sui_proxy/
peers.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use anyhow::{Context, Result, bail};
4use fastcrypto::ed25519::Ed25519PublicKey;
5use fastcrypto::encoding::Base64;
6use fastcrypto::encoding::Encoding;
7use fastcrypto::traits::ToFromBytes;
8use futures::stream::{self, StreamExt};
9use once_cell::sync::Lazy;
10use prometheus::{CounterVec, HistogramVec};
11use prometheus::{register_counter_vec, register_histogram_vec};
12use serde::Deserialize;
13use std::collections::BTreeMap;
14use std::{
15    collections::HashMap,
16    sync::{Arc, RwLock},
17    time::Duration,
18};
19use sui_tls::Allower;
20use sui_types::base_types::SuiAddress;
21use sui_types::bridge::BridgeSummary;
22use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary;
23use tracing::{debug, error, info, warn};
24use url::Url;
25
26static JSON_RPC_STATE: Lazy<CounterVec> = Lazy::new(|| {
27    register_counter_vec!(
28        "json_rpc_state",
29        "Number of successful/failed requests made.",
30        &["rpc_method", "status"]
31    )
32    .unwrap()
33});
34static JSON_RPC_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
35    register_histogram_vec!(
36        "json_rpc_duration_seconds",
37        "The json-rpc latencies in seconds.",
38        &["rpc_method"],
39        vec![
40            0.0008, 0.0016, 0.0032, 0.0064, 0.0128, 0.0256, 0.0512, 0.1024, 0.2048, 0.4096, 0.8192,
41            1.0, 1.25, 1.5, 1.75, 2.0, 4.0, 8.0
42        ],
43    )
44    .unwrap()
45});
46
47/// AllowedPeers is a mapping of public key to AllowedPeer data
48pub type AllowedPeers = Arc<RwLock<HashMap<Ed25519PublicKey, AllowedPeer>>>;
49
50type MetricsPubKeys = Arc<RwLock<HashMap<String, Ed25519PublicKey>>>;
51
52#[derive(Hash, PartialEq, Eq, Debug, Clone)]
53pub struct AllowedPeer {
54    pub name: String,
55    pub public_key: Ed25519PublicKey,
56}
57
58/// SuiNodeProvider queries the sui blockchain and keeps a record of known validators based on the response from
59/// sui_getValidators.  The node name, public key and other info is extracted from the chain and stored in this
60/// data structure.  We pass this struct to the tls verifier and it depends on the state contained within.
61/// Handlers also use this data in an Extractor extension to check incoming clients on the http api against known keys.
62#[derive(Debug, Clone)]
63pub struct SuiNodeProvider {
64    sui_nodes: AllowedPeers,
65    bridge_nodes: AllowedPeers,
66    static_nodes: AllowedPeers,
67    rpc_url: String,
68    rpc_poll_interval: Duration,
69}
70
71impl Allower for SuiNodeProvider {
72    fn allowed(&self, key: &Ed25519PublicKey) -> bool {
73        self.static_nodes.read().unwrap().contains_key(key)
74            || self.sui_nodes.read().unwrap().contains_key(key)
75            || self.bridge_nodes.read().unwrap().contains_key(key)
76    }
77}
78
79impl SuiNodeProvider {
80    pub fn new(
81        rpc_url: String,
82        rpc_poll_interval: Duration,
83        static_peers: Vec<AllowedPeer>,
84    ) -> Self {
85        // build our hashmap with the static pub keys. we only do this one time at binary startup.
86        let static_nodes: HashMap<Ed25519PublicKey, AllowedPeer> = static_peers
87            .into_iter()
88            .map(|v| (v.public_key.clone(), v))
89            .collect();
90        let static_nodes = Arc::new(RwLock::new(static_nodes));
91        let sui_nodes = Arc::new(RwLock::new(HashMap::new()));
92        let bridge_nodes = Arc::new(RwLock::new(HashMap::new()));
93        Self {
94            sui_nodes,
95            bridge_nodes,
96            static_nodes,
97            rpc_url,
98            rpc_poll_interval,
99        }
100    }
101
102    /// get is used to retrieve peer info in our handlers
103    pub fn get(&self, key: &Ed25519PublicKey) -> Option<AllowedPeer> {
104        debug!("look for {:?}", key);
105        // check static nodes first
106        if let Some(v) = self.static_nodes.read().unwrap().get(key) {
107            return Some(AllowedPeer {
108                name: v.name.to_owned(),
109                public_key: v.public_key.to_owned(),
110            });
111        }
112        // check sui validators
113        if let Some(v) = self.sui_nodes.read().unwrap().get(key) {
114            return Some(AllowedPeer {
115                name: v.name.to_owned(),
116                public_key: v.public_key.to_owned(),
117            });
118        }
119        // check bridge validators
120        if let Some(v) = self.bridge_nodes.read().unwrap().get(key) {
121            return Some(AllowedPeer {
122                name: v.name.to_owned(),
123                public_key: v.public_key.to_owned(),
124            });
125        }
126        None
127    }
128
129    /// Get a mutable reference to the allowed sui validator map
130    pub fn get_sui_mut(&mut self) -> &mut AllowedPeers {
131        &mut self.sui_nodes
132    }
133
134    /// get_validators will retrieve known validators
135    async fn get_validators(url: String) -> Result<SuiSystemStateSummary> {
136        let rpc_method = "suix_getLatestSuiSystemState";
137        let observe = || {
138            let timer = JSON_RPC_DURATION
139                .with_label_values(&[rpc_method])
140                .start_timer();
141            || {
142                timer.observe_duration();
143            }
144        }();
145        let client = reqwest::Client::builder().build().unwrap();
146        let request = serde_json::json!({
147            "jsonrpc": "2.0",
148            "method":rpc_method,
149            "id":1,
150        });
151        let response = client
152            .post(url)
153            .header(reqwest::header::CONTENT_TYPE, "application/json")
154            .body(request.to_string())
155            .send()
156            .await
157            .with_context(|| {
158                JSON_RPC_STATE
159                    .with_label_values(&[rpc_method, "failed_get"])
160                    .inc();
161                observe();
162                "unable to perform json rpc"
163            })?;
164
165        let raw = response.bytes().await.with_context(|| {
166            JSON_RPC_STATE
167                .with_label_values(&[rpc_method, "failed_body_extract"])
168                .inc();
169            observe();
170            "unable to extract body bytes from json rpc"
171        })?;
172
173        #[derive(Debug, Deserialize)]
174        struct ResponseBody {
175            result: SuiSystemStateSummary,
176        }
177
178        let body: ResponseBody = match serde_json::from_slice(&raw) {
179            Ok(b) => b,
180            Err(error) => {
181                JSON_RPC_STATE
182                    .with_label_values(&[rpc_method, "failed_json_decode"])
183                    .inc();
184                observe();
185                bail!(
186                    "unable to decode json: {error} response from json rpc: {:?}",
187                    raw
188                )
189            }
190        };
191        JSON_RPC_STATE
192            .with_label_values(&[rpc_method, "success"])
193            .inc();
194        observe();
195        Ok(body.result)
196    }
197
198    /// get_bridge_validators will retrieve known bridge validators
199    async fn get_bridge_validators(url: String) -> Result<BridgeSummary> {
200        let rpc_method = "suix_getLatestBridge";
201        let _timer = JSON_RPC_DURATION
202            .with_label_values(&[rpc_method])
203            .start_timer();
204        let client = reqwest::Client::builder().build().unwrap();
205        let request = serde_json::json!({
206            "jsonrpc": "2.0",
207            "method":rpc_method,
208            "id":1,
209        });
210        let response = client
211            .post(url)
212            .header(reqwest::header::CONTENT_TYPE, "application/json")
213            .body(request.to_string())
214            .send()
215            .await
216            .with_context(|| {
217                JSON_RPC_STATE
218                    .with_label_values(&[rpc_method, "failed_get"])
219                    .inc();
220                "unable to perform json rpc"
221            })?;
222
223        let raw = response.bytes().await.with_context(|| {
224            JSON_RPC_STATE
225                .with_label_values(&[rpc_method, "failed_body_extract"])
226                .inc();
227            "unable to extract body bytes from json rpc"
228        })?;
229
230        #[derive(Debug, Deserialize)]
231        struct ResponseBody {
232            result: BridgeSummary,
233        }
234        let summary: BridgeSummary = match serde_json::from_slice::<ResponseBody>(&raw) {
235            Ok(b) => b.result,
236            Err(error) => {
237                JSON_RPC_STATE
238                    .with_label_values(&[rpc_method, "failed_json_decode"])
239                    .inc();
240                bail!(
241                    "unable to decode json: {error} response from json rpc: {:?}",
242                    raw
243                )
244            }
245        };
246        JSON_RPC_STATE
247            .with_label_values(&[rpc_method, "success"])
248            .inc();
249        Ok(summary)
250    }
251
252    async fn update_sui_validator_set(&self) {
253        match Self::get_validators(self.rpc_url.to_owned()).await {
254            Ok(summary) => {
255                let validators = extract(summary);
256                let mut allow = self.sui_nodes.write().unwrap();
257                allow.clear();
258                allow.extend(validators);
259                info!(
260                    "{} sui validators managed to make it on the allow list",
261                    allow.len()
262                );
263            }
264            Err(error) => {
265                JSON_RPC_STATE
266                    .with_label_values(&["update_peer_count", "failed"])
267                    .inc();
268                error!("unable to refresh peer list: {error}");
269            }
270        };
271    }
272
273    async fn update_bridge_validator_set(&self, metrics_keys: MetricsPubKeys) {
274        let sui_system = match Self::get_validators(self.rpc_url.to_owned()).await {
275            Ok(summary) => summary,
276            Err(error) => {
277                JSON_RPC_STATE
278                    .with_label_values(&["update_bridge_peer_count", "failed"])
279                    .inc();
280                error!("unable to get sui system state: {error}");
281                return;
282            }
283        };
284        match Self::get_bridge_validators(self.rpc_url.to_owned()).await {
285            Ok(summary) => {
286                let names = sui_system
287                    .active_validators
288                    .into_iter()
289                    .map(|v| (v.sui_address, v.name))
290                    .collect();
291                let validators = extract_bridge(summary, Arc::new(names), metrics_keys).await;
292                let mut allow = self.bridge_nodes.write().unwrap();
293                allow.clear();
294                allow.extend(validators);
295                info!(
296                    "{} bridge validators managed to make it on the allow list",
297                    allow.len()
298                );
299            }
300            Err(error) => {
301                JSON_RPC_STATE
302                    .with_label_values(&["update_bridge_peer_count", "failed"])
303                    .inc();
304                error!("unable to refresh sui bridge peer list: {error}");
305            }
306        };
307    }
308
309    /// poll_peer_list will act as a refresh interval for our cache
310    pub fn poll_peer_list(&self) {
311        info!("Started polling for peers using rpc: {}", self.rpc_url);
312
313        let rpc_poll_interval = self.rpc_poll_interval;
314        let cloned_self = self.clone();
315        let bridge_metrics_keys: MetricsPubKeys = Arc::new(RwLock::new(HashMap::new()));
316        tokio::spawn(async move {
317            let mut interval = tokio::time::interval(rpc_poll_interval);
318            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
319
320            loop {
321                interval.tick().await;
322
323                cloned_self.update_sui_validator_set().await;
324                cloned_self
325                    .update_bridge_validator_set(bridge_metrics_keys.clone())
326                    .await;
327            }
328        });
329    }
330}
331
332/// extract will get the network pubkey bytes from a SuiValidatorSummary type.  This type comes from a
333/// full node rpc result.  See get_validators for details.  The key here, if extracted successfully, will
334/// ultimately be stored in the allow list and let us communicate with those actual peers via tls.
335fn extract(
336    summary: SuiSystemStateSummary,
337) -> impl Iterator<Item = (Ed25519PublicKey, AllowedPeer)> {
338    summary.active_validators.into_iter().filter_map(|vm| {
339        match Ed25519PublicKey::from_bytes(&vm.network_pubkey_bytes) {
340            Ok(public_key) => {
341                debug!(
342                    "adding public key {:?} for sui validator {:?}",
343                    public_key, vm.name
344                );
345                Some((
346                    public_key.clone(),
347                    AllowedPeer {
348                        name: vm.name,
349                        public_key,
350                    },
351                )) // scoped to filter_map
352            }
353            Err(error) => {
354                error!(
355                    "unable to decode public key for name: {:?} sui_address: {:?} error: {error}",
356                    vm.name, vm.sui_address
357                );
358                None // scoped to filter_map
359            }
360        }
361    })
362}
363
364async fn extract_bridge(
365    summary: BridgeSummary,
366    names: Arc<BTreeMap<SuiAddress, String>>,
367    metrics_keys: MetricsPubKeys,
368) -> Vec<(Ed25519PublicKey, AllowedPeer)> {
369    {
370        // Clean up the cache: retain only the metrics keys of the up-to-date bridge validator set
371        let mut metrics_keys_write = metrics_keys.write().unwrap();
372        metrics_keys_write.retain(|url, _| {
373            summary.committee.members.iter().any(|(_, cm)| {
374                String::from_utf8(cm.http_rest_url.clone()).ok().as_ref() == Some(url)
375            })
376        });
377    }
378
379    let client = reqwest::Client::builder()
380        .timeout(Duration::from_secs(10))
381        .build()
382        .unwrap();
383    let committee_members = summary.committee.members.clone();
384    let results: Vec<_> = stream::iter(committee_members)
385        .filter_map(|(_, cm)| {
386            let client = client.clone();
387            let metrics_keys = metrics_keys.clone();
388            let names = names.clone();
389            async move {
390                debug!(
391                    address =% cm.sui_address,
392                    "Extracting metrics public key for bridge node",
393                );
394
395                // Convert the Vec<u8> to a String and handle errors properly
396                let url_str = match String::from_utf8(cm.http_rest_url) {
397                    Ok(url) => url,
398                    Err(_) => {
399                        warn!(
400                            address =% cm.sui_address,
401                            "Invalid UTF-8 sequence in http_rest_url for bridge node ",
402                        );
403                        return None;
404                    }
405                };
406                // Parse the URL
407                let bridge_url = match Url::parse(&url_str) {
408                    Ok(url) => url,
409                    Err(_) => {
410                        warn!(url_str, "Unable to parse http_rest_url");
411                        return None;
412                    }
413                };
414
415                // Append "metrics_pub_key" to the path
416                let bridge_url = match append_path_segment(bridge_url, "metrics_pub_key") {
417                    Some(url) => url,
418                    None => {
419                        warn!(url_str, "Unable to append path segment to URL");
420                        return None;
421                    }
422                };
423
424                // Use the host portion of the http_rest_url as the "name"
425                let bridge_host = match bridge_url.host_str() {
426                    Some(host) => host,
427                    None => {
428                        warn!(url_str, "Hostname is missing from http_rest_url");
429                        return None;
430                    }
431                };
432                let bridge_name = names.get(&cm.sui_address).cloned().unwrap_or_else(|| {
433                    warn!(
434                        address =% cm.sui_address,
435                        "Bridge node not found in sui committee, using base URL as the name",
436                    );
437                    String::from(bridge_host)
438                });
439                let bridge_name = format!("bridge-{}", bridge_name);
440
441                let bridge_request_url = bridge_url.as_str();
442
443                let metrics_pub_key = match client.get(bridge_request_url).send().await {
444                    Ok(response) => {
445                        let raw = response.bytes().await.ok()?;
446                        let metrics_pub_key: String = match serde_json::from_slice(&raw) {
447                            Ok(key) => key,
448                            Err(error) => {
449                                warn!(?error, url_str, "Failed to deserialize response");
450                                return fallback_to_cached_key(
451                                    &metrics_keys,
452                                    &url_str,
453                                    &bridge_name,
454                                );
455                            }
456                        };
457                        let metrics_bytes = match Base64::decode(&metrics_pub_key) {
458                            Ok(pubkey_bytes) => pubkey_bytes,
459                            Err(error) => {
460                                warn!(
461                                    ?error,
462                                    bridge_name, "unable to decode public key for bridge node",
463                                );
464                                return None;
465                            }
466                        };
467                        match Ed25519PublicKey::from_bytes(&metrics_bytes) {
468                            Ok(pubkey) => {
469                                // Successfully fetched the key, update the cache
470                                let mut metrics_keys_write = metrics_keys.write().unwrap();
471                                metrics_keys_write.insert(url_str.clone(), pubkey.clone());
472                                debug!(
473                                    url_str,
474                                    public_key = ?pubkey,
475                                    "Successfully added bridge peer to metrics_keys"
476                                );
477                                pubkey
478                            }
479                            Err(error) => {
480                                warn!(
481                                    ?error,
482                                    bridge_request_url,
483                                    "unable to decode public key for bridge node",
484                                );
485                                return None;
486                            }
487                        }
488                    }
489                    Err(_) => {
490                        return fallback_to_cached_key(&metrics_keys, &url_str, &bridge_name);
491                    }
492                };
493                Some((
494                    metrics_pub_key.clone(),
495                    AllowedPeer {
496                        public_key: metrics_pub_key,
497                        name: bridge_name,
498                    },
499                ))
500            }
501        })
502        .collect()
503        .await;
504
505    results
506}
507
508fn fallback_to_cached_key(
509    metrics_keys: &MetricsPubKeys,
510    url_str: &str,
511    bridge_name: &str,
512) -> Option<(Ed25519PublicKey, AllowedPeer)> {
513    let metrics_keys_read = metrics_keys.read().unwrap();
514    if let Some(cached_key) = metrics_keys_read.get(url_str) {
515        debug!(
516            url_str,
517            "Using cached metrics public key after request failure"
518        );
519        Some((
520            cached_key.clone(),
521            AllowedPeer {
522                public_key: cached_key.clone(),
523                name: bridge_name.to_string(),
524            },
525        ))
526    } else {
527        warn!(
528            url_str,
529            "Failed to fetch public key and no cached key available"
530        );
531        None
532    }
533}
534
535fn append_path_segment(mut url: Url, segment: &str) -> Option<Url> {
536    url.path_segments_mut().ok()?.pop_if_empty().push(segment);
537    Some(url)
538}
539
540#[cfg(test)]
541mod tests {
542    use super::*;
543    use crate::admin::{CertKeyPair, generate_self_cert};
544    use serde::Serialize;
545    use sui_types::base_types::SuiAddress;
546    use sui_types::bridge::{BridgeCommitteeSummary, BridgeSummary, MoveTypeCommitteeMember};
547    use sui_types::sui_system_state::sui_system_state_summary::{
548        SuiSystemStateSummary, SuiValidatorSummary,
549    };
550
551    /// creates a test that binds our proxy use case to the structure in sui_getLatestSuiSystemState
552    /// most of the fields are garbage, but we will send the results of the serde process to a private decode
553    /// function that should always work if the structure is valid for our use
554    #[test]
555    fn depend_on_sui_sui_system_state_summary() {
556        let CertKeyPair(_, client_pub_key) = generate_self_cert("sui".into());
557        // all fields here just satisfy the field types, with exception to active_validators, we use
558        // some of those.
559        let depends_on = SuiSystemStateSummary {
560            active_validators: vec![SuiValidatorSummary {
561                network_pubkey_bytes: Vec::from(client_pub_key.as_bytes()),
562                primary_address: "empty".into(),
563                worker_address: "empty".into(),
564                ..Default::default()
565            }],
566            ..Default::default()
567        };
568
569        #[derive(Debug, Serialize, Deserialize)]
570        struct ResponseBody {
571            result: SuiSystemStateSummary,
572        }
573
574        let r = serde_json::to_string(&ResponseBody { result: depends_on })
575            .expect("expected to serialize ResponseBody{SuiSystemStateSummary}");
576
577        let deserialized = serde_json::from_str::<ResponseBody>(&r)
578            .expect("expected to deserialize ResponseBody{SuiSystemStateSummary}");
579
580        let peers = extract(deserialized.result);
581        assert_eq!(peers.count(), 1, "peers should have been a length of 1");
582    }
583
584    #[tokio::test]
585    async fn test_extract_bridge_invalid_bridge_url() {
586        let summary = BridgeSummary {
587            committee: BridgeCommitteeSummary {
588                members: vec![(
589                    vec![],
590                    MoveTypeCommitteeMember {
591                        sui_address: SuiAddress::ZERO,
592                        http_rest_url: "invalid_bridge_url".as_bytes().to_vec(),
593                        ..Default::default()
594                    },
595                )],
596                ..Default::default()
597            },
598            ..Default::default()
599        };
600
601        let metrics_keys = Arc::new(RwLock::new(HashMap::new()));
602        {
603            let mut cache = metrics_keys.write().unwrap();
604            cache.insert(
605                "invalid_bridge_url".to_string(),
606                Ed25519PublicKey::from_bytes(&[1u8; 32]).unwrap(),
607            );
608        }
609        let result = extract_bridge(summary, Arc::new(BTreeMap::new()), metrics_keys.clone()).await;
610
611        assert_eq!(
612            result.len(),
613            0,
614            "Should not fall back on cache if invalid bridge url is set"
615        );
616    }
617
618    #[tokio::test]
619    async fn test_extract_bridge_interrupted_response() {
620        let summary = BridgeSummary {
621            committee: BridgeCommitteeSummary {
622                members: vec![(
623                    vec![],
624                    MoveTypeCommitteeMember {
625                        sui_address: SuiAddress::ZERO,
626                        http_rest_url: "https://unresponsive_bridge_url".as_bytes().to_vec(),
627                        ..Default::default()
628                    },
629                )],
630                ..Default::default()
631            },
632            ..Default::default()
633        };
634
635        let metrics_keys = Arc::new(RwLock::new(HashMap::new()));
636        {
637            let mut cache = metrics_keys.write().unwrap();
638            cache.insert(
639                "https://unresponsive_bridge_url".to_string(),
640                Ed25519PublicKey::from_bytes(&[1u8; 32]).unwrap(),
641            );
642        }
643        let result = extract_bridge(summary, Arc::new(BTreeMap::new()), metrics_keys.clone()).await;
644
645        assert_eq!(
646            result.len(),
647            1,
648            "Should fall back on cache if invalid response occurs"
649        );
650        let allowed_peer = &result[0].1;
651        assert_eq!(
652            allowed_peer.public_key.as_bytes(),
653            &[1u8; 32],
654            "Should fall back to the cached public key"
655        );
656
657        let cache = metrics_keys.read().unwrap();
658        assert!(
659            cache.contains_key("https://unresponsive_bridge_url"),
660            "Cache should still contain the original key"
661        );
662    }
663
664    #[test]
665    fn test_append_path_segment() {
666        let test_cases = vec![
667            (
668                "https://example.com",
669                "metrics_pub_key",
670                "https://example.com/metrics_pub_key",
671            ),
672            (
673                "https://example.com/api",
674                "metrics_pub_key",
675                "https://example.com/api/metrics_pub_key",
676            ),
677            (
678                "https://example.com/",
679                "metrics_pub_key",
680                "https://example.com/metrics_pub_key",
681            ),
682            (
683                "https://example.com/api/",
684                "metrics_pub_key",
685                "https://example.com/api/metrics_pub_key",
686            ),
687            (
688                "https://example.com:8080",
689                "metrics_pub_key",
690                "https://example.com:8080/metrics_pub_key",
691            ),
692            (
693                "https://example.com?param=value",
694                "metrics_pub_key",
695                "https://example.com/metrics_pub_key?param=value",
696            ),
697            (
698                "https://example.com:8080/api/v1?param=value",
699                "metrics_pub_key",
700                "https://example.com:8080/api/v1/metrics_pub_key?param=value",
701            ),
702        ];
703
704        for (input_url, segment, expected_output) in test_cases {
705            let url = Url::parse(input_url).unwrap();
706            let result = append_path_segment(url, segment);
707            assert!(
708                result.is_some(),
709                "Failed to append segment for URL: {}",
710                input_url
711            );
712            let result_url = result.unwrap();
713            assert_eq!(
714                result_url.as_str(),
715                expected_output,
716                "Unexpected result for input URL: {}",
717                input_url
718            );
719        }
720    }
721}