1use crate::abi::{
8 EthBridgeCommitteeEvents, EthBridgeConfigEvents, EthBridgeEvent, EthBridgeLimiterEvents,
9 EthCommitteeUpgradeableContractEvents, EthSuiBridgeEvents,
10};
11use crate::client::bridge_authority_aggregator::BridgeAuthorityAggregator;
12use crate::crypto::BridgeAuthorityPublicKeyBytes;
13use crate::events::{BlocklistValidatorEvent, CommitteeMemberUrlUpdateEvent};
14use crate::events::{EmergencyOpEvent, SuiBridgeEvent};
15use crate::metrics::BridgeMetrics;
16use crate::retry_with_max_elapsed_time;
17use crate::sui_client::{SuiClient, SuiClientInner};
18use crate::types::{BridgeCommittee, IsBridgePaused};
19use arc_swap::ArcSwap;
20use std::collections::HashMap;
21use std::sync::Arc;
22use sui_types::TypeTag;
23use tokio::time::Duration;
24use tracing::{error, info, warn};
25
26const REFRESH_BRIDGE_RETRY_TIMES: u64 = 3;
27
28pub struct BridgeMonitor<C> {
29 sui_client: Arc<SuiClient<C>>,
30 sui_monitor_rx: mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
31 eth_monitor_rx: mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
32 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
33 bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
34 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
35 bridge_metrics: Arc<BridgeMetrics>,
36}
37
38impl<C> BridgeMonitor<C>
39where
40 C: SuiClientInner + 'static,
41{
42 pub fn new(
43 sui_client: Arc<SuiClient<C>>,
44 sui_monitor_rx: mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
45 eth_monitor_rx: mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
46 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
47 bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
48 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
49 bridge_metrics: Arc<BridgeMetrics>,
50 ) -> Self {
51 Self {
52 sui_client,
53 sui_monitor_rx,
54 eth_monitor_rx,
55 bridge_auth_agg,
56 bridge_paused_watch_tx,
57 sui_token_type_tags,
58 bridge_metrics,
59 }
60 }
61
62 pub async fn run(self) {
63 tracing::info!("Starting BridgeMonitor");
64 let Self {
65 sui_client,
66 mut sui_monitor_rx,
67 mut eth_monitor_rx,
68 bridge_auth_agg,
69 bridge_paused_watch_tx,
70 sui_token_type_tags,
71 bridge_metrics,
72 } = self;
73 let mut latest_token_config = (*sui_token_type_tags.load().clone()).clone();
74
75 loop {
76 tokio::select! {
77 sui_event = sui_monitor_rx.recv() => {
78 if let Some(sui_event) = sui_event {
79 Self::handle_sui_events(
80 sui_event,
81 &sui_client,
82 &bridge_auth_agg,
83 &bridge_paused_watch_tx,
84 &sui_token_type_tags,
85 &bridge_metrics,
86 &mut latest_token_config,
87 )
88 .await;
89 } else {
90 panic!("BridgeMonitor sui events channel was closed unexpectedly");
91 }
92 }
93 eth_event = eth_monitor_rx.recv() => {
94 if let Some(eth_event) = eth_event {
95 Self::handle_eth_events(eth_event, &bridge_metrics);
96 } else {
97 panic!("BridgeMonitor eth events channel was closed unexpectedly");
98 }
99 }
100 }
101 }
102 }
103
104 async fn handle_sui_events(
105 event: SuiBridgeEvent,
106 sui_client: &Arc<SuiClient<C>>,
107 bridge_auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
108 bridge_paused_watch_tx: &tokio::sync::watch::Sender<IsBridgePaused>,
109 sui_token_type_tags: &Arc<ArcSwap<HashMap<u8, TypeTag>>>,
110 bridge_metrics: &Arc<BridgeMetrics>,
111 latest_token_config: &mut HashMap<u8, TypeTag>,
112 ) {
113 info!("Received SuiBridgeEvent: {:?}", event);
114 macro_rules! bump_sui_counter {
115 ($action:expr) => {
116 bridge_metrics
117 .observed_governance_actions
118 .with_label_values(&[$action, "sui"])
119 .inc();
120 };
121 }
122
123 match event {
124 SuiBridgeEvent::SuiToEthTokenBridgeV1(_) => (),
125 SuiBridgeEvent::TokenTransferApproved(_) => (),
126 SuiBridgeEvent::TokenTransferClaimed(_) => (),
127 SuiBridgeEvent::TokenTransferAlreadyApproved(_) => (),
128 SuiBridgeEvent::TokenTransferAlreadyClaimed(_) => (),
129 SuiBridgeEvent::TokenTransferLimitExceed(_) => {
130 }
132
133 SuiBridgeEvent::EmergencyOpEvent(event) => {
134 bump_sui_counter!(if event.frozen {
135 "bridge_paused"
136 } else {
137 "bridge_unpaused"
138 });
139 let is_paused = get_latest_bridge_pause_status_with_emergency_event(
140 sui_client.clone(),
141 event,
142 Duration::from_secs(10),
143 )
144 .await;
145 bridge_paused_watch_tx
146 .send(is_paused)
147 .expect("Bridge pause status watch channel should not be closed");
148 }
149
150 SuiBridgeEvent::CommitteeMemberRegistration(_) => {
151 bump_sui_counter!("committee_member_registered");
152 }
153 SuiBridgeEvent::CommitteeUpdateEvent(_) => {
154 bump_sui_counter!("committee_updated");
155 }
156
157 SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(event) => {
158 bump_sui_counter!("committee_member_url_updated");
159 let new_committee = get_latest_bridge_committee_with_url_update_event(
160 sui_client.clone(),
161 event,
162 Duration::from_secs(10),
163 )
164 .await;
165 let committee_names = bridge_auth_agg.load().committee_keys_to_names.clone();
166 bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(
167 Arc::new(new_committee),
168 bridge_metrics.clone(),
169 committee_names,
170 )));
171 info!("Committee updated with CommitteeMemberUrlUpdateEvent");
172 }
173
174 SuiBridgeEvent::BlocklistValidatorEvent(event) => {
175 bump_sui_counter!(if event.blocklisted {
176 "validator_blocklisted"
177 } else {
178 "validator_unblocklisted"
179 });
180 let new_committee = get_latest_bridge_committee_with_blocklist_event(
181 sui_client.clone(),
182 event,
183 Duration::from_secs(10),
184 )
185 .await;
186 let committee_names = bridge_auth_agg.load().committee_keys_to_names.clone();
187 bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(
188 Arc::new(new_committee),
189 bridge_metrics.clone(),
190 committee_names,
191 )));
192 info!("Committee updated with BlocklistValidatorEvent");
193 }
194
195 SuiBridgeEvent::TokenRegistrationEvent(_) => {
196 bump_sui_counter!("new_token_registered");
197 }
198
199 SuiBridgeEvent::NewTokenEvent(event) => {
200 bump_sui_counter!("new_token_added");
201 if let std::collections::hash_map::Entry::Vacant(entry) =
202 latest_token_config.entry(event.token_id)
204 {
205 entry.insert(event.type_name.clone());
206 sui_token_type_tags.store(Arc::new(latest_token_config.clone()));
207 } else {
208 assert_eq!(event.type_name, latest_token_config[&event.token_id]);
210 }
211 }
212
213 SuiBridgeEvent::UpdateTokenPriceEvent(_event) => {
214 bump_sui_counter!("token_price_updated");
215 }
216
217 SuiBridgeEvent::UpdateRouteLimitEvent(_event) => {
218 bump_sui_counter!("limit_updated");
219 }
220 }
221 }
222
223 fn handle_eth_events(event: EthBridgeEvent, bridge_metrics: &Arc<BridgeMetrics>) {
224 info!("Received EthBridgeEvent: {:?}", event);
225
226 macro_rules! bump_eth_counter {
227 ($action:expr) => {
228 bridge_metrics
229 .observed_governance_actions
230 .with_label_values(&[$action, "eth"])
231 .inc()
232 };
233 }
234
235 match event {
236 EthBridgeEvent::EthBridgeCommitteeEvents(event) => match event {
237 EthBridgeCommitteeEvents::BlocklistUpdatedFilter(event) => {
238 bump_eth_counter!(if event.is_blocklisted {
239 "validator_blocklisted"
240 } else {
241 "validator_unblocklisted"
242 });
243 }
244 EthBridgeCommitteeEvents::InitializedFilter(_) => {
245 bump_eth_counter!("committee_contract_initialized");
246 }
247 EthBridgeCommitteeEvents::UpgradedFilter(_) => {
248 bump_eth_counter!("committee_contract_upgraded");
249 }
250 EthBridgeCommitteeEvents::BlocklistUpdatedV2Filter(e) => {
251 bump_eth_counter!(if e.is_blocklisted {
252 "validator_blocklisted"
253 } else {
254 "validator_unblocklisted"
255 });
256 }
257 EthBridgeCommitteeEvents::ContractUpgradedFilter(_) => {
258 bump_eth_counter!("committee_contract_upgraded");
259 }
260 },
261 EthBridgeEvent::EthBridgeLimiterEvents(event) => match event {
262 EthBridgeLimiterEvents::InitializedFilter(_) => {
263 bump_eth_counter!("limiter_contract_initialized");
264 }
265 EthBridgeLimiterEvents::UpgradedFilter(_) => {
266 bump_eth_counter!("limiter_contract_upgraded");
267 }
268 EthBridgeLimiterEvents::OwnershipTransferredFilter(_) => {
269 bump_eth_counter!("limiter_contract_ownership_transferred");
270 }
271 EthBridgeLimiterEvents::LimitUpdatedFilter(_) => {
272 bump_eth_counter!("limit_updated");
273 }
274 EthBridgeLimiterEvents::HourlyTransferAmountUpdatedFilter(_) => (),
277 EthBridgeLimiterEvents::ContractUpgradedFilter(_) => {
278 bump_eth_counter!("limiter_contract_upgraded");
279 }
280 EthBridgeLimiterEvents::LimitUpdatedV2Filter(_) => {
281 bump_eth_counter!("limit_updated");
282 }
283 },
284 EthBridgeEvent::EthBridgeConfigEvents(event) => match event {
285 EthBridgeConfigEvents::InitializedFilter(_) => {
286 bump_eth_counter!("config_contract_initialized");
287 }
288 EthBridgeConfigEvents::UpgradedFilter(_) => {
289 bump_eth_counter!("config_contract_upgraded");
290 }
291 EthBridgeConfigEvents::TokenAddedFilter(_) => {
292 bump_eth_counter!("new_token_added");
293 }
294 EthBridgeConfigEvents::TokenPriceUpdatedFilter(_) => {
295 bump_eth_counter!("update_token_price");
296 }
297 EthBridgeConfigEvents::ContractUpgradedFilter(_) => {
298 bump_eth_counter!("config_contract_upgraded");
299 }
300 EthBridgeConfigEvents::TokenPriceUpdatedV2Filter(_) => {
301 bump_eth_counter!("update_token_price");
302 }
303 EthBridgeConfigEvents::TokensAddedV2Filter(_) => {
304 bump_eth_counter!("new_token_added");
305 }
306 },
307 EthBridgeEvent::EthCommitteeUpgradeableContractEvents(event) => match event {
308 EthCommitteeUpgradeableContractEvents::InitializedFilter(_) => {
309 bump_eth_counter!("upgradeable_contract_initialized");
310 }
311 EthCommitteeUpgradeableContractEvents::UpgradedFilter(_) => {
312 bump_eth_counter!("upgradeable_contract_upgraded");
313 }
314 },
315 EthBridgeEvent::EthSuiBridgeEvents(event) => match event {
316 EthSuiBridgeEvents::TokensClaimedFilter(_) => (),
317 EthSuiBridgeEvents::TokensDepositedFilter(_) => (),
318 EthSuiBridgeEvents::PausedFilter(_) => bump_eth_counter!("bridge_paused"),
319 EthSuiBridgeEvents::UnpausedFilter(_) => bump_eth_counter!("bridge_unpaused"),
320 EthSuiBridgeEvents::UpgradedFilter(_) => {
321 bump_eth_counter!("bridge_contract_upgraded")
322 }
323 EthSuiBridgeEvents::InitializedFilter(_) => {
324 bump_eth_counter!("bridge_contract_initialized")
325 }
326 EthSuiBridgeEvents::ContractUpgradedFilter(_) => {
327 bump_eth_counter!("bridge_contract_upgraded")
328 }
329 EthSuiBridgeEvents::EmergencyOperationFilter(e) => {
330 if e.paused {
331 bump_eth_counter!("bridge_paused")
332 } else {
333 bump_eth_counter!("bridge_unpaused")
334 }
335 }
336 },
337 }
338 }
339}
340
341async fn get_latest_bridge_committee_with_url_update_event<C: SuiClientInner>(
342 sui_client: Arc<SuiClient<C>>,
343 event: CommitteeMemberUrlUpdateEvent,
344 staleness_retry_interval: Duration,
345) -> BridgeCommittee {
346 let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
347 loop {
348 let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
349 sui_client.get_bridge_committee(),
350 Duration::from_secs(600)
351 ) else {
352 error!("Failed to get bridge committee after retry");
353 continue;
354 };
355 let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(&event.member));
356 let Some(member) = member else {
357 warn!(
360 "Committee member not found in the committee: {:?}",
361 event.member
362 );
363 return committee;
364 };
365 if member.base_url == event.new_url {
366 return committee;
367 }
368 tokio::time::sleep(staleness_retry_interval).await;
373 remaining_retry_times -= 1;
374 if remaining_retry_times == 0 {
375 warn!(
376 "Committee member url {:?} does not match onchain record {:?} after retry",
377 event.member, member
378 );
379 return committee;
380 }
381 }
382}
383
384async fn get_latest_bridge_committee_with_blocklist_event<C: SuiClientInner>(
385 sui_client: Arc<SuiClient<C>>,
386 event: BlocklistValidatorEvent,
387 staleness_retry_interval: Duration,
388) -> BridgeCommittee {
389 let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
390 loop {
391 let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
392 sui_client.get_bridge_committee(),
393 Duration::from_secs(600)
394 ) else {
395 error!("Failed to get bridge committee after retry");
396 continue;
397 };
398 let mut any_mismatch = false;
399 for pk in &event.public_keys {
400 let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(pk));
401 let Some(member) = member else {
402 warn!("Committee member not found in the committee: {:?}", pk);
406 any_mismatch = true;
407 break;
408 };
409 if member.is_blocklisted != event.blocklisted {
410 warn!(
411 "Committee member blocklist status does not match onchain record: {:?}",
412 member
413 );
414 any_mismatch = true;
415 break;
416 }
417 }
418 if !any_mismatch {
419 return committee;
420 }
421 tokio::time::sleep(staleness_retry_interval).await;
426 remaining_retry_times -= 1;
427 if remaining_retry_times == 0 {
428 warn!(
429 "Committee member blocklist status {:?} does not match onchain record after retry",
430 event
431 );
432 return committee;
433 }
434 }
435}
436
437async fn get_latest_bridge_pause_status_with_emergency_event<C: SuiClientInner>(
438 sui_client: Arc<SuiClient<C>>,
439 event: EmergencyOpEvent,
440 staleness_retry_interval: Duration,
441) -> IsBridgePaused {
442 let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
443 loop {
444 let Ok(Ok(summary)) =
445 retry_with_max_elapsed_time!(sui_client.get_bridge_summary(), Duration::from_secs(600))
446 else {
447 error!("Failed to get bridge summary after retry");
448 continue;
449 };
450 if summary.is_frozen == event.frozen {
451 return summary.is_frozen;
452 }
453 tokio::time::sleep(staleness_retry_interval).await;
458 remaining_retry_times -= 1;
459 if remaining_retry_times == 0 {
460 warn!(
461 "Bridge pause status {:?} does not match onchain record {:?} after retry",
462 event, summary.is_frozen
463 );
464 return summary.is_frozen;
465 }
466 }
467}
468
469#[cfg(test)]
470mod tests {
471 use std::str::FromStr;
472
473 use super::*;
474 use crate::events::{NewTokenEvent, init_all_struct_tags};
475 use crate::test_utils::{
476 bridge_committee_to_bridge_committee_summary, get_test_authority_and_key,
477 };
478 use crate::types::{BRIDGE_PAUSED, BRIDGE_UNPAUSED, BridgeAuthority};
479 use fastcrypto::traits::KeyPair;
480 use prometheus::Registry;
481 use sui_types::base_types::SuiAddress;
482 use sui_types::bridge::BridgeCommitteeSummary;
483 use sui_types::bridge::MoveTypeCommitteeMember;
484 use sui_types::crypto::get_key_pair;
485
486 use crate::{sui_mock_client::SuiMockClient, types::BridgeCommittee};
487 use sui_types::crypto::ToFromBytes;
488
489 #[tokio::test]
490 async fn test_get_latest_bridge_committee_with_url_update_event() {
491 telemetry_subscribers::init_for_testing();
492 let sui_client_mock = SuiMockClient::default();
493 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
494 let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
495 let pk = kp.public().clone();
496 let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
497 let pk_bytes = pk_as_bytes.as_bytes().to_vec();
498 let event = CommitteeMemberUrlUpdateEvent {
499 member: pk,
500 new_url: "http://new.url".to_string(),
501 };
502 let summary = BridgeCommitteeSummary {
503 members: vec![(
504 pk_bytes.clone(),
505 MoveTypeCommitteeMember {
506 sui_address: SuiAddress::random_for_testing_only(),
507 bridge_pubkey_bytes: pk_bytes.clone(),
508 voting_power: 10000,
509 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
510 blocklisted: false,
511 },
512 )],
513 member_registration: vec![],
514 last_committee_update_epoch: 0,
515 };
516
517 sui_client_mock.set_bridge_committee(summary.clone());
519 let timer = std::time::Instant::now();
520 let committee = get_latest_bridge_committee_with_url_update_event(
521 sui_client.clone(),
522 event.clone(),
523 Duration::from_secs(2),
524 )
525 .await;
526 assert_eq!(
527 committee.member(&pk_as_bytes).unwrap().base_url,
528 "http://new.url"
529 );
530 assert!(timer.elapsed().as_millis() < 500);
531
532 let old_summary = BridgeCommitteeSummary {
535 members: vec![(
536 pk_bytes.clone(),
537 MoveTypeCommitteeMember {
538 sui_address: SuiAddress::random_for_testing_only(),
539 bridge_pubkey_bytes: pk_bytes.clone(),
540 voting_power: 10000,
541 http_rest_url: "http://old.url".to_string().as_bytes().to_vec(),
542 blocklisted: false,
543 },
544 )],
545 member_registration: vec![],
546 last_committee_update_epoch: 0,
547 };
548 sui_client_mock.set_bridge_committee(old_summary.clone());
549 let timer = std::time::Instant::now();
550 let sui_client_mock_clone = sui_client_mock.clone();
552 tokio::spawn(async move {
553 tokio::time::sleep(Duration::from_secs(1)).await;
554 sui_client_mock_clone.set_bridge_committee(summary.clone());
555 });
556 let committee = get_latest_bridge_committee_with_url_update_event(
557 sui_client.clone(),
558 event.clone(),
559 Duration::from_secs(2),
560 )
561 .await;
562 assert_eq!(
563 committee.member(&pk_as_bytes).unwrap().base_url,
564 "http://new.url"
565 );
566 let elapsed = timer.elapsed().as_millis();
567 assert!(elapsed > 1000 && elapsed < 3000);
568
569 let newer_summary = BridgeCommitteeSummary {
572 members: vec![(
573 pk_bytes.clone(),
574 MoveTypeCommitteeMember {
575 sui_address: SuiAddress::random_for_testing_only(),
576 bridge_pubkey_bytes: pk_bytes.clone(),
577 voting_power: 10000,
578 http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
579 blocklisted: false,
580 },
581 )],
582 member_registration: vec![],
583 last_committee_update_epoch: 0,
584 };
585 sui_client_mock.set_bridge_committee(newer_summary.clone());
586 let timer = std::time::Instant::now();
587 let committee = get_latest_bridge_committee_with_url_update_event(
588 sui_client.clone(),
589 event.clone(),
590 Duration::from_millis(500),
591 )
592 .await;
593 assert_eq!(
594 committee.member(&pk_as_bytes).unwrap().base_url,
595 "http://newer.url"
596 );
597 let elapsed = timer.elapsed().as_millis();
598 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
599
600 let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
603 let pk2 = kp2.public().clone();
604 let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
605 let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
606 let newer_summary = BridgeCommitteeSummary {
607 members: vec![(
608 pk_bytes2.clone(),
609 MoveTypeCommitteeMember {
610 sui_address: SuiAddress::random_for_testing_only(),
611 bridge_pubkey_bytes: pk_bytes2.clone(),
612 voting_power: 10000,
613 http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
614 blocklisted: false,
615 },
616 )],
617 member_registration: vec![],
618 last_committee_update_epoch: 0,
619 };
620 sui_client_mock.set_bridge_committee(newer_summary.clone());
621 let timer = std::time::Instant::now();
622 let committee = get_latest_bridge_committee_with_url_update_event(
623 sui_client.clone(),
624 event.clone(),
625 Duration::from_secs(1),
626 )
627 .await;
628 assert_eq!(
629 committee.member(&pk_as_bytes2).unwrap().base_url,
630 "http://newer.url"
631 );
632 assert!(committee.member(&pk_as_bytes).is_none());
633 let elapsed = timer.elapsed().as_millis();
634 assert!(elapsed < 1000);
635 }
636
637 #[tokio::test]
638 async fn test_get_latest_bridge_committee_with_blocklist_event() {
639 telemetry_subscribers::init_for_testing();
640 let sui_client_mock = SuiMockClient::default();
641 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
642 let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
643 let pk = kp.public().clone();
644 let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
645 let pk_bytes = pk_as_bytes.as_bytes().to_vec();
646
647 let event = BlocklistValidatorEvent {
649 blocklisted: true,
650 public_keys: vec![pk.clone()],
651 };
652 let summary = BridgeCommitteeSummary {
653 members: vec![(
654 pk_bytes.clone(),
655 MoveTypeCommitteeMember {
656 sui_address: SuiAddress::random_for_testing_only(),
657 bridge_pubkey_bytes: pk_bytes.clone(),
658 voting_power: 10000,
659 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
660 blocklisted: true,
661 },
662 )],
663 member_registration: vec![],
664 last_committee_update_epoch: 0,
665 };
666 sui_client_mock.set_bridge_committee(summary.clone());
667 let timer = std::time::Instant::now();
668 let committee = get_latest_bridge_committee_with_blocklist_event(
669 sui_client.clone(),
670 event.clone(),
671 Duration::from_secs(2),
672 )
673 .await;
674 assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
675 assert!(timer.elapsed().as_millis() < 500);
676
677 let event = BlocklistValidatorEvent {
679 blocklisted: false,
680 public_keys: vec![pk.clone()],
681 };
682 let summary = BridgeCommitteeSummary {
683 members: vec![(
684 pk_bytes.clone(),
685 MoveTypeCommitteeMember {
686 sui_address: SuiAddress::random_for_testing_only(),
687 bridge_pubkey_bytes: pk_bytes.clone(),
688 voting_power: 10000,
689 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
690 blocklisted: false,
691 },
692 )],
693 member_registration: vec![],
694 last_committee_update_epoch: 0,
695 };
696 sui_client_mock.set_bridge_committee(summary.clone());
697 let timer = std::time::Instant::now();
698 let committee = get_latest_bridge_committee_with_blocklist_event(
699 sui_client.clone(),
700 event.clone(),
701 Duration::from_secs(2),
702 )
703 .await;
704 assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
705 assert!(timer.elapsed().as_millis() < 500);
706
707 let old_summary = BridgeCommitteeSummary {
710 members: vec![(
711 pk_bytes.clone(),
712 MoveTypeCommitteeMember {
713 sui_address: SuiAddress::random_for_testing_only(),
714 bridge_pubkey_bytes: pk_bytes.clone(),
715 voting_power: 10000,
716 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
717 blocklisted: true,
718 },
719 )],
720 member_registration: vec![],
721 last_committee_update_epoch: 0,
722 };
723 sui_client_mock.set_bridge_committee(old_summary.clone());
724 let timer = std::time::Instant::now();
725 let sui_client_mock_clone = sui_client_mock.clone();
727 tokio::spawn(async move {
728 tokio::time::sleep(Duration::from_secs(1)).await;
729 sui_client_mock_clone.set_bridge_committee(summary.clone());
730 });
731 let committee = get_latest_bridge_committee_with_blocklist_event(
732 sui_client.clone(),
733 event.clone(),
734 Duration::from_secs(2),
735 )
736 .await;
737 assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
738 let elapsed = timer.elapsed().as_millis();
739 assert!(elapsed > 1000 && elapsed < 3000);
740
741 let newer_summary = BridgeCommitteeSummary {
744 members: vec![(
745 pk_bytes.clone(),
746 MoveTypeCommitteeMember {
747 sui_address: SuiAddress::random_for_testing_only(),
748 bridge_pubkey_bytes: pk_bytes.clone(),
749 voting_power: 10000,
750 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
751 blocklisted: true,
752 },
753 )],
754 member_registration: vec![],
755 last_committee_update_epoch: 0,
756 };
757 sui_client_mock.set_bridge_committee(newer_summary.clone());
758 let timer = std::time::Instant::now();
759 let committee = get_latest_bridge_committee_with_blocklist_event(
760 sui_client.clone(),
761 event.clone(),
762 Duration::from_millis(500),
763 )
764 .await;
765 assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
766 let elapsed = timer.elapsed().as_millis();
767 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
768
769 let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
772 let pk2 = kp2.public().clone();
773 let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
774 let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
775 let summary = BridgeCommitteeSummary {
776 members: vec![(
777 pk_bytes2.clone(),
778 MoveTypeCommitteeMember {
779 sui_address: SuiAddress::random_for_testing_only(),
780 bridge_pubkey_bytes: pk_bytes2.clone(),
781 voting_power: 10000,
782 http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
783 blocklisted: false,
784 },
785 )],
786 member_registration: vec![],
787 last_committee_update_epoch: 0,
788 };
789 sui_client_mock.set_bridge_committee(summary.clone());
790 let timer = std::time::Instant::now();
791 let committee = get_latest_bridge_committee_with_blocklist_event(
792 sui_client.clone(),
793 event.clone(),
794 Duration::from_secs(1),
795 )
796 .await;
797 assert_eq!(
798 committee.member(&pk_as_bytes2).unwrap().base_url,
799 "http://newer.url"
800 );
801 assert!(committee.member(&pk_as_bytes).is_none());
802 let elapsed = timer.elapsed().as_millis();
803 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
804
805 let event = BlocklistValidatorEvent {
807 blocklisted: true,
808 public_keys: vec![pk, pk2],
809 };
810 let summary = BridgeCommitteeSummary {
811 members: vec![
812 (
813 pk_bytes.clone(),
814 MoveTypeCommitteeMember {
815 sui_address: SuiAddress::random_for_testing_only(),
816 bridge_pubkey_bytes: pk_bytes.clone(),
817 voting_power: 5000,
818 http_rest_url: "http://pk.url".to_string().as_bytes().to_vec(),
819 blocklisted: true,
820 },
821 ),
822 (
823 pk_bytes2.clone(),
824 MoveTypeCommitteeMember {
825 sui_address: SuiAddress::random_for_testing_only(),
826 bridge_pubkey_bytes: pk_bytes2.clone(),
827 voting_power: 5000,
828 http_rest_url: "http://pk2.url".to_string().as_bytes().to_vec(),
829 blocklisted: false,
830 },
831 ),
832 ],
833 member_registration: vec![],
834 last_committee_update_epoch: 0,
835 };
836 sui_client_mock.set_bridge_committee(summary.clone());
837 let timer = std::time::Instant::now();
838 let committee = get_latest_bridge_committee_with_blocklist_event(
839 sui_client.clone(),
840 event.clone(),
841 Duration::from_millis(500),
842 )
843 .await;
844 assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
845 assert!(!committee.member(&pk_as_bytes2).unwrap().is_blocklisted);
846 let elapsed = timer.elapsed().as_millis();
847 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
848 }
849
850 #[tokio::test]
851 async fn test_get_bridge_pause_status_with_emergency_event() {
852 telemetry_subscribers::init_for_testing();
853 let sui_client_mock = SuiMockClient::default();
854 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
855
856 let event = EmergencyOpEvent { frozen: true };
858 sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
859 let timer = std::time::Instant::now();
860 assert!(
861 get_latest_bridge_pause_status_with_emergency_event(
862 sui_client.clone(),
863 event.clone(),
864 Duration::from_secs(2),
865 )
866 .await
867 );
868 assert!(timer.elapsed().as_millis() < 500);
869
870 let event = EmergencyOpEvent { frozen: false };
871 sui_client_mock.set_is_bridge_paused(BRIDGE_UNPAUSED);
872 let timer = std::time::Instant::now();
873 assert!(
874 !get_latest_bridge_pause_status_with_emergency_event(
875 sui_client.clone(),
876 event.clone(),
877 Duration::from_secs(2),
878 )
879 .await
880 );
881 assert!(timer.elapsed().as_millis() < 500);
882
883 sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
886 let timer = std::time::Instant::now();
887 let sui_client_mock_clone = sui_client_mock.clone();
889 tokio::spawn(async move {
890 tokio::time::sleep(Duration::from_secs(1)).await;
891 sui_client_mock_clone.set_is_bridge_paused(BRIDGE_UNPAUSED);
892 });
893 assert!(
894 !get_latest_bridge_pause_status_with_emergency_event(
895 sui_client.clone(),
896 event.clone(),
897 Duration::from_secs(2),
898 )
899 .await
900 );
901 let elapsed = timer.elapsed().as_millis();
902 assert!(elapsed > 1000 && elapsed < 3000, "{}", elapsed);
903
904 sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
907 let timer = std::time::Instant::now();
908 assert!(
909 get_latest_bridge_pause_status_with_emergency_event(
910 sui_client.clone(),
911 event.clone(),
912 Duration::from_secs(2),
913 )
914 .await
915 );
916 let elapsed = timer.elapsed().as_millis();
917 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
918 }
919
920 #[tokio::test]
921 async fn test_update_bridge_authority_aggregation_with_url_change_event() {
922 let (
923 sui_monitor_tx,
924 sui_monitor_rx,
925 _eth_monitor_tx,
926 eth_monitor_rx,
927 sui_client_mock,
928 sui_client,
929 bridge_pause_tx,
930 _bridge_pause_rx,
931 mut authorities,
932 bridge_metrics,
933 ) = setup();
934 let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
935 let agg = Arc::new(ArcSwap::new(Arc::new(
936 BridgeAuthorityAggregator::new_for_testing(Arc::new(old_committee)),
937 )));
938 let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
939 let _handle = tokio::task::spawn(
940 BridgeMonitor::new(
941 sui_client.clone(),
942 sui_monitor_rx,
943 eth_monitor_rx,
944 agg.clone(),
945 bridge_pause_tx,
946 sui_token_type_tags,
947 bridge_metrics,
948 )
949 .run(),
950 );
951 let new_url = "http://new.url".to_string();
952 authorities[0].base_url = new_url.clone();
953 let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
954 let new_committee_summary =
955 bridge_committee_to_bridge_committee_summary(new_committee.clone());
956 sui_client_mock.set_bridge_committee(new_committee_summary.clone());
957 sui_monitor_tx
958 .send(SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(
959 CommitteeMemberUrlUpdateEvent {
960 member: authorities[0].pubkey.clone(),
961 new_url: new_url.clone(),
962 },
963 ))
964 .await
965 .unwrap();
966 tokio::time::sleep(Duration::from_secs(1)).await;
968 assert_eq!(
970 agg.load()
971 .committee
972 .member(&BridgeAuthorityPublicKeyBytes::from(&authorities[0].pubkey))
973 .unwrap()
974 .base_url,
975 new_url
976 );
977 }
978
979 #[tokio::test]
980 async fn test_update_bridge_authority_aggregation_with_blocklist_event() {
981 let (
982 sui_monitor_tx,
983 sui_monitor_rx,
984 _eth_monitor_tx,
985 eth_monitor_rx,
986 sui_client_mock,
987 sui_client,
988 bridge_pause_tx,
989 _bridge_pause_rx,
990 mut authorities,
991 bridge_metrics,
992 ) = setup();
993 let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
994 let agg = Arc::new(ArcSwap::new(Arc::new(
995 BridgeAuthorityAggregator::new_for_testing(Arc::new(old_committee)),
996 )));
997 let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
998 let _handle = tokio::task::spawn(
999 BridgeMonitor::new(
1000 sui_client.clone(),
1001 sui_monitor_rx,
1002 eth_monitor_rx,
1003 agg.clone(),
1004 bridge_pause_tx,
1005 sui_token_type_tags,
1006 bridge_metrics,
1007 )
1008 .run(),
1009 );
1010 authorities[0].is_blocklisted = true;
1011 let to_blocklist = &authorities[0];
1012 let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
1013 let new_committee_summary =
1014 bridge_committee_to_bridge_committee_summary(new_committee.clone());
1015 sui_client_mock.set_bridge_committee(new_committee_summary.clone());
1016 sui_monitor_tx
1017 .send(SuiBridgeEvent::BlocklistValidatorEvent(
1018 BlocklistValidatorEvent {
1019 public_keys: vec![to_blocklist.pubkey.clone()],
1020 blocklisted: true,
1021 },
1022 ))
1023 .await
1024 .unwrap();
1025 tokio::time::sleep(Duration::from_secs(1)).await;
1027 assert!(
1028 agg.load()
1029 .committee
1030 .member(&BridgeAuthorityPublicKeyBytes::from(&to_blocklist.pubkey))
1031 .unwrap()
1032 .is_blocklisted,
1033 );
1034 }
1035
1036 #[tokio::test]
1037 async fn test_update_bridge_pause_status_with_emergency_event() {
1038 let (
1039 sui_monitor_tx,
1040 sui_monitor_rx,
1041 _eth_monitor_tx,
1042 eth_monitor_rx,
1043 sui_client_mock,
1044 sui_client,
1045 bridge_pause_tx,
1046 bridge_pause_rx,
1047 authorities,
1048 bridge_metrics,
1049 ) = setup();
1050 let event = EmergencyOpEvent {
1051 frozen: !*bridge_pause_tx.borrow(), };
1053 let committee = BridgeCommittee::new(authorities.clone()).unwrap();
1054 let agg = Arc::new(ArcSwap::new(Arc::new(
1055 BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1056 )));
1057 let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1058 let _handle = tokio::task::spawn(
1059 BridgeMonitor::new(
1060 sui_client.clone(),
1061 sui_monitor_rx,
1062 eth_monitor_rx,
1063 agg.clone(),
1064 bridge_pause_tx,
1065 sui_token_type_tags,
1066 bridge_metrics,
1067 )
1068 .run(),
1069 );
1070
1071 sui_client_mock.set_is_bridge_paused(event.frozen);
1072 sui_monitor_tx
1073 .send(SuiBridgeEvent::EmergencyOpEvent(event.clone()))
1074 .await
1075 .unwrap();
1076 tokio::time::sleep(Duration::from_secs(1)).await;
1078 assert!(*bridge_pause_rx.borrow() == event.frozen);
1080 }
1081
1082 #[tokio::test]
1083 async fn test_update_sui_token_type_tags() {
1084 let (
1085 sui_monitor_tx,
1086 sui_monitor_rx,
1087 _eth_monitor_tx,
1088 eth_monitor_rx,
1089 _sui_client_mock,
1090 sui_client,
1091 bridge_pause_tx,
1092 _bridge_pause_rx,
1093 authorities,
1094 bridge_metrics,
1095 ) = setup();
1096 let event = NewTokenEvent {
1097 token_id: 255,
1098 type_name: TypeTag::from_str("0xbeef::beef::BEEF").unwrap(),
1099 native_token: false,
1100 decimal_multiplier: 1000000,
1101 notional_value: 100000000,
1102 };
1103 let committee = BridgeCommittee::new(authorities.clone()).unwrap();
1104 let agg = Arc::new(ArcSwap::new(Arc::new(
1105 BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1106 )));
1107 let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1108 let sui_token_type_tags_clone = sui_token_type_tags.clone();
1109 let _handle = tokio::task::spawn(
1110 BridgeMonitor::new(
1111 sui_client.clone(),
1112 sui_monitor_rx,
1113 eth_monitor_rx,
1114 agg.clone(),
1115 bridge_pause_tx,
1116 sui_token_type_tags_clone,
1117 bridge_metrics,
1118 )
1119 .run(),
1120 );
1121
1122 sui_monitor_tx
1123 .send(SuiBridgeEvent::NewTokenEvent(event.clone()))
1124 .await
1125 .unwrap();
1126 tokio::time::sleep(Duration::from_secs(1)).await;
1128 sui_token_type_tags
1130 .load()
1131 .clone()
1132 .get(&event.token_id)
1133 .unwrap();
1134 }
1135
1136 #[allow(clippy::type_complexity)]
1137 fn setup() -> (
1138 mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
1139 mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
1140 mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
1141 mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
1142 SuiMockClient,
1143 Arc<SuiClient<SuiMockClient>>,
1144 tokio::sync::watch::Sender<IsBridgePaused>,
1145 tokio::sync::watch::Receiver<IsBridgePaused>,
1146 Vec<BridgeAuthority>,
1147 Arc<BridgeMetrics>,
1148 ) {
1149 telemetry_subscribers::init_for_testing();
1150 let registry = Registry::new();
1151 mysten_metrics::init_metrics(®istry);
1152 init_all_struct_tags();
1153 let bridge_metrics = Arc::new(BridgeMetrics::new_for_testing());
1154
1155 let sui_client_mock = SuiMockClient::default();
1156 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1157 let (sui_monitor_tx, sui_monitor_rx) = mysten_metrics::metered_channel::channel(
1158 10000,
1159 &mysten_metrics::get_metrics()
1160 .unwrap()
1161 .channel_inflight
1162 .with_label_values(&["sui_monitor_queue"]),
1163 );
1164 let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
1165 10000,
1166 &mysten_metrics::get_metrics()
1167 .unwrap()
1168 .channel_inflight
1169 .with_label_values(&["eth_monitor_queue"]),
1170 );
1171
1172 let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1173 let authorities = vec![
1174 get_test_authority_and_key(2500, 0 ).0,
1175 get_test_authority_and_key(2500, 0 ).0,
1176 get_test_authority_and_key(2500, 0 ).0,
1177 get_test_authority_and_key(2500, 0 ).0,
1178 ];
1179 (
1180 sui_monitor_tx,
1181 sui_monitor_rx,
1182 eth_monitor_tx,
1183 eth_monitor_rx,
1184 sui_client_mock,
1185 sui_client,
1186 bridge_pause_tx,
1187 bridge_pause_rx,
1188 authorities,
1189 bridge_metrics,
1190 )
1191 }
1192}