1use anyhow::{Context, Result, bail};
4use fastcrypto::ed25519::Ed25519PublicKey;
5use fastcrypto::encoding::Base64;
6use fastcrypto::encoding::Encoding;
7use fastcrypto::traits::ToFromBytes;
8use futures::stream::{self, StreamExt};
9use once_cell::sync::Lazy;
10use prometheus::{CounterVec, HistogramVec};
11use prometheus::{register_counter_vec, register_histogram_vec};
12use serde::Deserialize;
13use std::collections::BTreeMap;
14use std::{
15 collections::HashMap,
16 sync::{Arc, RwLock},
17 time::Duration,
18};
19use sui_tls::Allower;
20use sui_types::base_types::SuiAddress;
21use sui_types::bridge::BridgeSummary;
22use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary;
23use tracing::{debug, error, info, warn};
24use url::Url;
25
26static JSON_RPC_STATE: Lazy<CounterVec> = Lazy::new(|| {
27 register_counter_vec!(
28 "json_rpc_state",
29 "Number of successful/failed requests made.",
30 &["rpc_method", "status"]
31 )
32 .unwrap()
33});
34static JSON_RPC_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
35 register_histogram_vec!(
36 "json_rpc_duration_seconds",
37 "The json-rpc latencies in seconds.",
38 &["rpc_method"],
39 vec![
40 0.0008, 0.0016, 0.0032, 0.0064, 0.0128, 0.0256, 0.0512, 0.1024, 0.2048, 0.4096, 0.8192,
41 1.0, 1.25, 1.5, 1.75, 2.0, 4.0, 8.0
42 ],
43 )
44 .unwrap()
45});
46
47pub type AllowedPeers = Arc<RwLock<HashMap<Ed25519PublicKey, AllowedPeer>>>;
49
50type MetricsPubKeys = Arc<RwLock<HashMap<String, Ed25519PublicKey>>>;
51
52#[derive(Hash, PartialEq, Eq, Debug, Clone)]
53pub struct AllowedPeer {
54 pub name: String,
55 pub public_key: Ed25519PublicKey,
56}
57
58#[derive(Debug, Clone)]
63pub struct SuiNodeProvider {
64 sui_nodes: AllowedPeers,
65 bridge_nodes: AllowedPeers,
66 static_nodes: AllowedPeers,
67 rpc_url: String,
68 rpc_poll_interval: Duration,
69}
70
71impl Allower for SuiNodeProvider {
72 fn allowed(&self, key: &Ed25519PublicKey) -> bool {
73 self.static_nodes.read().unwrap().contains_key(key)
74 || self.sui_nodes.read().unwrap().contains_key(key)
75 || self.bridge_nodes.read().unwrap().contains_key(key)
76 }
77}
78
79impl SuiNodeProvider {
80 pub fn new(
81 rpc_url: String,
82 rpc_poll_interval: Duration,
83 static_peers: Vec<AllowedPeer>,
84 ) -> Self {
85 let static_nodes: HashMap<Ed25519PublicKey, AllowedPeer> = static_peers
87 .into_iter()
88 .map(|v| (v.public_key.clone(), v))
89 .collect();
90 let static_nodes = Arc::new(RwLock::new(static_nodes));
91 let sui_nodes = Arc::new(RwLock::new(HashMap::new()));
92 let bridge_nodes = Arc::new(RwLock::new(HashMap::new()));
93 Self {
94 sui_nodes,
95 bridge_nodes,
96 static_nodes,
97 rpc_url,
98 rpc_poll_interval,
99 }
100 }
101
102 pub fn get(&self, key: &Ed25519PublicKey) -> Option<AllowedPeer> {
104 debug!("look for {:?}", key);
105 if let Some(v) = self.static_nodes.read().unwrap().get(key) {
107 return Some(AllowedPeer {
108 name: v.name.to_owned(),
109 public_key: v.public_key.to_owned(),
110 });
111 }
112 if let Some(v) = self.sui_nodes.read().unwrap().get(key) {
114 return Some(AllowedPeer {
115 name: v.name.to_owned(),
116 public_key: v.public_key.to_owned(),
117 });
118 }
119 if let Some(v) = self.bridge_nodes.read().unwrap().get(key) {
121 return Some(AllowedPeer {
122 name: v.name.to_owned(),
123 public_key: v.public_key.to_owned(),
124 });
125 }
126 None
127 }
128
129 pub fn get_sui_mut(&mut self) -> &mut AllowedPeers {
131 &mut self.sui_nodes
132 }
133
134 async fn get_validators(url: String) -> Result<SuiSystemStateSummary> {
136 let rpc_method = "suix_getLatestSuiSystemState";
137 let observe = || {
138 let timer = JSON_RPC_DURATION
139 .with_label_values(&[rpc_method])
140 .start_timer();
141 || {
142 timer.observe_duration();
143 }
144 }();
145 let client = reqwest::Client::builder().build().unwrap();
146 let request = serde_json::json!({
147 "jsonrpc": "2.0",
148 "method":rpc_method,
149 "id":1,
150 });
151 let response = client
152 .post(url)
153 .header(reqwest::header::CONTENT_TYPE, "application/json")
154 .body(request.to_string())
155 .send()
156 .await
157 .with_context(|| {
158 JSON_RPC_STATE
159 .with_label_values(&[rpc_method, "failed_get"])
160 .inc();
161 observe();
162 "unable to perform json rpc"
163 })?;
164
165 let raw = response.bytes().await.with_context(|| {
166 JSON_RPC_STATE
167 .with_label_values(&[rpc_method, "failed_body_extract"])
168 .inc();
169 observe();
170 "unable to extract body bytes from json rpc"
171 })?;
172
173 #[derive(Debug, Deserialize)]
174 struct ResponseBody {
175 result: SuiSystemStateSummary,
176 }
177
178 let body: ResponseBody = match serde_json::from_slice(&raw) {
179 Ok(b) => b,
180 Err(error) => {
181 JSON_RPC_STATE
182 .with_label_values(&[rpc_method, "failed_json_decode"])
183 .inc();
184 observe();
185 bail!(
186 "unable to decode json: {error} response from json rpc: {:?}",
187 raw
188 )
189 }
190 };
191 JSON_RPC_STATE
192 .with_label_values(&[rpc_method, "success"])
193 .inc();
194 observe();
195 Ok(body.result)
196 }
197
198 async fn get_bridge_validators(url: String) -> Result<BridgeSummary> {
200 let rpc_method = "suix_getLatestBridge";
201 let _timer = JSON_RPC_DURATION
202 .with_label_values(&[rpc_method])
203 .start_timer();
204 let client = reqwest::Client::builder().build().unwrap();
205 let request = serde_json::json!({
206 "jsonrpc": "2.0",
207 "method":rpc_method,
208 "id":1,
209 });
210 let response = client
211 .post(url)
212 .header(reqwest::header::CONTENT_TYPE, "application/json")
213 .body(request.to_string())
214 .send()
215 .await
216 .with_context(|| {
217 JSON_RPC_STATE
218 .with_label_values(&[rpc_method, "failed_get"])
219 .inc();
220 "unable to perform json rpc"
221 })?;
222
223 let raw = response.bytes().await.with_context(|| {
224 JSON_RPC_STATE
225 .with_label_values(&[rpc_method, "failed_body_extract"])
226 .inc();
227 "unable to extract body bytes from json rpc"
228 })?;
229
230 #[derive(Debug, Deserialize)]
231 struct ResponseBody {
232 result: BridgeSummary,
233 }
234 let summary: BridgeSummary = match serde_json::from_slice::<ResponseBody>(&raw) {
235 Ok(b) => b.result,
236 Err(error) => {
237 JSON_RPC_STATE
238 .with_label_values(&[rpc_method, "failed_json_decode"])
239 .inc();
240 bail!(
241 "unable to decode json: {error} response from json rpc: {:?}",
242 raw
243 )
244 }
245 };
246 JSON_RPC_STATE
247 .with_label_values(&[rpc_method, "success"])
248 .inc();
249 Ok(summary)
250 }
251
252 async fn update_sui_validator_set(&self) {
253 match Self::get_validators(self.rpc_url.to_owned()).await {
254 Ok(summary) => {
255 let validators = extract(summary);
256 let mut allow = self.sui_nodes.write().unwrap();
257 allow.clear();
258 allow.extend(validators);
259 info!(
260 "{} sui validators managed to make it on the allow list",
261 allow.len()
262 );
263 }
264 Err(error) => {
265 JSON_RPC_STATE
266 .with_label_values(&["update_peer_count", "failed"])
267 .inc();
268 error!("unable to refresh peer list: {error}");
269 }
270 };
271 }
272
273 async fn update_bridge_validator_set(&self, metrics_keys: MetricsPubKeys) {
274 let sui_system = match Self::get_validators(self.rpc_url.to_owned()).await {
275 Ok(summary) => summary,
276 Err(error) => {
277 JSON_RPC_STATE
278 .with_label_values(&["update_bridge_peer_count", "failed"])
279 .inc();
280 error!("unable to get sui system state: {error}");
281 return;
282 }
283 };
284 match Self::get_bridge_validators(self.rpc_url.to_owned()).await {
285 Ok(summary) => {
286 let names = sui_system
287 .active_validators
288 .into_iter()
289 .map(|v| (v.sui_address, v.name))
290 .collect();
291 let validators = extract_bridge(summary, Arc::new(names), metrics_keys).await;
292 let mut allow = self.bridge_nodes.write().unwrap();
293 allow.clear();
294 allow.extend(validators);
295 info!(
296 "{} bridge validators managed to make it on the allow list",
297 allow.len()
298 );
299 }
300 Err(error) => {
301 JSON_RPC_STATE
302 .with_label_values(&["update_bridge_peer_count", "failed"])
303 .inc();
304 error!("unable to refresh sui bridge peer list: {error}");
305 }
306 };
307 }
308
309 pub fn poll_peer_list(&self) {
311 info!("Started polling for peers using rpc: {}", self.rpc_url);
312
313 let rpc_poll_interval = self.rpc_poll_interval;
314 let cloned_self = self.clone();
315 let bridge_metrics_keys: MetricsPubKeys = Arc::new(RwLock::new(HashMap::new()));
316 tokio::spawn(async move {
317 let mut interval = tokio::time::interval(rpc_poll_interval);
318 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
319
320 loop {
321 interval.tick().await;
322
323 cloned_self.update_sui_validator_set().await;
324 cloned_self
325 .update_bridge_validator_set(bridge_metrics_keys.clone())
326 .await;
327 }
328 });
329 }
330}
331
332fn extract(
336 summary: SuiSystemStateSummary,
337) -> impl Iterator<Item = (Ed25519PublicKey, AllowedPeer)> {
338 summary.active_validators.into_iter().filter_map(|vm| {
339 match Ed25519PublicKey::from_bytes(&vm.network_pubkey_bytes) {
340 Ok(public_key) => {
341 debug!(
342 "adding public key {:?} for sui validator {:?}",
343 public_key, vm.name
344 );
345 Some((
346 public_key.clone(),
347 AllowedPeer {
348 name: vm.name,
349 public_key,
350 },
351 )) }
353 Err(error) => {
354 error!(
355 "unable to decode public key for name: {:?} sui_address: {:?} error: {error}",
356 vm.name, vm.sui_address
357 );
358 None }
360 }
361 })
362}
363
364async fn extract_bridge(
365 summary: BridgeSummary,
366 names: Arc<BTreeMap<SuiAddress, String>>,
367 metrics_keys: MetricsPubKeys,
368) -> Vec<(Ed25519PublicKey, AllowedPeer)> {
369 {
370 let mut metrics_keys_write = metrics_keys.write().unwrap();
372 metrics_keys_write.retain(|url, _| {
373 summary.committee.members.iter().any(|(_, cm)| {
374 String::from_utf8(cm.http_rest_url.clone()).ok().as_ref() == Some(url)
375 })
376 });
377 }
378
379 let client = reqwest::Client::builder()
380 .timeout(Duration::from_secs(10))
381 .build()
382 .unwrap();
383 let committee_members = summary.committee.members.clone();
384 let results: Vec<_> = stream::iter(committee_members)
385 .filter_map(|(_, cm)| {
386 let client = client.clone();
387 let metrics_keys = metrics_keys.clone();
388 let names = names.clone();
389 async move {
390 debug!(
391 address =% cm.sui_address,
392 "Extracting metrics public key for bridge node",
393 );
394
395 let url_str = match String::from_utf8(cm.http_rest_url) {
397 Ok(url) => url,
398 Err(_) => {
399 warn!(
400 address =% cm.sui_address,
401 "Invalid UTF-8 sequence in http_rest_url for bridge node ",
402 );
403 return None;
404 }
405 };
406 let bridge_url = match Url::parse(&url_str) {
408 Ok(url) => url,
409 Err(_) => {
410 warn!(url_str, "Unable to parse http_rest_url");
411 return None;
412 }
413 };
414
415 let bridge_url = match append_path_segment(bridge_url, "metrics_pub_key") {
417 Some(url) => url,
418 None => {
419 warn!(url_str, "Unable to append path segment to URL");
420 return None;
421 }
422 };
423
424 let bridge_host = match bridge_url.host_str() {
426 Some(host) => host,
427 None => {
428 warn!(url_str, "Hostname is missing from http_rest_url");
429 return None;
430 }
431 };
432 let bridge_name = names.get(&cm.sui_address).cloned().unwrap_or_else(|| {
433 warn!(
434 address =% cm.sui_address,
435 "Bridge node not found in sui committee, using base URL as the name",
436 );
437 String::from(bridge_host)
438 });
439 let bridge_name = format!("bridge-{}", bridge_name);
440
441 let bridge_request_url = bridge_url.as_str();
442
443 let metrics_pub_key = match client.get(bridge_request_url).send().await {
444 Ok(response) => {
445 let raw = response.bytes().await.ok()?;
446 let metrics_pub_key: String = match serde_json::from_slice(&raw) {
447 Ok(key) => key,
448 Err(error) => {
449 warn!(?error, url_str, "Failed to deserialize response");
450 return fallback_to_cached_key(
451 &metrics_keys,
452 &url_str,
453 &bridge_name,
454 );
455 }
456 };
457 let metrics_bytes = match Base64::decode(&metrics_pub_key) {
458 Ok(pubkey_bytes) => pubkey_bytes,
459 Err(error) => {
460 warn!(
461 ?error,
462 bridge_name, "unable to decode public key for bridge node",
463 );
464 return None;
465 }
466 };
467 match Ed25519PublicKey::from_bytes(&metrics_bytes) {
468 Ok(pubkey) => {
469 let mut metrics_keys_write = metrics_keys.write().unwrap();
471 metrics_keys_write.insert(url_str.clone(), pubkey.clone());
472 debug!(
473 url_str,
474 public_key = ?pubkey,
475 "Successfully added bridge peer to metrics_keys"
476 );
477 pubkey
478 }
479 Err(error) => {
480 warn!(
481 ?error,
482 bridge_request_url,
483 "unable to decode public key for bridge node",
484 );
485 return None;
486 }
487 }
488 }
489 Err(_) => {
490 return fallback_to_cached_key(&metrics_keys, &url_str, &bridge_name);
491 }
492 };
493 Some((
494 metrics_pub_key.clone(),
495 AllowedPeer {
496 public_key: metrics_pub_key,
497 name: bridge_name,
498 },
499 ))
500 }
501 })
502 .collect()
503 .await;
504
505 results
506}
507
508fn fallback_to_cached_key(
509 metrics_keys: &MetricsPubKeys,
510 url_str: &str,
511 bridge_name: &str,
512) -> Option<(Ed25519PublicKey, AllowedPeer)> {
513 let metrics_keys_read = metrics_keys.read().unwrap();
514 if let Some(cached_key) = metrics_keys_read.get(url_str) {
515 debug!(
516 url_str,
517 "Using cached metrics public key after request failure"
518 );
519 Some((
520 cached_key.clone(),
521 AllowedPeer {
522 public_key: cached_key.clone(),
523 name: bridge_name.to_string(),
524 },
525 ))
526 } else {
527 warn!(
528 url_str,
529 "Failed to fetch public key and no cached key available"
530 );
531 None
532 }
533}
534
535fn append_path_segment(mut url: Url, segment: &str) -> Option<Url> {
536 url.path_segments_mut().ok()?.pop_if_empty().push(segment);
537 Some(url)
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543 use crate::admin::{CertKeyPair, generate_self_cert};
544 use serde::Serialize;
545 use sui_types::base_types::SuiAddress;
546 use sui_types::bridge::{BridgeCommitteeSummary, BridgeSummary, MoveTypeCommitteeMember};
547 use sui_types::sui_system_state::sui_system_state_summary::{
548 SuiSystemStateSummary, SuiValidatorSummary,
549 };
550
551 #[test]
555 fn depend_on_sui_sui_system_state_summary() {
556 let CertKeyPair(_, client_pub_key) = generate_self_cert("sui".into());
557 let depends_on = SuiSystemStateSummary {
560 active_validators: vec![SuiValidatorSummary {
561 network_pubkey_bytes: Vec::from(client_pub_key.as_bytes()),
562 primary_address: "empty".into(),
563 worker_address: "empty".into(),
564 ..Default::default()
565 }],
566 ..Default::default()
567 };
568
569 #[derive(Debug, Serialize, Deserialize)]
570 struct ResponseBody {
571 result: SuiSystemStateSummary,
572 }
573
574 let r = serde_json::to_string(&ResponseBody { result: depends_on })
575 .expect("expected to serialize ResponseBody{SuiSystemStateSummary}");
576
577 let deserialized = serde_json::from_str::<ResponseBody>(&r)
578 .expect("expected to deserialize ResponseBody{SuiSystemStateSummary}");
579
580 let peers = extract(deserialized.result);
581 assert_eq!(peers.count(), 1, "peers should have been a length of 1");
582 }
583
584 #[tokio::test]
585 async fn test_extract_bridge_invalid_bridge_url() {
586 let summary = BridgeSummary {
587 committee: BridgeCommitteeSummary {
588 members: vec![(
589 vec![],
590 MoveTypeCommitteeMember {
591 sui_address: SuiAddress::ZERO,
592 http_rest_url: "invalid_bridge_url".as_bytes().to_vec(),
593 ..Default::default()
594 },
595 )],
596 ..Default::default()
597 },
598 ..Default::default()
599 };
600
601 let metrics_keys = Arc::new(RwLock::new(HashMap::new()));
602 {
603 let mut cache = metrics_keys.write().unwrap();
604 cache.insert(
605 "invalid_bridge_url".to_string(),
606 Ed25519PublicKey::from_bytes(&[1u8; 32]).unwrap(),
607 );
608 }
609 let result = extract_bridge(summary, Arc::new(BTreeMap::new()), metrics_keys.clone()).await;
610
611 assert_eq!(
612 result.len(),
613 0,
614 "Should not fall back on cache if invalid bridge url is set"
615 );
616 }
617
618 #[tokio::test]
619 async fn test_extract_bridge_interrupted_response() {
620 let summary = BridgeSummary {
621 committee: BridgeCommitteeSummary {
622 members: vec![(
623 vec![],
624 MoveTypeCommitteeMember {
625 sui_address: SuiAddress::ZERO,
626 http_rest_url: "https://unresponsive_bridge_url".as_bytes().to_vec(),
627 ..Default::default()
628 },
629 )],
630 ..Default::default()
631 },
632 ..Default::default()
633 };
634
635 let metrics_keys = Arc::new(RwLock::new(HashMap::new()));
636 {
637 let mut cache = metrics_keys.write().unwrap();
638 cache.insert(
639 "https://unresponsive_bridge_url".to_string(),
640 Ed25519PublicKey::from_bytes(&[1u8; 32]).unwrap(),
641 );
642 }
643 let result = extract_bridge(summary, Arc::new(BTreeMap::new()), metrics_keys.clone()).await;
644
645 assert_eq!(
646 result.len(),
647 1,
648 "Should fall back on cache if invalid response occurs"
649 );
650 let allowed_peer = &result[0].1;
651 assert_eq!(
652 allowed_peer.public_key.as_bytes(),
653 &[1u8; 32],
654 "Should fall back to the cached public key"
655 );
656
657 let cache = metrics_keys.read().unwrap();
658 assert!(
659 cache.contains_key("https://unresponsive_bridge_url"),
660 "Cache should still contain the original key"
661 );
662 }
663
664 #[test]
665 fn test_append_path_segment() {
666 let test_cases = vec![
667 (
668 "https://example.com",
669 "metrics_pub_key",
670 "https://example.com/metrics_pub_key",
671 ),
672 (
673 "https://example.com/api",
674 "metrics_pub_key",
675 "https://example.com/api/metrics_pub_key",
676 ),
677 (
678 "https://example.com/",
679 "metrics_pub_key",
680 "https://example.com/metrics_pub_key",
681 ),
682 (
683 "https://example.com/api/",
684 "metrics_pub_key",
685 "https://example.com/api/metrics_pub_key",
686 ),
687 (
688 "https://example.com:8080",
689 "metrics_pub_key",
690 "https://example.com:8080/metrics_pub_key",
691 ),
692 (
693 "https://example.com?param=value",
694 "metrics_pub_key",
695 "https://example.com/metrics_pub_key?param=value",
696 ),
697 (
698 "https://example.com:8080/api/v1?param=value",
699 "metrics_pub_key",
700 "https://example.com:8080/api/v1/metrics_pub_key?param=value",
701 ),
702 ];
703
704 for (input_url, segment, expected_output) in test_cases {
705 let url = Url::parse(input_url).unwrap();
706 let result = append_path_segment(url, segment);
707 assert!(
708 result.is_some(),
709 "Failed to append segment for URL: {}",
710 input_url
711 );
712 let result_url = result.unwrap();
713 assert_eq!(
714 result_url.as_str(),
715 expected_output,
716 "Unexpected result for input URL: {}",
717 input_url
718 );
719 }
720 }
721}