sui_bridge/
monitor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! `BridgeMonitor` receives all `SuiBridgeEvent` and `EthBridgeEvent`
5//! and handles them accordingly.
6
7use crate::abi::{
8    EthBridgeCommitteeEvents, EthBridgeConfigEvents, EthBridgeEvent, EthBridgeLimiterEvents,
9    EthCommitteeUpgradeableContractEvents, EthSuiBridgeEvents,
10};
11use crate::client::bridge_authority_aggregator::BridgeAuthorityAggregator;
12use crate::crypto::BridgeAuthorityPublicKeyBytes;
13use crate::events::{BlocklistValidatorEvent, CommitteeMemberUrlUpdateEvent};
14use crate::events::{EmergencyOpEvent, SuiBridgeEvent};
15use crate::metrics::BridgeMetrics;
16use crate::retry_with_max_elapsed_time;
17use crate::sui_client::{SuiClient, SuiClientInner};
18use crate::types::{BridgeCommittee, IsBridgePaused};
19use arc_swap::ArcSwap;
20use std::collections::HashMap;
21use std::sync::Arc;
22use sui_types::TypeTag;
23use tokio::time::Duration;
24use tracing::{error, info, warn};
25
26const REFRESH_BRIDGE_RETRY_TIMES: u64 = 3;
27
28pub struct BridgeMonitor<C> {
29    sui_client: Arc<SuiClient<C>>,
30    sui_monitor_rx: mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
31    eth_monitor_rx: mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
32    bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
33    bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
34    sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
35    bridge_metrics: Arc<BridgeMetrics>,
36}
37
38impl<C> BridgeMonitor<C>
39where
40    C: SuiClientInner + 'static,
41{
42    pub fn new(
43        sui_client: Arc<SuiClient<C>>,
44        sui_monitor_rx: mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
45        eth_monitor_rx: mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
46        bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
47        bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
48        sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
49        bridge_metrics: Arc<BridgeMetrics>,
50    ) -> Self {
51        Self {
52            sui_client,
53            sui_monitor_rx,
54            eth_monitor_rx,
55            bridge_auth_agg,
56            bridge_paused_watch_tx,
57            sui_token_type_tags,
58            bridge_metrics,
59        }
60    }
61
62    pub async fn run(self) {
63        tracing::info!("Starting BridgeMonitor");
64        let Self {
65            sui_client,
66            mut sui_monitor_rx,
67            mut eth_monitor_rx,
68            bridge_auth_agg,
69            bridge_paused_watch_tx,
70            sui_token_type_tags,
71            bridge_metrics,
72        } = self;
73        let mut latest_token_config = (*sui_token_type_tags.load().clone()).clone();
74
75        loop {
76            tokio::select! {
77                sui_event = sui_monitor_rx.recv() => {
78                    if let Some(sui_event) = sui_event {
79                        Self::handle_sui_events(
80                            sui_event,
81                            &sui_client,
82                            &bridge_auth_agg,
83                            &bridge_paused_watch_tx,
84                            &sui_token_type_tags,
85                            &bridge_metrics,
86                            &mut latest_token_config,
87                        )
88                        .await;
89                    } else {
90                        panic!("BridgeMonitor sui events channel was closed unexpectedly");
91                    }
92                }
93                eth_event = eth_monitor_rx.recv() => {
94                    if let Some(eth_event) = eth_event {
95                        Self::handle_eth_events(eth_event, &bridge_metrics);
96                    } else {
97                        panic!("BridgeMonitor eth events channel was closed unexpectedly");
98                    }
99                }
100            }
101        }
102    }
103
104    async fn handle_sui_events(
105        event: SuiBridgeEvent,
106        sui_client: &Arc<SuiClient<C>>,
107        bridge_auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
108        bridge_paused_watch_tx: &tokio::sync::watch::Sender<IsBridgePaused>,
109        sui_token_type_tags: &Arc<ArcSwap<HashMap<u8, TypeTag>>>,
110        bridge_metrics: &Arc<BridgeMetrics>,
111        latest_token_config: &mut HashMap<u8, TypeTag>,
112    ) {
113        info!("Received SuiBridgeEvent: {:?}", event);
114        macro_rules! bump_sui_counter {
115            ($action:expr) => {
116                bridge_metrics
117                    .observed_governance_actions
118                    .with_label_values(&[$action, "sui"])
119                    .inc();
120            };
121        }
122
123        match event {
124            SuiBridgeEvent::SuiToEthTokenBridgeV1(_) => (),
125            SuiBridgeEvent::SuiToEthTokenBridgeV2(_) => (),
126            SuiBridgeEvent::TokenTransferApproved(_) => (),
127            SuiBridgeEvent::TokenTransferClaimed(_) => (),
128            SuiBridgeEvent::TokenTransferAlreadyApproved(_) => (),
129            SuiBridgeEvent::TokenTransferAlreadyClaimed(_) => (),
130            SuiBridgeEvent::TokenTransferLimitExceed(_) => {
131                // TODO do we want to do anything here?
132            }
133
134            SuiBridgeEvent::EmergencyOpEvent(event) => {
135                bump_sui_counter!(if event.frozen {
136                    "bridge_paused"
137                } else {
138                    "bridge_unpaused"
139                });
140                let is_paused = get_latest_bridge_pause_status_with_emergency_event(
141                    sui_client.clone(),
142                    event,
143                    Duration::from_secs(10),
144                )
145                .await;
146                bridge_paused_watch_tx
147                    .send(is_paused)
148                    .expect("Bridge pause status watch channel should not be closed");
149            }
150
151            SuiBridgeEvent::CommitteeMemberRegistration(_) => {
152                bump_sui_counter!("committee_member_registered");
153            }
154            SuiBridgeEvent::CommitteeUpdateEvent(_) => {
155                bump_sui_counter!("committee_updated");
156            }
157
158            SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(event) => {
159                bump_sui_counter!("committee_member_url_updated");
160                let new_committee = get_latest_bridge_committee_with_url_update_event(
161                    sui_client.clone(),
162                    event,
163                    Duration::from_secs(10),
164                )
165                .await;
166                let committee_names = bridge_auth_agg.load().committee_keys_to_names.clone();
167                bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(
168                    Arc::new(new_committee),
169                    bridge_metrics.clone(),
170                    committee_names,
171                )));
172                info!("Committee updated with CommitteeMemberUrlUpdateEvent");
173            }
174
175            SuiBridgeEvent::BlocklistValidatorEvent(event) => {
176                bump_sui_counter!(if event.blocklisted {
177                    "validator_blocklisted"
178                } else {
179                    "validator_unblocklisted"
180                });
181                let new_committee = get_latest_bridge_committee_with_blocklist_event(
182                    sui_client.clone(),
183                    event,
184                    Duration::from_secs(10),
185                )
186                .await;
187                let committee_names = bridge_auth_agg.load().committee_keys_to_names.clone();
188                bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(
189                    Arc::new(new_committee),
190                    bridge_metrics.clone(),
191                    committee_names,
192                )));
193                info!("Committee updated with BlocklistValidatorEvent");
194            }
195
196            SuiBridgeEvent::TokenRegistrationEvent(_) => {
197                bump_sui_counter!("new_token_registered");
198            }
199
200            SuiBridgeEvent::NewTokenEvent(event) => {
201                bump_sui_counter!("new_token_added");
202                if let std::collections::hash_map::Entry::Vacant(entry) =
203                    // We only add new tokens but not remove so it's ok to just insert
204                    latest_token_config.entry(event.token_id)
205                {
206                    entry.insert(event.type_name.clone());
207                    sui_token_type_tags.store(Arc::new(latest_token_config.clone()));
208                } else {
209                    // invariant
210                    assert_eq!(event.type_name, latest_token_config[&event.token_id]);
211                }
212            }
213
214            SuiBridgeEvent::UpdateTokenPriceEvent(_event) => {
215                bump_sui_counter!("token_price_updated");
216            }
217
218            SuiBridgeEvent::UpdateRouteLimitEvent(_event) => {
219                bump_sui_counter!("limit_updated");
220            }
221        }
222    }
223
224    fn handle_eth_events(event: EthBridgeEvent, bridge_metrics: &Arc<BridgeMetrics>) {
225        info!("Received EthBridgeEvent: {:?}", event);
226
227        macro_rules! bump_eth_counter {
228            ($action:expr) => {
229                bridge_metrics
230                    .observed_governance_actions
231                    .with_label_values(&[$action, "eth"])
232                    .inc()
233            };
234        }
235
236        match event {
237            EthBridgeEvent::EthBridgeCommitteeEvents(event) => match event {
238                EthBridgeCommitteeEvents::BlocklistUpdated(event) => {
239                    bump_eth_counter!(if event.isBlocklisted {
240                        "validator_blocklisted"
241                    } else {
242                        "validator_unblocklisted"
243                    });
244                }
245                EthBridgeCommitteeEvents::Initialized(_) => {
246                    bump_eth_counter!("committee_contract_initialized");
247                }
248                EthBridgeCommitteeEvents::Upgraded(_) => {
249                    bump_eth_counter!("committee_contract_upgraded");
250                }
251                EthBridgeCommitteeEvents::BlocklistUpdatedV2(e) => {
252                    bump_eth_counter!(if e.isBlocklisted {
253                        "validator_blocklisted"
254                    } else {
255                        "validator_unblocklisted"
256                    });
257                }
258                EthBridgeCommitteeEvents::ContractUpgraded(_) => {
259                    bump_eth_counter!("committee_contract_upgraded");
260                }
261            },
262            EthBridgeEvent::EthBridgeLimiterEvents(event) => match event {
263                EthBridgeLimiterEvents::Initialized(_) => {
264                    bump_eth_counter!("limiter_contract_initialized");
265                }
266                EthBridgeLimiterEvents::Upgraded(_) => {
267                    bump_eth_counter!("limiter_contract_upgraded");
268                }
269                EthBridgeLimiterEvents::OwnershipTransferred(_) => {
270                    bump_eth_counter!("limiter_contract_ownership_transferred");
271                }
272                EthBridgeLimiterEvents::LimitUpdated(_) => {
273                    bump_eth_counter!("limit_updated");
274                }
275                // This event is deprecated but we keep it for ABI compatibility
276                // TODO: We can safely update abi and remove it once the testnet bridge contract is upgraded
277                EthBridgeLimiterEvents::HourlyTransferAmountUpdated(_) => (),
278                EthBridgeLimiterEvents::ContractUpgraded(_) => {
279                    bump_eth_counter!("limiter_contract_upgraded");
280                }
281                EthBridgeLimiterEvents::LimitUpdatedV2(_) => {
282                    bump_eth_counter!("limit_updated");
283                }
284            },
285            EthBridgeEvent::EthBridgeConfigEvents(event) => match event {
286                EthBridgeConfigEvents::Initialized(_) => {
287                    bump_eth_counter!("config_contract_initialized");
288                }
289                EthBridgeConfigEvents::Upgraded(_) => {
290                    bump_eth_counter!("config_contract_upgraded");
291                }
292                EthBridgeConfigEvents::TokenAdded(_) => {
293                    bump_eth_counter!("new_token_added");
294                }
295                EthBridgeConfigEvents::TokenPriceUpdated(_) => {
296                    bump_eth_counter!("update_token_price");
297                }
298                EthBridgeConfigEvents::ContractUpgraded(_) => {
299                    bump_eth_counter!("config_contract_upgraded");
300                }
301                EthBridgeConfigEvents::TokenPriceUpdatedV2(_) => {
302                    bump_eth_counter!("update_token_price");
303                }
304                EthBridgeConfigEvents::TokensAddedV2(_) => {
305                    bump_eth_counter!("new_token_added");
306                }
307            },
308            EthBridgeEvent::EthCommitteeUpgradeableContractEvents(event) => match event {
309                EthCommitteeUpgradeableContractEvents::Initialized(_) => {
310                    bump_eth_counter!("upgradeable_contract_initialized");
311                }
312                EthCommitteeUpgradeableContractEvents::Upgraded(_) => {
313                    bump_eth_counter!("upgradeable_contract_upgraded");
314                }
315            },
316            EthBridgeEvent::EthSuiBridgeEvents(event) => match event {
317                EthSuiBridgeEvents::TokensClaimed(_) => (),
318                EthSuiBridgeEvents::TokensDeposited(_) => (),
319                EthSuiBridgeEvents::TokensDepositedV2(_) => (),
320                EthSuiBridgeEvents::Paused(_) => bump_eth_counter!("bridge_paused"),
321                EthSuiBridgeEvents::Unpaused(_) => bump_eth_counter!("bridge_unpaused"),
322                EthSuiBridgeEvents::Upgraded(_) => {
323                    bump_eth_counter!("bridge_contract_upgraded")
324                }
325                EthSuiBridgeEvents::Initialized(_) => {
326                    bump_eth_counter!("bridge_contract_initialized")
327                }
328                EthSuiBridgeEvents::ContractUpgraded(_) => {
329                    bump_eth_counter!("bridge_contract_upgraded")
330                }
331                EthSuiBridgeEvents::EmergencyOperation(e) => {
332                    if e.paused {
333                        bump_eth_counter!("bridge_paused")
334                    } else {
335                        bump_eth_counter!("bridge_unpaused")
336                    }
337                }
338            },
339        }
340    }
341}
342
343async fn get_latest_bridge_committee_with_url_update_event<C: SuiClientInner>(
344    sui_client: Arc<SuiClient<C>>,
345    event: CommitteeMemberUrlUpdateEvent,
346    staleness_retry_interval: Duration,
347) -> BridgeCommittee {
348    let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
349    loop {
350        let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
351            sui_client.get_bridge_committee(),
352            Duration::from_secs(600)
353        ) else {
354            error!("Failed to get bridge committee after retry");
355            continue;
356        };
357        let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(&event.member));
358        let Some(member) = member else {
359            // This is possible when a node is processing an older event while the member quitted at a later point, which is fine.
360            // Or fullnode returns a stale committee that the member hasn't joined, which is rare and tricy to handle so we just log it.
361            warn!(
362                "Committee member not found in the committee: {:?}",
363                event.member
364            );
365            return committee;
366        };
367        if member.base_url == event.new_url {
368            return committee;
369        }
370        // If url does not match, it could be:
371        // 1. the query is sent to a stale fullnode that does not have the latest data yet
372        // 2. the node is processing an older message, and the latest url has changed again
373        // In either case, we retry a few times. If it still fails to match, we assume it's the latter case.
374        tokio::time::sleep(staleness_retry_interval).await;
375        remaining_retry_times -= 1;
376        if remaining_retry_times == 0 {
377            warn!(
378                "Committee member url {:?} does not match onchain record {:?} after retry",
379                event.member, member
380            );
381            return committee;
382        }
383    }
384}
385
386async fn get_latest_bridge_committee_with_blocklist_event<C: SuiClientInner>(
387    sui_client: Arc<SuiClient<C>>,
388    event: BlocklistValidatorEvent,
389    staleness_retry_interval: Duration,
390) -> BridgeCommittee {
391    let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
392    loop {
393        let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
394            sui_client.get_bridge_committee(),
395            Duration::from_secs(600)
396        ) else {
397            error!("Failed to get bridge committee after retry");
398            continue;
399        };
400        let mut any_mismatch = false;
401        for pk in &event.public_keys {
402            let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(pk));
403            let Some(member) = member else {
404                // This is possible when a node is processing an older event while the member
405                // quitted at a later point. Or fullnode returns a stale committee that
406                // the member hasn't joined.
407                warn!("Committee member not found in the committee: {:?}", pk);
408                any_mismatch = true;
409                break;
410            };
411            if member.is_blocklisted != event.blocklisted {
412                warn!(
413                    "Committee member blocklist status does not match onchain record: {:?}",
414                    member
415                );
416                any_mismatch = true;
417                break;
418            }
419        }
420        if !any_mismatch {
421            return committee;
422        }
423        // If there is any match, it could be:
424        // 1. the query is sent to a stale fullnode that does not have the latest data yet
425        // 2. the node is processing an older message, and the latest blocklist status has changed again
426        // In either case, we retry a few times. If it still fails to match, we assume it's the latter case.
427        tokio::time::sleep(staleness_retry_interval).await;
428        remaining_retry_times -= 1;
429        if remaining_retry_times == 0 {
430            warn!(
431                "Committee member blocklist status {:?} does not match onchain record after retry",
432                event
433            );
434            return committee;
435        }
436    }
437}
438
439async fn get_latest_bridge_pause_status_with_emergency_event<C: SuiClientInner>(
440    sui_client: Arc<SuiClient<C>>,
441    event: EmergencyOpEvent,
442    staleness_retry_interval: Duration,
443) -> IsBridgePaused {
444    let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
445    loop {
446        let Ok(Ok(summary)) =
447            retry_with_max_elapsed_time!(sui_client.get_bridge_summary(), Duration::from_secs(600))
448        else {
449            error!("Failed to get bridge summary after retry");
450            continue;
451        };
452        if summary.is_frozen == event.frozen {
453            return summary.is_frozen;
454        }
455        // If the onchain status does not match, it could be:
456        // 1. the query is sent to a stale fullnode that does not have the latest data yet
457        // 2. the node is processing an older message, and the latest status has changed again
458        // In either case, we retry a few times. If it still fails to match, we assume it's the latter case.
459        tokio::time::sleep(staleness_retry_interval).await;
460        remaining_retry_times -= 1;
461        if remaining_retry_times == 0 {
462            warn!(
463                "Bridge pause status {:?} does not match onchain record {:?} after retry",
464                event, summary.is_frozen
465            );
466            return summary.is_frozen;
467        }
468    }
469}
470
471#[cfg(test)]
472mod tests {
473    use std::str::FromStr;
474
475    use super::*;
476    use crate::events::{NewTokenEvent, init_all_struct_tags};
477    use crate::test_utils::{
478        bridge_committee_to_bridge_committee_summary, get_test_authority_and_key,
479    };
480    use crate::types::{BRIDGE_PAUSED, BRIDGE_UNPAUSED, BridgeAuthority};
481    use fastcrypto::traits::KeyPair;
482    use prometheus::Registry;
483    use sui_types::base_types::SuiAddress;
484    use sui_types::bridge::BridgeCommitteeSummary;
485    use sui_types::bridge::MoveTypeCommitteeMember;
486    use sui_types::crypto::get_key_pair;
487
488    use crate::{sui_mock_client::SuiMockClient, types::BridgeCommittee};
489    use sui_types::crypto::ToFromBytes;
490
491    #[tokio::test]
492    async fn test_get_latest_bridge_committee_with_url_update_event() {
493        telemetry_subscribers::init_for_testing();
494        let sui_client_mock = SuiMockClient::default();
495        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
496        let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
497        let pk = kp.public().clone();
498        let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
499        let pk_bytes = pk_as_bytes.as_bytes().to_vec();
500        let event = CommitteeMemberUrlUpdateEvent {
501            member: pk,
502            new_url: "http://new.url".to_string(),
503        };
504        let summary = BridgeCommitteeSummary {
505            members: vec![(
506                pk_bytes.clone(),
507                MoveTypeCommitteeMember {
508                    sui_address: SuiAddress::random_for_testing_only(),
509                    bridge_pubkey_bytes: pk_bytes.clone(),
510                    voting_power: 10000,
511                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
512                    blocklisted: false,
513                },
514            )],
515            member_registration: vec![],
516            last_committee_update_epoch: 0,
517        };
518
519        // Test the regular case, the onchain url matches
520        sui_client_mock.set_bridge_committee(summary.clone());
521        let timer = std::time::Instant::now();
522        let committee = get_latest_bridge_committee_with_url_update_event(
523            sui_client.clone(),
524            event.clone(),
525            Duration::from_secs(2),
526        )
527        .await;
528        assert_eq!(
529            committee.member(&pk_as_bytes).unwrap().base_url,
530            "http://new.url"
531        );
532        assert!(timer.elapsed().as_millis() < 500);
533
534        // Test the case where the onchain url is older. Then update onchain url in 1 second.
535        // Since the retry interval is 2 seconds, it should return the next retry.
536        let old_summary = BridgeCommitteeSummary {
537            members: vec![(
538                pk_bytes.clone(),
539                MoveTypeCommitteeMember {
540                    sui_address: SuiAddress::random_for_testing_only(),
541                    bridge_pubkey_bytes: pk_bytes.clone(),
542                    voting_power: 10000,
543                    http_rest_url: "http://old.url".to_string().as_bytes().to_vec(),
544                    blocklisted: false,
545                },
546            )],
547            member_registration: vec![],
548            last_committee_update_epoch: 0,
549        };
550        sui_client_mock.set_bridge_committee(old_summary.clone());
551        let timer = std::time::Instant::now();
552        // update the url to "http://new.url" in 1 second
553        let sui_client_mock_clone = sui_client_mock.clone();
554        tokio::spawn(async move {
555            tokio::time::sleep(Duration::from_secs(1)).await;
556            sui_client_mock_clone.set_bridge_committee(summary.clone());
557        });
558        let committee = get_latest_bridge_committee_with_url_update_event(
559            sui_client.clone(),
560            event.clone(),
561            Duration::from_secs(2),
562        )
563        .await;
564        assert_eq!(
565            committee.member(&pk_as_bytes).unwrap().base_url,
566            "http://new.url"
567        );
568        let elapsed = timer.elapsed().as_millis();
569        assert!(elapsed > 1000 && elapsed < 3000);
570
571        // Test the case where the onchain url is newer. It should retry up to
572        // REFRESH_BRIDGE_RETRY_TIMES time then return the onchain record.
573        let newer_summary = BridgeCommitteeSummary {
574            members: vec![(
575                pk_bytes.clone(),
576                MoveTypeCommitteeMember {
577                    sui_address: SuiAddress::random_for_testing_only(),
578                    bridge_pubkey_bytes: pk_bytes.clone(),
579                    voting_power: 10000,
580                    http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
581                    blocklisted: false,
582                },
583            )],
584            member_registration: vec![],
585            last_committee_update_epoch: 0,
586        };
587        sui_client_mock.set_bridge_committee(newer_summary.clone());
588        let timer = std::time::Instant::now();
589        let committee = get_latest_bridge_committee_with_url_update_event(
590            sui_client.clone(),
591            event.clone(),
592            Duration::from_millis(500),
593        )
594        .await;
595        assert_eq!(
596            committee.member(&pk_as_bytes).unwrap().base_url,
597            "http://newer.url"
598        );
599        let elapsed = timer.elapsed().as_millis();
600        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
601
602        // Test the case where the member is not found in the committee
603        // It should return the onchain record.
604        let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
605        let pk2 = kp2.public().clone();
606        let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
607        let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
608        let newer_summary = BridgeCommitteeSummary {
609            members: vec![(
610                pk_bytes2.clone(),
611                MoveTypeCommitteeMember {
612                    sui_address: SuiAddress::random_for_testing_only(),
613                    bridge_pubkey_bytes: pk_bytes2.clone(),
614                    voting_power: 10000,
615                    http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
616                    blocklisted: false,
617                },
618            )],
619            member_registration: vec![],
620            last_committee_update_epoch: 0,
621        };
622        sui_client_mock.set_bridge_committee(newer_summary.clone());
623        let timer = std::time::Instant::now();
624        let committee = get_latest_bridge_committee_with_url_update_event(
625            sui_client.clone(),
626            event.clone(),
627            Duration::from_secs(1),
628        )
629        .await;
630        assert_eq!(
631            committee.member(&pk_as_bytes2).unwrap().base_url,
632            "http://newer.url"
633        );
634        assert!(committee.member(&pk_as_bytes).is_none());
635        let elapsed = timer.elapsed().as_millis();
636        assert!(elapsed < 1000);
637    }
638
639    #[tokio::test]
640    async fn test_get_latest_bridge_committee_with_blocklist_event() {
641        telemetry_subscribers::init_for_testing();
642        let sui_client_mock = SuiMockClient::default();
643        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
644        let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
645        let pk = kp.public().clone();
646        let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
647        let pk_bytes = pk_as_bytes.as_bytes().to_vec();
648
649        // Test the case where the onchain status is the same as the event (blocklisted)
650        let event = BlocklistValidatorEvent {
651            blocklisted: true,
652            public_keys: vec![pk.clone()],
653        };
654        let summary = BridgeCommitteeSummary {
655            members: vec![(
656                pk_bytes.clone(),
657                MoveTypeCommitteeMember {
658                    sui_address: SuiAddress::random_for_testing_only(),
659                    bridge_pubkey_bytes: pk_bytes.clone(),
660                    voting_power: 10000,
661                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
662                    blocklisted: true,
663                },
664            )],
665            member_registration: vec![],
666            last_committee_update_epoch: 0,
667        };
668        sui_client_mock.set_bridge_committee(summary.clone());
669        let timer = std::time::Instant::now();
670        let committee = get_latest_bridge_committee_with_blocklist_event(
671            sui_client.clone(),
672            event.clone(),
673            Duration::from_secs(2),
674        )
675        .await;
676        assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
677        assert!(timer.elapsed().as_millis() < 500);
678
679        // Test the case where the onchain status is the same as the event (unblocklisted)
680        let event = BlocklistValidatorEvent {
681            blocklisted: false,
682            public_keys: vec![pk.clone()],
683        };
684        let summary = BridgeCommitteeSummary {
685            members: vec![(
686                pk_bytes.clone(),
687                MoveTypeCommitteeMember {
688                    sui_address: SuiAddress::random_for_testing_only(),
689                    bridge_pubkey_bytes: pk_bytes.clone(),
690                    voting_power: 10000,
691                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
692                    blocklisted: false,
693                },
694            )],
695            member_registration: vec![],
696            last_committee_update_epoch: 0,
697        };
698        sui_client_mock.set_bridge_committee(summary.clone());
699        let timer = std::time::Instant::now();
700        let committee = get_latest_bridge_committee_with_blocklist_event(
701            sui_client.clone(),
702            event.clone(),
703            Duration::from_secs(2),
704        )
705        .await;
706        assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
707        assert!(timer.elapsed().as_millis() < 500);
708
709        // Test the case where the onchain status is older. Then update onchain status in 1 second.
710        // Since the retry interval is 2 seconds, it should return the next retry.
711        let old_summary = BridgeCommitteeSummary {
712            members: vec![(
713                pk_bytes.clone(),
714                MoveTypeCommitteeMember {
715                    sui_address: SuiAddress::random_for_testing_only(),
716                    bridge_pubkey_bytes: pk_bytes.clone(),
717                    voting_power: 10000,
718                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
719                    blocklisted: true,
720                },
721            )],
722            member_registration: vec![],
723            last_committee_update_epoch: 0,
724        };
725        sui_client_mock.set_bridge_committee(old_summary.clone());
726        let timer = std::time::Instant::now();
727        // update unblocklisted in 1 second
728        let sui_client_mock_clone = sui_client_mock.clone();
729        tokio::spawn(async move {
730            tokio::time::sleep(Duration::from_secs(1)).await;
731            sui_client_mock_clone.set_bridge_committee(summary.clone());
732        });
733        let committee = get_latest_bridge_committee_with_blocklist_event(
734            sui_client.clone(),
735            event.clone(),
736            Duration::from_secs(2),
737        )
738        .await;
739        assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
740        let elapsed = timer.elapsed().as_millis();
741        assert!(elapsed > 1000 && elapsed < 3000);
742
743        // Test the case where the onchain url is newer. It should retry up to
744        // REFRESH_BRIDGE_RETRY_TIMES time then return the onchain record.
745        let newer_summary = BridgeCommitteeSummary {
746            members: vec![(
747                pk_bytes.clone(),
748                MoveTypeCommitteeMember {
749                    sui_address: SuiAddress::random_for_testing_only(),
750                    bridge_pubkey_bytes: pk_bytes.clone(),
751                    voting_power: 10000,
752                    http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
753                    blocklisted: true,
754                },
755            )],
756            member_registration: vec![],
757            last_committee_update_epoch: 0,
758        };
759        sui_client_mock.set_bridge_committee(newer_summary.clone());
760        let timer = std::time::Instant::now();
761        let committee = get_latest_bridge_committee_with_blocklist_event(
762            sui_client.clone(),
763            event.clone(),
764            Duration::from_millis(500),
765        )
766        .await;
767        assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
768        let elapsed = timer.elapsed().as_millis();
769        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
770
771        // Test the case where the member onchain url is not found in the committee
772        // It should return the onchain record after retrying a few times.
773        let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
774        let pk2 = kp2.public().clone();
775        let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
776        let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
777        let summary = BridgeCommitteeSummary {
778            members: vec![(
779                pk_bytes2.clone(),
780                MoveTypeCommitteeMember {
781                    sui_address: SuiAddress::random_for_testing_only(),
782                    bridge_pubkey_bytes: pk_bytes2.clone(),
783                    voting_power: 10000,
784                    http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
785                    blocklisted: false,
786                },
787            )],
788            member_registration: vec![],
789            last_committee_update_epoch: 0,
790        };
791        sui_client_mock.set_bridge_committee(summary.clone());
792        let timer = std::time::Instant::now();
793        let committee = get_latest_bridge_committee_with_blocklist_event(
794            sui_client.clone(),
795            event.clone(),
796            Duration::from_secs(1),
797        )
798        .await;
799        assert_eq!(
800            committee.member(&pk_as_bytes2).unwrap().base_url,
801            "http://newer.url"
802        );
803        assert!(committee.member(&pk_as_bytes).is_none());
804        let elapsed = timer.elapsed().as_millis();
805        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
806
807        // Test any mismtach in the blocklist status should retry a few times
808        let event = BlocklistValidatorEvent {
809            blocklisted: true,
810            public_keys: vec![pk, pk2],
811        };
812        let summary = BridgeCommitteeSummary {
813            members: vec![
814                (
815                    pk_bytes.clone(),
816                    MoveTypeCommitteeMember {
817                        sui_address: SuiAddress::random_for_testing_only(),
818                        bridge_pubkey_bytes: pk_bytes.clone(),
819                        voting_power: 5000,
820                        http_rest_url: "http://pk.url".to_string().as_bytes().to_vec(),
821                        blocklisted: true,
822                    },
823                ),
824                (
825                    pk_bytes2.clone(),
826                    MoveTypeCommitteeMember {
827                        sui_address: SuiAddress::random_for_testing_only(),
828                        bridge_pubkey_bytes: pk_bytes2.clone(),
829                        voting_power: 5000,
830                        http_rest_url: "http://pk2.url".to_string().as_bytes().to_vec(),
831                        blocklisted: false,
832                    },
833                ),
834            ],
835            member_registration: vec![],
836            last_committee_update_epoch: 0,
837        };
838        sui_client_mock.set_bridge_committee(summary.clone());
839        let timer = std::time::Instant::now();
840        let committee = get_latest_bridge_committee_with_blocklist_event(
841            sui_client.clone(),
842            event.clone(),
843            Duration::from_millis(500),
844        )
845        .await;
846        assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
847        assert!(!committee.member(&pk_as_bytes2).unwrap().is_blocklisted);
848        let elapsed = timer.elapsed().as_millis();
849        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
850    }
851
852    #[tokio::test]
853    async fn test_get_bridge_pause_status_with_emergency_event() {
854        telemetry_subscribers::init_for_testing();
855        let sui_client_mock = SuiMockClient::default();
856        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
857
858        // Test event and onchain status match
859        let event = EmergencyOpEvent { frozen: true };
860        sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
861        let timer = std::time::Instant::now();
862        assert!(
863            get_latest_bridge_pause_status_with_emergency_event(
864                sui_client.clone(),
865                event.clone(),
866                Duration::from_secs(2),
867            )
868            .await
869        );
870        assert!(timer.elapsed().as_millis() < 500);
871
872        let event = EmergencyOpEvent { frozen: false };
873        sui_client_mock.set_is_bridge_paused(BRIDGE_UNPAUSED);
874        let timer = std::time::Instant::now();
875        assert!(
876            !get_latest_bridge_pause_status_with_emergency_event(
877                sui_client.clone(),
878                event.clone(),
879                Duration::from_secs(2),
880            )
881            .await
882        );
883        assert!(timer.elapsed().as_millis() < 500);
884
885        // Test the case where the onchain status (paused) is older. Then update onchain status in 1 second.
886        // Since the retry interval is 2 seconds, it should return the next retry.
887        sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
888        let timer = std::time::Instant::now();
889        // update the bridge to unpaused in 1 second
890        let sui_client_mock_clone = sui_client_mock.clone();
891        tokio::spawn(async move {
892            tokio::time::sleep(Duration::from_secs(1)).await;
893            sui_client_mock_clone.set_is_bridge_paused(BRIDGE_UNPAUSED);
894        });
895        assert!(
896            !get_latest_bridge_pause_status_with_emergency_event(
897                sui_client.clone(),
898                event.clone(),
899                Duration::from_secs(2),
900            )
901            .await
902        );
903        let elapsed = timer.elapsed().as_millis();
904        assert!(elapsed > 1000 && elapsed < 3000, "{}", elapsed);
905
906        // Test the case where the onchain status (paused) is newer. It should retry up to
907        // REFRESH_BRIDGE_RETRY_TIMES time then return the onchain record.
908        sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
909        let timer = std::time::Instant::now();
910        assert!(
911            get_latest_bridge_pause_status_with_emergency_event(
912                sui_client.clone(),
913                event.clone(),
914                Duration::from_secs(2),
915            )
916            .await
917        );
918        let elapsed = timer.elapsed().as_millis();
919        assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
920    }
921
922    #[tokio::test]
923    async fn test_update_bridge_authority_aggregation_with_url_change_event() {
924        let (
925            sui_monitor_tx,
926            sui_monitor_rx,
927            _eth_monitor_tx,
928            eth_monitor_rx,
929            sui_client_mock,
930            sui_client,
931            bridge_pause_tx,
932            _bridge_pause_rx,
933            mut authorities,
934            bridge_metrics,
935        ) = setup();
936        let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
937        let agg = Arc::new(ArcSwap::new(Arc::new(
938            BridgeAuthorityAggregator::new_for_testing(Arc::new(old_committee)),
939        )));
940        let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
941        let _handle = tokio::task::spawn(
942            BridgeMonitor::new(
943                sui_client.clone(),
944                sui_monitor_rx,
945                eth_monitor_rx,
946                agg.clone(),
947                bridge_pause_tx,
948                sui_token_type_tags,
949                bridge_metrics,
950            )
951            .run(),
952        );
953        let new_url = "http://new.url".to_string();
954        authorities[0].base_url = new_url.clone();
955        let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
956        let new_committee_summary =
957            bridge_committee_to_bridge_committee_summary(new_committee.clone());
958        sui_client_mock.set_bridge_committee(new_committee_summary.clone());
959        sui_monitor_tx
960            .send(SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(
961                CommitteeMemberUrlUpdateEvent {
962                    member: authorities[0].pubkey.clone(),
963                    new_url: new_url.clone(),
964                },
965            ))
966            .await
967            .unwrap();
968        // Wait for the monitor to process the event
969        tokio::time::sleep(Duration::from_secs(1)).await;
970        // Now expect the committee to be updated
971        assert_eq!(
972            agg.load()
973                .committee
974                .member(&BridgeAuthorityPublicKeyBytes::from(&authorities[0].pubkey))
975                .unwrap()
976                .base_url,
977            new_url
978        );
979    }
980
981    #[tokio::test]
982    async fn test_update_bridge_authority_aggregation_with_blocklist_event() {
983        let (
984            sui_monitor_tx,
985            sui_monitor_rx,
986            _eth_monitor_tx,
987            eth_monitor_rx,
988            sui_client_mock,
989            sui_client,
990            bridge_pause_tx,
991            _bridge_pause_rx,
992            mut authorities,
993            bridge_metrics,
994        ) = setup();
995        let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
996        let agg = Arc::new(ArcSwap::new(Arc::new(
997            BridgeAuthorityAggregator::new_for_testing(Arc::new(old_committee)),
998        )));
999        let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1000        let _handle = tokio::task::spawn(
1001            BridgeMonitor::new(
1002                sui_client.clone(),
1003                sui_monitor_rx,
1004                eth_monitor_rx,
1005                agg.clone(),
1006                bridge_pause_tx,
1007                sui_token_type_tags,
1008                bridge_metrics,
1009            )
1010            .run(),
1011        );
1012        authorities[0].is_blocklisted = true;
1013        let to_blocklist = &authorities[0];
1014        let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
1015        let new_committee_summary =
1016            bridge_committee_to_bridge_committee_summary(new_committee.clone());
1017        sui_client_mock.set_bridge_committee(new_committee_summary.clone());
1018        sui_monitor_tx
1019            .send(SuiBridgeEvent::BlocklistValidatorEvent(
1020                BlocklistValidatorEvent {
1021                    public_keys: vec![to_blocklist.pubkey.clone()],
1022                    blocklisted: true,
1023                },
1024            ))
1025            .await
1026            .unwrap();
1027        // Wait for the monitor to process the event
1028        tokio::time::sleep(Duration::from_secs(1)).await;
1029        assert!(
1030            agg.load()
1031                .committee
1032                .member(&BridgeAuthorityPublicKeyBytes::from(&to_blocklist.pubkey))
1033                .unwrap()
1034                .is_blocklisted,
1035        );
1036    }
1037
1038    #[tokio::test]
1039    async fn test_update_bridge_pause_status_with_emergency_event() {
1040        let (
1041            sui_monitor_tx,
1042            sui_monitor_rx,
1043            _eth_monitor_tx,
1044            eth_monitor_rx,
1045            sui_client_mock,
1046            sui_client,
1047            bridge_pause_tx,
1048            bridge_pause_rx,
1049            authorities,
1050            bridge_metrics,
1051        ) = setup();
1052        let event = EmergencyOpEvent {
1053            frozen: !*bridge_pause_tx.borrow(), // toggle the bridge pause status
1054        };
1055        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
1056        let agg = Arc::new(ArcSwap::new(Arc::new(
1057            BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1058        )));
1059        let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1060        let _handle = tokio::task::spawn(
1061            BridgeMonitor::new(
1062                sui_client.clone(),
1063                sui_monitor_rx,
1064                eth_monitor_rx,
1065                agg.clone(),
1066                bridge_pause_tx,
1067                sui_token_type_tags,
1068                bridge_metrics,
1069            )
1070            .run(),
1071        );
1072
1073        sui_client_mock.set_is_bridge_paused(event.frozen);
1074        sui_monitor_tx
1075            .send(SuiBridgeEvent::EmergencyOpEvent(event.clone()))
1076            .await
1077            .unwrap();
1078        // Wait for the monitor to process the event
1079        tokio::time::sleep(Duration::from_secs(1)).await;
1080        // Now expect the committee to be updated
1081        assert!(*bridge_pause_rx.borrow() == event.frozen);
1082    }
1083
1084    #[tokio::test]
1085    async fn test_update_sui_token_type_tags() {
1086        let (
1087            sui_monitor_tx,
1088            sui_monitor_rx,
1089            _eth_monitor_tx,
1090            eth_monitor_rx,
1091            _sui_client_mock,
1092            sui_client,
1093            bridge_pause_tx,
1094            _bridge_pause_rx,
1095            authorities,
1096            bridge_metrics,
1097        ) = setup();
1098        let event = NewTokenEvent {
1099            token_id: 255,
1100            type_name: TypeTag::from_str("0xbeef::beef::BEEF").unwrap(),
1101            native_token: false,
1102            decimal_multiplier: 1000000,
1103            notional_value: 100000000,
1104        };
1105        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
1106        let agg = Arc::new(ArcSwap::new(Arc::new(
1107            BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1108        )));
1109        let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1110        let sui_token_type_tags_clone = sui_token_type_tags.clone();
1111        let _handle = tokio::task::spawn(
1112            BridgeMonitor::new(
1113                sui_client.clone(),
1114                sui_monitor_rx,
1115                eth_monitor_rx,
1116                agg.clone(),
1117                bridge_pause_tx,
1118                sui_token_type_tags_clone,
1119                bridge_metrics,
1120            )
1121            .run(),
1122        );
1123
1124        sui_monitor_tx
1125            .send(SuiBridgeEvent::NewTokenEvent(event.clone()))
1126            .await
1127            .unwrap();
1128        // Wait for the monitor to process the event
1129        tokio::time::sleep(Duration::from_secs(1)).await;
1130        // Now expect new token type tags to appear in sui_token_type_tags
1131        sui_token_type_tags
1132            .load()
1133            .clone()
1134            .get(&event.token_id)
1135            .unwrap();
1136    }
1137
1138    #[allow(clippy::type_complexity)]
1139    fn setup() -> (
1140        mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
1141        mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
1142        mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
1143        mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
1144        SuiMockClient,
1145        Arc<SuiClient<SuiMockClient>>,
1146        tokio::sync::watch::Sender<IsBridgePaused>,
1147        tokio::sync::watch::Receiver<IsBridgePaused>,
1148        Vec<BridgeAuthority>,
1149        Arc<BridgeMetrics>,
1150    ) {
1151        telemetry_subscribers::init_for_testing();
1152        let registry = Registry::new();
1153        mysten_metrics::init_metrics(&registry);
1154        init_all_struct_tags();
1155        let bridge_metrics = Arc::new(BridgeMetrics::new_for_testing());
1156
1157        let sui_client_mock = SuiMockClient::default();
1158        let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1159        let (sui_monitor_tx, sui_monitor_rx) = mysten_metrics::metered_channel::channel(
1160            10000,
1161            &mysten_metrics::get_metrics()
1162                .unwrap()
1163                .channel_inflight
1164                .with_label_values(&["sui_monitor_queue"]),
1165        );
1166        let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
1167            10000,
1168            &mysten_metrics::get_metrics()
1169                .unwrap()
1170                .channel_inflight
1171                .with_label_values(&["eth_monitor_queue"]),
1172        );
1173
1174        let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1175        let authorities = vec![
1176            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
1177            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
1178            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
1179            get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
1180        ];
1181        (
1182            sui_monitor_tx,
1183            sui_monitor_rx,
1184            eth_monitor_tx,
1185            eth_monitor_rx,
1186            sui_client_mock,
1187            sui_client,
1188            bridge_pause_tx,
1189            bridge_pause_rx,
1190            authorities,
1191            bridge_metrics,
1192        )
1193    }
1194}