sui_network/discovery/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::types::PeerInfo;
5use anemo::{Network, Peer, PeerId, Request, Response, types::PeerEvent};
6use fastcrypto::ed25519::{Ed25519PublicKey, Ed25519Signature};
7use futures::StreamExt;
8use mysten_common::debug_fatal;
9use serde::{Deserialize, Serialize};
10use shared_crypto::intent::IntentScope;
11use std::{
12    collections::HashMap,
13    sync::{Arc, RwLock},
14    time::Duration,
15};
16use sui_config::p2p::{AccessType, DiscoveryConfig, P2pConfig, SeedPeer};
17use sui_types::crypto::{NetworkKeyPair, Signer, ToFromBytes, VerifyingKey};
18use sui_types::digests::Digest;
19use sui_types::message_envelope::{Envelope, Message, VerifiedEnvelope};
20use sui_types::multiaddr::Multiaddr;
21use tap::{Pipe, TapFallible};
22use tokio::sync::broadcast::error::RecvError;
23use tokio::sync::watch;
24use tokio::{
25    sync::oneshot,
26    task::{AbortHandle, JoinSet},
27};
28use tracing::{debug, info, trace};
29
30const TIMEOUT: Duration = Duration::from_secs(1);
31const ONE_DAY_MILLISECONDS: u64 = 24 * 60 * 60 * 1_000;
32const MAX_ADDRESS_LENGTH: usize = 300;
33const MAX_PEERS_TO_SEND: usize = 200;
34const MAX_ADDRESSES_PER_PEER: usize = 2;
35
36mod generated {
37    include!(concat!(env!("OUT_DIR"), "/sui.Discovery.rs"));
38}
39mod builder;
40mod metrics;
41mod server;
42#[cfg(test)]
43mod tests;
44
45pub use builder::{Builder, Handle, UnstartedDiscovery};
46pub use generated::{
47    discovery_client::DiscoveryClient,
48    discovery_server::{Discovery, DiscoveryServer},
49};
50pub use server::GetKnownPeersResponseV2;
51
52use self::metrics::Metrics;
53
54/// The internal discovery state shared between the main event loop and the request handler
55struct State {
56    our_info: Option<SignedNodeInfo>,
57    connected_peers: HashMap<PeerId, ()>,
58    known_peers: HashMap<PeerId, VerifiedSignedNodeInfo>,
59}
60
61/// The information necessary to dial another peer.
62///
63/// `NodeInfo` contains all the information that is shared with other nodes via the discovery
64/// service to advertise how a node can be reached.
65#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
66pub struct NodeInfo {
67    pub peer_id: PeerId,
68    pub addresses: Vec<Multiaddr>,
69
70    /// Creation time.
71    ///
72    /// This is used to determine which of two NodeInfo's from the same PeerId should be retained.
73    pub timestamp_ms: u64,
74
75    /// See docstring for `AccessType`.
76    pub access_type: AccessType,
77}
78
79impl NodeInfo {
80    fn sign(self, keypair: &NetworkKeyPair) -> SignedNodeInfo {
81        let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
82        let sig = keypair.sign(&msg);
83        SignedNodeInfo::new_from_data_and_sig(self, sig)
84    }
85}
86
87pub type SignedNodeInfo = Envelope<NodeInfo, Ed25519Signature>;
88
89pub type VerifiedSignedNodeInfo = VerifiedEnvelope<NodeInfo, Ed25519Signature>;
90
91#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
92pub struct NodeInfoDigest(Digest);
93
94impl NodeInfoDigest {
95    pub const fn new(digest: [u8; 32]) -> Self {
96        Self(Digest::new(digest))
97    }
98}
99
100impl Message for NodeInfo {
101    type DigestType = NodeInfoDigest;
102    const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
103
104    fn digest(&self) -> Self::DigestType {
105        unreachable!("NodeInfoDigest is not used today")
106    }
107}
108
109#[derive(Clone, Debug, Default)]
110pub struct TrustedPeerChangeEvent {
111    pub new_peers: Vec<PeerInfo>,
112}
113
114struct DiscoveryEventLoop {
115    config: P2pConfig,
116    discovery_config: Arc<DiscoveryConfig>,
117    allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
118    network: Network,
119    keypair: NetworkKeyPair,
120    tasks: JoinSet<()>,
121    pending_dials: HashMap<PeerId, AbortHandle>,
122    dial_seed_peers_task: Option<AbortHandle>,
123    shutdown_handle: oneshot::Receiver<()>,
124    state: Arc<RwLock<State>>,
125    trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
126    metrics: Metrics,
127}
128
129impl DiscoveryEventLoop {
130    pub async fn start(mut self) {
131        info!("Discovery started");
132
133        self.construct_our_info();
134        self.configure_preferred_peers();
135
136        let mut interval = tokio::time::interval(self.discovery_config.interval_period());
137        let mut peer_events = {
138            let (subscriber, _peers) = self.network.subscribe().unwrap();
139            subscriber
140        };
141
142        loop {
143            tokio::select! {
144                now = interval.tick() => {
145                    let now_unix = now_unix();
146                    self.handle_tick(now.into_std(), now_unix);
147                }
148                peer_event = peer_events.recv() => {
149                    self.handle_peer_event(peer_event);
150                },
151                Ok(()) = self.trusted_peer_change_rx.changed() => {
152                    let event: TrustedPeerChangeEvent = self.trusted_peer_change_rx.borrow_and_update().clone();
153                    self.handle_trusted_peer_change_event(event);
154                }
155                Some(task_result) = self.tasks.join_next() => {
156                    match task_result {
157                        Ok(()) => {},
158                        Err(e) => {
159                            if e.is_cancelled() {
160                                // avoid crashing on ungraceful shutdown
161                            } else if e.is_panic() {
162                                // propagate panics.
163                                std::panic::resume_unwind(e.into_panic());
164                            } else {
165                                panic!("task failed: {e}");
166                            }
167                        },
168                    };
169                },
170                // Once the shutdown notification resolves we can terminate the event loop
171                _ = &mut self.shutdown_handle => {
172                    break;
173                }
174            }
175        }
176
177        info!("Discovery ended");
178    }
179
180    fn construct_our_info(&mut self) {
181        if self.state.read().unwrap().our_info.is_some() {
182            return;
183        }
184
185        let address = self
186            .config
187            .external_address
188            .clone()
189            .and_then(|addr| addr.to_anemo_address().ok().map(|_| addr))
190            .into_iter()
191            .collect();
192        let our_info = NodeInfo {
193            peer_id: self.network.peer_id(),
194            addresses: address,
195            timestamp_ms: now_unix(),
196            access_type: self.discovery_config.access_type(),
197        }
198        .sign(&self.keypair);
199
200        self.state.write().unwrap().our_info = Some(our_info);
201    }
202
203    fn configure_preferred_peers(&mut self) {
204        for (peer_id, address) in self
205            .discovery_config
206            .allowlisted_peers
207            .iter()
208            .map(|sp| (sp.peer_id, sp.address.clone()))
209            .chain(self.config.seed_peers.iter().filter_map(|ap| {
210                ap.peer_id
211                    .map(|peer_id| (peer_id, Some(ap.address.clone())))
212            }))
213        {
214            let anemo_address = if let Some(address) = address {
215                let Ok(address) = address.to_anemo_address() else {
216                    debug!(p2p_address=?address, "Can't convert p2p address to anemo address");
217                    continue;
218                };
219                Some(address)
220            } else {
221                None
222            };
223
224            // TODO: once we have `PeerAffinity::Allowlisted` we should update allowlisted peers'
225            // affinity.
226            let peer_info = anemo::types::PeerInfo {
227                peer_id,
228                affinity: anemo::types::PeerAffinity::High,
229                address: anemo_address.into_iter().collect(),
230            };
231            debug!(?peer_info, "Add configured preferred peer");
232            self.network.known_peers().insert(peer_info);
233        }
234    }
235
236    fn update_our_info_timestamp(&mut self, now_unix: u64) {
237        let state = &mut self.state.write().unwrap();
238        if let Some(our_info) = &state.our_info {
239            let mut data = our_info.data().clone();
240            data.timestamp_ms = now_unix;
241            state.our_info = Some(data.sign(&self.keypair));
242        }
243    }
244
245    // TODO: we don't boot out old committee member yets, however we may want to do this
246    // in the future along with other network management work.
247    fn handle_trusted_peer_change_event(
248        &mut self,
249        trusted_peer_change_event: TrustedPeerChangeEvent,
250    ) {
251        for peer_info in trusted_peer_change_event.new_peers {
252            debug!(?peer_info, "Add committee member as preferred peer.");
253            self.network.known_peers().insert(peer_info);
254        }
255    }
256
257    fn handle_peer_event(&mut self, peer_event: Result<PeerEvent, RecvError>) {
258        match peer_event {
259            Ok(PeerEvent::NewPeer(peer_id)) => {
260                if let Some(peer) = self.network.peer(peer_id) {
261                    self.state
262                        .write()
263                        .unwrap()
264                        .connected_peers
265                        .insert(peer_id, ());
266
267                    // Query the new node for any peers
268                    self.tasks.spawn(query_peer_for_their_known_peers(
269                        peer,
270                        self.state.clone(),
271                        self.metrics.clone(),
272                        self.allowlisted_peers.clone(),
273                    ));
274                }
275            }
276            Ok(PeerEvent::LostPeer(peer_id, _)) => {
277                self.state.write().unwrap().connected_peers.remove(&peer_id);
278            }
279
280            Err(RecvError::Closed) => {
281                panic!("PeerEvent channel shouldn't be able to be closed");
282            }
283
284            Err(RecvError::Lagged(_)) => {
285                trace!("State-Sync fell behind processing PeerEvents");
286            }
287        }
288    }
289
290    fn handle_tick(&mut self, _now: std::time::Instant, now_unix: u64) {
291        self.update_our_info_timestamp(now_unix);
292
293        self.tasks
294            .spawn(query_connected_peers_for_their_known_peers(
295                self.network.clone(),
296                self.discovery_config.clone(),
297                self.state.clone(),
298                self.metrics.clone(),
299                self.allowlisted_peers.clone(),
300            ));
301
302        // Cull old peers older than a day
303        self.state
304            .write()
305            .unwrap()
306            .known_peers
307            .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms) < ONE_DAY_MILLISECONDS);
308
309        // Clean out the pending_dials
310        self.pending_dials.retain(|_k, v| !v.is_finished());
311        if let Some(abort_handle) = &self.dial_seed_peers_task
312            && abort_handle.is_finished()
313        {
314            self.dial_seed_peers_task = None;
315        }
316
317        // Spawn some dials
318        let state = self.state.read().unwrap();
319        let eligible = state
320            .known_peers
321            .clone()
322            .into_iter()
323            .filter(|(peer_id, info)| {
324                peer_id != &self.network.peer_id() &&
325                !info.addresses.is_empty() // Peer has addresses we can dial
326                && !state.connected_peers.contains_key(peer_id) // We're not already connected
327                && !self.pending_dials.contains_key(peer_id) // There is no pending dial to this node
328            })
329            .collect::<Vec<_>>();
330
331        // No need to connect to any more peers if we're already connected to a bunch
332        let number_of_connections = state.connected_peers.len();
333        let number_to_dial = std::cmp::min(
334            eligible.len(),
335            self.discovery_config
336                .target_concurrent_connections()
337                .saturating_sub(number_of_connections),
338        );
339
340        // randomize the order
341        for (peer_id, info) in rand::seq::SliceRandom::choose_multiple(
342            eligible.as_slice(),
343            &mut rand::thread_rng(),
344            number_to_dial,
345        ) {
346            let abort_handle = self.tasks.spawn(try_to_connect_to_peer(
347                self.network.clone(),
348                info.data().to_owned(),
349            ));
350            self.pending_dials.insert(*peer_id, abort_handle);
351        }
352
353        // If we aren't connected to anything and we aren't presently trying to connect to anyone
354        // we need to try the seed peers
355        if self.dial_seed_peers_task.is_none()
356            && state.connected_peers.is_empty()
357            && self.pending_dials.is_empty()
358            && !self.config.seed_peers.is_empty()
359        {
360            let abort_handle = self.tasks.spawn(try_to_connect_to_seed_peers(
361                self.network.clone(),
362                self.discovery_config.clone(),
363                self.config.seed_peers.clone(),
364            ));
365
366            self.dial_seed_peers_task = Some(abort_handle);
367        }
368    }
369}
370
371async fn try_to_connect_to_peer(network: Network, info: NodeInfo) {
372    debug!("Connecting to peer {info:?}");
373    for multiaddr in &info.addresses {
374        if let Ok(address) = multiaddr.to_anemo_address() {
375            // Ignore the result and just log the error if there is one
376            if network
377                .connect_with_peer_id(address, info.peer_id)
378                .await
379                .tap_err(|e| {
380                    debug!(
381                        "error dialing {} at address '{}': {e}",
382                        info.peer_id.short_display(4),
383                        multiaddr
384                    )
385                })
386                .is_ok()
387            {
388                return;
389            }
390        }
391    }
392}
393
394async fn try_to_connect_to_seed_peers(
395    network: Network,
396    config: Arc<DiscoveryConfig>,
397    seed_peers: Vec<SeedPeer>,
398) {
399    debug!(?seed_peers, "Connecting to seed peers");
400    let network = &network;
401
402    futures::stream::iter(seed_peers.into_iter().filter_map(|seed| {
403        seed.address
404            .to_anemo_address()
405            .ok()
406            .map(|address| (seed, address))
407    }))
408    .for_each_concurrent(
409        config.target_concurrent_connections(),
410        |(seed, address)| async move {
411            // Ignore the result and just log the error  if there is one
412            let _ = if let Some(peer_id) = seed.peer_id {
413                network.connect_with_peer_id(address, peer_id).await
414            } else {
415                network.connect(address).await
416            }
417            .tap_err(|e| debug!("error dialing multiaddr '{}': {e}", seed.address));
418        },
419    )
420    .await;
421}
422
423async fn query_peer_for_their_known_peers(
424    peer: Peer,
425    state: Arc<RwLock<State>>,
426    metrics: Metrics,
427    allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
428) {
429    let mut client = DiscoveryClient::new(peer);
430
431    let request = Request::new(()).with_timeout(TIMEOUT);
432    let found_peers = client
433        .get_known_peers_v2(request)
434        .await
435        .ok()
436        .map(Response::into_inner)
437        .map(
438            |GetKnownPeersResponseV2 {
439                 own_info,
440                 mut known_peers,
441             }| {
442                if !own_info.addresses.is_empty() {
443                    known_peers.push(own_info)
444                }
445                known_peers
446            },
447        );
448    if let Some(found_peers) = found_peers {
449        update_known_peers(state, metrics, found_peers, allowlisted_peers);
450    }
451}
452
453async fn query_connected_peers_for_their_known_peers(
454    network: Network,
455    config: Arc<DiscoveryConfig>,
456    state: Arc<RwLock<State>>,
457    metrics: Metrics,
458    allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
459) {
460    use rand::seq::IteratorRandom;
461
462    let peers_to_query = network
463        .peers()
464        .into_iter()
465        .flat_map(|id| network.peer(id))
466        .choose_multiple(&mut rand::thread_rng(), config.peers_to_query());
467
468    let found_peers = peers_to_query
469        .into_iter()
470        .map(DiscoveryClient::new)
471        .map(|mut client| async move {
472            let request = Request::new(()).with_timeout(TIMEOUT);
473            client
474                .get_known_peers_v2(request)
475                .await
476                .ok()
477                .map(Response::into_inner)
478                .map(
479                    |GetKnownPeersResponseV2 {
480                         own_info,
481                         mut known_peers,
482                     }| {
483                        if !own_info.addresses.is_empty() {
484                            known_peers.push(own_info)
485                        }
486                        known_peers
487                    },
488                )
489        })
490        .pipe(futures::stream::iter)
491        .buffer_unordered(config.peers_to_query())
492        .filter_map(std::future::ready)
493        .flat_map(futures::stream::iter)
494        .collect::<Vec<_>>()
495        .await;
496
497    update_known_peers(state, metrics, found_peers, allowlisted_peers);
498}
499
500fn update_known_peers(
501    state: Arc<RwLock<State>>,
502    metrics: Metrics,
503    found_peers: Vec<SignedNodeInfo>,
504    allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
505) {
506    use std::collections::hash_map::Entry;
507
508    let now_unix = now_unix();
509    let our_peer_id = state.read().unwrap().our_info.clone().unwrap().peer_id;
510    let known_peers = &mut state.write().unwrap().known_peers;
511    // only take the first MAX_PEERS_TO_SEND peers
512    for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
513        // +1 to account for the "own_info" of the serving peer
514        // Skip peers whose timestamp is too far in the future from our clock
515        // or that are too old
516        if peer_info.timestamp_ms > now_unix.saturating_add(30 * 1_000) // 30 seconds
517            || now_unix.saturating_sub(peer_info.timestamp_ms) > ONE_DAY_MILLISECONDS
518        {
519            continue;
520        }
521
522        if peer_info.peer_id == our_peer_id {
523            continue;
524        }
525
526        // If Peer is Private, and not in our allowlist, skip it.
527        if peer_info.access_type == AccessType::Private
528            && !allowlisted_peers.contains_key(&peer_info.peer_id)
529        {
530            continue;
531        }
532
533        // Skip entries that have too many addresses as a means to cap the size of a node's info
534        if peer_info.addresses.len() > MAX_ADDRESSES_PER_PEER {
535            continue;
536        }
537
538        // verify that all addresses provided are valid anemo addresses
539        if !peer_info
540            .addresses
541            .iter()
542            .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
543        {
544            continue;
545        }
546        let Ok(public_key) = Ed25519PublicKey::from_bytes(&peer_info.peer_id.0) else {
547            debug_fatal!(
548                // This should never happen.
549                "Failed to convert anemo PeerId {:?} to Ed25519PublicKey",
550                peer_info.peer_id
551            );
552            continue;
553        };
554        let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
555        if let Err(e) = public_key.verify(&msg, peer_info.auth_sig()) {
556            info!(
557                "Discovery failed to verify signature for NodeInfo for peer {:?}: {e:?}",
558                peer_info.peer_id
559            );
560            // TODO: consider denylisting the source of bad NodeInfo from future requests.
561            continue;
562        }
563        let peer = VerifiedSignedNodeInfo::new_from_verified(peer_info);
564
565        match known_peers.entry(peer.peer_id) {
566            Entry::Occupied(mut o) => {
567                if peer.timestamp_ms > o.get().timestamp_ms {
568                    if o.get().addresses.is_empty() && !peer.addresses.is_empty() {
569                        metrics.inc_num_peers_with_external_address();
570                    }
571                    if !o.get().addresses.is_empty() && peer.addresses.is_empty() {
572                        metrics.dec_num_peers_with_external_address();
573                    }
574                    o.insert(peer);
575                }
576            }
577            Entry::Vacant(v) => {
578                if !peer.addresses.is_empty() {
579                    metrics.inc_num_peers_with_external_address();
580                }
581                v.insert(peer);
582            }
583        }
584    }
585}
586
587fn now_unix() -> u64 {
588    use std::time::{SystemTime, UNIX_EPOCH};
589
590    SystemTime::now()
591        .duration_since(UNIX_EPOCH)
592        .unwrap()
593        .as_millis() as u64
594}