1use crate::abi::{
8 EthBridgeCommitteeEvents, EthBridgeConfigEvents, EthBridgeEvent, EthBridgeLimiterEvents,
9 EthCommitteeUpgradeableContractEvents, EthSuiBridgeEvents,
10};
11use crate::client::bridge_authority_aggregator::BridgeAuthorityAggregator;
12use crate::crypto::BridgeAuthorityPublicKeyBytes;
13use crate::events::{BlocklistValidatorEvent, CommitteeMemberUrlUpdateEvent};
14use crate::events::{EmergencyOpEvent, SuiBridgeEvent};
15use crate::metrics::BridgeMetrics;
16use crate::retry_with_max_elapsed_time;
17use crate::sui_client::{SuiClient, SuiClientInner};
18use crate::types::{BridgeCommittee, IsBridgePaused};
19use arc_swap::ArcSwap;
20use std::collections::HashMap;
21use std::sync::Arc;
22use sui_types::TypeTag;
23use tokio::time::Duration;
24use tracing::{error, info, warn};
25
26const REFRESH_BRIDGE_RETRY_TIMES: u64 = 3;
27
28pub struct BridgeMonitor<C> {
29 sui_client: Arc<SuiClient<C>>,
30 sui_monitor_rx: mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
31 eth_monitor_rx: mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
32 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
33 bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
34 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
35 bridge_metrics: Arc<BridgeMetrics>,
36}
37
38impl<C> BridgeMonitor<C>
39where
40 C: SuiClientInner + 'static,
41{
42 pub fn new(
43 sui_client: Arc<SuiClient<C>>,
44 sui_monitor_rx: mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
45 eth_monitor_rx: mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
46 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
47 bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
48 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
49 bridge_metrics: Arc<BridgeMetrics>,
50 ) -> Self {
51 Self {
52 sui_client,
53 sui_monitor_rx,
54 eth_monitor_rx,
55 bridge_auth_agg,
56 bridge_paused_watch_tx,
57 sui_token_type_tags,
58 bridge_metrics,
59 }
60 }
61
62 pub async fn run(self) {
63 tracing::info!("Starting BridgeMonitor");
64 let Self {
65 sui_client,
66 mut sui_monitor_rx,
67 mut eth_monitor_rx,
68 bridge_auth_agg,
69 bridge_paused_watch_tx,
70 sui_token_type_tags,
71 bridge_metrics,
72 } = self;
73 let mut latest_token_config = (*sui_token_type_tags.load().clone()).clone();
74
75 loop {
76 tokio::select! {
77 sui_event = sui_monitor_rx.recv() => {
78 if let Some(sui_event) = sui_event {
79 Self::handle_sui_events(
80 sui_event,
81 &sui_client,
82 &bridge_auth_agg,
83 &bridge_paused_watch_tx,
84 &sui_token_type_tags,
85 &bridge_metrics,
86 &mut latest_token_config,
87 )
88 .await;
89 } else {
90 panic!("BridgeMonitor sui events channel was closed unexpectedly");
91 }
92 }
93 eth_event = eth_monitor_rx.recv() => {
94 if let Some(eth_event) = eth_event {
95 Self::handle_eth_events(eth_event, &bridge_metrics);
96 } else {
97 panic!("BridgeMonitor eth events channel was closed unexpectedly");
98 }
99 }
100 }
101 }
102 }
103
104 async fn handle_sui_events(
105 event: SuiBridgeEvent,
106 sui_client: &Arc<SuiClient<C>>,
107 bridge_auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
108 bridge_paused_watch_tx: &tokio::sync::watch::Sender<IsBridgePaused>,
109 sui_token_type_tags: &Arc<ArcSwap<HashMap<u8, TypeTag>>>,
110 bridge_metrics: &Arc<BridgeMetrics>,
111 latest_token_config: &mut HashMap<u8, TypeTag>,
112 ) {
113 info!("Received SuiBridgeEvent: {:?}", event);
114 macro_rules! bump_sui_counter {
115 ($action:expr) => {
116 bridge_metrics
117 .observed_governance_actions
118 .with_label_values(&[$action, "sui"])
119 .inc();
120 };
121 }
122
123 match event {
124 SuiBridgeEvent::SuiToEthTokenBridgeV1(_) => (),
125 SuiBridgeEvent::SuiToEthTokenBridgeV2(_) => (),
126 SuiBridgeEvent::TokenTransferApproved(_) => (),
127 SuiBridgeEvent::TokenTransferClaimed(_) => (),
128 SuiBridgeEvent::TokenTransferAlreadyApproved(_) => (),
129 SuiBridgeEvent::TokenTransferAlreadyClaimed(_) => (),
130 SuiBridgeEvent::TokenTransferLimitExceed(_) => {
131 }
133
134 SuiBridgeEvent::EmergencyOpEvent(event) => {
135 bump_sui_counter!(if event.frozen {
136 "bridge_paused"
137 } else {
138 "bridge_unpaused"
139 });
140 let is_paused = get_latest_bridge_pause_status_with_emergency_event(
141 sui_client.clone(),
142 event,
143 Duration::from_secs(10),
144 )
145 .await;
146 bridge_paused_watch_tx
147 .send(is_paused)
148 .expect("Bridge pause status watch channel should not be closed");
149 }
150
151 SuiBridgeEvent::CommitteeMemberRegistration(_) => {
152 bump_sui_counter!("committee_member_registered");
153 }
154 SuiBridgeEvent::CommitteeUpdateEvent(_) => {
155 bump_sui_counter!("committee_updated");
156 }
157
158 SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(event) => {
159 bump_sui_counter!("committee_member_url_updated");
160 let new_committee = get_latest_bridge_committee_with_url_update_event(
161 sui_client.clone(),
162 event,
163 Duration::from_secs(10),
164 )
165 .await;
166 let committee_names = bridge_auth_agg.load().committee_keys_to_names.clone();
167 bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(
168 Arc::new(new_committee),
169 bridge_metrics.clone(),
170 committee_names,
171 )));
172 info!("Committee updated with CommitteeMemberUrlUpdateEvent");
173 }
174
175 SuiBridgeEvent::BlocklistValidatorEvent(event) => {
176 bump_sui_counter!(if event.blocklisted {
177 "validator_blocklisted"
178 } else {
179 "validator_unblocklisted"
180 });
181 let new_committee = get_latest_bridge_committee_with_blocklist_event(
182 sui_client.clone(),
183 event,
184 Duration::from_secs(10),
185 )
186 .await;
187 let committee_names = bridge_auth_agg.load().committee_keys_to_names.clone();
188 bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(
189 Arc::new(new_committee),
190 bridge_metrics.clone(),
191 committee_names,
192 )));
193 info!("Committee updated with BlocklistValidatorEvent");
194 }
195
196 SuiBridgeEvent::TokenRegistrationEvent(_) => {
197 bump_sui_counter!("new_token_registered");
198 }
199
200 SuiBridgeEvent::NewTokenEvent(event) => {
201 bump_sui_counter!("new_token_added");
202 if let std::collections::hash_map::Entry::Vacant(entry) =
203 latest_token_config.entry(event.token_id)
205 {
206 entry.insert(event.type_name.clone());
207 sui_token_type_tags.store(Arc::new(latest_token_config.clone()));
208 } else {
209 assert_eq!(event.type_name, latest_token_config[&event.token_id]);
211 }
212 }
213
214 SuiBridgeEvent::UpdateTokenPriceEvent(_event) => {
215 bump_sui_counter!("token_price_updated");
216 }
217
218 SuiBridgeEvent::UpdateRouteLimitEvent(_event) => {
219 bump_sui_counter!("limit_updated");
220 }
221 }
222 }
223
224 fn handle_eth_events(event: EthBridgeEvent, bridge_metrics: &Arc<BridgeMetrics>) {
225 info!("Received EthBridgeEvent: {:?}", event);
226
227 macro_rules! bump_eth_counter {
228 ($action:expr) => {
229 bridge_metrics
230 .observed_governance_actions
231 .with_label_values(&[$action, "eth"])
232 .inc()
233 };
234 }
235
236 match event {
237 EthBridgeEvent::EthBridgeCommitteeEvents(event) => match event {
238 EthBridgeCommitteeEvents::BlocklistUpdated(event) => {
239 bump_eth_counter!(if event.isBlocklisted {
240 "validator_blocklisted"
241 } else {
242 "validator_unblocklisted"
243 });
244 }
245 EthBridgeCommitteeEvents::Initialized(_) => {
246 bump_eth_counter!("committee_contract_initialized");
247 }
248 EthBridgeCommitteeEvents::Upgraded(_) => {
249 bump_eth_counter!("committee_contract_upgraded");
250 }
251 EthBridgeCommitteeEvents::BlocklistUpdatedV2(e) => {
252 bump_eth_counter!(if e.isBlocklisted {
253 "validator_blocklisted"
254 } else {
255 "validator_unblocklisted"
256 });
257 }
258 EthBridgeCommitteeEvents::ContractUpgraded(_) => {
259 bump_eth_counter!("committee_contract_upgraded");
260 }
261 },
262 EthBridgeEvent::EthBridgeLimiterEvents(event) => match event {
263 EthBridgeLimiterEvents::Initialized(_) => {
264 bump_eth_counter!("limiter_contract_initialized");
265 }
266 EthBridgeLimiterEvents::Upgraded(_) => {
267 bump_eth_counter!("limiter_contract_upgraded");
268 }
269 EthBridgeLimiterEvents::OwnershipTransferred(_) => {
270 bump_eth_counter!("limiter_contract_ownership_transferred");
271 }
272 EthBridgeLimiterEvents::LimitUpdated(_) => {
273 bump_eth_counter!("limit_updated");
274 }
275 EthBridgeLimiterEvents::HourlyTransferAmountUpdated(_) => (),
278 EthBridgeLimiterEvents::ContractUpgraded(_) => {
279 bump_eth_counter!("limiter_contract_upgraded");
280 }
281 EthBridgeLimiterEvents::LimitUpdatedV2(_) => {
282 bump_eth_counter!("limit_updated");
283 }
284 },
285 EthBridgeEvent::EthBridgeConfigEvents(event) => match event {
286 EthBridgeConfigEvents::Initialized(_) => {
287 bump_eth_counter!("config_contract_initialized");
288 }
289 EthBridgeConfigEvents::Upgraded(_) => {
290 bump_eth_counter!("config_contract_upgraded");
291 }
292 EthBridgeConfigEvents::TokenAdded(_) => {
293 bump_eth_counter!("new_token_added");
294 }
295 EthBridgeConfigEvents::TokenPriceUpdated(_) => {
296 bump_eth_counter!("update_token_price");
297 }
298 EthBridgeConfigEvents::ContractUpgraded(_) => {
299 bump_eth_counter!("config_contract_upgraded");
300 }
301 EthBridgeConfigEvents::TokenPriceUpdatedV2(_) => {
302 bump_eth_counter!("update_token_price");
303 }
304 EthBridgeConfigEvents::TokensAddedV2(_) => {
305 bump_eth_counter!("new_token_added");
306 }
307 },
308 EthBridgeEvent::EthCommitteeUpgradeableContractEvents(event) => match event {
309 EthCommitteeUpgradeableContractEvents::Initialized(_) => {
310 bump_eth_counter!("upgradeable_contract_initialized");
311 }
312 EthCommitteeUpgradeableContractEvents::Upgraded(_) => {
313 bump_eth_counter!("upgradeable_contract_upgraded");
314 }
315 },
316 EthBridgeEvent::EthSuiBridgeEvents(event) => match event {
317 EthSuiBridgeEvents::TokensClaimed(_) => (),
318 EthSuiBridgeEvents::TokensDeposited(_) => (),
319 EthSuiBridgeEvents::TokensDepositedV2(_) => (),
320 EthSuiBridgeEvents::Paused(_) => bump_eth_counter!("bridge_paused"),
321 EthSuiBridgeEvents::Unpaused(_) => bump_eth_counter!("bridge_unpaused"),
322 EthSuiBridgeEvents::Upgraded(_) => {
323 bump_eth_counter!("bridge_contract_upgraded")
324 }
325 EthSuiBridgeEvents::Initialized(_) => {
326 bump_eth_counter!("bridge_contract_initialized")
327 }
328 EthSuiBridgeEvents::ContractUpgraded(_) => {
329 bump_eth_counter!("bridge_contract_upgraded")
330 }
331 EthSuiBridgeEvents::EmergencyOperation(e) => {
332 if e.paused {
333 bump_eth_counter!("bridge_paused")
334 } else {
335 bump_eth_counter!("bridge_unpaused")
336 }
337 }
338 },
339 }
340 }
341}
342
343async fn get_latest_bridge_committee_with_url_update_event<C: SuiClientInner>(
344 sui_client: Arc<SuiClient<C>>,
345 event: CommitteeMemberUrlUpdateEvent,
346 staleness_retry_interval: Duration,
347) -> BridgeCommittee {
348 let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
349 loop {
350 let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
351 sui_client.get_bridge_committee(),
352 Duration::from_secs(600)
353 ) else {
354 error!("Failed to get bridge committee after retry");
355 continue;
356 };
357 let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(&event.member));
358 let Some(member) = member else {
359 warn!(
362 "Committee member not found in the committee: {:?}",
363 event.member
364 );
365 return committee;
366 };
367 if member.base_url == event.new_url {
368 return committee;
369 }
370 tokio::time::sleep(staleness_retry_interval).await;
375 remaining_retry_times -= 1;
376 if remaining_retry_times == 0 {
377 warn!(
378 "Committee member url {:?} does not match onchain record {:?} after retry",
379 event.member, member
380 );
381 return committee;
382 }
383 }
384}
385
386async fn get_latest_bridge_committee_with_blocklist_event<C: SuiClientInner>(
387 sui_client: Arc<SuiClient<C>>,
388 event: BlocklistValidatorEvent,
389 staleness_retry_interval: Duration,
390) -> BridgeCommittee {
391 let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
392 loop {
393 let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
394 sui_client.get_bridge_committee(),
395 Duration::from_secs(600)
396 ) else {
397 error!("Failed to get bridge committee after retry");
398 continue;
399 };
400 let mut any_mismatch = false;
401 for pk in &event.public_keys {
402 let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(pk));
403 let Some(member) = member else {
404 warn!("Committee member not found in the committee: {:?}", pk);
408 any_mismatch = true;
409 break;
410 };
411 if member.is_blocklisted != event.blocklisted {
412 warn!(
413 "Committee member blocklist status does not match onchain record: {:?}",
414 member
415 );
416 any_mismatch = true;
417 break;
418 }
419 }
420 if !any_mismatch {
421 return committee;
422 }
423 tokio::time::sleep(staleness_retry_interval).await;
428 remaining_retry_times -= 1;
429 if remaining_retry_times == 0 {
430 warn!(
431 "Committee member blocklist status {:?} does not match onchain record after retry",
432 event
433 );
434 return committee;
435 }
436 }
437}
438
439async fn get_latest_bridge_pause_status_with_emergency_event<C: SuiClientInner>(
440 sui_client: Arc<SuiClient<C>>,
441 event: EmergencyOpEvent,
442 staleness_retry_interval: Duration,
443) -> IsBridgePaused {
444 let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
445 loop {
446 let Ok(Ok(summary)) =
447 retry_with_max_elapsed_time!(sui_client.get_bridge_summary(), Duration::from_secs(600))
448 else {
449 error!("Failed to get bridge summary after retry");
450 continue;
451 };
452 if summary.is_frozen == event.frozen {
453 return summary.is_frozen;
454 }
455 tokio::time::sleep(staleness_retry_interval).await;
460 remaining_retry_times -= 1;
461 if remaining_retry_times == 0 {
462 warn!(
463 "Bridge pause status {:?} does not match onchain record {:?} after retry",
464 event, summary.is_frozen
465 );
466 return summary.is_frozen;
467 }
468 }
469}
470
471#[cfg(test)]
472mod tests {
473 use std::str::FromStr;
474
475 use super::*;
476 use crate::events::{NewTokenEvent, init_all_struct_tags};
477 use crate::test_utils::{
478 bridge_committee_to_bridge_committee_summary, get_test_authority_and_key,
479 };
480 use crate::types::{BRIDGE_PAUSED, BRIDGE_UNPAUSED, BridgeAuthority};
481 use fastcrypto::traits::KeyPair;
482 use prometheus::Registry;
483 use sui_types::base_types::SuiAddress;
484 use sui_types::bridge::BridgeCommitteeSummary;
485 use sui_types::bridge::MoveTypeCommitteeMember;
486 use sui_types::crypto::get_key_pair;
487
488 use crate::{sui_mock_client::SuiMockClient, types::BridgeCommittee};
489 use sui_types::crypto::ToFromBytes;
490
491 #[tokio::test]
492 async fn test_get_latest_bridge_committee_with_url_update_event() {
493 telemetry_subscribers::init_for_testing();
494 let sui_client_mock = SuiMockClient::default();
495 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
496 let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
497 let pk = kp.public().clone();
498 let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
499 let pk_bytes = pk_as_bytes.as_bytes().to_vec();
500 let event = CommitteeMemberUrlUpdateEvent {
501 member: pk,
502 new_url: "http://new.url".to_string(),
503 };
504 let summary = BridgeCommitteeSummary {
505 members: vec![(
506 pk_bytes.clone(),
507 MoveTypeCommitteeMember {
508 sui_address: SuiAddress::random_for_testing_only(),
509 bridge_pubkey_bytes: pk_bytes.clone(),
510 voting_power: 10000,
511 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
512 blocklisted: false,
513 },
514 )],
515 member_registration: vec![],
516 last_committee_update_epoch: 0,
517 };
518
519 sui_client_mock.set_bridge_committee(summary.clone());
521 let timer = std::time::Instant::now();
522 let committee = get_latest_bridge_committee_with_url_update_event(
523 sui_client.clone(),
524 event.clone(),
525 Duration::from_secs(2),
526 )
527 .await;
528 assert_eq!(
529 committee.member(&pk_as_bytes).unwrap().base_url,
530 "http://new.url"
531 );
532 assert!(timer.elapsed().as_millis() < 500);
533
534 let old_summary = BridgeCommitteeSummary {
537 members: vec![(
538 pk_bytes.clone(),
539 MoveTypeCommitteeMember {
540 sui_address: SuiAddress::random_for_testing_only(),
541 bridge_pubkey_bytes: pk_bytes.clone(),
542 voting_power: 10000,
543 http_rest_url: "http://old.url".to_string().as_bytes().to_vec(),
544 blocklisted: false,
545 },
546 )],
547 member_registration: vec![],
548 last_committee_update_epoch: 0,
549 };
550 sui_client_mock.set_bridge_committee(old_summary.clone());
551 let timer = std::time::Instant::now();
552 let sui_client_mock_clone = sui_client_mock.clone();
554 tokio::spawn(async move {
555 tokio::time::sleep(Duration::from_secs(1)).await;
556 sui_client_mock_clone.set_bridge_committee(summary.clone());
557 });
558 let committee = get_latest_bridge_committee_with_url_update_event(
559 sui_client.clone(),
560 event.clone(),
561 Duration::from_secs(2),
562 )
563 .await;
564 assert_eq!(
565 committee.member(&pk_as_bytes).unwrap().base_url,
566 "http://new.url"
567 );
568 let elapsed = timer.elapsed().as_millis();
569 assert!(elapsed > 1000 && elapsed < 3000);
570
571 let newer_summary = BridgeCommitteeSummary {
574 members: vec![(
575 pk_bytes.clone(),
576 MoveTypeCommitteeMember {
577 sui_address: SuiAddress::random_for_testing_only(),
578 bridge_pubkey_bytes: pk_bytes.clone(),
579 voting_power: 10000,
580 http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
581 blocklisted: false,
582 },
583 )],
584 member_registration: vec![],
585 last_committee_update_epoch: 0,
586 };
587 sui_client_mock.set_bridge_committee(newer_summary.clone());
588 let timer = std::time::Instant::now();
589 let committee = get_latest_bridge_committee_with_url_update_event(
590 sui_client.clone(),
591 event.clone(),
592 Duration::from_millis(500),
593 )
594 .await;
595 assert_eq!(
596 committee.member(&pk_as_bytes).unwrap().base_url,
597 "http://newer.url"
598 );
599 let elapsed = timer.elapsed().as_millis();
600 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
601
602 let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
605 let pk2 = kp2.public().clone();
606 let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
607 let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
608 let newer_summary = BridgeCommitteeSummary {
609 members: vec![(
610 pk_bytes2.clone(),
611 MoveTypeCommitteeMember {
612 sui_address: SuiAddress::random_for_testing_only(),
613 bridge_pubkey_bytes: pk_bytes2.clone(),
614 voting_power: 10000,
615 http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
616 blocklisted: false,
617 },
618 )],
619 member_registration: vec![],
620 last_committee_update_epoch: 0,
621 };
622 sui_client_mock.set_bridge_committee(newer_summary.clone());
623 let timer = std::time::Instant::now();
624 let committee = get_latest_bridge_committee_with_url_update_event(
625 sui_client.clone(),
626 event.clone(),
627 Duration::from_secs(1),
628 )
629 .await;
630 assert_eq!(
631 committee.member(&pk_as_bytes2).unwrap().base_url,
632 "http://newer.url"
633 );
634 assert!(committee.member(&pk_as_bytes).is_none());
635 let elapsed = timer.elapsed().as_millis();
636 assert!(elapsed < 1000);
637 }
638
639 #[tokio::test]
640 async fn test_get_latest_bridge_committee_with_blocklist_event() {
641 telemetry_subscribers::init_for_testing();
642 let sui_client_mock = SuiMockClient::default();
643 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
644 let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
645 let pk = kp.public().clone();
646 let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
647 let pk_bytes = pk_as_bytes.as_bytes().to_vec();
648
649 let event = BlocklistValidatorEvent {
651 blocklisted: true,
652 public_keys: vec![pk.clone()],
653 };
654 let summary = BridgeCommitteeSummary {
655 members: vec![(
656 pk_bytes.clone(),
657 MoveTypeCommitteeMember {
658 sui_address: SuiAddress::random_for_testing_only(),
659 bridge_pubkey_bytes: pk_bytes.clone(),
660 voting_power: 10000,
661 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
662 blocklisted: true,
663 },
664 )],
665 member_registration: vec![],
666 last_committee_update_epoch: 0,
667 };
668 sui_client_mock.set_bridge_committee(summary.clone());
669 let timer = std::time::Instant::now();
670 let committee = get_latest_bridge_committee_with_blocklist_event(
671 sui_client.clone(),
672 event.clone(),
673 Duration::from_secs(2),
674 )
675 .await;
676 assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
677 assert!(timer.elapsed().as_millis() < 500);
678
679 let event = BlocklistValidatorEvent {
681 blocklisted: false,
682 public_keys: vec![pk.clone()],
683 };
684 let summary = BridgeCommitteeSummary {
685 members: vec![(
686 pk_bytes.clone(),
687 MoveTypeCommitteeMember {
688 sui_address: SuiAddress::random_for_testing_only(),
689 bridge_pubkey_bytes: pk_bytes.clone(),
690 voting_power: 10000,
691 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
692 blocklisted: false,
693 },
694 )],
695 member_registration: vec![],
696 last_committee_update_epoch: 0,
697 };
698 sui_client_mock.set_bridge_committee(summary.clone());
699 let timer = std::time::Instant::now();
700 let committee = get_latest_bridge_committee_with_blocklist_event(
701 sui_client.clone(),
702 event.clone(),
703 Duration::from_secs(2),
704 )
705 .await;
706 assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
707 assert!(timer.elapsed().as_millis() < 500);
708
709 let old_summary = BridgeCommitteeSummary {
712 members: vec![(
713 pk_bytes.clone(),
714 MoveTypeCommitteeMember {
715 sui_address: SuiAddress::random_for_testing_only(),
716 bridge_pubkey_bytes: pk_bytes.clone(),
717 voting_power: 10000,
718 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
719 blocklisted: true,
720 },
721 )],
722 member_registration: vec![],
723 last_committee_update_epoch: 0,
724 };
725 sui_client_mock.set_bridge_committee(old_summary.clone());
726 let timer = std::time::Instant::now();
727 let sui_client_mock_clone = sui_client_mock.clone();
729 tokio::spawn(async move {
730 tokio::time::sleep(Duration::from_secs(1)).await;
731 sui_client_mock_clone.set_bridge_committee(summary.clone());
732 });
733 let committee = get_latest_bridge_committee_with_blocklist_event(
734 sui_client.clone(),
735 event.clone(),
736 Duration::from_secs(2),
737 )
738 .await;
739 assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
740 let elapsed = timer.elapsed().as_millis();
741 assert!(elapsed > 1000 && elapsed < 3000);
742
743 let newer_summary = BridgeCommitteeSummary {
746 members: vec![(
747 pk_bytes.clone(),
748 MoveTypeCommitteeMember {
749 sui_address: SuiAddress::random_for_testing_only(),
750 bridge_pubkey_bytes: pk_bytes.clone(),
751 voting_power: 10000,
752 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
753 blocklisted: true,
754 },
755 )],
756 member_registration: vec![],
757 last_committee_update_epoch: 0,
758 };
759 sui_client_mock.set_bridge_committee(newer_summary.clone());
760 let timer = std::time::Instant::now();
761 let committee = get_latest_bridge_committee_with_blocklist_event(
762 sui_client.clone(),
763 event.clone(),
764 Duration::from_millis(500),
765 )
766 .await;
767 assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
768 let elapsed = timer.elapsed().as_millis();
769 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
770
771 let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
774 let pk2 = kp2.public().clone();
775 let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
776 let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
777 let summary = BridgeCommitteeSummary {
778 members: vec![(
779 pk_bytes2.clone(),
780 MoveTypeCommitteeMember {
781 sui_address: SuiAddress::random_for_testing_only(),
782 bridge_pubkey_bytes: pk_bytes2.clone(),
783 voting_power: 10000,
784 http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
785 blocklisted: false,
786 },
787 )],
788 member_registration: vec![],
789 last_committee_update_epoch: 0,
790 };
791 sui_client_mock.set_bridge_committee(summary.clone());
792 let timer = std::time::Instant::now();
793 let committee = get_latest_bridge_committee_with_blocklist_event(
794 sui_client.clone(),
795 event.clone(),
796 Duration::from_secs(1),
797 )
798 .await;
799 assert_eq!(
800 committee.member(&pk_as_bytes2).unwrap().base_url,
801 "http://newer.url"
802 );
803 assert!(committee.member(&pk_as_bytes).is_none());
804 let elapsed = timer.elapsed().as_millis();
805 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
806
807 let event = BlocklistValidatorEvent {
809 blocklisted: true,
810 public_keys: vec![pk, pk2],
811 };
812 let summary = BridgeCommitteeSummary {
813 members: vec![
814 (
815 pk_bytes.clone(),
816 MoveTypeCommitteeMember {
817 sui_address: SuiAddress::random_for_testing_only(),
818 bridge_pubkey_bytes: pk_bytes.clone(),
819 voting_power: 5000,
820 http_rest_url: "http://pk.url".to_string().as_bytes().to_vec(),
821 blocklisted: true,
822 },
823 ),
824 (
825 pk_bytes2.clone(),
826 MoveTypeCommitteeMember {
827 sui_address: SuiAddress::random_for_testing_only(),
828 bridge_pubkey_bytes: pk_bytes2.clone(),
829 voting_power: 5000,
830 http_rest_url: "http://pk2.url".to_string().as_bytes().to_vec(),
831 blocklisted: false,
832 },
833 ),
834 ],
835 member_registration: vec![],
836 last_committee_update_epoch: 0,
837 };
838 sui_client_mock.set_bridge_committee(summary.clone());
839 let timer = std::time::Instant::now();
840 let committee = get_latest_bridge_committee_with_blocklist_event(
841 sui_client.clone(),
842 event.clone(),
843 Duration::from_millis(500),
844 )
845 .await;
846 assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
847 assert!(!committee.member(&pk_as_bytes2).unwrap().is_blocklisted);
848 let elapsed = timer.elapsed().as_millis();
849 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
850 }
851
852 #[tokio::test]
853 async fn test_get_bridge_pause_status_with_emergency_event() {
854 telemetry_subscribers::init_for_testing();
855 let sui_client_mock = SuiMockClient::default();
856 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
857
858 let event = EmergencyOpEvent { frozen: true };
860 sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
861 let timer = std::time::Instant::now();
862 assert!(
863 get_latest_bridge_pause_status_with_emergency_event(
864 sui_client.clone(),
865 event.clone(),
866 Duration::from_secs(2),
867 )
868 .await
869 );
870 assert!(timer.elapsed().as_millis() < 500);
871
872 let event = EmergencyOpEvent { frozen: false };
873 sui_client_mock.set_is_bridge_paused(BRIDGE_UNPAUSED);
874 let timer = std::time::Instant::now();
875 assert!(
876 !get_latest_bridge_pause_status_with_emergency_event(
877 sui_client.clone(),
878 event.clone(),
879 Duration::from_secs(2),
880 )
881 .await
882 );
883 assert!(timer.elapsed().as_millis() < 500);
884
885 sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
888 let timer = std::time::Instant::now();
889 let sui_client_mock_clone = sui_client_mock.clone();
891 tokio::spawn(async move {
892 tokio::time::sleep(Duration::from_secs(1)).await;
893 sui_client_mock_clone.set_is_bridge_paused(BRIDGE_UNPAUSED);
894 });
895 assert!(
896 !get_latest_bridge_pause_status_with_emergency_event(
897 sui_client.clone(),
898 event.clone(),
899 Duration::from_secs(2),
900 )
901 .await
902 );
903 let elapsed = timer.elapsed().as_millis();
904 assert!(elapsed > 1000 && elapsed < 3000, "{}", elapsed);
905
906 sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
909 let timer = std::time::Instant::now();
910 assert!(
911 get_latest_bridge_pause_status_with_emergency_event(
912 sui_client.clone(),
913 event.clone(),
914 Duration::from_secs(2),
915 )
916 .await
917 );
918 let elapsed = timer.elapsed().as_millis();
919 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
920 }
921
922 #[tokio::test]
923 async fn test_update_bridge_authority_aggregation_with_url_change_event() {
924 let (
925 sui_monitor_tx,
926 sui_monitor_rx,
927 _eth_monitor_tx,
928 eth_monitor_rx,
929 sui_client_mock,
930 sui_client,
931 bridge_pause_tx,
932 _bridge_pause_rx,
933 mut authorities,
934 bridge_metrics,
935 ) = setup();
936 let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
937 let agg = Arc::new(ArcSwap::new(Arc::new(
938 BridgeAuthorityAggregator::new_for_testing(Arc::new(old_committee)),
939 )));
940 let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
941 let _handle = tokio::task::spawn(
942 BridgeMonitor::new(
943 sui_client.clone(),
944 sui_monitor_rx,
945 eth_monitor_rx,
946 agg.clone(),
947 bridge_pause_tx,
948 sui_token_type_tags,
949 bridge_metrics,
950 )
951 .run(),
952 );
953 let new_url = "http://new.url".to_string();
954 authorities[0].base_url = new_url.clone();
955 let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
956 let new_committee_summary =
957 bridge_committee_to_bridge_committee_summary(new_committee.clone());
958 sui_client_mock.set_bridge_committee(new_committee_summary.clone());
959 sui_monitor_tx
960 .send(SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(
961 CommitteeMemberUrlUpdateEvent {
962 member: authorities[0].pubkey.clone(),
963 new_url: new_url.clone(),
964 },
965 ))
966 .await
967 .unwrap();
968 tokio::time::sleep(Duration::from_secs(1)).await;
970 assert_eq!(
972 agg.load()
973 .committee
974 .member(&BridgeAuthorityPublicKeyBytes::from(&authorities[0].pubkey))
975 .unwrap()
976 .base_url,
977 new_url
978 );
979 }
980
981 #[tokio::test]
982 async fn test_update_bridge_authority_aggregation_with_blocklist_event() {
983 let (
984 sui_monitor_tx,
985 sui_monitor_rx,
986 _eth_monitor_tx,
987 eth_monitor_rx,
988 sui_client_mock,
989 sui_client,
990 bridge_pause_tx,
991 _bridge_pause_rx,
992 mut authorities,
993 bridge_metrics,
994 ) = setup();
995 let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
996 let agg = Arc::new(ArcSwap::new(Arc::new(
997 BridgeAuthorityAggregator::new_for_testing(Arc::new(old_committee)),
998 )));
999 let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1000 let _handle = tokio::task::spawn(
1001 BridgeMonitor::new(
1002 sui_client.clone(),
1003 sui_monitor_rx,
1004 eth_monitor_rx,
1005 agg.clone(),
1006 bridge_pause_tx,
1007 sui_token_type_tags,
1008 bridge_metrics,
1009 )
1010 .run(),
1011 );
1012 authorities[0].is_blocklisted = true;
1013 let to_blocklist = &authorities[0];
1014 let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
1015 let new_committee_summary =
1016 bridge_committee_to_bridge_committee_summary(new_committee.clone());
1017 sui_client_mock.set_bridge_committee(new_committee_summary.clone());
1018 sui_monitor_tx
1019 .send(SuiBridgeEvent::BlocklistValidatorEvent(
1020 BlocklistValidatorEvent {
1021 public_keys: vec![to_blocklist.pubkey.clone()],
1022 blocklisted: true,
1023 },
1024 ))
1025 .await
1026 .unwrap();
1027 tokio::time::sleep(Duration::from_secs(1)).await;
1029 assert!(
1030 agg.load()
1031 .committee
1032 .member(&BridgeAuthorityPublicKeyBytes::from(&to_blocklist.pubkey))
1033 .unwrap()
1034 .is_blocklisted,
1035 );
1036 }
1037
1038 #[tokio::test]
1039 async fn test_update_bridge_pause_status_with_emergency_event() {
1040 let (
1041 sui_monitor_tx,
1042 sui_monitor_rx,
1043 _eth_monitor_tx,
1044 eth_monitor_rx,
1045 sui_client_mock,
1046 sui_client,
1047 bridge_pause_tx,
1048 bridge_pause_rx,
1049 authorities,
1050 bridge_metrics,
1051 ) = setup();
1052 let event = EmergencyOpEvent {
1053 frozen: !*bridge_pause_tx.borrow(), };
1055 let committee = BridgeCommittee::new(authorities.clone()).unwrap();
1056 let agg = Arc::new(ArcSwap::new(Arc::new(
1057 BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1058 )));
1059 let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1060 let _handle = tokio::task::spawn(
1061 BridgeMonitor::new(
1062 sui_client.clone(),
1063 sui_monitor_rx,
1064 eth_monitor_rx,
1065 agg.clone(),
1066 bridge_pause_tx,
1067 sui_token_type_tags,
1068 bridge_metrics,
1069 )
1070 .run(),
1071 );
1072
1073 sui_client_mock.set_is_bridge_paused(event.frozen);
1074 sui_monitor_tx
1075 .send(SuiBridgeEvent::EmergencyOpEvent(event.clone()))
1076 .await
1077 .unwrap();
1078 tokio::time::sleep(Duration::from_secs(1)).await;
1080 assert!(*bridge_pause_rx.borrow() == event.frozen);
1082 }
1083
1084 #[tokio::test]
1085 async fn test_update_sui_token_type_tags() {
1086 let (
1087 sui_monitor_tx,
1088 sui_monitor_rx,
1089 _eth_monitor_tx,
1090 eth_monitor_rx,
1091 _sui_client_mock,
1092 sui_client,
1093 bridge_pause_tx,
1094 _bridge_pause_rx,
1095 authorities,
1096 bridge_metrics,
1097 ) = setup();
1098 let event = NewTokenEvent {
1099 token_id: 255,
1100 type_name: TypeTag::from_str("0xbeef::beef::BEEF").unwrap(),
1101 native_token: false,
1102 decimal_multiplier: 1000000,
1103 notional_value: 100000000,
1104 };
1105 let committee = BridgeCommittee::new(authorities.clone()).unwrap();
1106 let agg = Arc::new(ArcSwap::new(Arc::new(
1107 BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1108 )));
1109 let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1110 let sui_token_type_tags_clone = sui_token_type_tags.clone();
1111 let _handle = tokio::task::spawn(
1112 BridgeMonitor::new(
1113 sui_client.clone(),
1114 sui_monitor_rx,
1115 eth_monitor_rx,
1116 agg.clone(),
1117 bridge_pause_tx,
1118 sui_token_type_tags_clone,
1119 bridge_metrics,
1120 )
1121 .run(),
1122 );
1123
1124 sui_monitor_tx
1125 .send(SuiBridgeEvent::NewTokenEvent(event.clone()))
1126 .await
1127 .unwrap();
1128 tokio::time::sleep(Duration::from_secs(1)).await;
1130 sui_token_type_tags
1132 .load()
1133 .clone()
1134 .get(&event.token_id)
1135 .unwrap();
1136 }
1137
1138 #[allow(clippy::type_complexity)]
1139 fn setup() -> (
1140 mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
1141 mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
1142 mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
1143 mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
1144 SuiMockClient,
1145 Arc<SuiClient<SuiMockClient>>,
1146 tokio::sync::watch::Sender<IsBridgePaused>,
1147 tokio::sync::watch::Receiver<IsBridgePaused>,
1148 Vec<BridgeAuthority>,
1149 Arc<BridgeMetrics>,
1150 ) {
1151 telemetry_subscribers::init_for_testing();
1152 let registry = Registry::new();
1153 mysten_metrics::init_metrics(®istry);
1154 init_all_struct_tags();
1155 let bridge_metrics = Arc::new(BridgeMetrics::new_for_testing());
1156
1157 let sui_client_mock = SuiMockClient::default();
1158 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1159 let (sui_monitor_tx, sui_monitor_rx) = mysten_metrics::metered_channel::channel(
1160 10000,
1161 &mysten_metrics::get_metrics()
1162 .unwrap()
1163 .channel_inflight
1164 .with_label_values(&["sui_monitor_queue"]),
1165 );
1166 let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
1167 10000,
1168 &mysten_metrics::get_metrics()
1169 .unwrap()
1170 .channel_inflight
1171 .with_label_values(&["eth_monitor_queue"]),
1172 );
1173
1174 let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1175 let authorities = vec![
1176 get_test_authority_and_key(2500, 0 ).0,
1177 get_test_authority_and_key(2500, 0 ).0,
1178 get_test_authority_and_key(2500, 0 ).0,
1179 get_test_authority_and_key(2500, 0 ).0,
1180 ];
1181 (
1182 sui_monitor_tx,
1183 sui_monitor_rx,
1184 eth_monitor_tx,
1185 eth_monitor_rx,
1186 sui_client_mock,
1187 sui_client,
1188 bridge_pause_tx,
1189 bridge_pause_rx,
1190 authorities,
1191 bridge_metrics,
1192 )
1193 }
1194}