sui_network/discovery/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::types::PeerAffinity;
5use anemo::types::PeerInfo;
6use anemo::{Network, Peer, PeerId, Request, Response, types::PeerEvent};
7use fastcrypto::ed25519::{Ed25519PublicKey, Ed25519Signature};
8use futures::StreamExt;
9use mysten_common::debug_fatal;
10use serde::{Deserialize, Serialize};
11use shared_crypto::intent::IntentScope;
12use std::{
13    collections::{BTreeMap, HashMap},
14    path::PathBuf,
15    sync::{Arc, RwLock},
16    time::{Duration, Instant},
17};
18
19use crate::endpoint_manager::{AddressSource, EndpointId, EndpointManager};
20use store::{load_stored_peers, save_stored_peers};
21use sui_config::p2p::{AccessType, DiscoveryConfig, P2pConfig};
22use sui_types::crypto::{NetworkKeyPair, NetworkPublicKey, Signer, ToFromBytes, VerifyingKey};
23use sui_types::digests::Digest;
24use sui_types::message_envelope::{Envelope, Message, VerifiedEnvelope};
25use sui_types::multiaddr::Multiaddr;
26use tap::{Pipe, TapFallible};
27use tokio::sync::broadcast::error::RecvError;
28use tokio::sync::mpsc;
29use tokio::{
30    sync::oneshot,
31    task::{AbortHandle, JoinSet},
32};
33use tracing::{debug, info, trace};
34
35const TIMEOUT: Duration = Duration::from_secs(1);
36const ONE_DAY_MILLISECONDS: u64 = 24 * 60 * 60 * 1_000;
37const MAX_ADDRESS_LENGTH: usize = 300;
38const MAX_PEERS_TO_SEND: usize = 200;
39const MAX_ADDRESSES_PER_PEER: usize = 2;
40
41mod generated {
42    include!(concat!(env!("OUT_DIR"), "/sui.Discovery.rs"));
43}
44mod builder;
45mod metrics;
46mod server;
47mod store;
48#[cfg(test)]
49mod tests;
50
51pub use builder::{Builder, UnstartedDiscovery};
52pub use generated::{
53    discovery_client::DiscoveryClient,
54    discovery_server::{Discovery, DiscoveryServer},
55};
56pub use server::{GetKnownPeersRequestV3, GetKnownPeersResponseV2, GetKnownPeersResponseV3};
57
58/// Message types for the discovery system mailbox.
59#[derive(Debug)]
60pub enum DiscoveryMessage {
61    /// An external source (e.g. admin API, node config) updated a peer's address.
62    PeerAddressChange {
63        peer_id: PeerId,
64        source: AddressSource,
65        addresses: Vec<anemo::types::Address>,
66    },
67    /// Node info was received from inbound RPC.
68    ReceivedNodeInfo {
69        peer_info: Box<SignedVersionedNodeInfo>,
70    },
71    /// A spawned peer-query task discovered updated info for a configured peer,
72    /// signaling the event loop to update the stored peer addresses.
73    ConfiguredPeersUpdated,
74    /// A peer has been reported as continuously failing, triggering disconnect and cooldown.
75    PeerFailureReport { peer_id: PeerId },
76}
77
78/// A Handle to the Discovery subsystem. The Discovery system will be shut down once all Handles
79/// have been dropped.
80#[derive(Clone, Debug)]
81pub struct Handle {
82    pub(super) _shutdown_handle: Arc<oneshot::Sender<()>>,
83    pub(super) sender: Sender,
84}
85
86impl Handle {
87    pub fn sender(&self) -> Sender {
88        self.sender.clone()
89    }
90}
91
92/// A lightweight handle for sending messages to the discovery event loop
93/// without holding a shutdown reference.
94#[derive(Clone, Debug)]
95pub struct Sender {
96    pub(super) sender: mpsc::Sender<DiscoveryMessage>,
97}
98
99impl Sender {
100    pub fn peer_address_change(
101        &self,
102        peer_id: PeerId,
103        source: AddressSource,
104        addresses: Vec<anemo::types::Address>,
105    ) {
106        self.sender
107            .try_send(DiscoveryMessage::PeerAddressChange {
108                peer_id,
109                source,
110                addresses,
111            })
112            .expect("Discovery mailbox should not overflow or be closed")
113    }
114
115    pub fn report_peer_failure(&self, peer_id: PeerId) {
116        let _ = self
117            .sender
118            .try_send(DiscoveryMessage::PeerFailureReport { peer_id });
119    }
120}
121
122use self::metrics::Metrics;
123
124/// The internal discovery state shared between the main event loop and the request handler
125struct State {
126    our_info: Option<SignedNodeInfo>,
127    our_info_v2: Option<SignedVersionedNodeInfo>,
128    connected_peers: HashMap<PeerId, ()>,
129    known_peers: HashMap<PeerId, VerifiedSignedNodeInfo>,
130    known_peers_v2: HashMap<PeerId, VerifiedSignedVersionedNodeInfo>,
131    peer_addresses: HashMap<PeerId, BTreeMap<AddressSource, Vec<anemo::types::Address>>>,
132}
133
134/// The information necessary to dial another peer.
135///
136/// `NodeInfo` contains all the information that is shared with other nodes via the discovery
137/// service to advertise how a node can be reached.
138#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
139pub struct NodeInfo {
140    pub peer_id: PeerId,
141    pub addresses: Vec<Multiaddr>,
142
143    /// Creation time.
144    ///
145    /// This is used to determine which of two NodeInfo's from the same PeerId should be retained.
146    pub timestamp_ms: u64,
147
148    /// See docstring for `AccessType`.
149    pub access_type: AccessType,
150}
151
152impl NodeInfo {
153    fn sign(self, keypair: &NetworkKeyPair) -> SignedNodeInfo {
154        let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
155        let sig = keypair.sign(&msg);
156        SignedNodeInfo::new_from_data_and_sig(self, sig)
157    }
158}
159
160pub type SignedNodeInfo = Envelope<NodeInfo, Ed25519Signature>;
161
162pub type VerifiedSignedNodeInfo = VerifiedEnvelope<NodeInfo, Ed25519Signature>;
163
164#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
165pub struct NodeInfoDigest(Digest);
166
167impl NodeInfoDigest {
168    pub const fn new(digest: [u8; 32]) -> Self {
169        Self(Digest::new(digest))
170    }
171}
172
173impl Message for NodeInfo {
174    type DigestType = NodeInfoDigest;
175    const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
176
177    fn digest(&self) -> Self::DigestType {
178        unreachable!("NodeInfoDigest is not used today")
179    }
180}
181
182/// NodeInfoV2 supports multiple address types keyed by EndpointId.
183// TODO: Remove support for V1 once V2 is available in all production networks.
184#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
185pub struct NodeInfoV2 {
186    pub addresses: BTreeMap<EndpointId, Vec<Multiaddr>>,
187    pub timestamp_ms: u64,
188    pub access_type: AccessType,
189}
190
191impl NodeInfoV2 {
192    /// Derive the P2P PeerId from the addresses map.
193    /// Returns None if no P2P endpoint is present.
194    pub fn peer_id(&self) -> Option<PeerId> {
195        self.addresses.keys().find_map(|k| match k {
196            EndpointId::P2p(peer_id) => Some(*peer_id),
197            EndpointId::Consensus(_) => None,
198        })
199    }
200
201    pub fn p2p_addresses(&self) -> &[Multiaddr] {
202        self.addresses
203            .iter()
204            .find_map(|(k, v)| match k {
205                EndpointId::P2p(_) => Some(v.as_slice()),
206                EndpointId::Consensus(_) => None,
207            })
208            .unwrap_or(&[])
209    }
210}
211
212/// Versioned wrapper for NodeInfo types.
213#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
214pub enum VersionedNodeInfo {
215    V1(NodeInfo),
216    V2(NodeInfoV2),
217}
218
219impl VersionedNodeInfo {
220    pub fn peer_id(&self) -> Option<PeerId> {
221        match self {
222            VersionedNodeInfo::V1(info) => Some(info.peer_id),
223            VersionedNodeInfo::V2(info) => info.peer_id(),
224        }
225    }
226
227    pub fn timestamp_ms(&self) -> u64 {
228        match self {
229            VersionedNodeInfo::V1(info) => info.timestamp_ms,
230            VersionedNodeInfo::V2(info) => info.timestamp_ms,
231        }
232    }
233
234    pub fn access_type(&self) -> AccessType {
235        match self {
236            VersionedNodeInfo::V1(info) => info.access_type,
237            VersionedNodeInfo::V2(info) => info.access_type,
238        }
239    }
240
241    pub fn p2p_addresses(&self) -> &[Multiaddr] {
242        match self {
243            VersionedNodeInfo::V1(info) => &info.addresses,
244            VersionedNodeInfo::V2(info) => info.p2p_addresses(),
245        }
246    }
247
248    pub fn sign(self, keypair: &NetworkKeyPair) -> SignedVersionedNodeInfo {
249        let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
250        let sig = keypair.sign(&msg);
251        SignedVersionedNodeInfo::new_from_data_and_sig(self, sig)
252    }
253}
254
255#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
256pub struct VersionedNodeInfoDigest(Digest);
257
258impl Message for VersionedNodeInfo {
259    type DigestType = VersionedNodeInfoDigest;
260    const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
261
262    fn digest(&self) -> Self::DigestType {
263        unreachable!("VersionedNodeInfoDigest is not used today")
264    }
265}
266
267pub type SignedVersionedNodeInfo = Envelope<VersionedNodeInfo, Ed25519Signature>;
268pub type VerifiedSignedVersionedNodeInfo = VerifiedEnvelope<VersionedNodeInfo, Ed25519Signature>;
269
270/// Verifies the signature and endpoint identity consistency of a
271/// `SignedVersionedNodeInfo`, returning a `VerifiedSignedVersionedNodeInfo`
272/// on success.
273///
274/// Checks:
275/// 1. The signature is valid for the P2P peer_id embedded in the node info.
276/// 2. Any non-P2P endpoint identities (e.g. `Consensus`) match the P2P
277///    peer_id, preventing a peer from advertising endpoints under another
278///    key.
279fn verify_versioned_node_info(
280    peer_info: &SignedVersionedNodeInfo,
281) -> Result<VerifiedSignedVersionedNodeInfo, &'static str> {
282    let peer_id = peer_info.peer_id().ok_or("missing P2P peer_id")?;
283
284    let public_key =
285        Ed25519PublicKey::from_bytes(&peer_id.0).map_err(|_| "invalid peer_id public key")?;
286
287    let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
288    public_key
289        .verify(&msg, peer_info.auth_sig())
290        .map_err(|_| "signature verification failed")?;
291
292    match peer_info.data() {
293        VersionedNodeInfo::V1(info) => {
294            if info.addresses.len() > MAX_ADDRESSES_PER_PEER {
295                return Err("too many addresses");
296            }
297            if !info
298                .addresses
299                .iter()
300                .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
301            {
302                return Err("invalid address");
303            }
304        }
305        VersionedNodeInfo::V2(info_v2) => {
306            // Each endpoint variant (P2p, Consensus) may appear at most once.
307            let mut seen_variants = Vec::new();
308            for endpoint_id in info_v2.addresses.keys() {
309                let variant = std::mem::discriminant(endpoint_id);
310                if seen_variants.contains(&variant) {
311                    return Err("duplicate endpoint variant");
312                }
313                seen_variants.push(variant);
314            }
315
316            for (endpoint_id, addrs) in &info_v2.addresses {
317                if addrs.len() > MAX_ADDRESSES_PER_PEER {
318                    return Err("too many addresses for endpoint");
319                }
320                if !addrs.iter().all(|addr| addr.len() < MAX_ADDRESS_LENGTH) {
321                    return Err("address too long");
322                }
323                if matches!(endpoint_id, EndpointId::P2p(_))
324                    && !addrs.iter().all(|addr| addr.to_anemo_address().is_ok())
325                {
326                    return Err("invalid P2P address");
327                }
328            }
329
330            let identities_valid = info_v2.addresses.keys().all(|eid| match eid {
331                EndpointId::P2p(_) => true,
332                EndpointId::Consensus(pubkey) => pubkey.as_bytes() == peer_id.0,
333            });
334            if !identities_valid {
335                return Err("non-P2P endpoint identity mismatch");
336            }
337        }
338    }
339
340    Ok(VerifiedSignedVersionedNodeInfo::new_from_verified(
341        peer_info.clone(),
342    ))
343}
344
345struct DiscoveryEventLoop {
346    config: P2pConfig,
347    discovery_config: Arc<DiscoveryConfig>,
348    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
349    unidentified_seed_peers: Vec<anemo::types::Address>,
350    network: Network,
351    keypair: NetworkKeyPair,
352    tasks: JoinSet<()>,
353    pending_dials: HashMap<PeerId, AbortHandle>,
354    dial_seed_peers_task: Option<AbortHandle>,
355    shutdown_handle: oneshot::Receiver<()>,
356    state: Arc<RwLock<State>>,
357    mailbox: mpsc::Receiver<DiscoveryMessage>,
358    mailbox_tx: mpsc::Sender<DiscoveryMessage>,
359    metrics: Metrics,
360    consensus_external_address: Option<Multiaddr>,
361    endpoint_manager: EndpointManager,
362    store_path: Option<PathBuf>,
363    peer_cooldowns: HashMap<PeerId, Instant>,
364}
365
366impl DiscoveryEventLoop {
367    pub async fn start(mut self) {
368        info!("Discovery started");
369
370        self.construct_our_info();
371        self.configure_preferred_peers();
372        self.load_stored_peers_on_startup();
373
374        let mut interval = tokio::time::interval(self.discovery_config.interval_period());
375        let mut peer_events = {
376            let (subscriber, _peers) = self.network.subscribe().unwrap();
377            subscriber
378        };
379
380        loop {
381            tokio::select! {
382                now = interval.tick() => {
383                    let now_unix = now_unix();
384                    self.handle_tick(now.into_std(), now_unix);
385                }
386                peer_event = peer_events.recv() => {
387                    self.handle_peer_event(peer_event);
388                },
389                Some(message) = self.mailbox.recv() => {
390                    self.handle_message(message);
391                }
392                Some(task_result) = self.tasks.join_next() => {
393                    match task_result {
394                        Ok(()) => {},
395                        Err(e) => {
396                            if e.is_cancelled() {
397                                // avoid crashing on ungraceful shutdown
398                            } else if e.is_panic() {
399                                // propagate panics.
400                                std::panic::resume_unwind(e.into_panic());
401                            } else {
402                                panic!("task failed: {e}");
403                            }
404                        },
405                    };
406                },
407                // Once the shutdown notification resolves we can terminate the event loop
408                _ = &mut self.shutdown_handle => {
409                    break;
410                }
411            }
412        }
413
414        self.save_stored_peers();
415        info!("Discovery ended");
416    }
417
418    fn handle_message(&mut self, message: DiscoveryMessage) {
419        match message {
420            DiscoveryMessage::PeerAddressChange {
421                peer_id,
422                source,
423                addresses,
424            } => {
425                self.handle_peer_address_change(peer_id, source, addresses);
426            }
427            DiscoveryMessage::ReceivedNodeInfo { peer_info } => {
428                let changed = update_known_peers_versioned(
429                    self.state.clone(),
430                    self.metrics.clone(),
431                    vec![*peer_info],
432                    self.configured_peers.clone(),
433                    &self.endpoint_manager,
434                );
435                if changed {
436                    self.save_stored_peers();
437                }
438            }
439            DiscoveryMessage::ConfiguredPeersUpdated => {
440                self.save_stored_peers();
441            }
442            DiscoveryMessage::PeerFailureReport { peer_id } => {
443                self.handle_peer_failure_report(peer_id);
444            }
445        }
446    }
447
448    fn handle_peer_failure_report(&mut self, peer_id: PeerId) {
449        if self.configured_peers.contains_key(&peer_id) {
450            info!(?peer_id, "ignoring failure report for configured peer");
451            return;
452        }
453        let min_peers = self.discovery_config.min_peers_for_disconnect();
454        let connected_count = self.state.read().unwrap().connected_peers.len();
455        if connected_count < min_peers {
456            info!(
457                ?peer_id,
458                connected_count, min_peers, "skipping disconnect, too few connected peers"
459            );
460            return;
461        }
462        info!(
463            ?peer_id,
464            "peer failure reported, disconnecting and adding cooldown"
465        );
466        let _ = self.network.disconnect(peer_id);
467        self.peer_cooldowns.insert(peer_id, Instant::now());
468    }
469
470    fn construct_our_info(&mut self) {
471        if self.state.read().unwrap().our_info.is_some() {
472            return;
473        }
474
475        let peer_id = self.network.peer_id();
476        let timestamp_ms = now_unix();
477        let access_type = self.discovery_config.access_type();
478
479        let addresses: Vec<Multiaddr> = self
480            .config
481            .external_address
482            .clone()
483            .and_then(|addr| addr.to_anemo_address().ok().map(|_| addr))
484            .into_iter()
485            .collect();
486
487        let our_info = NodeInfo {
488            peer_id,
489            addresses: addresses.clone(),
490            timestamp_ms,
491            access_type,
492        }
493        .sign(&self.keypair);
494
495        let mut addresses_map = BTreeMap::new();
496        addresses_map.insert(EndpointId::P2p(peer_id), addresses);
497        if let Some(consensus_addr) = &self.consensus_external_address {
498            // Populates `Consensus` EndpointId from our P2P (anemo) `PeerId`.
499            // This is safe because both the P2P and consensus networks use the same
500            // ed25519 network keypair. Both originate from `NodeConfig::network_key_pair()`.
501            let network_pubkey =
502                NetworkPublicKey::from_bytes(&peer_id.0).expect("PeerId is a valid public key");
503            addresses_map.insert(
504                EndpointId::Consensus(network_pubkey),
505                vec![consensus_addr.clone()],
506            );
507        }
508        let our_info_v2 = VersionedNodeInfo::V2(NodeInfoV2 {
509            addresses: addresses_map,
510            timestamp_ms,
511            access_type,
512        })
513        .sign(&self.keypair);
514
515        let mut state = self.state.write().unwrap();
516        state.our_info = Some(our_info);
517        state.our_info_v2 = Some(our_info_v2);
518    }
519
520    fn configure_preferred_peers(&mut self) {
521        let peers: Vec<_> = self.configured_peers.values().cloned().collect();
522        for peer_info in peers {
523            debug!(?peer_info, "Add configured preferred peer");
524            match peer_info.affinity {
525                PeerAffinity::High => {
526                    self.handle_peer_address_change(
527                        peer_info.peer_id,
528                        AddressSource::Seed,
529                        peer_info.address,
530                    );
531                }
532                _ => {
533                    // Allowed peers accept inbound but aren't actively dialed,
534                    // so insert them directly without address priority tracking.
535                    self.network.known_peers().insert(peer_info);
536                }
537            }
538        }
539    }
540
541    fn update_our_info_timestamp(&mut self, now_unix: u64) {
542        let state = &mut self.state.write().unwrap();
543
544        if let Some(our_info) = &state.our_info {
545            let mut data = our_info.data().clone();
546            data.timestamp_ms = now_unix;
547            state.our_info = Some(data.sign(&self.keypair));
548        }
549
550        if let Some(our_info_v2) = &state.our_info_v2 {
551            let mut data = our_info_v2.data().clone();
552            match &mut data {
553                VersionedNodeInfo::V1(info) => info.timestamp_ms = now_unix,
554                VersionedNodeInfo::V2(info) => info.timestamp_ms = now_unix,
555            }
556            state.our_info_v2 = Some(data.sign(&self.keypair));
557        }
558    }
559
560    fn handle_peer_address_change(
561        &mut self,
562        peer_id: PeerId,
563        source: AddressSource,
564        addresses: Vec<anemo::types::Address>,
565    ) {
566        debug!(
567            ?peer_id,
568            ?source,
569            ?addresses,
570            "Received peer address change"
571        );
572
573        // Update stored addresses.
574        {
575            let mut state = self.state.write().unwrap();
576            let source_map = state.peer_addresses.entry(peer_id).or_default();
577
578            if addresses.is_empty() {
579                source_map.remove(&source);
580                if source_map.is_empty() {
581                    state.peer_addresses.remove(&peer_id);
582                }
583            } else {
584                source_map.insert(source, addresses);
585            }
586        }
587
588        // Check if we should use the updated addresses.
589        self.reconfigure_peer_addresses(peer_id);
590    }
591
592    /// Reads the highest-priority addresses from `peer_addresses` for the given peer
593    /// and updates `network.known_peers()` if they differ from the current addresses.
594    fn reconfigure_peer_addresses(&mut self, peer_id: PeerId) {
595        let priority_addresses = self
596            .state
597            .read()
598            .unwrap()
599            .peer_addresses
600            .get(&peer_id)
601            .and_then(|sources| sources.first_key_value().map(|(_, addrs)| addrs.clone()))
602            .unwrap_or_default();
603        let current_addresses = self
604            .network
605            .known_peers()
606            .get(&peer_id)
607            .map(|info| info.address.clone())
608            .unwrap_or_default();
609        if priority_addresses != current_addresses {
610            let new_peer_info = PeerInfo {
611                peer_id,
612                affinity: PeerAffinity::High,
613                address: priority_addresses.clone(),
614            };
615
616            self.network.known_peers().insert(new_peer_info);
617
618            // Override existing connection if there might be one.
619            if !current_addresses.is_empty() {
620                let _ = self.network.disconnect(peer_id);
621
622                if let Some(address) = priority_addresses.first().cloned() {
623                    let network = self.network.clone();
624                    self.tasks.spawn(async move {
625                        // If this fails, ConnectionManager will retry.
626                        let _ = network.connect_with_peer_id(address, peer_id).await;
627                    });
628                }
629            }
630        }
631    }
632
633    fn load_stored_peers_on_startup(&mut self) {
634        let Some(path) = &self.store_path else {
635            return;
636        };
637        let entries = load_stored_peers(path);
638        if entries.is_empty() {
639            return;
640        }
641        info!(
642            count = entries.len(),
643            "Loaded stored peer addresses from {}",
644            path.display()
645        );
646
647        update_known_peers_versioned(
648            self.state.clone(),
649            self.metrics.clone(),
650            entries,
651            self.configured_peers.clone(),
652            &self.endpoint_manager,
653        );
654
655        // update_known_peers_versioned forwards addresses through the mailbox.
656        // Drain it now so addresses are applied before the event loop starts.
657        while let Ok(msg) = self.mailbox.try_recv() {
658            self.handle_message(msg);
659        }
660    }
661
662    fn save_stored_peers(&self) {
663        let Some(path) = &self.store_path else {
664            return;
665        };
666        let state = self.state.read().unwrap();
667        let peers_to_save: Vec<SignedVersionedNodeInfo> = state
668            .known_peers_v2
669            .iter()
670            .filter(|(pid, _)| self.configured_peers.contains_key(pid))
671            .map(|(_, verified)| verified.inner().clone())
672            .collect();
673        drop(state);
674        save_stored_peers(path, &peers_to_save);
675    }
676
677    fn handle_peer_event(&mut self, peer_event: Result<PeerEvent, RecvError>) {
678        match peer_event {
679            Ok(PeerEvent::NewPeer(peer_id)) => {
680                if let Some(peer) = self.network.peer(peer_id) {
681                    self.state
682                        .write()
683                        .unwrap()
684                        .connected_peers
685                        .insert(peer_id, ());
686
687                    // Query the new node for any peers
688                    self.tasks.spawn(query_peer_for_their_known_peers(
689                        peer,
690                        self.discovery_config.clone(),
691                        self.state.clone(),
692                        self.metrics.clone(),
693                        self.configured_peers.clone(),
694                        self.endpoint_manager.clone(),
695                        self.mailbox_sender(),
696                    ));
697                }
698            }
699            Ok(PeerEvent::LostPeer(peer_id, _)) => {
700                self.state.write().unwrap().connected_peers.remove(&peer_id);
701            }
702
703            Err(RecvError::Closed) => {
704                panic!("PeerEvent channel shouldn't be able to be closed");
705            }
706
707            Err(RecvError::Lagged(_)) => {
708                trace!("State-Sync fell behind processing PeerEvents");
709            }
710        }
711    }
712
713    fn handle_tick(&mut self, _now: std::time::Instant, now_unix: u64) {
714        self.update_our_info_timestamp(now_unix);
715
716        self.tasks
717            .spawn(query_connected_peers_for_their_known_peers(
718                self.network.clone(),
719                self.discovery_config.clone(),
720                self.state.clone(),
721                self.metrics.clone(),
722                self.configured_peers.clone(),
723                self.endpoint_manager.clone(),
724                self.mailbox_sender(),
725            ));
726
727        // Cull old peers older than a day
728        let mut culled_configured_peers = Vec::new();
729        {
730            let mut state = self.state.write().unwrap();
731            state
732                .known_peers
733                .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms) < ONE_DAY_MILLISECONDS);
734            state.known_peers_v2.retain(|k, v| {
735                let keep = now_unix.saturating_sub(v.timestamp_ms()) < ONE_DAY_MILLISECONDS;
736                if !keep && self.configured_peers.contains_key(k) {
737                    culled_configured_peers.push(*k);
738                }
739                keep
740            });
741        }
742
743        // Clear Discovery-sourced addresses for configured peers whose
744        // signed info expired, allowing fallback to lower-priority sources.
745        for peer_id in culled_configured_peers {
746            self.endpoint_manager
747                .clear_source(peer_id, AddressSource::Discovery);
748        }
749
750        // Clean out the pending_dials
751        self.pending_dials.retain(|_k, v| !v.is_finished());
752        if let Some(abort_handle) = &self.dial_seed_peers_task
753            && abort_handle.is_finished()
754        {
755            self.dial_seed_peers_task = None;
756        }
757
758        let cooldown = self.discovery_config.peer_failure_cooldown();
759        self.peer_cooldowns
760            .retain(|_, since| since.elapsed() < cooldown);
761
762        // Spawn some dials
763        let state = self.state.read().unwrap();
764        let our_peer_id = self.network.peer_id();
765
766        // Collect eligible peers from both known_peers (V2) and known_peers_v2 (V3),
767        // preferring fresher timestamps when a peer appears in both maps.
768        // Partition into preferred (not on cooldown) and cooldown peers.
769        let mut preferred: HashMap<PeerId, NodeInfo> = HashMap::new();
770        let mut cooldown_peers: HashMap<PeerId, NodeInfo> = HashMap::new();
771
772        for (peer_id, info) in state.known_peers.iter() {
773            if *peer_id != our_peer_id
774                && !info.addresses.is_empty()
775                && !state.connected_peers.contains_key(peer_id)
776                && !self.pending_dials.contains_key(peer_id)
777            {
778                if self.peer_cooldowns.contains_key(peer_id) {
779                    cooldown_peers.insert(*peer_id, info.data().clone());
780                } else {
781                    preferred.insert(*peer_id, info.data().clone());
782                }
783            }
784        }
785        for (peer_id, info) in state.known_peers_v2.iter() {
786            let p2p_addresses = info.p2p_addresses();
787            if *peer_id != our_peer_id
788                && !p2p_addresses.is_empty()
789                && !state.connected_peers.contains_key(peer_id)
790                && !self.pending_dials.contains_key(peer_id)
791            {
792                let node_info = NodeInfo {
793                    peer_id: *peer_id,
794                    addresses: p2p_addresses.to_vec(),
795                    timestamp_ms: info.timestamp_ms(),
796                    access_type: info.access_type(),
797                };
798                if self.peer_cooldowns.contains_key(peer_id) {
799                    if cooldown_peers
800                        .get(peer_id)
801                        .is_none_or(|existing| info.timestamp_ms() > existing.timestamp_ms)
802                    {
803                        cooldown_peers.insert(*peer_id, node_info);
804                    }
805                } else if preferred
806                    .get(peer_id)
807                    .is_none_or(|existing| info.timestamp_ms() > existing.timestamp_ms)
808                {
809                    preferred.insert(*peer_id, node_info);
810                }
811            }
812        }
813
814        let number_of_connections = state.connected_peers.len();
815        let number_to_dial = self
816            .discovery_config
817            .target_concurrent_connections()
818            .saturating_sub(number_of_connections);
819
820        use rand::seq::IteratorRandom;
821        let mut rng = rand::thread_rng();
822
823        let mut to_dial: Vec<_> = preferred
824            .into_iter()
825            .choose_multiple(&mut rng, number_to_dial);
826
827        let remaining = number_to_dial.saturating_sub(to_dial.len());
828        to_dial.extend(
829            cooldown_peers
830                .into_iter()
831                .choose_multiple(&mut rng, remaining),
832        );
833
834        for (peer_id, info) in to_dial {
835            let abort_handle = self
836                .tasks
837                .spawn(try_to_connect_to_peer(self.network.clone(), info));
838            self.pending_dials.insert(peer_id, abort_handle);
839        }
840
841        // If we aren't connected to anything and we aren't presently trying to connect to anyone
842        // we need to try the configured peers with High affinity (seed peers)
843        let has_peers_to_dial = || {
844            self.configured_peers
845                .values()
846                .any(|p| p.affinity == PeerAffinity::High)
847                || !self.unidentified_seed_peers.is_empty()
848        };
849        if self.dial_seed_peers_task.is_none()
850            && state.connected_peers.is_empty()
851            && self.pending_dials.is_empty()
852            && has_peers_to_dial()
853        {
854            let abort_handle = self.tasks.spawn(try_to_connect_to_seed_peers(
855                self.network.clone(),
856                self.discovery_config.clone(),
857                self.configured_peers.clone(),
858                self.unidentified_seed_peers.clone(),
859            ));
860
861            self.dial_seed_peers_task = Some(abort_handle);
862        }
863    }
864
865    fn mailbox_sender(&self) -> mpsc::Sender<DiscoveryMessage> {
866        self.mailbox_tx.clone()
867    }
868}
869
870async fn try_to_connect_to_peer(network: Network, info: NodeInfo) {
871    debug!("Connecting to peer {info:?}");
872    for multiaddr in &info.addresses {
873        if let Ok(address) = multiaddr.to_anemo_address() {
874            // Ignore the result and just log the error if there is one
875            if network
876                .connect_with_peer_id(address, info.peer_id)
877                .await
878                .tap_err(|e| {
879                    debug!(
880                        "error dialing {} at address '{}': {e}",
881                        info.peer_id.short_display(4),
882                        multiaddr
883                    )
884                })
885                .is_ok()
886            {
887                return;
888            }
889        }
890    }
891}
892
893async fn try_to_connect_to_seed_peers(
894    network: Network,
895    config: Arc<DiscoveryConfig>,
896    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
897    unidentified_seed_peers: Vec<anemo::types::Address>,
898) {
899    let high_affinity_peers: Vec<_> = configured_peers
900        .values()
901        .filter(|p| p.affinity == PeerAffinity::High)
902        .cloned()
903        .collect();
904    debug!(
905        ?high_affinity_peers,
906        ?unidentified_seed_peers,
907        "Connecting to seed peers"
908    );
909    let network = &network;
910
911    // Attempt connection to all high-affinity and seed peers.
912    let with_peer_id = high_affinity_peers.into_iter().flat_map(|peer_info| {
913        peer_info
914            .address
915            .into_iter()
916            .map(move |addr| (Some(peer_info.peer_id), addr))
917    });
918    let without_peer_id = unidentified_seed_peers.into_iter().map(|addr| (None, addr));
919    futures::stream::iter(with_peer_id.chain(without_peer_id))
920        .for_each_concurrent(
921            config.target_concurrent_connections(),
922            |(peer_id, address)| async move {
923                // Ignore the result and just log the error if there is one
924                let _ = if let Some(peer_id) = peer_id {
925                    network
926                        .connect_with_peer_id(address.clone(), peer_id)
927                        .await
928                        .tap_err(|e| {
929                            debug!(
930                                "error dialing peer {} at '{}': {e}",
931                                peer_id.short_display(4),
932                                address
933                            )
934                        })
935                } else {
936                    network
937                        .connect(address.clone())
938                        .await
939                        .tap_err(|e| debug!("error dialing address '{}': {e}", address))
940                };
941            },
942        )
943        .await;
944}
945
946async fn query_peer_for_known_peers_v2(peer: Peer) -> Option<Vec<SignedNodeInfo>> {
947    let mut client = DiscoveryClient::new(peer);
948    let request = Request::new(()).with_timeout(TIMEOUT);
949    client
950        .get_known_peers_v2(request)
951        .await
952        .ok()
953        .map(Response::into_inner)
954        .map(
955            |GetKnownPeersResponseV2 {
956                 own_info,
957                 mut known_peers,
958             }| {
959                if !own_info.addresses.is_empty() {
960                    known_peers.push(own_info)
961                }
962                known_peers
963            },
964        )
965}
966
967async fn query_peer_for_their_known_peers(
968    peer: Peer,
969    discovery_config: Arc<DiscoveryConfig>,
970    state: Arc<RwLock<State>>,
971    metrics: Metrics,
972    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
973    endpoint_manager: EndpointManager,
974    mailbox_tx: mpsc::Sender<DiscoveryMessage>,
975) {
976    // Query V3 concurrently with V2 when enabled
977    if discovery_config.use_get_known_peers_v3() {
978        let our_info_v2 = state.read().unwrap().our_info_v2.clone();
979        if let Some(own_info) = our_info_v2 {
980            let peer_for_v3 = peer.clone();
981            let v3_query = async move {
982                let mut client = DiscoveryClient::new(peer_for_v3);
983                let request =
984                    Request::new(GetKnownPeersRequestV3 { own_info }).with_timeout(TIMEOUT);
985                client
986                    .get_known_peers_v3(request)
987                    .await
988                    .ok()
989                    .map(Response::into_inner)
990                    .map(
991                        |GetKnownPeersResponseV3 {
992                             own_info,
993                             mut known_peers,
994                         }| {
995                            if !own_info.p2p_addresses().is_empty() {
996                                known_peers.push(own_info)
997                            }
998                            known_peers
999                        },
1000                    )
1001            };
1002
1003            let (found_peers_v2, found_peers_v3) =
1004                tokio::join!(query_peer_for_known_peers_v2(peer), v3_query);
1005
1006            if let Some(found_peers) = found_peers_v2 {
1007                update_known_peers(
1008                    state.clone(),
1009                    metrics.clone(),
1010                    found_peers,
1011                    configured_peers.clone(),
1012                );
1013            }
1014            if let Some(found_peers) = found_peers_v3 {
1015                let changed = update_known_peers_versioned(
1016                    state,
1017                    metrics,
1018                    found_peers,
1019                    configured_peers,
1020                    &endpoint_manager,
1021                );
1022                if changed {
1023                    let _ = mailbox_tx.try_send(DiscoveryMessage::ConfiguredPeersUpdated);
1024                }
1025            }
1026            return;
1027        }
1028    }
1029
1030    // V3 not enabled or our_info_v2 not available - just run V2
1031    if let Some(found_peers) = query_peer_for_known_peers_v2(peer).await {
1032        update_known_peers(state, metrics, found_peers, configured_peers);
1033    }
1034}
1035
1036async fn query_connected_peers_for_their_known_peers(
1037    network: Network,
1038    config: Arc<DiscoveryConfig>,
1039    state: Arc<RwLock<State>>,
1040    metrics: Metrics,
1041    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1042    endpoint_manager: EndpointManager,
1043    mailbox_tx: mpsc::Sender<DiscoveryMessage>,
1044) {
1045    use rand::seq::IteratorRandom;
1046
1047    let peers_to_query: Vec<_> = network
1048        .peers()
1049        .into_iter()
1050        .flat_map(|id| network.peer(id))
1051        .choose_multiple(&mut rand::thread_rng(), config.peers_to_query());
1052
1053    // Query V2 to keep known_peers populated for V2 clients
1054    let v2_query = {
1055        let peers = peers_to_query.clone();
1056        let peers_to_query_count = config.peers_to_query();
1057        async move {
1058            peers
1059                .into_iter()
1060                .map(DiscoveryClient::new)
1061                .map(|mut client| async move {
1062                    let request = Request::new(()).with_timeout(TIMEOUT);
1063                    client
1064                        .get_known_peers_v2(request)
1065                        .await
1066                        .ok()
1067                        .map(Response::into_inner)
1068                        .map(
1069                            |GetKnownPeersResponseV2 {
1070                                 own_info,
1071                                 mut known_peers,
1072                             }| {
1073                                if !own_info.addresses.is_empty() {
1074                                    known_peers.push(own_info)
1075                                }
1076                                known_peers
1077                            },
1078                        )
1079                })
1080                .pipe(futures::stream::iter)
1081                .buffer_unordered(peers_to_query_count)
1082                .filter_map(std::future::ready)
1083                .flat_map(futures::stream::iter)
1084                .collect::<Vec<_>>()
1085                .await
1086        }
1087    };
1088
1089    // Additionally query V3 when enabled, concurrently with V2
1090    if config.use_get_known_peers_v3() {
1091        let our_info_v2 = state.read().unwrap().our_info_v2.clone();
1092        if let Some(own_info) = our_info_v2 {
1093            let v3_query = {
1094                let peers_to_query_count = config.peers_to_query();
1095                async move {
1096                    peers_to_query
1097                        .into_iter()
1098                        .map(DiscoveryClient::new)
1099                        .map(|mut client| {
1100                            let own_info = own_info.clone();
1101                            async move {
1102                                let request = Request::new(GetKnownPeersRequestV3 { own_info })
1103                                    .with_timeout(TIMEOUT);
1104                                client
1105                                    .get_known_peers_v3(request)
1106                                    .await
1107                                    .ok()
1108                                    .map(Response::into_inner)
1109                                    .map(
1110                                        |GetKnownPeersResponseV3 {
1111                                             own_info,
1112                                             mut known_peers,
1113                                         }| {
1114                                            if !own_info.p2p_addresses().is_empty() {
1115                                                known_peers.push(own_info)
1116                                            }
1117                                            known_peers
1118                                        },
1119                                    )
1120                            }
1121                        })
1122                        .pipe(futures::stream::iter)
1123                        .buffer_unordered(peers_to_query_count)
1124                        .filter_map(std::future::ready)
1125                        .flat_map(futures::stream::iter)
1126                        .collect::<Vec<_>>()
1127                        .await
1128                }
1129            };
1130
1131            let (found_peers_v2, found_peers_v3) = tokio::join!(v2_query, v3_query);
1132
1133            update_known_peers(
1134                state.clone(),
1135                metrics.clone(),
1136                found_peers_v2,
1137                configured_peers.clone(),
1138            );
1139            let changed = update_known_peers_versioned(
1140                state,
1141                metrics,
1142                found_peers_v3,
1143                configured_peers,
1144                &endpoint_manager,
1145            );
1146            if changed {
1147                let _ = mailbox_tx.try_send(DiscoveryMessage::ConfiguredPeersUpdated);
1148            }
1149            return;
1150        }
1151    }
1152
1153    // V3 not enabled or our_info_v2 not available - just run V2
1154    let found_peers_v2 = v2_query.await;
1155    update_known_peers(state, metrics, found_peers_v2, configured_peers);
1156}
1157
1158fn update_known_peers(
1159    state: Arc<RwLock<State>>,
1160    metrics: Metrics,
1161    found_peers: Vec<SignedNodeInfo>,
1162    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1163) {
1164    use std::collections::hash_map::Entry;
1165
1166    let now_unix = now_unix();
1167    let our_peer_id = state.read().unwrap().our_info.clone().unwrap().peer_id;
1168    let known_peers = &mut state.write().unwrap().known_peers;
1169    // only take the first MAX_PEERS_TO_SEND peers
1170    for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1171        // +1 to account for the "own_info" of the serving peer
1172        // Skip peers whose timestamp is too far in the future from our clock
1173        // or that are too old
1174        if peer_info.timestamp_ms > now_unix.saturating_add(30 * 1_000) // 30 seconds
1175            || now_unix.saturating_sub(peer_info.timestamp_ms) > ONE_DAY_MILLISECONDS
1176        {
1177            continue;
1178        }
1179
1180        if peer_info.peer_id == our_peer_id {
1181            continue;
1182        }
1183
1184        // If Peer is Private or Trusted, and not in our configured peers, skip it.
1185        let is_restricted = match peer_info.access_type {
1186            AccessType::Public => false,
1187            AccessType::Private | AccessType::Trusted => true,
1188        };
1189        if is_restricted && !configured_peers.contains_key(&peer_info.peer_id) {
1190            continue;
1191        }
1192
1193        // Skip entries that have too many addresses as a means to cap the size of a node's info
1194        if peer_info.addresses.len() > MAX_ADDRESSES_PER_PEER {
1195            continue;
1196        }
1197
1198        // verify that all addresses provided are valid anemo addresses
1199        if !peer_info
1200            .addresses
1201            .iter()
1202            .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
1203        {
1204            continue;
1205        }
1206        let Ok(public_key) = Ed25519PublicKey::from_bytes(&peer_info.peer_id.0) else {
1207            debug_fatal!(
1208                // This should never happen.
1209                "Failed to convert anemo PeerId {:?} to Ed25519PublicKey",
1210                peer_info.peer_id
1211            );
1212            continue;
1213        };
1214        let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
1215        if let Err(e) = public_key.verify(&msg, peer_info.auth_sig()) {
1216            info!(
1217                "Discovery failed to verify signature for NodeInfo for peer {:?}: {e:?}",
1218                peer_info.peer_id
1219            );
1220            // TODO: consider denylisting the source of bad NodeInfo from future requests.
1221            continue;
1222        }
1223        let peer = VerifiedSignedNodeInfo::new_from_verified(peer_info);
1224
1225        match known_peers.entry(peer.peer_id) {
1226            Entry::Occupied(mut o) => {
1227                if peer.timestamp_ms > o.get().timestamp_ms {
1228                    if o.get().addresses.is_empty() && !peer.addresses.is_empty() {
1229                        metrics.inc_num_peers_with_external_address();
1230                    }
1231                    if !o.get().addresses.is_empty() && peer.addresses.is_empty() {
1232                        metrics.dec_num_peers_with_external_address();
1233                    }
1234                    o.insert(peer);
1235                }
1236            }
1237            Entry::Vacant(v) => {
1238                if !peer.addresses.is_empty() {
1239                    metrics.inc_num_peers_with_external_address();
1240                }
1241                v.insert(peer);
1242            }
1243        }
1244    }
1245}
1246
1247/// Returns true if any configured peer was inserted or updated.
1248fn update_known_peers_versioned(
1249    state: Arc<RwLock<State>>,
1250    metrics: Metrics,
1251    found_peers: Vec<SignedVersionedNodeInfo>,
1252    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1253    endpoint_manager: &EndpointManager,
1254) -> bool {
1255    use std::collections::hash_map::Entry;
1256
1257    let now_unix = now_unix();
1258    let our_peer_id = state
1259        .read()
1260        .unwrap()
1261        .our_info_v2
1262        .as_ref()
1263        .and_then(|info| info.peer_id());
1264    let known_peers_v2 = &mut state.write().unwrap().known_peers_v2;
1265    let mut configured_peer_changed = false;
1266
1267    for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1268        let timestamp_ms = peer_info.timestamp_ms();
1269
1270        if timestamp_ms > now_unix.saturating_add(30 * 1_000)
1271            || now_unix.saturating_sub(timestamp_ms) > ONE_DAY_MILLISECONDS
1272        {
1273            continue;
1274        }
1275
1276        let Some(peer_id) = peer_info.peer_id() else {
1277            continue;
1278        };
1279
1280        if Some(peer_id) == our_peer_id {
1281            continue;
1282        }
1283
1284        let is_restricted = match peer_info.access_type() {
1285            AccessType::Public => false,
1286            AccessType::Private | AccessType::Trusted => true,
1287        };
1288        if is_restricted && !configured_peers.contains_key(&peer_id) {
1289            continue;
1290        }
1291
1292        let peer = match verify_versioned_node_info(&peer_info) {
1293            Ok(verified) => verified,
1294            Err(reason) => {
1295                info!("Discovery rejecting VersionedNodeInfo for peer {peer_id:?}: {reason}");
1296                continue;
1297            }
1298        };
1299
1300        // Forward discovered addresses for configured peers to EndpointManager.
1301        let is_configured = configured_peers.contains_key(&peer_id);
1302        if is_configured && let VersionedNodeInfo::V2(info_v2) = peer_info.data() {
1303            for (endpoint_id, addrs) in &info_v2.addresses {
1304                if !addrs.is_empty() {
1305                    let _ = endpoint_manager.update_endpoint(
1306                        endpoint_id.clone(),
1307                        AddressSource::Discovery,
1308                        addrs.clone(),
1309                    );
1310                }
1311            }
1312        }
1313
1314        let peer_p2p_addresses = peer.p2p_addresses();
1315
1316        match known_peers_v2.entry(peer_id) {
1317            Entry::Occupied(mut o) => {
1318                if peer.timestamp_ms() > o.get().timestamp_ms() {
1319                    let old_addresses = o.get().p2p_addresses();
1320                    if old_addresses.is_empty() && !peer_p2p_addresses.is_empty() {
1321                        metrics.inc_num_peers_with_external_address();
1322                    }
1323                    if !old_addresses.is_empty() && peer_p2p_addresses.is_empty() {
1324                        metrics.dec_num_peers_with_external_address();
1325                    }
1326                    o.insert(peer);
1327                    if is_configured {
1328                        configured_peer_changed = true;
1329                    }
1330                }
1331            }
1332            Entry::Vacant(v) => {
1333                if !peer_p2p_addresses.is_empty() {
1334                    metrics.inc_num_peers_with_external_address();
1335                }
1336                v.insert(peer);
1337                if is_configured {
1338                    configured_peer_changed = true;
1339                }
1340            }
1341        }
1342    }
1343
1344    configured_peer_changed
1345}
1346
1347pub(super) fn now_unix() -> u64 {
1348    use std::time::{SystemTime, UNIX_EPOCH};
1349
1350    SystemTime::now()
1351        .duration_since(UNIX_EPOCH)
1352        .unwrap()
1353        .as_millis() as u64
1354}