sui_network/discovery/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::types::PeerAffinity;
5use anemo::types::PeerInfo;
6use anemo::{Network, Peer, PeerId, Request, Response, types::PeerEvent};
7use fastcrypto::ed25519::{Ed25519PublicKey, Ed25519Signature};
8use futures::StreamExt;
9use mysten_common::debug_fatal;
10use serde::{Deserialize, Serialize};
11use shared_crypto::intent::IntentScope;
12use std::{
13    collections::HashMap,
14    sync::{Arc, RwLock},
15    time::Duration,
16};
17use sui_config::p2p::{AccessType, DiscoveryConfig, P2pConfig};
18use sui_types::crypto::{NetworkKeyPair, Signer, ToFromBytes, VerifyingKey};
19use sui_types::digests::Digest;
20use sui_types::message_envelope::{Envelope, Message, VerifiedEnvelope};
21use sui_types::multiaddr::Multiaddr;
22use tap::{Pipe, TapFallible};
23use tokio::sync::broadcast::error::RecvError;
24use tokio::sync::mpsc;
25use tokio::{
26    sync::oneshot,
27    task::{AbortHandle, JoinSet},
28};
29use tracing::{debug, info, trace};
30
31const TIMEOUT: Duration = Duration::from_secs(1);
32const ONE_DAY_MILLISECONDS: u64 = 24 * 60 * 60 * 1_000;
33const MAX_ADDRESS_LENGTH: usize = 300;
34const MAX_PEERS_TO_SEND: usize = 200;
35const MAX_ADDRESSES_PER_PEER: usize = 2;
36
37mod generated {
38    include!(concat!(env!("OUT_DIR"), "/sui.Discovery.rs"));
39}
40mod builder;
41mod metrics;
42mod server;
43#[cfg(test)]
44mod tests;
45
46pub use builder::{Builder, UnstartedDiscovery};
47pub use generated::{
48    discovery_client::DiscoveryClient,
49    discovery_server::{Discovery, DiscoveryServer},
50};
51pub use server::GetKnownPeersResponseV2;
52
53/// Message types for the discovery system mailbox.
54#[derive(Debug)]
55pub enum DiscoveryMessage {
56    /// Update the address for a single peer.
57    PeerAddressChange(PeerInfo),
58}
59
60/// A Handle to the Discovery subsystem. The Discovery system will be shut down once all Handles
61/// have been dropped.
62#[derive(Clone, Debug)]
63pub struct Handle {
64    pub(super) _shutdown_handle: Arc<oneshot::Sender<()>>,
65    sender: mpsc::Sender<DiscoveryMessage>,
66}
67
68impl Handle {
69    /// Updates the address for a single peer in the p2p network.
70    ///
71    /// If the peer's address has changed, the discovery system will
72    /// forcibly reconnect to the peer at the new address.
73    pub fn peer_address_change(&self, peer_info: PeerInfo) {
74        self.sender
75            .try_send(DiscoveryMessage::PeerAddressChange(peer_info))
76            .expect("Discovery mailbox should not overflow or be closed")
77    }
78}
79
80use self::metrics::Metrics;
81
82/// The internal discovery state shared between the main event loop and the request handler
83struct State {
84    our_info: Option<SignedNodeInfo>,
85    connected_peers: HashMap<PeerId, ()>,
86    known_peers: HashMap<PeerId, VerifiedSignedNodeInfo>,
87}
88
89/// The information necessary to dial another peer.
90///
91/// `NodeInfo` contains all the information that is shared with other nodes via the discovery
92/// service to advertise how a node can be reached.
93#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
94pub struct NodeInfo {
95    pub peer_id: PeerId,
96    pub addresses: Vec<Multiaddr>,
97
98    /// Creation time.
99    ///
100    /// This is used to determine which of two NodeInfo's from the same PeerId should be retained.
101    pub timestamp_ms: u64,
102
103    /// See docstring for `AccessType`.
104    pub access_type: AccessType,
105}
106
107impl NodeInfo {
108    fn sign(self, keypair: &NetworkKeyPair) -> SignedNodeInfo {
109        let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
110        let sig = keypair.sign(&msg);
111        SignedNodeInfo::new_from_data_and_sig(self, sig)
112    }
113}
114
115pub type SignedNodeInfo = Envelope<NodeInfo, Ed25519Signature>;
116
117pub type VerifiedSignedNodeInfo = VerifiedEnvelope<NodeInfo, Ed25519Signature>;
118
119#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
120pub struct NodeInfoDigest(Digest);
121
122impl NodeInfoDigest {
123    pub const fn new(digest: [u8; 32]) -> Self {
124        Self(Digest::new(digest))
125    }
126}
127
128impl Message for NodeInfo {
129    type DigestType = NodeInfoDigest;
130    const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
131
132    fn digest(&self) -> Self::DigestType {
133        unreachable!("NodeInfoDigest is not used today")
134    }
135}
136
137struct DiscoveryEventLoop {
138    config: P2pConfig,
139    discovery_config: Arc<DiscoveryConfig>,
140    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
141    unidentified_seed_peers: Vec<anemo::types::Address>,
142    network: Network,
143    keypair: NetworkKeyPair,
144    tasks: JoinSet<()>,
145    pending_dials: HashMap<PeerId, AbortHandle>,
146    dial_seed_peers_task: Option<AbortHandle>,
147    shutdown_handle: oneshot::Receiver<()>,
148    state: Arc<RwLock<State>>,
149    mailbox: mpsc::Receiver<DiscoveryMessage>,
150    metrics: Metrics,
151}
152
153impl DiscoveryEventLoop {
154    pub async fn start(mut self) {
155        info!("Discovery started");
156
157        self.construct_our_info();
158        self.configure_preferred_peers();
159
160        let mut interval = tokio::time::interval(self.discovery_config.interval_period());
161        let mut peer_events = {
162            let (subscriber, _peers) = self.network.subscribe().unwrap();
163            subscriber
164        };
165
166        loop {
167            tokio::select! {
168                now = interval.tick() => {
169                    let now_unix = now_unix();
170                    self.handle_tick(now.into_std(), now_unix);
171                }
172                peer_event = peer_events.recv() => {
173                    self.handle_peer_event(peer_event);
174                },
175                Some(message) = self.mailbox.recv() => {
176                    self.handle_message(message);
177                }
178                Some(task_result) = self.tasks.join_next() => {
179                    match task_result {
180                        Ok(()) => {},
181                        Err(e) => {
182                            if e.is_cancelled() {
183                                // avoid crashing on ungraceful shutdown
184                            } else if e.is_panic() {
185                                // propagate panics.
186                                std::panic::resume_unwind(e.into_panic());
187                            } else {
188                                panic!("task failed: {e}");
189                            }
190                        },
191                    };
192                },
193                // Once the shutdown notification resolves we can terminate the event loop
194                _ = &mut self.shutdown_handle => {
195                    break;
196                }
197            }
198        }
199
200        info!("Discovery ended");
201    }
202
203    fn handle_message(&mut self, message: DiscoveryMessage) {
204        match message {
205            DiscoveryMessage::PeerAddressChange(peer_info) => {
206                self.handle_peer_address_change(peer_info);
207            }
208        }
209    }
210
211    fn construct_our_info(&mut self) {
212        if self.state.read().unwrap().our_info.is_some() {
213            return;
214        }
215
216        let address = self
217            .config
218            .external_address
219            .clone()
220            .and_then(|addr| addr.to_anemo_address().ok().map(|_| addr))
221            .into_iter()
222            .collect();
223        let our_info = NodeInfo {
224            peer_id: self.network.peer_id(),
225            addresses: address,
226            timestamp_ms: now_unix(),
227            access_type: self.discovery_config.access_type(),
228        }
229        .sign(&self.keypair);
230
231        self.state.write().unwrap().our_info = Some(our_info);
232    }
233
234    fn configure_preferred_peers(&mut self) {
235        for peer_info in self.configured_peers.values() {
236            debug!(?peer_info, "Add configured preferred peer");
237            self.network.known_peers().insert(peer_info.clone());
238        }
239    }
240
241    fn update_our_info_timestamp(&mut self, now_unix: u64) {
242        let state = &mut self.state.write().unwrap();
243        if let Some(our_info) = &state.our_info {
244            let mut data = our_info.data().clone();
245            data.timestamp_ms = now_unix;
246            state.our_info = Some(data.sign(&self.keypair));
247        }
248    }
249
250    fn handle_peer_address_change(&mut self, peer_info: PeerInfo) {
251        debug!(?peer_info, "Add committee member as preferred peer.");
252        if let Some(old_peer_info) = self.network.known_peers().insert(peer_info.clone())
253            && old_peer_info.address != peer_info.address
254        {
255            // Forcibly reconnect to the peer if its address(es) changed.
256            // If no valid new address is provided, we will remain disconnected.
257            let _ = self.network.disconnect(peer_info.peer_id);
258            if let Some(address) = peer_info.address.first().cloned() {
259                let network = self.network.clone();
260                self.tasks.spawn(async move {
261                    // If this fails, ConnectionManager will retry.
262                    let _ = network
263                        .connect_with_peer_id(address, peer_info.peer_id)
264                        .await;
265                });
266            }
267        }
268    }
269
270    fn handle_peer_event(&mut self, peer_event: Result<PeerEvent, RecvError>) {
271        match peer_event {
272            Ok(PeerEvent::NewPeer(peer_id)) => {
273                if let Some(peer) = self.network.peer(peer_id) {
274                    self.state
275                        .write()
276                        .unwrap()
277                        .connected_peers
278                        .insert(peer_id, ());
279
280                    // Query the new node for any peers
281                    self.tasks.spawn(query_peer_for_their_known_peers(
282                        peer,
283                        self.state.clone(),
284                        self.metrics.clone(),
285                        self.configured_peers.clone(),
286                    ));
287                }
288            }
289            Ok(PeerEvent::LostPeer(peer_id, _)) => {
290                self.state.write().unwrap().connected_peers.remove(&peer_id);
291            }
292
293            Err(RecvError::Closed) => {
294                panic!("PeerEvent channel shouldn't be able to be closed");
295            }
296
297            Err(RecvError::Lagged(_)) => {
298                trace!("State-Sync fell behind processing PeerEvents");
299            }
300        }
301    }
302
303    fn handle_tick(&mut self, _now: std::time::Instant, now_unix: u64) {
304        self.update_our_info_timestamp(now_unix);
305
306        self.tasks
307            .spawn(query_connected_peers_for_their_known_peers(
308                self.network.clone(),
309                self.discovery_config.clone(),
310                self.state.clone(),
311                self.metrics.clone(),
312                self.configured_peers.clone(),
313            ));
314
315        // Cull old peers older than a day
316        self.state
317            .write()
318            .unwrap()
319            .known_peers
320            .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms) < ONE_DAY_MILLISECONDS);
321
322        // Clean out the pending_dials
323        self.pending_dials.retain(|_k, v| !v.is_finished());
324        if let Some(abort_handle) = &self.dial_seed_peers_task
325            && abort_handle.is_finished()
326        {
327            self.dial_seed_peers_task = None;
328        }
329
330        // Spawn some dials
331        let state = self.state.read().unwrap();
332        let eligible = state
333            .known_peers
334            .clone()
335            .into_iter()
336            .filter(|(peer_id, info)| {
337                peer_id != &self.network.peer_id() &&
338                !info.addresses.is_empty() // Peer has addresses we can dial
339                && !state.connected_peers.contains_key(peer_id) // We're not already connected
340                && !self.pending_dials.contains_key(peer_id) // There is no pending dial to this node
341            })
342            .collect::<Vec<_>>();
343
344        // No need to connect to any more peers if we're already connected to a bunch
345        let number_of_connections = state.connected_peers.len();
346        let number_to_dial = std::cmp::min(
347            eligible.len(),
348            self.discovery_config
349                .target_concurrent_connections()
350                .saturating_sub(number_of_connections),
351        );
352
353        // randomize the order
354        for (peer_id, info) in rand::seq::SliceRandom::choose_multiple(
355            eligible.as_slice(),
356            &mut rand::thread_rng(),
357            number_to_dial,
358        ) {
359            let abort_handle = self.tasks.spawn(try_to_connect_to_peer(
360                self.network.clone(),
361                info.data().to_owned(),
362            ));
363            self.pending_dials.insert(*peer_id, abort_handle);
364        }
365
366        // If we aren't connected to anything and we aren't presently trying to connect to anyone
367        // we need to try the configured peers with High affinity (seed peers)
368        let has_peers_to_dial = || {
369            self.configured_peers
370                .values()
371                .any(|p| p.affinity == PeerAffinity::High)
372                || !self.unidentified_seed_peers.is_empty()
373        };
374        if self.dial_seed_peers_task.is_none()
375            && state.connected_peers.is_empty()
376            && self.pending_dials.is_empty()
377            && has_peers_to_dial()
378        {
379            let abort_handle = self.tasks.spawn(try_to_connect_to_seed_peers(
380                self.network.clone(),
381                self.discovery_config.clone(),
382                self.configured_peers.clone(),
383                self.unidentified_seed_peers.clone(),
384            ));
385
386            self.dial_seed_peers_task = Some(abort_handle);
387        }
388    }
389}
390
391async fn try_to_connect_to_peer(network: Network, info: NodeInfo) {
392    debug!("Connecting to peer {info:?}");
393    for multiaddr in &info.addresses {
394        if let Ok(address) = multiaddr.to_anemo_address() {
395            // Ignore the result and just log the error if there is one
396            if network
397                .connect_with_peer_id(address, info.peer_id)
398                .await
399                .tap_err(|e| {
400                    debug!(
401                        "error dialing {} at address '{}': {e}",
402                        info.peer_id.short_display(4),
403                        multiaddr
404                    )
405                })
406                .is_ok()
407            {
408                return;
409            }
410        }
411    }
412}
413
414async fn try_to_connect_to_seed_peers(
415    network: Network,
416    config: Arc<DiscoveryConfig>,
417    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
418    unidentified_seed_peers: Vec<anemo::types::Address>,
419) {
420    let high_affinity_peers: Vec<_> = configured_peers
421        .values()
422        .filter(|p| p.affinity == PeerAffinity::High)
423        .cloned()
424        .collect();
425    debug!(
426        ?high_affinity_peers,
427        ?unidentified_seed_peers,
428        "Connecting to seed peers"
429    );
430    let network = &network;
431
432    // Attempt connection to all high-affinity and seed peers.
433    let with_peer_id = high_affinity_peers.into_iter().flat_map(|peer_info| {
434        peer_info
435            .address
436            .into_iter()
437            .map(move |addr| (Some(peer_info.peer_id), addr))
438    });
439    let without_peer_id = unidentified_seed_peers.into_iter().map(|addr| (None, addr));
440    futures::stream::iter(with_peer_id.chain(without_peer_id))
441        .for_each_concurrent(
442            config.target_concurrent_connections(),
443            |(peer_id, address)| async move {
444                // Ignore the result and just log the error if there is one
445                let _ = if let Some(peer_id) = peer_id {
446                    network
447                        .connect_with_peer_id(address.clone(), peer_id)
448                        .await
449                        .tap_err(|e| {
450                            debug!(
451                                "error dialing peer {} at '{}': {e}",
452                                peer_id.short_display(4),
453                                address
454                            )
455                        })
456                } else {
457                    network
458                        .connect(address.clone())
459                        .await
460                        .tap_err(|e| debug!("error dialing address '{}': {e}", address))
461                };
462            },
463        )
464        .await;
465}
466
467async fn query_peer_for_their_known_peers(
468    peer: Peer,
469    state: Arc<RwLock<State>>,
470    metrics: Metrics,
471    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
472) {
473    let mut client = DiscoveryClient::new(peer);
474
475    let request = Request::new(()).with_timeout(TIMEOUT);
476    let found_peers = client
477        .get_known_peers_v2(request)
478        .await
479        .ok()
480        .map(Response::into_inner)
481        .map(
482            |GetKnownPeersResponseV2 {
483                 own_info,
484                 mut known_peers,
485             }| {
486                if !own_info.addresses.is_empty() {
487                    known_peers.push(own_info)
488                }
489                known_peers
490            },
491        );
492    if let Some(found_peers) = found_peers {
493        update_known_peers(state, metrics, found_peers, configured_peers);
494    }
495}
496
497async fn query_connected_peers_for_their_known_peers(
498    network: Network,
499    config: Arc<DiscoveryConfig>,
500    state: Arc<RwLock<State>>,
501    metrics: Metrics,
502    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
503) {
504    use rand::seq::IteratorRandom;
505
506    let peers_to_query = network
507        .peers()
508        .into_iter()
509        .flat_map(|id| network.peer(id))
510        .choose_multiple(&mut rand::thread_rng(), config.peers_to_query());
511
512    let found_peers = peers_to_query
513        .into_iter()
514        .map(DiscoveryClient::new)
515        .map(|mut client| async move {
516            let request = Request::new(()).with_timeout(TIMEOUT);
517            client
518                .get_known_peers_v2(request)
519                .await
520                .ok()
521                .map(Response::into_inner)
522                .map(
523                    |GetKnownPeersResponseV2 {
524                         own_info,
525                         mut known_peers,
526                     }| {
527                        if !own_info.addresses.is_empty() {
528                            known_peers.push(own_info)
529                        }
530                        known_peers
531                    },
532                )
533        })
534        .pipe(futures::stream::iter)
535        .buffer_unordered(config.peers_to_query())
536        .filter_map(std::future::ready)
537        .flat_map(futures::stream::iter)
538        .collect::<Vec<_>>()
539        .await;
540
541    update_known_peers(state, metrics, found_peers, configured_peers);
542}
543
544fn update_known_peers(
545    state: Arc<RwLock<State>>,
546    metrics: Metrics,
547    found_peers: Vec<SignedNodeInfo>,
548    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
549) {
550    use std::collections::hash_map::Entry;
551
552    let now_unix = now_unix();
553    let our_peer_id = state.read().unwrap().our_info.clone().unwrap().peer_id;
554    let known_peers = &mut state.write().unwrap().known_peers;
555    // only take the first MAX_PEERS_TO_SEND peers
556    for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
557        // +1 to account for the "own_info" of the serving peer
558        // Skip peers whose timestamp is too far in the future from our clock
559        // or that are too old
560        if peer_info.timestamp_ms > now_unix.saturating_add(30 * 1_000) // 30 seconds
561            || now_unix.saturating_sub(peer_info.timestamp_ms) > ONE_DAY_MILLISECONDS
562        {
563            continue;
564        }
565
566        if peer_info.peer_id == our_peer_id {
567            continue;
568        }
569
570        // If Peer is Private, and not in our configured peers, skip it.
571        if peer_info.access_type == AccessType::Private
572            && !configured_peers.contains_key(&peer_info.peer_id)
573        {
574            continue;
575        }
576
577        // Skip entries that have too many addresses as a means to cap the size of a node's info
578        if peer_info.addresses.len() > MAX_ADDRESSES_PER_PEER {
579            continue;
580        }
581
582        // verify that all addresses provided are valid anemo addresses
583        if !peer_info
584            .addresses
585            .iter()
586            .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
587        {
588            continue;
589        }
590        let Ok(public_key) = Ed25519PublicKey::from_bytes(&peer_info.peer_id.0) else {
591            debug_fatal!(
592                // This should never happen.
593                "Failed to convert anemo PeerId {:?} to Ed25519PublicKey",
594                peer_info.peer_id
595            );
596            continue;
597        };
598        let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
599        if let Err(e) = public_key.verify(&msg, peer_info.auth_sig()) {
600            info!(
601                "Discovery failed to verify signature for NodeInfo for peer {:?}: {e:?}",
602                peer_info.peer_id
603            );
604            // TODO: consider denylisting the source of bad NodeInfo from future requests.
605            continue;
606        }
607        let peer = VerifiedSignedNodeInfo::new_from_verified(peer_info);
608
609        match known_peers.entry(peer.peer_id) {
610            Entry::Occupied(mut o) => {
611                if peer.timestamp_ms > o.get().timestamp_ms {
612                    if o.get().addresses.is_empty() && !peer.addresses.is_empty() {
613                        metrics.inc_num_peers_with_external_address();
614                    }
615                    if !o.get().addresses.is_empty() && peer.addresses.is_empty() {
616                        metrics.dec_num_peers_with_external_address();
617                    }
618                    o.insert(peer);
619                }
620            }
621            Entry::Vacant(v) => {
622                if !peer.addresses.is_empty() {
623                    metrics.inc_num_peers_with_external_address();
624                }
625                v.insert(peer);
626            }
627        }
628    }
629}
630
631fn now_unix() -> u64 {
632    use std::time::{SystemTime, UNIX_EPOCH};
633
634    SystemTime::now()
635        .duration_since(UNIX_EPOCH)
636        .unwrap()
637        .as_millis() as u64
638}