1use crate::abi::{
8 EthBridgeCommitteeEvents, EthBridgeConfigEvents, EthBridgeEvent, EthBridgeLimiterEvents,
9 EthCommitteeUpgradeableContractEvents, EthSuiBridgeEvents,
10};
11use crate::client::bridge_authority_aggregator::BridgeAuthorityAggregator;
12use crate::crypto::BridgeAuthorityPublicKeyBytes;
13use crate::events::{BlocklistValidatorEvent, CommitteeMemberUrlUpdateEvent};
14use crate::events::{EmergencyOpEvent, SuiBridgeEvent};
15use crate::metrics::BridgeMetrics;
16use crate::retry_with_max_elapsed_time;
17use crate::sui_client::{SuiClient, SuiClientInner};
18use crate::types::{BridgeCommittee, IsBridgePaused};
19use arc_swap::ArcSwap;
20use futures::StreamExt;
21use std::collections::HashMap;
22use std::sync::Arc;
23use sui_rpc::field::{FieldMask, FieldMaskUtil};
24use sui_rpc::proto::sui::rpc::v2::{Checkpoint, SubscribeCheckpointsRequest};
25use sui_types::TypeTag;
26use tokio::time::Duration;
27use tracing::{error, info, warn};
28
29const REFRESH_BRIDGE_RETRY_TIMES: u64 = 3;
30
31pub struct BridgeMonitor<C> {
32 sui_client: Arc<SuiClient<C>>,
33 sui_monitor_rx: mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
34 eth_monitor_rx: mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
35 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
36 bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
37 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
38 bridge_metrics: Arc<BridgeMetrics>,
39}
40
41impl<C> BridgeMonitor<C>
42where
43 C: SuiClientInner + 'static,
44{
45 pub fn new(
46 sui_client: Arc<SuiClient<C>>,
47 sui_monitor_rx: mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
48 eth_monitor_rx: mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
49 bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
50 bridge_paused_watch_tx: tokio::sync::watch::Sender<IsBridgePaused>,
51 sui_token_type_tags: Arc<ArcSwap<HashMap<u8, TypeTag>>>,
52 bridge_metrics: Arc<BridgeMetrics>,
53 ) -> Self {
54 Self {
55 sui_client,
56 sui_monitor_rx,
57 eth_monitor_rx,
58 bridge_auth_agg,
59 bridge_paused_watch_tx,
60 sui_token_type_tags,
61 bridge_metrics,
62 }
63 }
64
65 pub async fn run(self) {
66 tracing::info!("Starting BridgeMonitor");
67 let Self {
68 sui_client,
69 mut sui_monitor_rx,
70 mut eth_monitor_rx,
71 bridge_auth_agg,
72 bridge_paused_watch_tx,
73 sui_token_type_tags,
74 bridge_metrics,
75 } = self;
76 let mut latest_token_config = (*sui_token_type_tags.load().clone()).clone();
77
78 loop {
79 tokio::select! {
80 sui_event = sui_monitor_rx.recv() => {
81 if let Some(sui_event) = sui_event {
82 Self::handle_sui_events(
83 sui_event,
84 &sui_client,
85 &bridge_auth_agg,
86 &bridge_paused_watch_tx,
87 &sui_token_type_tags,
88 &bridge_metrics,
89 &mut latest_token_config,
90 )
91 .await;
92 } else {
93 panic!("BridgeMonitor sui events channel was closed unexpectedly");
94 }
95 }
96 eth_event = eth_monitor_rx.recv() => {
97 if let Some(eth_event) = eth_event {
98 Self::handle_eth_events(eth_event, &bridge_metrics);
99 } else {
100 panic!("BridgeMonitor eth events channel was closed unexpectedly");
101 }
102 }
103 }
104 }
105 }
106
107 async fn handle_sui_events(
108 event: SuiBridgeEvent,
109 sui_client: &Arc<SuiClient<C>>,
110 bridge_auth_agg: &Arc<ArcSwap<BridgeAuthorityAggregator>>,
111 bridge_paused_watch_tx: &tokio::sync::watch::Sender<IsBridgePaused>,
112 sui_token_type_tags: &Arc<ArcSwap<HashMap<u8, TypeTag>>>,
113 bridge_metrics: &Arc<BridgeMetrics>,
114 latest_token_config: &mut HashMap<u8, TypeTag>,
115 ) {
116 info!("Received SuiBridgeEvent: {:?}", event);
117 macro_rules! bump_sui_counter {
118 ($action:expr) => {
119 bridge_metrics
120 .observed_governance_actions
121 .with_label_values(&[$action, "sui"])
122 .inc();
123 };
124 }
125
126 match event {
127 SuiBridgeEvent::SuiToEthTokenBridgeV1(_) => (),
128 SuiBridgeEvent::SuiToEthTokenBridgeV2(_) => (),
129 SuiBridgeEvent::TokenTransferApproved(_) => (),
130 SuiBridgeEvent::TokenTransferClaimed(_) => (),
131 SuiBridgeEvent::TokenTransferAlreadyApproved(_) => (),
132 SuiBridgeEvent::TokenTransferAlreadyClaimed(_) => (),
133 SuiBridgeEvent::TokenTransferLimitExceed(_) => {
134 }
136
137 SuiBridgeEvent::EmergencyOpEvent(event) => {
138 bump_sui_counter!(if event.frozen {
139 "bridge_paused"
140 } else {
141 "bridge_unpaused"
142 });
143 let is_paused = get_latest_bridge_pause_status_with_emergency_event(
144 sui_client.clone(),
145 event,
146 Duration::from_secs(10),
147 )
148 .await;
149 bridge_paused_watch_tx
150 .send(is_paused)
151 .expect("Bridge pause status watch channel should not be closed");
152 }
153
154 SuiBridgeEvent::CommitteeMemberRegistration(_) => {
155 bump_sui_counter!("committee_member_registered");
156 }
157 SuiBridgeEvent::CommitteeUpdateEvent(_) => {
158 bump_sui_counter!("committee_updated");
159 }
160
161 SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(event) => {
162 bump_sui_counter!("committee_member_url_updated");
163 let new_committee = get_latest_bridge_committee_with_url_update_event(
164 sui_client.clone(),
165 event,
166 Duration::from_secs(10),
167 )
168 .await;
169 let committee_names = bridge_auth_agg.load().committee_keys_to_names.clone();
170 bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(
171 Arc::new(new_committee),
172 bridge_metrics.clone(),
173 committee_names,
174 )));
175 info!("Committee updated with CommitteeMemberUrlUpdateEvent");
176 }
177
178 SuiBridgeEvent::BlocklistValidatorEvent(event) => {
179 bump_sui_counter!(if event.blocklisted {
180 "validator_blocklisted"
181 } else {
182 "validator_unblocklisted"
183 });
184 let new_committee = get_latest_bridge_committee_with_blocklist_event(
185 sui_client.clone(),
186 event,
187 Duration::from_secs(10),
188 )
189 .await;
190 let committee_names = bridge_auth_agg.load().committee_keys_to_names.clone();
191 bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(
192 Arc::new(new_committee),
193 bridge_metrics.clone(),
194 committee_names,
195 )));
196 info!("Committee updated with BlocklistValidatorEvent");
197 }
198
199 SuiBridgeEvent::TokenRegistrationEvent(_) => {
200 bump_sui_counter!("new_token_registered");
201 }
202
203 SuiBridgeEvent::NewTokenEvent(event) => {
204 bump_sui_counter!("new_token_added");
205 if let std::collections::hash_map::Entry::Vacant(entry) =
206 latest_token_config.entry(event.token_id)
208 {
209 entry.insert(event.type_name.clone());
210 sui_token_type_tags.store(Arc::new(latest_token_config.clone()));
211 } else {
212 assert_eq!(event.type_name, latest_token_config[&event.token_id]);
214 }
215 }
216
217 SuiBridgeEvent::UpdateTokenPriceEvent(_event) => {
218 bump_sui_counter!("token_price_updated");
219 }
220
221 SuiBridgeEvent::UpdateRouteLimitEvent(_event) => {
222 bump_sui_counter!("limit_updated");
223 }
224 }
225 }
226
227 fn handle_eth_events(event: EthBridgeEvent, bridge_metrics: &Arc<BridgeMetrics>) {
228 info!("Received EthBridgeEvent: {:?}", event);
229
230 macro_rules! bump_eth_counter {
231 ($action:expr) => {
232 bridge_metrics
233 .observed_governance_actions
234 .with_label_values(&[$action, "eth"])
235 .inc()
236 };
237 }
238
239 match event {
240 EthBridgeEvent::EthBridgeCommitteeEvents(event) => match event {
241 EthBridgeCommitteeEvents::BlocklistUpdated(event) => {
242 bump_eth_counter!(if event.isBlocklisted {
243 "validator_blocklisted"
244 } else {
245 "validator_unblocklisted"
246 });
247 }
248 EthBridgeCommitteeEvents::Initialized(_) => {
249 bump_eth_counter!("committee_contract_initialized");
250 }
251 EthBridgeCommitteeEvents::Upgraded(_) => {
252 bump_eth_counter!("committee_contract_upgraded");
253 }
254 EthBridgeCommitteeEvents::BlocklistUpdatedV2(e) => {
255 bump_eth_counter!(if e.isBlocklisted {
256 "validator_blocklisted"
257 } else {
258 "validator_unblocklisted"
259 });
260 }
261 EthBridgeCommitteeEvents::ContractUpgraded(_) => {
262 bump_eth_counter!("committee_contract_upgraded");
263 }
264 },
265 EthBridgeEvent::EthBridgeLimiterEvents(event) => match event {
266 EthBridgeLimiterEvents::Initialized(_) => {
267 bump_eth_counter!("limiter_contract_initialized");
268 }
269 EthBridgeLimiterEvents::Upgraded(_) => {
270 bump_eth_counter!("limiter_contract_upgraded");
271 }
272 EthBridgeLimiterEvents::OwnershipTransferred(_) => {
273 bump_eth_counter!("limiter_contract_ownership_transferred");
274 }
275 EthBridgeLimiterEvents::LimitUpdated(_) => {
276 bump_eth_counter!("limit_updated");
277 }
278 EthBridgeLimiterEvents::HourlyTransferAmountUpdated(_) => (),
281 EthBridgeLimiterEvents::ContractUpgraded(_) => {
282 bump_eth_counter!("limiter_contract_upgraded");
283 }
284 EthBridgeLimiterEvents::LimitUpdatedV2(_) => {
285 bump_eth_counter!("limit_updated");
286 }
287 },
288 EthBridgeEvent::EthBridgeConfigEvents(event) => match event {
289 EthBridgeConfigEvents::Initialized(_) => {
290 bump_eth_counter!("config_contract_initialized");
291 }
292 EthBridgeConfigEvents::Upgraded(_) => {
293 bump_eth_counter!("config_contract_upgraded");
294 }
295 EthBridgeConfigEvents::TokenAdded(_) => {
296 bump_eth_counter!("new_token_added");
297 }
298 EthBridgeConfigEvents::TokenPriceUpdated(_) => {
299 bump_eth_counter!("update_token_price");
300 }
301 EthBridgeConfigEvents::ContractUpgraded(_) => {
302 bump_eth_counter!("config_contract_upgraded");
303 }
304 EthBridgeConfigEvents::TokenPriceUpdatedV2(_) => {
305 bump_eth_counter!("update_token_price");
306 }
307 EthBridgeConfigEvents::TokensAddedV2(_) => {
308 bump_eth_counter!("new_token_added");
309 }
310 },
311 EthBridgeEvent::EthCommitteeUpgradeableContractEvents(event) => match event {
312 EthCommitteeUpgradeableContractEvents::Initialized(_) => {
313 bump_eth_counter!("upgradeable_contract_initialized");
314 }
315 EthCommitteeUpgradeableContractEvents::Upgraded(_) => {
316 bump_eth_counter!("upgradeable_contract_upgraded");
317 }
318 },
319 EthBridgeEvent::EthSuiBridgeEvents(event) => match event {
320 EthSuiBridgeEvents::TokensClaimed(_) => (),
321 EthSuiBridgeEvents::TokensDeposited(_) => (),
322 EthSuiBridgeEvents::TokensDepositedV2(_) => (),
323 EthSuiBridgeEvents::Paused(_) => bump_eth_counter!("bridge_paused"),
324 EthSuiBridgeEvents::Unpaused(_) => bump_eth_counter!("bridge_unpaused"),
325 EthSuiBridgeEvents::Upgraded(_) => {
326 bump_eth_counter!("bridge_contract_upgraded")
327 }
328 EthSuiBridgeEvents::Initialized(_) => {
329 bump_eth_counter!("bridge_contract_initialized")
330 }
331 EthSuiBridgeEvents::ContractUpgraded(_) => {
332 bump_eth_counter!("bridge_contract_upgraded")
333 }
334 EthSuiBridgeEvents::EmergencyOperation(e) => {
335 if e.paused {
336 bump_eth_counter!("bridge_paused")
337 } else {
338 bump_eth_counter!("bridge_unpaused")
339 }
340 }
341 },
342 }
343 }
344}
345
346async fn get_latest_bridge_committee_with_url_update_event<C: SuiClientInner>(
347 sui_client: Arc<SuiClient<C>>,
348 event: CommitteeMemberUrlUpdateEvent,
349 staleness_retry_interval: Duration,
350) -> BridgeCommittee {
351 let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
352 loop {
353 let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
354 sui_client.get_bridge_committee(),
355 Duration::from_secs(600)
356 ) else {
357 error!("Failed to get bridge committee after retry");
358 continue;
359 };
360 let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(&event.member));
361 let Some(member) = member else {
362 warn!(
365 "Committee member not found in the committee: {:?}",
366 event.member
367 );
368 return committee;
369 };
370 if member.base_url == event.new_url {
371 return committee;
372 }
373 tokio::time::sleep(staleness_retry_interval).await;
378 remaining_retry_times -= 1;
379 if remaining_retry_times == 0 {
380 warn!(
381 "Committee member url {:?} does not match onchain record {:?} after retry",
382 event.member, member
383 );
384 return committee;
385 }
386 }
387}
388
389async fn get_latest_bridge_committee_with_blocklist_event<C: SuiClientInner>(
390 sui_client: Arc<SuiClient<C>>,
391 event: BlocklistValidatorEvent,
392 staleness_retry_interval: Duration,
393) -> BridgeCommittee {
394 let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
395 loop {
396 let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
397 sui_client.get_bridge_committee(),
398 Duration::from_secs(600)
399 ) else {
400 error!("Failed to get bridge committee after retry");
401 continue;
402 };
403 let mut any_mismatch = false;
404 for pk in &event.public_keys {
405 let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(pk));
406 let Some(member) = member else {
407 warn!("Committee member not found in the committee: {:?}", pk);
411 any_mismatch = true;
412 break;
413 };
414 if member.is_blocklisted != event.blocklisted {
415 warn!(
416 "Committee member blocklist status does not match onchain record: {:?}",
417 member
418 );
419 any_mismatch = true;
420 break;
421 }
422 }
423 if !any_mismatch {
424 return committee;
425 }
426 tokio::time::sleep(staleness_retry_interval).await;
431 remaining_retry_times -= 1;
432 if remaining_retry_times == 0 {
433 warn!(
434 "Committee member blocklist status {:?} does not match onchain record after retry",
435 event
436 );
437 return committee;
438 }
439 }
440}
441
442async fn get_latest_bridge_pause_status_with_emergency_event<C: SuiClientInner>(
443 sui_client: Arc<SuiClient<C>>,
444 event: EmergencyOpEvent,
445 staleness_retry_interval: Duration,
446) -> IsBridgePaused {
447 let mut remaining_retry_times = REFRESH_BRIDGE_RETRY_TIMES;
448 loop {
449 let Ok(Ok(summary)) =
450 retry_with_max_elapsed_time!(sui_client.get_bridge_summary(), Duration::from_secs(600))
451 else {
452 error!("Failed to get bridge summary after retry");
453 continue;
454 };
455 if summary.is_frozen == event.frozen {
456 return summary.is_frozen;
457 }
458 tokio::time::sleep(staleness_retry_interval).await;
463 remaining_retry_times -= 1;
464 if remaining_retry_times == 0 {
465 warn!(
466 "Bridge pause status {:?} does not match onchain record {:?} after retry",
467 event, summary.is_frozen
468 );
469 return summary.is_frozen;
470 }
471 }
472}
473
474pub async fn subscribe_bridge_events(
475 mut client: sui_rpc::Client,
476 sender: mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
477) {
478 let subscription_read_mask = FieldMask::from_paths([
479 Checkpoint::path_builder().sequence_number(),
480 Checkpoint::path_builder()
481 .transactions()
482 .events()
483 .bcs()
484 .value(),
485 Checkpoint::path_builder().transactions().digest(),
486 ]);
487
488 loop {
489 let mut subscription = match client
490 .subscription_client()
491 .subscribe_checkpoints(
492 SubscribeCheckpointsRequest::default()
493 .with_read_mask(subscription_read_mask.clone()),
494 )
495 .await
496 {
497 Ok(subscription) => subscription,
498 Err(e) => {
499 tracing::warn!("error trying to subscribe to checkpoints: {e}");
500 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
501 continue;
502 }
503 }
504 .into_inner();
505
506 while let Some(item) = subscription.next().await {
507 let checkpoint = match item {
508 Ok(checkpoint) => checkpoint,
509 Err(e) => {
510 tracing::warn!("error in checkpoint stream: {e}");
511 break;
512 }
513 };
514
515 let ckpt = checkpoint.cursor();
516 tracing::debug!("recieved checkpoint {ckpt}");
517
518 for txn in checkpoint.checkpoint().transactions() {
519 let txn_digest = txn.digest();
520 let Some(bcs_events) = txn.events_opt().map(|events| events.bcs()) else {
521 continue;
522 };
523
524 let Ok(events) = bcs_events.deserialize::<sui_types::effects::TransactionEvents>()
525 else {
526 tracing::warn!(
527 "error deserializing events from txn {txn_digest} in checkpoint {ckpt}"
528 );
529 continue;
530 };
531
532 for event in events.data {
533 match SuiBridgeEvent::try_from_event(&event) {
534 Ok(Some(bridge_event)) => {
535 sender
536 .send(bridge_event)
537 .await
538 .expect("Sending event to monitor channel should not fail");
539 }
540 Ok(None) => {}
541 Err(e) => {
542 tracing::warn!(
543 "error deserializing SuiBridgeEvent from txn {txn_digest} in checkpoint {ckpt}: {e}"
544 );
545 }
546 }
547 }
548 }
549 }
550 }
551}
552
553#[cfg(test)]
554mod tests {
555 use std::str::FromStr;
556
557 use super::*;
558 use crate::events::{NewTokenEvent, init_all_struct_tags};
559 use crate::test_utils::{
560 bridge_committee_to_bridge_committee_summary, get_test_authority_and_key,
561 };
562 use crate::types::{BRIDGE_PAUSED, BRIDGE_UNPAUSED, BridgeAuthority};
563 use fastcrypto::traits::KeyPair;
564 use prometheus::Registry;
565 use sui_types::base_types::SuiAddress;
566 use sui_types::bridge::BridgeCommitteeSummary;
567 use sui_types::bridge::MoveTypeCommitteeMember;
568 use sui_types::crypto::get_key_pair;
569
570 use crate::{sui_mock_client::SuiMockClient, types::BridgeCommittee};
571 use sui_types::crypto::ToFromBytes;
572
573 #[tokio::test]
574 async fn test_get_latest_bridge_committee_with_url_update_event() {
575 telemetry_subscribers::init_for_testing();
576 let sui_client_mock = SuiMockClient::default();
577 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
578 let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
579 let pk = kp.public().clone();
580 let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
581 let pk_bytes = pk_as_bytes.as_bytes().to_vec();
582 let event = CommitteeMemberUrlUpdateEvent {
583 member: pk,
584 new_url: "http://new.url".to_string(),
585 };
586 let summary = BridgeCommitteeSummary {
587 members: vec![(
588 pk_bytes.clone(),
589 MoveTypeCommitteeMember {
590 sui_address: SuiAddress::random_for_testing_only(),
591 bridge_pubkey_bytes: pk_bytes.clone(),
592 voting_power: 10000,
593 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
594 blocklisted: false,
595 },
596 )],
597 member_registration: vec![],
598 last_committee_update_epoch: 0,
599 };
600
601 sui_client_mock.set_bridge_committee(summary.clone());
603 let timer = std::time::Instant::now();
604 let committee = get_latest_bridge_committee_with_url_update_event(
605 sui_client.clone(),
606 event.clone(),
607 Duration::from_secs(2),
608 )
609 .await;
610 assert_eq!(
611 committee.member(&pk_as_bytes).unwrap().base_url,
612 "http://new.url"
613 );
614 assert!(timer.elapsed().as_millis() < 500);
615
616 let old_summary = BridgeCommitteeSummary {
619 members: vec![(
620 pk_bytes.clone(),
621 MoveTypeCommitteeMember {
622 sui_address: SuiAddress::random_for_testing_only(),
623 bridge_pubkey_bytes: pk_bytes.clone(),
624 voting_power: 10000,
625 http_rest_url: "http://old.url".to_string().as_bytes().to_vec(),
626 blocklisted: false,
627 },
628 )],
629 member_registration: vec![],
630 last_committee_update_epoch: 0,
631 };
632 sui_client_mock.set_bridge_committee(old_summary.clone());
633 let timer = std::time::Instant::now();
634 let sui_client_mock_clone = sui_client_mock.clone();
636 tokio::spawn(async move {
637 tokio::time::sleep(Duration::from_secs(1)).await;
638 sui_client_mock_clone.set_bridge_committee(summary.clone());
639 });
640 let committee = get_latest_bridge_committee_with_url_update_event(
641 sui_client.clone(),
642 event.clone(),
643 Duration::from_secs(2),
644 )
645 .await;
646 assert_eq!(
647 committee.member(&pk_as_bytes).unwrap().base_url,
648 "http://new.url"
649 );
650 let elapsed = timer.elapsed().as_millis();
651 assert!(elapsed > 1000 && elapsed < 3000);
652
653 let newer_summary = BridgeCommitteeSummary {
656 members: vec![(
657 pk_bytes.clone(),
658 MoveTypeCommitteeMember {
659 sui_address: SuiAddress::random_for_testing_only(),
660 bridge_pubkey_bytes: pk_bytes.clone(),
661 voting_power: 10000,
662 http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
663 blocklisted: false,
664 },
665 )],
666 member_registration: vec![],
667 last_committee_update_epoch: 0,
668 };
669 sui_client_mock.set_bridge_committee(newer_summary.clone());
670 let timer = std::time::Instant::now();
671 let committee = get_latest_bridge_committee_with_url_update_event(
672 sui_client.clone(),
673 event.clone(),
674 Duration::from_millis(500),
675 )
676 .await;
677 assert_eq!(
678 committee.member(&pk_as_bytes).unwrap().base_url,
679 "http://newer.url"
680 );
681 let elapsed = timer.elapsed().as_millis();
682 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
683
684 let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
687 let pk2 = kp2.public().clone();
688 let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
689 let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
690 let newer_summary = BridgeCommitteeSummary {
691 members: vec![(
692 pk_bytes2.clone(),
693 MoveTypeCommitteeMember {
694 sui_address: SuiAddress::random_for_testing_only(),
695 bridge_pubkey_bytes: pk_bytes2.clone(),
696 voting_power: 10000,
697 http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
698 blocklisted: false,
699 },
700 )],
701 member_registration: vec![],
702 last_committee_update_epoch: 0,
703 };
704 sui_client_mock.set_bridge_committee(newer_summary.clone());
705 let timer = std::time::Instant::now();
706 let committee = get_latest_bridge_committee_with_url_update_event(
707 sui_client.clone(),
708 event.clone(),
709 Duration::from_secs(1),
710 )
711 .await;
712 assert_eq!(
713 committee.member(&pk_as_bytes2).unwrap().base_url,
714 "http://newer.url"
715 );
716 assert!(committee.member(&pk_as_bytes).is_none());
717 let elapsed = timer.elapsed().as_millis();
718 assert!(elapsed < 1000);
719 }
720
721 #[tokio::test]
722 async fn test_get_latest_bridge_committee_with_blocklist_event() {
723 telemetry_subscribers::init_for_testing();
724 let sui_client_mock = SuiMockClient::default();
725 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
726 let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
727 let pk = kp.public().clone();
728 let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
729 let pk_bytes = pk_as_bytes.as_bytes().to_vec();
730
731 let event = BlocklistValidatorEvent {
733 blocklisted: true,
734 public_keys: vec![pk.clone()],
735 };
736 let summary = BridgeCommitteeSummary {
737 members: vec![(
738 pk_bytes.clone(),
739 MoveTypeCommitteeMember {
740 sui_address: SuiAddress::random_for_testing_only(),
741 bridge_pubkey_bytes: pk_bytes.clone(),
742 voting_power: 10000,
743 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
744 blocklisted: true,
745 },
746 )],
747 member_registration: vec![],
748 last_committee_update_epoch: 0,
749 };
750 sui_client_mock.set_bridge_committee(summary.clone());
751 let timer = std::time::Instant::now();
752 let committee = get_latest_bridge_committee_with_blocklist_event(
753 sui_client.clone(),
754 event.clone(),
755 Duration::from_secs(2),
756 )
757 .await;
758 assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
759 assert!(timer.elapsed().as_millis() < 500);
760
761 let event = BlocklistValidatorEvent {
763 blocklisted: false,
764 public_keys: vec![pk.clone()],
765 };
766 let summary = BridgeCommitteeSummary {
767 members: vec![(
768 pk_bytes.clone(),
769 MoveTypeCommitteeMember {
770 sui_address: SuiAddress::random_for_testing_only(),
771 bridge_pubkey_bytes: pk_bytes.clone(),
772 voting_power: 10000,
773 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
774 blocklisted: false,
775 },
776 )],
777 member_registration: vec![],
778 last_committee_update_epoch: 0,
779 };
780 sui_client_mock.set_bridge_committee(summary.clone());
781 let timer = std::time::Instant::now();
782 let committee = get_latest_bridge_committee_with_blocklist_event(
783 sui_client.clone(),
784 event.clone(),
785 Duration::from_secs(2),
786 )
787 .await;
788 assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
789 assert!(timer.elapsed().as_millis() < 500);
790
791 let old_summary = BridgeCommitteeSummary {
794 members: vec![(
795 pk_bytes.clone(),
796 MoveTypeCommitteeMember {
797 sui_address: SuiAddress::random_for_testing_only(),
798 bridge_pubkey_bytes: pk_bytes.clone(),
799 voting_power: 10000,
800 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
801 blocklisted: true,
802 },
803 )],
804 member_registration: vec![],
805 last_committee_update_epoch: 0,
806 };
807 sui_client_mock.set_bridge_committee(old_summary.clone());
808 let timer = std::time::Instant::now();
809 let sui_client_mock_clone = sui_client_mock.clone();
811 tokio::spawn(async move {
812 tokio::time::sleep(Duration::from_secs(1)).await;
813 sui_client_mock_clone.set_bridge_committee(summary.clone());
814 });
815 let committee = get_latest_bridge_committee_with_blocklist_event(
816 sui_client.clone(),
817 event.clone(),
818 Duration::from_secs(2),
819 )
820 .await;
821 assert!(!committee.member(&pk_as_bytes).unwrap().is_blocklisted);
822 let elapsed = timer.elapsed().as_millis();
823 assert!(elapsed > 1000 && elapsed < 3000);
824
825 let newer_summary = BridgeCommitteeSummary {
828 members: vec![(
829 pk_bytes.clone(),
830 MoveTypeCommitteeMember {
831 sui_address: SuiAddress::random_for_testing_only(),
832 bridge_pubkey_bytes: pk_bytes.clone(),
833 voting_power: 10000,
834 http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
835 blocklisted: true,
836 },
837 )],
838 member_registration: vec![],
839 last_committee_update_epoch: 0,
840 };
841 sui_client_mock.set_bridge_committee(newer_summary.clone());
842 let timer = std::time::Instant::now();
843 let committee = get_latest_bridge_committee_with_blocklist_event(
844 sui_client.clone(),
845 event.clone(),
846 Duration::from_millis(500),
847 )
848 .await;
849 assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
850 let elapsed = timer.elapsed().as_millis();
851 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
852
853 let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
856 let pk2 = kp2.public().clone();
857 let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
858 let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
859 let summary = BridgeCommitteeSummary {
860 members: vec![(
861 pk_bytes2.clone(),
862 MoveTypeCommitteeMember {
863 sui_address: SuiAddress::random_for_testing_only(),
864 bridge_pubkey_bytes: pk_bytes2.clone(),
865 voting_power: 10000,
866 http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
867 blocklisted: false,
868 },
869 )],
870 member_registration: vec![],
871 last_committee_update_epoch: 0,
872 };
873 sui_client_mock.set_bridge_committee(summary.clone());
874 let timer = std::time::Instant::now();
875 let committee = get_latest_bridge_committee_with_blocklist_event(
876 sui_client.clone(),
877 event.clone(),
878 Duration::from_secs(1),
879 )
880 .await;
881 assert_eq!(
882 committee.member(&pk_as_bytes2).unwrap().base_url,
883 "http://newer.url"
884 );
885 assert!(committee.member(&pk_as_bytes).is_none());
886 let elapsed = timer.elapsed().as_millis();
887 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
888
889 let event = BlocklistValidatorEvent {
891 blocklisted: true,
892 public_keys: vec![pk, pk2],
893 };
894 let summary = BridgeCommitteeSummary {
895 members: vec![
896 (
897 pk_bytes.clone(),
898 MoveTypeCommitteeMember {
899 sui_address: SuiAddress::random_for_testing_only(),
900 bridge_pubkey_bytes: pk_bytes.clone(),
901 voting_power: 5000,
902 http_rest_url: "http://pk.url".to_string().as_bytes().to_vec(),
903 blocklisted: true,
904 },
905 ),
906 (
907 pk_bytes2.clone(),
908 MoveTypeCommitteeMember {
909 sui_address: SuiAddress::random_for_testing_only(),
910 bridge_pubkey_bytes: pk_bytes2.clone(),
911 voting_power: 5000,
912 http_rest_url: "http://pk2.url".to_string().as_bytes().to_vec(),
913 blocklisted: false,
914 },
915 ),
916 ],
917 member_registration: vec![],
918 last_committee_update_epoch: 0,
919 };
920 sui_client_mock.set_bridge_committee(summary.clone());
921 let timer = std::time::Instant::now();
922 let committee = get_latest_bridge_committee_with_blocklist_event(
923 sui_client.clone(),
924 event.clone(),
925 Duration::from_millis(500),
926 )
927 .await;
928 assert!(committee.member(&pk_as_bytes).unwrap().is_blocklisted);
929 assert!(!committee.member(&pk_as_bytes2).unwrap().is_blocklisted);
930 let elapsed = timer.elapsed().as_millis();
931 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
932 }
933
934 #[tokio::test]
935 async fn test_get_bridge_pause_status_with_emergency_event() {
936 telemetry_subscribers::init_for_testing();
937 let sui_client_mock = SuiMockClient::default();
938 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
939
940 let event = EmergencyOpEvent { frozen: true };
942 sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
943 let timer = std::time::Instant::now();
944 assert!(
945 get_latest_bridge_pause_status_with_emergency_event(
946 sui_client.clone(),
947 event.clone(),
948 Duration::from_secs(2),
949 )
950 .await
951 );
952 assert!(timer.elapsed().as_millis() < 500);
953
954 let event = EmergencyOpEvent { frozen: false };
955 sui_client_mock.set_is_bridge_paused(BRIDGE_UNPAUSED);
956 let timer = std::time::Instant::now();
957 assert!(
958 !get_latest_bridge_pause_status_with_emergency_event(
959 sui_client.clone(),
960 event.clone(),
961 Duration::from_secs(2),
962 )
963 .await
964 );
965 assert!(timer.elapsed().as_millis() < 500);
966
967 sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
970 let timer = std::time::Instant::now();
971 let sui_client_mock_clone = sui_client_mock.clone();
973 tokio::spawn(async move {
974 tokio::time::sleep(Duration::from_secs(1)).await;
975 sui_client_mock_clone.set_is_bridge_paused(BRIDGE_UNPAUSED);
976 });
977 assert!(
978 !get_latest_bridge_pause_status_with_emergency_event(
979 sui_client.clone(),
980 event.clone(),
981 Duration::from_secs(2),
982 )
983 .await
984 );
985 let elapsed = timer.elapsed().as_millis();
986 assert!(elapsed > 1000 && elapsed < 3000, "{}", elapsed);
987
988 sui_client_mock.set_is_bridge_paused(BRIDGE_PAUSED);
991 let timer = std::time::Instant::now();
992 assert!(
993 get_latest_bridge_pause_status_with_emergency_event(
994 sui_client.clone(),
995 event.clone(),
996 Duration::from_secs(2),
997 )
998 .await
999 );
1000 let elapsed = timer.elapsed().as_millis();
1001 assert!(elapsed > 500 * REFRESH_BRIDGE_RETRY_TIMES as u128);
1002 }
1003
1004 #[tokio::test]
1005 async fn test_update_bridge_authority_aggregation_with_url_change_event() {
1006 let (
1007 sui_monitor_tx,
1008 sui_monitor_rx,
1009 _eth_monitor_tx,
1010 eth_monitor_rx,
1011 sui_client_mock,
1012 sui_client,
1013 bridge_pause_tx,
1014 _bridge_pause_rx,
1015 mut authorities,
1016 bridge_metrics,
1017 ) = setup();
1018 let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
1019 let agg = Arc::new(ArcSwap::new(Arc::new(
1020 BridgeAuthorityAggregator::new_for_testing(Arc::new(old_committee)),
1021 )));
1022 let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1023 let _handle = tokio::task::spawn(
1024 BridgeMonitor::new(
1025 sui_client.clone(),
1026 sui_monitor_rx,
1027 eth_monitor_rx,
1028 agg.clone(),
1029 bridge_pause_tx,
1030 sui_token_type_tags,
1031 bridge_metrics,
1032 )
1033 .run(),
1034 );
1035 let new_url = "http://new.url".to_string();
1036 authorities[0].base_url = new_url.clone();
1037 let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
1038 let new_committee_summary =
1039 bridge_committee_to_bridge_committee_summary(new_committee.clone());
1040 sui_client_mock.set_bridge_committee(new_committee_summary.clone());
1041 sui_monitor_tx
1042 .send(SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(
1043 CommitteeMemberUrlUpdateEvent {
1044 member: authorities[0].pubkey.clone(),
1045 new_url: new_url.clone(),
1046 },
1047 ))
1048 .await
1049 .unwrap();
1050 tokio::time::sleep(Duration::from_secs(1)).await;
1052 assert_eq!(
1054 agg.load()
1055 .committee
1056 .member(&BridgeAuthorityPublicKeyBytes::from(&authorities[0].pubkey))
1057 .unwrap()
1058 .base_url,
1059 new_url
1060 );
1061 }
1062
1063 #[tokio::test]
1064 async fn test_update_bridge_authority_aggregation_with_blocklist_event() {
1065 let (
1066 sui_monitor_tx,
1067 sui_monitor_rx,
1068 _eth_monitor_tx,
1069 eth_monitor_rx,
1070 sui_client_mock,
1071 sui_client,
1072 bridge_pause_tx,
1073 _bridge_pause_rx,
1074 mut authorities,
1075 bridge_metrics,
1076 ) = setup();
1077 let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
1078 let agg = Arc::new(ArcSwap::new(Arc::new(
1079 BridgeAuthorityAggregator::new_for_testing(Arc::new(old_committee)),
1080 )));
1081 let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1082 let _handle = tokio::task::spawn(
1083 BridgeMonitor::new(
1084 sui_client.clone(),
1085 sui_monitor_rx,
1086 eth_monitor_rx,
1087 agg.clone(),
1088 bridge_pause_tx,
1089 sui_token_type_tags,
1090 bridge_metrics,
1091 )
1092 .run(),
1093 );
1094 authorities[0].is_blocklisted = true;
1095 let to_blocklist = &authorities[0];
1096 let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
1097 let new_committee_summary =
1098 bridge_committee_to_bridge_committee_summary(new_committee.clone());
1099 sui_client_mock.set_bridge_committee(new_committee_summary.clone());
1100 sui_monitor_tx
1101 .send(SuiBridgeEvent::BlocklistValidatorEvent(
1102 BlocklistValidatorEvent {
1103 public_keys: vec![to_blocklist.pubkey.clone()],
1104 blocklisted: true,
1105 },
1106 ))
1107 .await
1108 .unwrap();
1109 tokio::time::sleep(Duration::from_secs(1)).await;
1111 assert!(
1112 agg.load()
1113 .committee
1114 .member(&BridgeAuthorityPublicKeyBytes::from(&to_blocklist.pubkey))
1115 .unwrap()
1116 .is_blocklisted,
1117 );
1118 }
1119
1120 #[tokio::test]
1121 async fn test_update_bridge_pause_status_with_emergency_event() {
1122 let (
1123 sui_monitor_tx,
1124 sui_monitor_rx,
1125 _eth_monitor_tx,
1126 eth_monitor_rx,
1127 sui_client_mock,
1128 sui_client,
1129 bridge_pause_tx,
1130 bridge_pause_rx,
1131 authorities,
1132 bridge_metrics,
1133 ) = setup();
1134 let event = EmergencyOpEvent {
1135 frozen: !*bridge_pause_tx.borrow(), };
1137 let committee = BridgeCommittee::new(authorities.clone()).unwrap();
1138 let agg = Arc::new(ArcSwap::new(Arc::new(
1139 BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1140 )));
1141 let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1142 let _handle = tokio::task::spawn(
1143 BridgeMonitor::new(
1144 sui_client.clone(),
1145 sui_monitor_rx,
1146 eth_monitor_rx,
1147 agg.clone(),
1148 bridge_pause_tx,
1149 sui_token_type_tags,
1150 bridge_metrics,
1151 )
1152 .run(),
1153 );
1154
1155 sui_client_mock.set_is_bridge_paused(event.frozen);
1156 sui_monitor_tx
1157 .send(SuiBridgeEvent::EmergencyOpEvent(event.clone()))
1158 .await
1159 .unwrap();
1160 tokio::time::sleep(Duration::from_secs(1)).await;
1162 assert!(*bridge_pause_rx.borrow() == event.frozen);
1164 }
1165
1166 #[tokio::test]
1167 async fn test_update_sui_token_type_tags() {
1168 let (
1169 sui_monitor_tx,
1170 sui_monitor_rx,
1171 _eth_monitor_tx,
1172 eth_monitor_rx,
1173 _sui_client_mock,
1174 sui_client,
1175 bridge_pause_tx,
1176 _bridge_pause_rx,
1177 authorities,
1178 bridge_metrics,
1179 ) = setup();
1180 let event = NewTokenEvent {
1181 token_id: 255,
1182 type_name: TypeTag::from_str("0xbeef::beef::BEEF").unwrap(),
1183 native_token: false,
1184 decimal_multiplier: 1000000,
1185 notional_value: 100000000,
1186 };
1187 let committee = BridgeCommittee::new(authorities.clone()).unwrap();
1188 let agg = Arc::new(ArcSwap::new(Arc::new(
1189 BridgeAuthorityAggregator::new_for_testing(Arc::new(committee)),
1190 )));
1191 let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(HashMap::new())));
1192 let sui_token_type_tags_clone = sui_token_type_tags.clone();
1193 let _handle = tokio::task::spawn(
1194 BridgeMonitor::new(
1195 sui_client.clone(),
1196 sui_monitor_rx,
1197 eth_monitor_rx,
1198 agg.clone(),
1199 bridge_pause_tx,
1200 sui_token_type_tags_clone,
1201 bridge_metrics,
1202 )
1203 .run(),
1204 );
1205
1206 sui_monitor_tx
1207 .send(SuiBridgeEvent::NewTokenEvent(event.clone()))
1208 .await
1209 .unwrap();
1210 tokio::time::sleep(Duration::from_secs(1)).await;
1212 sui_token_type_tags
1214 .load()
1215 .clone()
1216 .get(&event.token_id)
1217 .unwrap();
1218 }
1219
1220 #[allow(clippy::type_complexity)]
1221 fn setup() -> (
1222 mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
1223 mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
1224 mysten_metrics::metered_channel::Sender<EthBridgeEvent>,
1225 mysten_metrics::metered_channel::Receiver<EthBridgeEvent>,
1226 SuiMockClient,
1227 Arc<SuiClient<SuiMockClient>>,
1228 tokio::sync::watch::Sender<IsBridgePaused>,
1229 tokio::sync::watch::Receiver<IsBridgePaused>,
1230 Vec<BridgeAuthority>,
1231 Arc<BridgeMetrics>,
1232 ) {
1233 telemetry_subscribers::init_for_testing();
1234 let registry = Registry::new();
1235 mysten_metrics::init_metrics(®istry);
1236 init_all_struct_tags();
1237 let bridge_metrics = Arc::new(BridgeMetrics::new_for_testing());
1238
1239 let sui_client_mock = SuiMockClient::default();
1240 let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
1241 let (sui_monitor_tx, sui_monitor_rx) = mysten_metrics::metered_channel::channel(
1242 10000,
1243 &mysten_metrics::get_metrics()
1244 .unwrap()
1245 .channel_inflight
1246 .with_label_values(&["sui_monitor_queue"]),
1247 );
1248 let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
1249 10000,
1250 &mysten_metrics::get_metrics()
1251 .unwrap()
1252 .channel_inflight
1253 .with_label_values(&["eth_monitor_queue"]),
1254 );
1255
1256 let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(false);
1257 let authorities = vec![
1258 get_test_authority_and_key(2500, 0 ).0,
1259 get_test_authority_and_key(2500, 0 ).0,
1260 get_test_authority_and_key(2500, 0 ).0,
1261 get_test_authority_and_key(2500, 0 ).0,
1262 ];
1263 (
1264 sui_monitor_tx,
1265 sui_monitor_rx,
1266 eth_monitor_tx,
1267 eth_monitor_rx,
1268 sui_client_mock,
1269 sui_client,
1270 bridge_pause_tx,
1271 bridge_pause_rx,
1272 authorities,
1273 bridge_metrics,
1274 )
1275 }
1276}