sui_network/discovery/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::types::PeerAffinity;
5use anemo::types::PeerInfo;
6use anemo::{Network, Peer, PeerId, Request, Response, types::PeerEvent};
7use fastcrypto::ed25519::{Ed25519PublicKey, Ed25519Signature};
8use futures::StreamExt;
9use mysten_common::debug_fatal;
10use serde::{Deserialize, Serialize};
11use shared_crypto::intent::IntentScope;
12use std::{
13    collections::{BTreeMap, HashMap},
14    sync::{Arc, RwLock},
15    time::Duration,
16};
17
18use crate::endpoint_manager::{AddressSource, EndpointId, EndpointManager};
19use sui_config::p2p::{AccessType, DiscoveryConfig, P2pConfig};
20use sui_types::crypto::{NetworkKeyPair, NetworkPublicKey, Signer, ToFromBytes, VerifyingKey};
21use sui_types::digests::Digest;
22use sui_types::message_envelope::{Envelope, Message, VerifiedEnvelope};
23use sui_types::multiaddr::Multiaddr;
24use tap::{Pipe, TapFallible};
25use tokio::sync::broadcast::error::RecvError;
26use tokio::sync::mpsc;
27use tokio::{
28    sync::oneshot,
29    task::{AbortHandle, JoinSet},
30};
31use tracing::{debug, info, trace};
32
33const TIMEOUT: Duration = Duration::from_secs(1);
34const ONE_DAY_MILLISECONDS: u64 = 24 * 60 * 60 * 1_000;
35const MAX_ADDRESS_LENGTH: usize = 300;
36const MAX_PEERS_TO_SEND: usize = 200;
37const MAX_ADDRESSES_PER_PEER: usize = 2;
38
39mod generated {
40    include!(concat!(env!("OUT_DIR"), "/sui.Discovery.rs"));
41}
42mod builder;
43mod metrics;
44mod server;
45#[cfg(test)]
46mod tests;
47
48pub use builder::{Builder, UnstartedDiscovery};
49pub use generated::{
50    discovery_client::DiscoveryClient,
51    discovery_server::{Discovery, DiscoveryServer},
52};
53pub use server::{GetKnownPeersRequestV3, GetKnownPeersResponseV2, GetKnownPeersResponseV3};
54
55/// Message types for the discovery system mailbox.
56#[derive(Debug)]
57pub enum DiscoveryMessage {
58    PeerAddressChange {
59        peer_id: PeerId,
60        source: AddressSource,
61        addresses: Vec<anemo::types::Address>,
62    },
63    ReceivedNodeInfo {
64        peer_info: Box<SignedVersionedNodeInfo>,
65    },
66}
67
68/// A Handle to the Discovery subsystem. The Discovery system will be shut down once all Handles
69/// have been dropped.
70#[derive(Clone, Debug)]
71pub struct Handle {
72    pub(super) _shutdown_handle: Arc<oneshot::Sender<()>>,
73    pub(super) sender: Sender,
74}
75
76impl Handle {
77    pub fn sender(&self) -> Sender {
78        self.sender.clone()
79    }
80}
81
82/// A lightweight handle for sending messages to the discovery event loop
83/// without holding a shutdown reference.
84#[derive(Clone, Debug)]
85pub struct Sender {
86    pub(super) sender: mpsc::Sender<DiscoveryMessage>,
87}
88
89impl Sender {
90    pub fn peer_address_change(
91        &self,
92        peer_id: PeerId,
93        source: AddressSource,
94        addresses: Vec<anemo::types::Address>,
95    ) {
96        self.sender
97            .try_send(DiscoveryMessage::PeerAddressChange {
98                peer_id,
99                source,
100                addresses,
101            })
102            .expect("Discovery mailbox should not overflow or be closed")
103    }
104}
105
106use self::metrics::Metrics;
107
108/// The internal discovery state shared between the main event loop and the request handler
109struct State {
110    our_info: Option<SignedNodeInfo>,
111    our_info_v2: Option<SignedVersionedNodeInfo>,
112    connected_peers: HashMap<PeerId, ()>,
113    known_peers: HashMap<PeerId, VerifiedSignedNodeInfo>,
114    known_peers_v2: HashMap<PeerId, VerifiedSignedVersionedNodeInfo>,
115    peer_address_overrides: HashMap<PeerId, BTreeMap<AddressSource, Vec<anemo::types::Address>>>,
116}
117
118/// The information necessary to dial another peer.
119///
120/// `NodeInfo` contains all the information that is shared with other nodes via the discovery
121/// service to advertise how a node can be reached.
122#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
123pub struct NodeInfo {
124    pub peer_id: PeerId,
125    pub addresses: Vec<Multiaddr>,
126
127    /// Creation time.
128    ///
129    /// This is used to determine which of two NodeInfo's from the same PeerId should be retained.
130    pub timestamp_ms: u64,
131
132    /// See docstring for `AccessType`.
133    pub access_type: AccessType,
134}
135
136impl NodeInfo {
137    fn sign(self, keypair: &NetworkKeyPair) -> SignedNodeInfo {
138        let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
139        let sig = keypair.sign(&msg);
140        SignedNodeInfo::new_from_data_and_sig(self, sig)
141    }
142}
143
144pub type SignedNodeInfo = Envelope<NodeInfo, Ed25519Signature>;
145
146pub type VerifiedSignedNodeInfo = VerifiedEnvelope<NodeInfo, Ed25519Signature>;
147
148#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
149pub struct NodeInfoDigest(Digest);
150
151impl NodeInfoDigest {
152    pub const fn new(digest: [u8; 32]) -> Self {
153        Self(Digest::new(digest))
154    }
155}
156
157impl Message for NodeInfo {
158    type DigestType = NodeInfoDigest;
159    const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
160
161    fn digest(&self) -> Self::DigestType {
162        unreachable!("NodeInfoDigest is not used today")
163    }
164}
165
166/// NodeInfoV2 supports multiple address types keyed by EndpointId.
167// TODO: Remove support for V1 once V2 is available in all production networks.
168#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
169pub struct NodeInfoV2 {
170    pub addresses: BTreeMap<EndpointId, Vec<Multiaddr>>,
171    pub timestamp_ms: u64,
172    pub access_type: AccessType,
173}
174
175impl NodeInfoV2 {
176    /// Derive the P2P PeerId from the addresses map.
177    /// Returns None if no P2P endpoint is present.
178    pub fn peer_id(&self) -> Option<PeerId> {
179        self.addresses.keys().find_map(|k| match k {
180            EndpointId::P2p(peer_id) => Some(*peer_id),
181            EndpointId::Consensus(_) => None,
182        })
183    }
184
185    pub fn p2p_addresses(&self) -> &[Multiaddr] {
186        self.addresses
187            .iter()
188            .find_map(|(k, v)| match k {
189                EndpointId::P2p(_) => Some(v.as_slice()),
190                EndpointId::Consensus(_) => None,
191            })
192            .unwrap_or(&[])
193    }
194}
195
196/// Versioned wrapper for NodeInfo types.
197#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
198pub enum VersionedNodeInfo {
199    V1(NodeInfo),
200    V2(NodeInfoV2),
201}
202
203impl VersionedNodeInfo {
204    pub fn peer_id(&self) -> Option<PeerId> {
205        match self {
206            VersionedNodeInfo::V1(info) => Some(info.peer_id),
207            VersionedNodeInfo::V2(info) => info.peer_id(),
208        }
209    }
210
211    pub fn timestamp_ms(&self) -> u64 {
212        match self {
213            VersionedNodeInfo::V1(info) => info.timestamp_ms,
214            VersionedNodeInfo::V2(info) => info.timestamp_ms,
215        }
216    }
217
218    pub fn access_type(&self) -> AccessType {
219        match self {
220            VersionedNodeInfo::V1(info) => info.access_type,
221            VersionedNodeInfo::V2(info) => info.access_type,
222        }
223    }
224
225    pub fn p2p_addresses(&self) -> &[Multiaddr] {
226        match self {
227            VersionedNodeInfo::V1(info) => &info.addresses,
228            VersionedNodeInfo::V2(info) => info.p2p_addresses(),
229        }
230    }
231
232    pub fn sign(self, keypair: &NetworkKeyPair) -> SignedVersionedNodeInfo {
233        let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
234        let sig = keypair.sign(&msg);
235        SignedVersionedNodeInfo::new_from_data_and_sig(self, sig)
236    }
237}
238
239#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
240pub struct VersionedNodeInfoDigest(Digest);
241
242impl Message for VersionedNodeInfo {
243    type DigestType = VersionedNodeInfoDigest;
244    const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
245
246    fn digest(&self) -> Self::DigestType {
247        unreachable!("VersionedNodeInfoDigest is not used today")
248    }
249}
250
251pub type SignedVersionedNodeInfo = Envelope<VersionedNodeInfo, Ed25519Signature>;
252pub type VerifiedSignedVersionedNodeInfo = VerifiedEnvelope<VersionedNodeInfo, Ed25519Signature>;
253
254/// Verifies the signature and endpoint identity consistency of a
255/// `SignedVersionedNodeInfo`, returning a `VerifiedSignedVersionedNodeInfo`
256/// on success.
257///
258/// Checks:
259/// 1. The signature is valid for the P2P peer_id embedded in the node info.
260/// 2. Any non-P2P endpoint identities (e.g. `Consensus`) match the P2P
261///    peer_id, preventing a peer from advertising endpoints under another
262///    key.
263fn verify_versioned_node_info(
264    peer_info: &SignedVersionedNodeInfo,
265) -> Result<VerifiedSignedVersionedNodeInfo, &'static str> {
266    let peer_id = peer_info.peer_id().ok_or("missing P2P peer_id")?;
267
268    let public_key =
269        Ed25519PublicKey::from_bytes(&peer_id.0).map_err(|_| "invalid peer_id public key")?;
270
271    let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
272    public_key
273        .verify(&msg, peer_info.auth_sig())
274        .map_err(|_| "signature verification failed")?;
275
276    match peer_info.data() {
277        VersionedNodeInfo::V1(info) => {
278            if info.addresses.len() > MAX_ADDRESSES_PER_PEER {
279                return Err("too many addresses");
280            }
281            if !info
282                .addresses
283                .iter()
284                .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
285            {
286                return Err("invalid address");
287            }
288        }
289        VersionedNodeInfo::V2(info_v2) => {
290            // Each endpoint variant (P2p, Consensus) may appear at most once.
291            let mut seen_variants = Vec::new();
292            for endpoint_id in info_v2.addresses.keys() {
293                let variant = std::mem::discriminant(endpoint_id);
294                if seen_variants.contains(&variant) {
295                    return Err("duplicate endpoint variant");
296                }
297                seen_variants.push(variant);
298            }
299
300            for (endpoint_id, addrs) in &info_v2.addresses {
301                if addrs.len() > MAX_ADDRESSES_PER_PEER {
302                    return Err("too many addresses for endpoint");
303                }
304                if !addrs.iter().all(|addr| addr.len() < MAX_ADDRESS_LENGTH) {
305                    return Err("address too long");
306                }
307                if matches!(endpoint_id, EndpointId::P2p(_))
308                    && !addrs.iter().all(|addr| addr.to_anemo_address().is_ok())
309                {
310                    return Err("invalid P2P address");
311                }
312            }
313
314            let identities_valid = info_v2.addresses.keys().all(|eid| match eid {
315                EndpointId::P2p(_) => true,
316                EndpointId::Consensus(pubkey) => pubkey.as_bytes() == peer_id.0,
317            });
318            if !identities_valid {
319                return Err("non-P2P endpoint identity mismatch");
320            }
321        }
322    }
323
324    Ok(VerifiedSignedVersionedNodeInfo::new_from_verified(
325        peer_info.clone(),
326    ))
327}
328
329struct DiscoveryEventLoop {
330    config: P2pConfig,
331    discovery_config: Arc<DiscoveryConfig>,
332    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
333    unidentified_seed_peers: Vec<anemo::types::Address>,
334    network: Network,
335    keypair: NetworkKeyPair,
336    tasks: JoinSet<()>,
337    pending_dials: HashMap<PeerId, AbortHandle>,
338    dial_seed_peers_task: Option<AbortHandle>,
339    shutdown_handle: oneshot::Receiver<()>,
340    state: Arc<RwLock<State>>,
341    mailbox: mpsc::Receiver<DiscoveryMessage>,
342    metrics: Metrics,
343    consensus_external_address: Option<Multiaddr>,
344    endpoint_manager: EndpointManager,
345}
346
347impl DiscoveryEventLoop {
348    pub async fn start(mut self) {
349        info!("Discovery started");
350
351        self.construct_our_info();
352        self.configure_preferred_peers();
353
354        let mut interval = tokio::time::interval(self.discovery_config.interval_period());
355        let mut peer_events = {
356            let (subscriber, _peers) = self.network.subscribe().unwrap();
357            subscriber
358        };
359
360        loop {
361            tokio::select! {
362                now = interval.tick() => {
363                    let now_unix = now_unix();
364                    self.handle_tick(now.into_std(), now_unix);
365                }
366                peer_event = peer_events.recv() => {
367                    self.handle_peer_event(peer_event);
368                },
369                Some(message) = self.mailbox.recv() => {
370                    self.handle_message(message);
371                }
372                Some(task_result) = self.tasks.join_next() => {
373                    match task_result {
374                        Ok(()) => {},
375                        Err(e) => {
376                            if e.is_cancelled() {
377                                // avoid crashing on ungraceful shutdown
378                            } else if e.is_panic() {
379                                // propagate panics.
380                                std::panic::resume_unwind(e.into_panic());
381                            } else {
382                                panic!("task failed: {e}");
383                            }
384                        },
385                    };
386                },
387                // Once the shutdown notification resolves we can terminate the event loop
388                _ = &mut self.shutdown_handle => {
389                    break;
390                }
391            }
392        }
393
394        info!("Discovery ended");
395    }
396
397    fn handle_message(&mut self, message: DiscoveryMessage) {
398        match message {
399            DiscoveryMessage::PeerAddressChange {
400                peer_id,
401                source,
402                addresses,
403            } => {
404                self.handle_peer_address_change(peer_id, source, addresses);
405            }
406            DiscoveryMessage::ReceivedNodeInfo { peer_info } => {
407                update_known_peers_versioned(
408                    self.state.clone(),
409                    self.metrics.clone(),
410                    vec![*peer_info],
411                    self.configured_peers.clone(),
412                    &self.endpoint_manager,
413                );
414            }
415        }
416    }
417
418    fn construct_our_info(&mut self) {
419        if self.state.read().unwrap().our_info.is_some() {
420            return;
421        }
422
423        let peer_id = self.network.peer_id();
424        let timestamp_ms = now_unix();
425        let access_type = self.discovery_config.access_type();
426
427        let addresses: Vec<Multiaddr> = self
428            .config
429            .external_address
430            .clone()
431            .and_then(|addr| addr.to_anemo_address().ok().map(|_| addr))
432            .into_iter()
433            .collect();
434
435        let our_info = NodeInfo {
436            peer_id,
437            addresses: addresses.clone(),
438            timestamp_ms,
439            access_type,
440        }
441        .sign(&self.keypair);
442
443        let mut addresses_map = BTreeMap::new();
444        addresses_map.insert(EndpointId::P2p(peer_id), addresses);
445        if let Some(consensus_addr) = &self.consensus_external_address {
446            // Populates `Consensus` EndpointId from our P2P (anemo) `PeerId`.
447            // This is safe because both the P2P and consensus networks use the same
448            // ed25519 network keypair. Both originate from `NodeConfig::network_key_pair()`.
449            let network_pubkey =
450                NetworkPublicKey::from_bytes(&peer_id.0).expect("PeerId is a valid public key");
451            addresses_map.insert(
452                EndpointId::Consensus(network_pubkey),
453                vec![consensus_addr.clone()],
454            );
455        }
456        let our_info_v2 = VersionedNodeInfo::V2(NodeInfoV2 {
457            addresses: addresses_map,
458            timestamp_ms,
459            access_type,
460        })
461        .sign(&self.keypair);
462
463        let mut state = self.state.write().unwrap();
464        state.our_info = Some(our_info);
465        state.our_info_v2 = Some(our_info_v2);
466    }
467
468    fn configure_preferred_peers(&mut self) {
469        for peer_info in self.configured_peers.values() {
470            debug!(?peer_info, "Add configured preferred peer");
471            self.network.known_peers().insert(peer_info.clone());
472        }
473    }
474
475    fn update_our_info_timestamp(&mut self, now_unix: u64) {
476        let state = &mut self.state.write().unwrap();
477
478        if let Some(our_info) = &state.our_info {
479            let mut data = our_info.data().clone();
480            data.timestamp_ms = now_unix;
481            state.our_info = Some(data.sign(&self.keypair));
482        }
483
484        if let Some(our_info_v2) = &state.our_info_v2 {
485            let mut data = our_info_v2.data().clone();
486            match &mut data {
487                VersionedNodeInfo::V1(info) => info.timestamp_ms = now_unix,
488                VersionedNodeInfo::V2(info) => info.timestamp_ms = now_unix,
489            }
490            state.our_info_v2 = Some(data.sign(&self.keypair));
491        }
492    }
493
494    fn handle_peer_address_change(
495        &mut self,
496        peer_id: PeerId,
497        source: AddressSource,
498        addresses: Vec<anemo::types::Address>,
499    ) {
500        debug!(
501            ?peer_id,
502            ?source,
503            ?addresses,
504            "Received peer address change"
505        );
506
507        // Update stored addresses.
508        {
509            let mut state = self.state.write().unwrap();
510            let source_map = state.peer_address_overrides.entry(peer_id).or_default();
511
512            if addresses.is_empty() {
513                source_map.remove(&source);
514                if source_map.is_empty() {
515                    state.peer_address_overrides.remove(&peer_id);
516                }
517            } else {
518                source_map.insert(source, addresses);
519            }
520        }
521
522        // Reconfigure network if priority addresses changed.
523        let priority_addresses = self
524            .state
525            .read()
526            .unwrap()
527            .peer_address_overrides
528            .get(&peer_id)
529            .and_then(|sources| sources.first_key_value().map(|(_, addrs)| addrs.clone()))
530            .unwrap_or_default();
531        let current_addresses = self
532            .network
533            .known_peers()
534            .get(&peer_id)
535            .map(|info| info.address.clone())
536            .unwrap_or_default();
537        if priority_addresses != current_addresses {
538            let new_peer_info = PeerInfo {
539                peer_id,
540                affinity: PeerAffinity::High,
541                address: priority_addresses.clone(),
542            };
543
544            self.network.known_peers().insert(new_peer_info);
545            let _ = self.network.disconnect(peer_id);
546
547            if let Some(address) = priority_addresses.first().cloned() {
548                let network = self.network.clone();
549                self.tasks.spawn(async move {
550                    // If this fails, ConnectionManager will retry.
551                    let _ = network.connect_with_peer_id(address, peer_id).await;
552                });
553            }
554        }
555    }
556
557    fn handle_peer_event(&mut self, peer_event: Result<PeerEvent, RecvError>) {
558        match peer_event {
559            Ok(PeerEvent::NewPeer(peer_id)) => {
560                if let Some(peer) = self.network.peer(peer_id) {
561                    self.state
562                        .write()
563                        .unwrap()
564                        .connected_peers
565                        .insert(peer_id, ());
566
567                    // Query the new node for any peers
568                    self.tasks.spawn(query_peer_for_their_known_peers(
569                        peer,
570                        self.discovery_config.clone(),
571                        self.state.clone(),
572                        self.metrics.clone(),
573                        self.configured_peers.clone(),
574                        self.endpoint_manager.clone(),
575                    ));
576                }
577            }
578            Ok(PeerEvent::LostPeer(peer_id, _)) => {
579                self.state.write().unwrap().connected_peers.remove(&peer_id);
580            }
581
582            Err(RecvError::Closed) => {
583                panic!("PeerEvent channel shouldn't be able to be closed");
584            }
585
586            Err(RecvError::Lagged(_)) => {
587                trace!("State-Sync fell behind processing PeerEvents");
588            }
589        }
590    }
591
592    fn handle_tick(&mut self, _now: std::time::Instant, now_unix: u64) {
593        self.update_our_info_timestamp(now_unix);
594
595        self.tasks
596            .spawn(query_connected_peers_for_their_known_peers(
597                self.network.clone(),
598                self.discovery_config.clone(),
599                self.state.clone(),
600                self.metrics.clone(),
601                self.configured_peers.clone(),
602                self.endpoint_manager.clone(),
603            ));
604
605        // Cull old peers older than a day
606        {
607            let mut state = self.state.write().unwrap();
608            state
609                .known_peers
610                .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms) < ONE_DAY_MILLISECONDS);
611            state
612                .known_peers_v2
613                .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms()) < ONE_DAY_MILLISECONDS);
614        }
615
616        // Clean out the pending_dials
617        self.pending_dials.retain(|_k, v| !v.is_finished());
618        if let Some(abort_handle) = &self.dial_seed_peers_task
619            && abort_handle.is_finished()
620        {
621            self.dial_seed_peers_task = None;
622        }
623
624        // Spawn some dials
625        let state = self.state.read().unwrap();
626        let our_peer_id = self.network.peer_id();
627
628        // Collect eligible peers from both known_peers (V2) and known_peers_v2 (V3),
629        // preferring fresher timestamps when a peer appears in both maps.
630        let mut eligible: HashMap<PeerId, NodeInfo> = HashMap::new();
631
632        for (peer_id, info) in state.known_peers.iter() {
633            if *peer_id != our_peer_id
634                && !info.addresses.is_empty()
635                && !state.connected_peers.contains_key(peer_id)
636                && !self.pending_dials.contains_key(peer_id)
637            {
638                eligible.insert(*peer_id, info.data().clone());
639            }
640        }
641        for (peer_id, info) in state.known_peers_v2.iter() {
642            let p2p_addresses = info.p2p_addresses();
643            if *peer_id != our_peer_id
644                && !p2p_addresses.is_empty()
645                && !state.connected_peers.contains_key(peer_id)
646                && !self.pending_dials.contains_key(peer_id)
647                && eligible
648                    .get(peer_id)
649                    .is_none_or(|existing| info.timestamp_ms() > existing.timestamp_ms)
650            {
651                eligible.insert(
652                    *peer_id,
653                    NodeInfo {
654                        peer_id: *peer_id,
655                        addresses: p2p_addresses.to_vec(),
656                        timestamp_ms: info.timestamp_ms(),
657                        access_type: info.access_type(),
658                    },
659                );
660            }
661        }
662
663        // No need to connect to any more peers if we're already connected to a bunch
664        let number_of_connections = state.connected_peers.len();
665        let number_to_dial = std::cmp::min(
666            eligible.len(),
667            self.discovery_config
668                .target_concurrent_connections()
669                .saturating_sub(number_of_connections),
670        );
671
672        // randomize the order
673        use rand::seq::IteratorRandom;
674        for (peer_id, info) in eligible
675            .into_iter()
676            .choose_multiple(&mut rand::thread_rng(), number_to_dial)
677        {
678            let abort_handle = self
679                .tasks
680                .spawn(try_to_connect_to_peer(self.network.clone(), info));
681            self.pending_dials.insert(peer_id, abort_handle);
682        }
683
684        // If we aren't connected to anything and we aren't presently trying to connect to anyone
685        // we need to try the configured peers with High affinity (seed peers)
686        let has_peers_to_dial = || {
687            self.configured_peers
688                .values()
689                .any(|p| p.affinity == PeerAffinity::High)
690                || !self.unidentified_seed_peers.is_empty()
691        };
692        if self.dial_seed_peers_task.is_none()
693            && state.connected_peers.is_empty()
694            && self.pending_dials.is_empty()
695            && has_peers_to_dial()
696        {
697            let abort_handle = self.tasks.spawn(try_to_connect_to_seed_peers(
698                self.network.clone(),
699                self.discovery_config.clone(),
700                self.configured_peers.clone(),
701                self.unidentified_seed_peers.clone(),
702            ));
703
704            self.dial_seed_peers_task = Some(abort_handle);
705        }
706    }
707}
708
709async fn try_to_connect_to_peer(network: Network, info: NodeInfo) {
710    debug!("Connecting to peer {info:?}");
711    for multiaddr in &info.addresses {
712        if let Ok(address) = multiaddr.to_anemo_address() {
713            // Ignore the result and just log the error if there is one
714            if network
715                .connect_with_peer_id(address, info.peer_id)
716                .await
717                .tap_err(|e| {
718                    debug!(
719                        "error dialing {} at address '{}': {e}",
720                        info.peer_id.short_display(4),
721                        multiaddr
722                    )
723                })
724                .is_ok()
725            {
726                return;
727            }
728        }
729    }
730}
731
732async fn try_to_connect_to_seed_peers(
733    network: Network,
734    config: Arc<DiscoveryConfig>,
735    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
736    unidentified_seed_peers: Vec<anemo::types::Address>,
737) {
738    let high_affinity_peers: Vec<_> = configured_peers
739        .values()
740        .filter(|p| p.affinity == PeerAffinity::High)
741        .cloned()
742        .collect();
743    debug!(
744        ?high_affinity_peers,
745        ?unidentified_seed_peers,
746        "Connecting to seed peers"
747    );
748    let network = &network;
749
750    // Attempt connection to all high-affinity and seed peers.
751    let with_peer_id = high_affinity_peers.into_iter().flat_map(|peer_info| {
752        peer_info
753            .address
754            .into_iter()
755            .map(move |addr| (Some(peer_info.peer_id), addr))
756    });
757    let without_peer_id = unidentified_seed_peers.into_iter().map(|addr| (None, addr));
758    futures::stream::iter(with_peer_id.chain(without_peer_id))
759        .for_each_concurrent(
760            config.target_concurrent_connections(),
761            |(peer_id, address)| async move {
762                // Ignore the result and just log the error if there is one
763                let _ = if let Some(peer_id) = peer_id {
764                    network
765                        .connect_with_peer_id(address.clone(), peer_id)
766                        .await
767                        .tap_err(|e| {
768                            debug!(
769                                "error dialing peer {} at '{}': {e}",
770                                peer_id.short_display(4),
771                                address
772                            )
773                        })
774                } else {
775                    network
776                        .connect(address.clone())
777                        .await
778                        .tap_err(|e| debug!("error dialing address '{}': {e}", address))
779                };
780            },
781        )
782        .await;
783}
784
785async fn query_peer_for_known_peers_v2(peer: Peer) -> Option<Vec<SignedNodeInfo>> {
786    let mut client = DiscoveryClient::new(peer);
787    let request = Request::new(()).with_timeout(TIMEOUT);
788    client
789        .get_known_peers_v2(request)
790        .await
791        .ok()
792        .map(Response::into_inner)
793        .map(
794            |GetKnownPeersResponseV2 {
795                 own_info,
796                 mut known_peers,
797             }| {
798                if !own_info.addresses.is_empty() {
799                    known_peers.push(own_info)
800                }
801                known_peers
802            },
803        )
804}
805
806async fn query_peer_for_their_known_peers(
807    peer: Peer,
808    discovery_config: Arc<DiscoveryConfig>,
809    state: Arc<RwLock<State>>,
810    metrics: Metrics,
811    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
812    endpoint_manager: EndpointManager,
813) {
814    // Query V3 concurrently with V2 when enabled
815    if discovery_config.use_get_known_peers_v3() {
816        let our_info_v2 = state.read().unwrap().our_info_v2.clone();
817        if let Some(own_info) = our_info_v2 {
818            let peer_for_v3 = peer.clone();
819            let v3_query = async move {
820                let mut client = DiscoveryClient::new(peer_for_v3);
821                let request =
822                    Request::new(GetKnownPeersRequestV3 { own_info }).with_timeout(TIMEOUT);
823                client
824                    .get_known_peers_v3(request)
825                    .await
826                    .ok()
827                    .map(Response::into_inner)
828                    .map(
829                        |GetKnownPeersResponseV3 {
830                             own_info,
831                             mut known_peers,
832                         }| {
833                            if !own_info.p2p_addresses().is_empty() {
834                                known_peers.push(own_info)
835                            }
836                            known_peers
837                        },
838                    )
839            };
840
841            let (found_peers_v2, found_peers_v3) =
842                tokio::join!(query_peer_for_known_peers_v2(peer), v3_query);
843
844            if let Some(found_peers) = found_peers_v2 {
845                update_known_peers(
846                    state.clone(),
847                    metrics.clone(),
848                    found_peers,
849                    configured_peers.clone(),
850                );
851            }
852            if let Some(found_peers) = found_peers_v3 {
853                update_known_peers_versioned(
854                    state,
855                    metrics,
856                    found_peers,
857                    configured_peers,
858                    &endpoint_manager,
859                );
860            }
861            return;
862        }
863    }
864
865    // V3 not enabled or our_info_v2 not available - just run V2
866    if let Some(found_peers) = query_peer_for_known_peers_v2(peer).await {
867        update_known_peers(state, metrics, found_peers, configured_peers);
868    }
869}
870
871async fn query_connected_peers_for_their_known_peers(
872    network: Network,
873    config: Arc<DiscoveryConfig>,
874    state: Arc<RwLock<State>>,
875    metrics: Metrics,
876    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
877    endpoint_manager: EndpointManager,
878) {
879    use rand::seq::IteratorRandom;
880
881    let peers_to_query: Vec<_> = network
882        .peers()
883        .into_iter()
884        .flat_map(|id| network.peer(id))
885        .choose_multiple(&mut rand::thread_rng(), config.peers_to_query());
886
887    // Query V2 to keep known_peers populated for V2 clients
888    let v2_query = {
889        let peers = peers_to_query.clone();
890        let peers_to_query_count = config.peers_to_query();
891        async move {
892            peers
893                .into_iter()
894                .map(DiscoveryClient::new)
895                .map(|mut client| async move {
896                    let request = Request::new(()).with_timeout(TIMEOUT);
897                    client
898                        .get_known_peers_v2(request)
899                        .await
900                        .ok()
901                        .map(Response::into_inner)
902                        .map(
903                            |GetKnownPeersResponseV2 {
904                                 own_info,
905                                 mut known_peers,
906                             }| {
907                                if !own_info.addresses.is_empty() {
908                                    known_peers.push(own_info)
909                                }
910                                known_peers
911                            },
912                        )
913                })
914                .pipe(futures::stream::iter)
915                .buffer_unordered(peers_to_query_count)
916                .filter_map(std::future::ready)
917                .flat_map(futures::stream::iter)
918                .collect::<Vec<_>>()
919                .await
920        }
921    };
922
923    // Additionally query V3 when enabled, concurrently with V2
924    if config.use_get_known_peers_v3() {
925        let our_info_v2 = state.read().unwrap().our_info_v2.clone();
926        if let Some(own_info) = our_info_v2 {
927            let v3_query = {
928                let peers_to_query_count = config.peers_to_query();
929                async move {
930                    peers_to_query
931                        .into_iter()
932                        .map(DiscoveryClient::new)
933                        .map(|mut client| {
934                            let own_info = own_info.clone();
935                            async move {
936                                let request = Request::new(GetKnownPeersRequestV3 { own_info })
937                                    .with_timeout(TIMEOUT);
938                                client
939                                    .get_known_peers_v3(request)
940                                    .await
941                                    .ok()
942                                    .map(Response::into_inner)
943                                    .map(
944                                        |GetKnownPeersResponseV3 {
945                                             own_info,
946                                             mut known_peers,
947                                         }| {
948                                            if !own_info.p2p_addresses().is_empty() {
949                                                known_peers.push(own_info)
950                                            }
951                                            known_peers
952                                        },
953                                    )
954                            }
955                        })
956                        .pipe(futures::stream::iter)
957                        .buffer_unordered(peers_to_query_count)
958                        .filter_map(std::future::ready)
959                        .flat_map(futures::stream::iter)
960                        .collect::<Vec<_>>()
961                        .await
962                }
963            };
964
965            let (found_peers_v2, found_peers_v3) = tokio::join!(v2_query, v3_query);
966
967            update_known_peers(
968                state.clone(),
969                metrics.clone(),
970                found_peers_v2,
971                configured_peers.clone(),
972            );
973            update_known_peers_versioned(
974                state,
975                metrics,
976                found_peers_v3,
977                configured_peers,
978                &endpoint_manager,
979            );
980            return;
981        }
982    }
983
984    // V3 not enabled or our_info_v2 not available - just run V2
985    let found_peers_v2 = v2_query.await;
986    update_known_peers(state, metrics, found_peers_v2, configured_peers);
987}
988
989fn update_known_peers(
990    state: Arc<RwLock<State>>,
991    metrics: Metrics,
992    found_peers: Vec<SignedNodeInfo>,
993    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
994) {
995    use std::collections::hash_map::Entry;
996
997    let now_unix = now_unix();
998    let our_peer_id = state.read().unwrap().our_info.clone().unwrap().peer_id;
999    let known_peers = &mut state.write().unwrap().known_peers;
1000    // only take the first MAX_PEERS_TO_SEND peers
1001    for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1002        // +1 to account for the "own_info" of the serving peer
1003        // Skip peers whose timestamp is too far in the future from our clock
1004        // or that are too old
1005        if peer_info.timestamp_ms > now_unix.saturating_add(30 * 1_000) // 30 seconds
1006            || now_unix.saturating_sub(peer_info.timestamp_ms) > ONE_DAY_MILLISECONDS
1007        {
1008            continue;
1009        }
1010
1011        if peer_info.peer_id == our_peer_id {
1012            continue;
1013        }
1014
1015        // If Peer is Private or Trusted, and not in our configured peers, skip it.
1016        let is_restricted = match peer_info.access_type {
1017            AccessType::Public => false,
1018            AccessType::Private | AccessType::Trusted => true,
1019        };
1020        if is_restricted && !configured_peers.contains_key(&peer_info.peer_id) {
1021            continue;
1022        }
1023
1024        // Skip entries that have too many addresses as a means to cap the size of a node's info
1025        if peer_info.addresses.len() > MAX_ADDRESSES_PER_PEER {
1026            continue;
1027        }
1028
1029        // verify that all addresses provided are valid anemo addresses
1030        if !peer_info
1031            .addresses
1032            .iter()
1033            .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
1034        {
1035            continue;
1036        }
1037        let Ok(public_key) = Ed25519PublicKey::from_bytes(&peer_info.peer_id.0) else {
1038            debug_fatal!(
1039                // This should never happen.
1040                "Failed to convert anemo PeerId {:?} to Ed25519PublicKey",
1041                peer_info.peer_id
1042            );
1043            continue;
1044        };
1045        let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
1046        if let Err(e) = public_key.verify(&msg, peer_info.auth_sig()) {
1047            info!(
1048                "Discovery failed to verify signature for NodeInfo for peer {:?}: {e:?}",
1049                peer_info.peer_id
1050            );
1051            // TODO: consider denylisting the source of bad NodeInfo from future requests.
1052            continue;
1053        }
1054        let peer = VerifiedSignedNodeInfo::new_from_verified(peer_info);
1055
1056        match known_peers.entry(peer.peer_id) {
1057            Entry::Occupied(mut o) => {
1058                if peer.timestamp_ms > o.get().timestamp_ms {
1059                    if o.get().addresses.is_empty() && !peer.addresses.is_empty() {
1060                        metrics.inc_num_peers_with_external_address();
1061                    }
1062                    if !o.get().addresses.is_empty() && peer.addresses.is_empty() {
1063                        metrics.dec_num_peers_with_external_address();
1064                    }
1065                    o.insert(peer);
1066                }
1067            }
1068            Entry::Vacant(v) => {
1069                if !peer.addresses.is_empty() {
1070                    metrics.inc_num_peers_with_external_address();
1071                }
1072                v.insert(peer);
1073            }
1074        }
1075    }
1076}
1077
1078fn update_known_peers_versioned(
1079    state: Arc<RwLock<State>>,
1080    metrics: Metrics,
1081    found_peers: Vec<SignedVersionedNodeInfo>,
1082    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1083    endpoint_manager: &EndpointManager,
1084) {
1085    use std::collections::hash_map::Entry;
1086
1087    let now_unix = now_unix();
1088    let our_peer_id = state
1089        .read()
1090        .unwrap()
1091        .our_info_v2
1092        .as_ref()
1093        .and_then(|info| info.peer_id());
1094    let known_peers_v2 = &mut state.write().unwrap().known_peers_v2;
1095
1096    for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1097        let timestamp_ms = peer_info.timestamp_ms();
1098
1099        if timestamp_ms > now_unix.saturating_add(30 * 1_000)
1100            || now_unix.saturating_sub(timestamp_ms) > ONE_DAY_MILLISECONDS
1101        {
1102            continue;
1103        }
1104
1105        let Some(peer_id) = peer_info.peer_id() else {
1106            continue;
1107        };
1108
1109        if Some(peer_id) == our_peer_id {
1110            continue;
1111        }
1112
1113        let is_restricted = match peer_info.access_type() {
1114            AccessType::Public => false,
1115            AccessType::Private | AccessType::Trusted => true,
1116        };
1117        if is_restricted && !configured_peers.contains_key(&peer_id) {
1118            continue;
1119        }
1120
1121        let peer = match verify_versioned_node_info(&peer_info) {
1122            Ok(verified) => verified,
1123            Err(reason) => {
1124                info!("Discovery rejecting VersionedNodeInfo for peer {peer_id:?}: {reason}");
1125                continue;
1126            }
1127        };
1128
1129        // Forward non-P2P addresses from configured peers to EndpointManager.
1130        if configured_peers.contains_key(&peer_id)
1131            && let VersionedNodeInfo::V2(info_v2) = peer_info.data()
1132        {
1133            for (endpoint_id, addrs) in &info_v2.addresses {
1134                if !matches!(endpoint_id, EndpointId::P2p(_)) && !addrs.is_empty() {
1135                    let _ = endpoint_manager.update_endpoint(
1136                        endpoint_id.clone(),
1137                        AddressSource::Discovery,
1138                        addrs.clone(),
1139                    );
1140                }
1141            }
1142        }
1143
1144        let peer_p2p_addresses = peer.p2p_addresses();
1145
1146        match known_peers_v2.entry(peer_id) {
1147            Entry::Occupied(mut o) => {
1148                if peer.timestamp_ms() > o.get().timestamp_ms() {
1149                    let old_addresses = o.get().p2p_addresses();
1150                    if old_addresses.is_empty() && !peer_p2p_addresses.is_empty() {
1151                        metrics.inc_num_peers_with_external_address();
1152                    }
1153                    if !old_addresses.is_empty() && peer_p2p_addresses.is_empty() {
1154                        metrics.dec_num_peers_with_external_address();
1155                    }
1156                    o.insert(peer);
1157                }
1158            }
1159            Entry::Vacant(v) => {
1160                if !peer_p2p_addresses.is_empty() {
1161                    metrics.inc_num_peers_with_external_address();
1162                }
1163                v.insert(peer);
1164            }
1165        }
1166    }
1167}
1168
1169pub(super) fn now_unix() -> u64 {
1170    use std::time::{SystemTime, UNIX_EPOCH};
1171
1172    SystemTime::now()
1173        .duration_since(UNIX_EPOCH)
1174        .unwrap()
1175        .as_millis() as u64
1176}