sui_network/discovery/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::types::PeerAffinity;
5use anemo::types::PeerInfo;
6use anemo::{Network, Peer, PeerId, Request, Response, types::PeerEvent};
7use fastcrypto::ed25519::{Ed25519PublicKey, Ed25519Signature};
8use futures::StreamExt;
9use mysten_common::debug_fatal;
10use serde::{Deserialize, Serialize};
11use shared_crypto::intent::IntentScope;
12use std::{
13    collections::{BTreeMap, HashMap, HashSet},
14    path::PathBuf,
15    sync::{Arc, RwLock},
16    time::{Duration, Instant},
17};
18
19use crate::endpoint_manager::{AddressSource, EndpointId, EndpointManager};
20use store::{load_stored_peers, save_stored_peers};
21use sui_config::p2p::{AccessType, DiscoveryConfig, P2pConfig};
22use sui_types::crypto::{NetworkKeyPair, NetworkPublicKey, Signer, ToFromBytes, VerifyingKey};
23use sui_types::digests::Digest;
24use sui_types::message_envelope::{Envelope, Message, VerifiedEnvelope};
25use sui_types::multiaddr::Multiaddr;
26use tap::{Pipe, TapFallible};
27use tokio::sync::broadcast::error::RecvError;
28use tokio::sync::mpsc;
29use tokio::{
30    sync::oneshot,
31    task::{AbortHandle, JoinSet},
32};
33use tracing::{debug, info, trace};
34
35const TIMEOUT: Duration = Duration::from_secs(1);
36const ONE_DAY_MILLISECONDS: u64 = 24 * 60 * 60 * 1_000;
37const MAX_ADDRESS_LENGTH: usize = 300;
38const MAX_PEERS_TO_SEND: usize = 200;
39const MAX_ADDRESSES_PER_PEER: usize = 2;
40
41mod generated {
42    include!(concat!(env!("OUT_DIR"), "/sui.Discovery.rs"));
43}
44mod builder;
45mod metrics;
46mod server;
47mod store;
48#[cfg(test)]
49mod tests;
50
51pub use builder::{Builder, UnstartedDiscovery};
52pub use generated::{
53    discovery_client::DiscoveryClient,
54    discovery_server::{Discovery, DiscoveryServer},
55};
56pub use server::{GetKnownPeersRequestV3, GetKnownPeersResponseV2, GetKnownPeersResponseV3};
57
58/// Message types for the discovery system mailbox.
59#[derive(Debug)]
60pub enum DiscoveryMessage {
61    /// An external source (e.g. admin API, node config) updated a peer's address.
62    PeerAddressChange {
63        peer_id: PeerId,
64        source: AddressSource,
65        addresses: Vec<anemo::types::Address>,
66    },
67    /// Node info was received from inbound RPC.
68    ReceivedNodeInfo {
69        peer_info: Box<SignedVersionedNodeInfo>,
70    },
71    /// A spawned peer-query task discovered updated info for a trusted peer,
72    /// signaling the event loop to update the stored peer addresses.
73    TrustedPeersUpdated,
74    /// A peer has been reported as continuously failing, triggering disconnect and cooldown.
75    PeerFailureReport { peer_id: PeerId },
76}
77
78/// A Handle to the Discovery subsystem. The Discovery system will be shut down once all Handles
79/// have been dropped.
80#[derive(Clone, Debug)]
81pub struct Handle {
82    pub(super) _shutdown_handle: Arc<oneshot::Sender<()>>,
83    pub(super) sender: Sender,
84}
85
86impl Handle {
87    pub fn sender(&self) -> Sender {
88        self.sender.clone()
89    }
90}
91
92/// A lightweight handle for sending messages to the discovery event loop
93/// without holding a shutdown reference.
94#[derive(Clone, Debug)]
95pub struct Sender {
96    pub(super) sender: mpsc::Sender<DiscoveryMessage>,
97}
98
99impl Sender {
100    pub fn peer_address_change(
101        &self,
102        peer_id: PeerId,
103        source: AddressSource,
104        addresses: Vec<anemo::types::Address>,
105    ) {
106        self.sender
107            .try_send(DiscoveryMessage::PeerAddressChange {
108                peer_id,
109                source,
110                addresses,
111            })
112            .expect("Discovery mailbox should not overflow or be closed")
113    }
114
115    pub fn report_peer_failure(&self, peer_id: PeerId) {
116        let _ = self
117            .sender
118            .try_send(DiscoveryMessage::PeerFailureReport { peer_id });
119    }
120}
121
122use self::metrics::Metrics;
123
124/// The internal discovery state shared between the main event loop and the request handler
125struct State {
126    our_info: Option<SignedNodeInfo>,
127    our_info_v2: Option<SignedVersionedNodeInfo>,
128    connected_peers: HashMap<PeerId, ()>,
129    known_peers: HashMap<PeerId, VerifiedSignedNodeInfo>,
130    known_peers_v2: HashMap<PeerId, VerifiedSignedVersionedNodeInfo>,
131    peer_addresses: HashMap<PeerId, BTreeMap<AddressSource, Vec<anemo::types::Address>>>,
132}
133
134/// The information necessary to dial another peer.
135///
136/// `NodeInfo` contains all the information that is shared with other nodes via the discovery
137/// service to advertise how a node can be reached.
138#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
139pub struct NodeInfo {
140    pub peer_id: PeerId,
141    pub addresses: Vec<Multiaddr>,
142
143    /// Creation time.
144    ///
145    /// This is used to determine which of two NodeInfo's from the same PeerId should be retained.
146    pub timestamp_ms: u64,
147
148    /// See docstring for `AccessType`.
149    pub access_type: AccessType,
150}
151
152impl NodeInfo {
153    fn sign(self, keypair: &NetworkKeyPair) -> SignedNodeInfo {
154        let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
155        let sig = keypair.sign(&msg);
156        SignedNodeInfo::new_from_data_and_sig(self, sig)
157    }
158}
159
160pub type SignedNodeInfo = Envelope<NodeInfo, Ed25519Signature>;
161
162pub type VerifiedSignedNodeInfo = VerifiedEnvelope<NodeInfo, Ed25519Signature>;
163
164#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
165pub struct NodeInfoDigest(Digest);
166
167impl NodeInfoDigest {
168    pub const fn new(digest: [u8; 32]) -> Self {
169        Self(Digest::new(digest))
170    }
171}
172
173impl Message for NodeInfo {
174    type DigestType = NodeInfoDigest;
175    const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
176
177    fn digest(&self) -> Self::DigestType {
178        unreachable!("NodeInfoDigest is not used today")
179    }
180}
181
182/// NodeInfoV2 supports multiple address types keyed by EndpointId.
183// TODO: Remove support for V1 once V2 is available in all production networks.
184#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
185pub struct NodeInfoV2 {
186    pub addresses: BTreeMap<EndpointId, Vec<Multiaddr>>,
187    pub timestamp_ms: u64,
188    pub access_type: AccessType,
189}
190
191impl NodeInfoV2 {
192    /// Derive the P2P PeerId from the addresses map.
193    /// Returns None if no P2P endpoint is present.
194    pub fn peer_id(&self) -> Option<PeerId> {
195        self.addresses.keys().find_map(|k| match k {
196            EndpointId::P2p(peer_id) => Some(*peer_id),
197            EndpointId::Consensus(_) => None,
198        })
199    }
200
201    pub fn p2p_addresses(&self) -> &[Multiaddr] {
202        self.addresses
203            .iter()
204            .find_map(|(k, v)| match k {
205                EndpointId::P2p(_) => Some(v.as_slice()),
206                EndpointId::Consensus(_) => None,
207            })
208            .unwrap_or(&[])
209    }
210}
211
212/// Versioned wrapper for NodeInfo types.
213#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
214pub enum VersionedNodeInfo {
215    V1(NodeInfo),
216    V2(NodeInfoV2),
217}
218
219impl VersionedNodeInfo {
220    pub fn peer_id(&self) -> Option<PeerId> {
221        match self {
222            VersionedNodeInfo::V1(info) => Some(info.peer_id),
223            VersionedNodeInfo::V2(info) => info.peer_id(),
224        }
225    }
226
227    pub fn timestamp_ms(&self) -> u64 {
228        match self {
229            VersionedNodeInfo::V1(info) => info.timestamp_ms,
230            VersionedNodeInfo::V2(info) => info.timestamp_ms,
231        }
232    }
233
234    pub fn access_type(&self) -> AccessType {
235        match self {
236            VersionedNodeInfo::V1(info) => info.access_type,
237            VersionedNodeInfo::V2(info) => info.access_type,
238        }
239    }
240
241    pub fn p2p_addresses(&self) -> &[Multiaddr] {
242        match self {
243            VersionedNodeInfo::V1(info) => &info.addresses,
244            VersionedNodeInfo::V2(info) => info.p2p_addresses(),
245        }
246    }
247
248    pub fn sign(self, keypair: &NetworkKeyPair) -> SignedVersionedNodeInfo {
249        let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
250        let sig = keypair.sign(&msg);
251        SignedVersionedNodeInfo::new_from_data_and_sig(self, sig)
252    }
253}
254
255#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
256pub struct VersionedNodeInfoDigest(Digest);
257
258impl Message for VersionedNodeInfo {
259    type DigestType = VersionedNodeInfoDigest;
260    const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
261
262    fn digest(&self) -> Self::DigestType {
263        unreachable!("VersionedNodeInfoDigest is not used today")
264    }
265}
266
267pub type SignedVersionedNodeInfo = Envelope<VersionedNodeInfo, Ed25519Signature>;
268pub type VerifiedSignedVersionedNodeInfo = VerifiedEnvelope<VersionedNodeInfo, Ed25519Signature>;
269
270/// Verifies the signature and endpoint identity consistency of a
271/// `SignedVersionedNodeInfo`, returning a `VerifiedSignedVersionedNodeInfo`
272/// on success.
273///
274/// Checks:
275/// 1. The signature is valid for the P2P peer_id embedded in the node info.
276/// 2. Any non-P2P endpoint identities (e.g. `Consensus`) match the P2P
277///    peer_id, preventing a peer from advertising endpoints under another
278///    key.
279fn verify_versioned_node_info(
280    peer_info: &SignedVersionedNodeInfo,
281) -> Result<VerifiedSignedVersionedNodeInfo, &'static str> {
282    let peer_id = peer_info.peer_id().ok_or("missing P2P peer_id")?;
283
284    let public_key =
285        Ed25519PublicKey::from_bytes(&peer_id.0).map_err(|_| "invalid peer_id public key")?;
286
287    let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
288    public_key
289        .verify(&msg, peer_info.auth_sig())
290        .map_err(|_| "signature verification failed")?;
291
292    match peer_info.data() {
293        VersionedNodeInfo::V1(info) => {
294            if info.addresses.len() > MAX_ADDRESSES_PER_PEER {
295                return Err("too many addresses");
296            }
297            if !info
298                .addresses
299                .iter()
300                .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
301            {
302                return Err("invalid address");
303            }
304        }
305        VersionedNodeInfo::V2(info_v2) => {
306            // Each endpoint variant (P2p, Consensus) may appear at most once.
307            let mut seen_variants = Vec::new();
308            for endpoint_id in info_v2.addresses.keys() {
309                let variant = std::mem::discriminant(endpoint_id);
310                if seen_variants.contains(&variant) {
311                    return Err("duplicate endpoint variant");
312                }
313                seen_variants.push(variant);
314            }
315
316            for (endpoint_id, addrs) in &info_v2.addresses {
317                if addrs.len() > MAX_ADDRESSES_PER_PEER {
318                    return Err("too many addresses for endpoint");
319                }
320                if !addrs.iter().all(|addr| addr.len() < MAX_ADDRESS_LENGTH) {
321                    return Err("address too long");
322                }
323                if matches!(endpoint_id, EndpointId::P2p(_))
324                    && !addrs.iter().all(|addr| addr.to_anemo_address().is_ok())
325                {
326                    return Err("invalid P2P address");
327                }
328            }
329
330            let identities_valid = info_v2.addresses.keys().all(|eid| match eid {
331                EndpointId::P2p(_) => true,
332                EndpointId::Consensus(pubkey) => pubkey.as_bytes() == peer_id.0,
333            });
334            if !identities_valid {
335                return Err("non-P2P endpoint identity mismatch");
336            }
337        }
338    }
339
340    Ok(VerifiedSignedVersionedNodeInfo::new_from_verified(
341        peer_info.clone(),
342    ))
343}
344
345struct DiscoveryEventLoop {
346    config: P2pConfig,
347    discovery_config: Arc<DiscoveryConfig>,
348    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
349    chain_peers: Arc<RwLock<HashSet<PeerId>>>,
350    unidentified_seed_peers: Vec<anemo::types::Address>,
351    network: Network,
352    keypair: NetworkKeyPair,
353    tasks: JoinSet<()>,
354    pending_dials: HashMap<PeerId, AbortHandle>,
355    dial_seed_peers_task: Option<AbortHandle>,
356    shutdown_handle: oneshot::Receiver<()>,
357    state: Arc<RwLock<State>>,
358    mailbox: mpsc::Receiver<DiscoveryMessage>,
359    mailbox_tx: mpsc::Sender<DiscoveryMessage>,
360    metrics: Metrics,
361    consensus_external_address: Option<Multiaddr>,
362    endpoint_manager: EndpointManager,
363    store_path: Option<PathBuf>,
364    peer_cooldowns: HashMap<PeerId, Instant>,
365}
366
367impl DiscoveryEventLoop {
368    pub async fn start(mut self) {
369        info!("Discovery started");
370
371        self.construct_our_info();
372        self.configure_preferred_peers();
373        self.load_stored_peers_on_startup();
374
375        let mut interval = tokio::time::interval(self.discovery_config.interval_period());
376        let mut peer_events = {
377            let (subscriber, _peers) = self.network.subscribe().unwrap();
378            subscriber
379        };
380
381        loop {
382            tokio::select! {
383                now = interval.tick() => {
384                    let now_unix = now_unix();
385                    self.handle_tick(now.into_std(), now_unix);
386                }
387                peer_event = peer_events.recv() => {
388                    self.handle_peer_event(peer_event);
389                },
390                Some(message) = self.mailbox.recv() => {
391                    self.handle_message(message);
392                }
393                Some(task_result) = self.tasks.join_next() => {
394                    match task_result {
395                        Ok(()) => {},
396                        Err(e) => {
397                            if e.is_cancelled() {
398                                // avoid crashing on ungraceful shutdown
399                            } else if e.is_panic() {
400                                // propagate panics.
401                                std::panic::resume_unwind(e.into_panic());
402                            } else {
403                                panic!("task failed: {e}");
404                            }
405                        },
406                    };
407                },
408                // Once the shutdown notification resolves we can terminate the event loop
409                _ = &mut self.shutdown_handle => {
410                    break;
411                }
412            }
413        }
414
415        self.save_stored_peers();
416        info!("Discovery ended");
417    }
418
419    fn handle_message(&mut self, message: DiscoveryMessage) {
420        match message {
421            DiscoveryMessage::PeerAddressChange {
422                peer_id,
423                source,
424                addresses,
425            } => {
426                self.handle_peer_address_change(peer_id, source, addresses);
427            }
428            DiscoveryMessage::ReceivedNodeInfo { peer_info } => {
429                let changed = update_known_peers_versioned(
430                    self.state.clone(),
431                    self.metrics.clone(),
432                    vec![*peer_info],
433                    self.configured_peers.clone(),
434                    &self.chain_peers,
435                    &self.endpoint_manager,
436                );
437                if changed {
438                    self.save_stored_peers();
439                }
440            }
441            DiscoveryMessage::TrustedPeersUpdated => {
442                self.save_stored_peers();
443            }
444            DiscoveryMessage::PeerFailureReport { peer_id } => {
445                self.handle_peer_failure_report(peer_id);
446            }
447        }
448    }
449
450    fn handle_peer_failure_report(&mut self, peer_id: PeerId) {
451        if self.is_trusted_peer(&peer_id) {
452            info!(?peer_id, "ignoring failure report for trusted peer");
453            return;
454        }
455        let min_peers = self.discovery_config.min_peers_for_disconnect();
456        let connected_count = self.state.read().unwrap().connected_peers.len();
457        if connected_count < min_peers {
458            info!(
459                ?peer_id,
460                connected_count, min_peers, "skipping disconnect, too few connected peers"
461            );
462            return;
463        }
464        info!(
465            ?peer_id,
466            "peer failure reported, disconnecting and adding cooldown"
467        );
468        let _ = self.network.disconnect(peer_id);
469        self.peer_cooldowns.insert(peer_id, Instant::now());
470    }
471
472    fn construct_our_info(&mut self) {
473        if self.state.read().unwrap().our_info.is_some() {
474            return;
475        }
476
477        let peer_id = self.network.peer_id();
478        let timestamp_ms = now_unix();
479        let access_type = self.discovery_config.access_type();
480
481        let addresses: Vec<Multiaddr> = self
482            .config
483            .external_address
484            .clone()
485            .and_then(|addr| addr.to_anemo_address().ok().map(|_| addr))
486            .into_iter()
487            .collect();
488
489        let our_info = NodeInfo {
490            peer_id,
491            addresses: addresses.clone(),
492            timestamp_ms,
493            access_type,
494        }
495        .sign(&self.keypair);
496
497        let mut addresses_map = BTreeMap::new();
498        addresses_map.insert(EndpointId::P2p(peer_id), addresses);
499        if let Some(consensus_addr) = &self.consensus_external_address {
500            // Populates `Consensus` EndpointId from our P2P (anemo) `PeerId`.
501            // This is safe because both the P2P and consensus networks use the same
502            // ed25519 network keypair. Both originate from `NodeConfig::network_key_pair()`.
503            let network_pubkey =
504                NetworkPublicKey::from_bytes(&peer_id.0).expect("PeerId is a valid public key");
505            addresses_map.insert(
506                EndpointId::Consensus(network_pubkey),
507                vec![consensus_addr.clone()],
508            );
509        }
510        let our_info_v2 = VersionedNodeInfo::V2(NodeInfoV2 {
511            addresses: addresses_map,
512            timestamp_ms,
513            access_type,
514        })
515        .sign(&self.keypair);
516
517        let mut state = self.state.write().unwrap();
518        state.our_info = Some(our_info);
519        state.our_info_v2 = Some(our_info_v2);
520    }
521
522    fn configure_preferred_peers(&mut self) {
523        let peers: Vec<_> = self.configured_peers.values().cloned().collect();
524        for peer_info in peers {
525            debug!(?peer_info, "Add configured preferred peer");
526            match peer_info.affinity {
527                PeerAffinity::High => {
528                    self.handle_peer_address_change(
529                        peer_info.peer_id,
530                        AddressSource::Seed,
531                        peer_info.address,
532                    );
533                }
534                _ => {
535                    // Allowed peers accept inbound but aren't actively dialed,
536                    // so insert them directly without address priority tracking.
537                    self.network.known_peers().insert(peer_info);
538                }
539            }
540        }
541    }
542
543    fn update_our_info_timestamp(&mut self, now_unix: u64) {
544        let state = &mut self.state.write().unwrap();
545
546        if let Some(our_info) = &state.our_info {
547            let mut data = our_info.data().clone();
548            data.timestamp_ms = now_unix;
549            state.our_info = Some(data.sign(&self.keypair));
550        }
551
552        if let Some(our_info_v2) = &state.our_info_v2 {
553            let mut data = our_info_v2.data().clone();
554            match &mut data {
555                VersionedNodeInfo::V1(info) => info.timestamp_ms = now_unix,
556                VersionedNodeInfo::V2(info) => info.timestamp_ms = now_unix,
557            }
558            state.our_info_v2 = Some(data.sign(&self.keypair));
559        }
560    }
561
562    fn handle_peer_address_change(
563        &mut self,
564        peer_id: PeerId,
565        source: AddressSource,
566        addresses: Vec<anemo::types::Address>,
567    ) {
568        debug!(
569            ?peer_id,
570            ?source,
571            ?addresses,
572            "Received peer address change"
573        );
574
575        // Update set of trusted peers from on-chain validator configs.
576        if source == AddressSource::Chain {
577            if addresses.is_empty() {
578                self.chain_peers.write().unwrap().remove(&peer_id);
579            } else {
580                self.chain_peers.write().unwrap().insert(peer_id);
581            }
582        }
583
584        // Update stored addresses.
585        {
586            let mut state = self.state.write().unwrap();
587            let source_map = state.peer_addresses.entry(peer_id).or_default();
588
589            if addresses.is_empty() {
590                source_map.remove(&source);
591                if source_map.is_empty() {
592                    state.peer_addresses.remove(&peer_id);
593                }
594            } else {
595                source_map.insert(source, addresses);
596            }
597
598            // When a Chain address arrives, check if there are cached Discovery
599            // P2P addresses in known_peers_v2 that should take priority. This
600            // handles the startup race where cached NodeInfo is loaded before
601            // chain_peers is populated (so update_known_peers_versioned couldn't
602            // forward the Discovery addresses at load time).
603            if source == AddressSource::Chain
604                && !state
605                    .peer_addresses
606                    .get(&peer_id)
607                    .is_some_and(|s| s.contains_key(&AddressSource::Discovery))
608                && let Some(addrs) = state
609                    .known_peers_v2
610                    .get(&peer_id)
611                    .and_then(|info| match info.data() {
612                        VersionedNodeInfo::V2(v2) => Some(v2.p2p_addresses()),
613                        _ => None,
614                    })
615            {
616                let anemo_addrs: Vec<_> = addrs
617                    .iter()
618                    .filter_map(|a| a.to_anemo_address().ok())
619                    .collect();
620                if !anemo_addrs.is_empty() {
621                    state
622                        .peer_addresses
623                        .entry(peer_id)
624                        .or_default()
625                        .insert(AddressSource::Discovery, anemo_addrs);
626                }
627            }
628        }
629
630        // Check if we should use the updated addresses.
631        self.reconfigure_peer_addresses(peer_id);
632    }
633
634    /// Reads the highest-priority addresses from `peer_addresses` for the given peer
635    /// and updates `network.known_peers()` if they differ from the current addresses.
636    fn reconfigure_peer_addresses(&mut self, peer_id: PeerId) {
637        let priority_addresses = self
638            .state
639            .read()
640            .unwrap()
641            .peer_addresses
642            .get(&peer_id)
643            .and_then(|sources| sources.first_key_value().map(|(_, addrs)| addrs.clone()))
644            .unwrap_or_default();
645        let current_addresses = self
646            .network
647            .known_peers()
648            .get(&peer_id)
649            .map(|info| info.address.clone())
650            .unwrap_or_default();
651        if priority_addresses != current_addresses {
652            let new_peer_info = PeerInfo {
653                peer_id,
654                affinity: PeerAffinity::High,
655                address: priority_addresses.clone(),
656            };
657
658            self.network.known_peers().insert(new_peer_info);
659
660            // Override existing connection if there might be one.
661            if !current_addresses.is_empty() {
662                let _ = self.network.disconnect(peer_id);
663
664                if let Some(address) = priority_addresses.first().cloned() {
665                    let network = self.network.clone();
666                    self.tasks.spawn(async move {
667                        // If this fails, ConnectionManager will retry.
668                        let _ = network.connect_with_peer_id(address, peer_id).await;
669                    });
670                }
671            }
672        }
673    }
674
675    fn load_stored_peers_on_startup(&mut self) {
676        let Some(path) = &self.store_path else {
677            return;
678        };
679        let entries = load_stored_peers(path);
680        if entries.is_empty() {
681            return;
682        }
683        info!(
684            count = entries.len(),
685            "Loaded stored peer addresses from {}",
686            path.display()
687        );
688
689        update_known_peers_versioned(
690            self.state.clone(),
691            self.metrics.clone(),
692            entries,
693            self.configured_peers.clone(),
694            &self.chain_peers,
695            &self.endpoint_manager,
696        );
697
698        // update_known_peers_versioned forwards addresses through the mailbox.
699        // Drain it now so addresses are applied before the event loop starts.
700        while let Ok(msg) = self.mailbox.try_recv() {
701            self.handle_message(msg);
702        }
703    }
704
705    fn save_stored_peers(&self) {
706        let Some(path) = &self.store_path else {
707            return;
708        };
709        let state = self.state.read().unwrap();
710        let peers_to_save: Vec<SignedVersionedNodeInfo> = state
711            .known_peers_v2
712            .iter()
713            .filter(|(pid, _)| self.is_trusted_peer(pid))
714            .map(|(_, verified)| verified.inner().clone())
715            .collect();
716        drop(state);
717        save_stored_peers(path, &peers_to_save);
718    }
719
720    fn handle_peer_event(&mut self, peer_event: Result<PeerEvent, RecvError>) {
721        match peer_event {
722            Ok(PeerEvent::NewPeer(peer_id)) => {
723                if let Some(peer) = self.network.peer(peer_id) {
724                    self.state
725                        .write()
726                        .unwrap()
727                        .connected_peers
728                        .insert(peer_id, ());
729
730                    // Query the new node for any peers
731                    self.tasks.spawn(query_peer_for_their_known_peers(
732                        peer,
733                        self.discovery_config.clone(),
734                        self.state.clone(),
735                        self.metrics.clone(),
736                        self.configured_peers.clone(),
737                        self.chain_peers.clone(),
738                        self.endpoint_manager.clone(),
739                        self.mailbox_sender(),
740                    ));
741                }
742            }
743            Ok(PeerEvent::LostPeer(peer_id, _)) => {
744                self.state.write().unwrap().connected_peers.remove(&peer_id);
745            }
746
747            Err(RecvError::Closed) => {
748                panic!("PeerEvent channel shouldn't be able to be closed");
749            }
750
751            Err(RecvError::Lagged(_)) => {
752                trace!("State-Sync fell behind processing PeerEvents");
753            }
754        }
755    }
756
757    fn handle_tick(&mut self, _now: std::time::Instant, now_unix: u64) {
758        self.update_our_info_timestamp(now_unix);
759
760        self.tasks
761            .spawn(query_connected_peers_for_their_known_peers(
762                self.network.clone(),
763                self.discovery_config.clone(),
764                self.state.clone(),
765                self.metrics.clone(),
766                self.configured_peers.clone(),
767                self.chain_peers.clone(),
768                self.endpoint_manager.clone(),
769                self.mailbox_sender(),
770            ));
771
772        // Cull old peers older than a day
773        let mut culled_trusted_peers = Vec::new();
774        {
775            let mut state = self.state.write().unwrap();
776            state
777                .known_peers
778                .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms) < ONE_DAY_MILLISECONDS);
779            state.known_peers_v2.retain(|k, v| {
780                let keep = now_unix.saturating_sub(v.timestamp_ms()) < ONE_DAY_MILLISECONDS;
781                if !keep && self.is_trusted_peer(k) {
782                    culled_trusted_peers.push(*k);
783                }
784                keep
785            });
786        }
787
788        // Clear Discovery-sourced addresses for trusted peers whose
789        // signed info expired, allowing fallback to lower-priority sources.
790        for peer_id in culled_trusted_peers {
791            self.endpoint_manager
792                .clear_source(peer_id, AddressSource::Discovery);
793        }
794
795        // Clean out the pending_dials
796        self.pending_dials.retain(|_k, v| !v.is_finished());
797        if let Some(abort_handle) = &self.dial_seed_peers_task
798            && abort_handle.is_finished()
799        {
800            self.dial_seed_peers_task = None;
801        }
802
803        let cooldown = self.discovery_config.peer_failure_cooldown();
804        self.peer_cooldowns
805            .retain(|_, since| since.elapsed() < cooldown);
806
807        // Spawn some dials
808        let state = self.state.read().unwrap();
809        let our_peer_id = self.network.peer_id();
810
811        // Collect eligible peers from both known_peers (V2) and known_peers_v2 (V3),
812        // preferring fresher timestamps when a peer appears in both maps.
813        // Partition into preferred (not on cooldown) and cooldown peers.
814        let mut preferred: HashMap<PeerId, NodeInfo> = HashMap::new();
815        let mut cooldown_peers: HashMap<PeerId, NodeInfo> = HashMap::new();
816
817        for (peer_id, info) in state.known_peers.iter() {
818            if *peer_id != our_peer_id
819                && !info.addresses.is_empty()
820                && !state.connected_peers.contains_key(peer_id)
821                && !self.pending_dials.contains_key(peer_id)
822                && !state.peer_addresses.contains_key(peer_id)
823            {
824                if self.peer_cooldowns.contains_key(peer_id) {
825                    cooldown_peers.insert(*peer_id, info.data().clone());
826                } else {
827                    preferred.insert(*peer_id, info.data().clone());
828                }
829            }
830        }
831        for (peer_id, info) in state.known_peers_v2.iter() {
832            let p2p_addresses = info.p2p_addresses();
833            if *peer_id != our_peer_id
834                && !p2p_addresses.is_empty()
835                && !state.connected_peers.contains_key(peer_id)
836                && !self.pending_dials.contains_key(peer_id)
837                && !state.peer_addresses.contains_key(peer_id)
838            {
839                let node_info = NodeInfo {
840                    peer_id: *peer_id,
841                    addresses: p2p_addresses.to_vec(),
842                    timestamp_ms: info.timestamp_ms(),
843                    access_type: info.access_type(),
844                };
845                if self.peer_cooldowns.contains_key(peer_id) {
846                    if cooldown_peers
847                        .get(peer_id)
848                        .is_none_or(|existing| info.timestamp_ms() > existing.timestamp_ms)
849                    {
850                        cooldown_peers.insert(*peer_id, node_info);
851                    }
852                } else if preferred
853                    .get(peer_id)
854                    .is_none_or(|existing| info.timestamp_ms() > existing.timestamp_ms)
855                {
856                    preferred.insert(*peer_id, node_info);
857                }
858            }
859        }
860
861        let number_of_connections = state.connected_peers.len();
862        let number_to_dial = self
863            .discovery_config
864            .target_concurrent_connections()
865            .saturating_sub(number_of_connections);
866
867        use rand::seq::IteratorRandom;
868        let mut rng = rand::thread_rng();
869
870        let mut to_dial: Vec<_> = preferred
871            .into_iter()
872            .choose_multiple(&mut rng, number_to_dial);
873
874        let remaining = number_to_dial.saturating_sub(to_dial.len());
875        to_dial.extend(
876            cooldown_peers
877                .into_iter()
878                .choose_multiple(&mut rng, remaining),
879        );
880
881        for (peer_id, info) in to_dial {
882            let abort_handle = self
883                .tasks
884                .spawn(try_to_connect_to_peer(self.network.clone(), info));
885            self.pending_dials.insert(peer_id, abort_handle);
886        }
887
888        // If we aren't connected to anything and we aren't presently trying to connect to anyone
889        // we need to try the configured peers with High affinity (seed peers)
890        let has_peers_to_dial = || {
891            self.configured_peers
892                .values()
893                .any(|p| p.affinity == PeerAffinity::High)
894                || !self.unidentified_seed_peers.is_empty()
895        };
896        if self.dial_seed_peers_task.is_none()
897            && state.connected_peers.is_empty()
898            && self.pending_dials.is_empty()
899            && has_peers_to_dial()
900        {
901            let abort_handle = self.tasks.spawn(try_to_connect_to_seed_peers(
902                self.network.clone(),
903                self.discovery_config.clone(),
904                self.configured_peers.clone(),
905                self.unidentified_seed_peers.clone(),
906            ));
907
908            self.dial_seed_peers_task = Some(abort_handle);
909        }
910    }
911
912    fn is_trusted_peer(&self, peer_id: &PeerId) -> bool {
913        is_trusted_peer(peer_id, &self.configured_peers, &self.chain_peers)
914    }
915
916    fn mailbox_sender(&self) -> mpsc::Sender<DiscoveryMessage> {
917        self.mailbox_tx.clone()
918    }
919}
920
921async fn try_to_connect_to_peer(network: Network, info: NodeInfo) {
922    debug!("Connecting to peer {info:?}");
923    for multiaddr in &info.addresses {
924        if let Ok(address) = multiaddr.to_anemo_address() {
925            // Ignore the result and just log the error if there is one
926            if network
927                .connect_with_peer_id(address, info.peer_id)
928                .await
929                .tap_err(|e| {
930                    debug!(
931                        "error dialing {} at address '{}': {e}",
932                        info.peer_id.short_display(4),
933                        multiaddr
934                    )
935                })
936                .is_ok()
937            {
938                return;
939            }
940        }
941    }
942}
943
944async fn try_to_connect_to_seed_peers(
945    network: Network,
946    config: Arc<DiscoveryConfig>,
947    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
948    unidentified_seed_peers: Vec<anemo::types::Address>,
949) {
950    let high_affinity_peers: Vec<_> = configured_peers
951        .values()
952        .filter(|p| p.affinity == PeerAffinity::High)
953        .cloned()
954        .collect();
955    debug!(
956        ?high_affinity_peers,
957        ?unidentified_seed_peers,
958        "Connecting to seed peers"
959    );
960    let network = &network;
961
962    // Attempt connection to all high-affinity and seed peers.
963    let with_peer_id = high_affinity_peers.into_iter().flat_map(|peer_info| {
964        peer_info
965            .address
966            .into_iter()
967            .map(move |addr| (Some(peer_info.peer_id), addr))
968    });
969    let without_peer_id = unidentified_seed_peers.into_iter().map(|addr| (None, addr));
970    futures::stream::iter(with_peer_id.chain(without_peer_id))
971        .for_each_concurrent(
972            config.target_concurrent_connections(),
973            |(peer_id, address)| async move {
974                // Ignore the result and just log the error if there is one
975                let _ = if let Some(peer_id) = peer_id {
976                    network
977                        .connect_with_peer_id(address.clone(), peer_id)
978                        .await
979                        .tap_err(|e| {
980                            debug!(
981                                "error dialing peer {} at '{}': {e}",
982                                peer_id.short_display(4),
983                                address
984                            )
985                        })
986                } else {
987                    network
988                        .connect(address.clone())
989                        .await
990                        .tap_err(|e| debug!("error dialing address '{}': {e}", address))
991                };
992            },
993        )
994        .await;
995}
996
997async fn query_peer_for_known_peers_v2(peer: Peer) -> Option<Vec<SignedNodeInfo>> {
998    let mut client = DiscoveryClient::new(peer);
999    let request = Request::new(()).with_timeout(TIMEOUT);
1000    client
1001        .get_known_peers_v2(request)
1002        .await
1003        .ok()
1004        .map(Response::into_inner)
1005        .map(
1006            |GetKnownPeersResponseV2 {
1007                 own_info,
1008                 mut known_peers,
1009             }| {
1010                if !own_info.addresses.is_empty() {
1011                    known_peers.push(own_info)
1012                }
1013                known_peers
1014            },
1015        )
1016}
1017
1018async fn query_peer_for_their_known_peers(
1019    peer: Peer,
1020    discovery_config: Arc<DiscoveryConfig>,
1021    state: Arc<RwLock<State>>,
1022    metrics: Metrics,
1023    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1024    chain_peers: Arc<RwLock<HashSet<PeerId>>>,
1025    endpoint_manager: EndpointManager,
1026    mailbox_tx: mpsc::Sender<DiscoveryMessage>,
1027) {
1028    // Query V3 concurrently with V2 when enabled
1029    if discovery_config.use_get_known_peers_v3() {
1030        let our_info_v2 = state.read().unwrap().our_info_v2.clone();
1031        if let Some(own_info) = our_info_v2 {
1032            let peer_for_v3 = peer.clone();
1033            let v3_query = async move {
1034                let mut client = DiscoveryClient::new(peer_for_v3);
1035                let request =
1036                    Request::new(GetKnownPeersRequestV3 { own_info }).with_timeout(TIMEOUT);
1037                client
1038                    .get_known_peers_v3(request)
1039                    .await
1040                    .ok()
1041                    .map(Response::into_inner)
1042                    .map(
1043                        |GetKnownPeersResponseV3 {
1044                             own_info,
1045                             mut known_peers,
1046                         }| {
1047                            if !own_info.p2p_addresses().is_empty() {
1048                                known_peers.push(own_info)
1049                            }
1050                            known_peers
1051                        },
1052                    )
1053            };
1054
1055            let (found_peers_v2, found_peers_v3) =
1056                tokio::join!(query_peer_for_known_peers_v2(peer), v3_query);
1057
1058            if let Some(found_peers) = found_peers_v2 {
1059                update_known_peers(
1060                    state.clone(),
1061                    metrics.clone(),
1062                    found_peers,
1063                    configured_peers.clone(),
1064                    &chain_peers,
1065                );
1066            }
1067            if let Some(found_peers) = found_peers_v3 {
1068                let changed = update_known_peers_versioned(
1069                    state,
1070                    metrics,
1071                    found_peers,
1072                    configured_peers,
1073                    &chain_peers,
1074                    &endpoint_manager,
1075                );
1076                if changed {
1077                    let _ = mailbox_tx.try_send(DiscoveryMessage::TrustedPeersUpdated);
1078                }
1079            }
1080            return;
1081        }
1082    }
1083
1084    // V3 not enabled or our_info_v2 not available - just run V2
1085    if let Some(found_peers) = query_peer_for_known_peers_v2(peer).await {
1086        update_known_peers(state, metrics, found_peers, configured_peers, &chain_peers);
1087    }
1088}
1089
1090async fn query_connected_peers_for_their_known_peers(
1091    network: Network,
1092    config: Arc<DiscoveryConfig>,
1093    state: Arc<RwLock<State>>,
1094    metrics: Metrics,
1095    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1096    chain_peers: Arc<RwLock<HashSet<PeerId>>>,
1097    endpoint_manager: EndpointManager,
1098    mailbox_tx: mpsc::Sender<DiscoveryMessage>,
1099) {
1100    use rand::seq::IteratorRandom;
1101
1102    let peers_to_query: Vec<_> = network
1103        .peers()
1104        .into_iter()
1105        .flat_map(|id| network.peer(id))
1106        .choose_multiple(&mut rand::thread_rng(), config.peers_to_query());
1107
1108    // Query V2 to keep known_peers populated for V2 clients
1109    let v2_query = {
1110        let peers = peers_to_query.clone();
1111        let peers_to_query_count = config.peers_to_query();
1112        async move {
1113            peers
1114                .into_iter()
1115                .map(DiscoveryClient::new)
1116                .map(|mut client| async move {
1117                    let request = Request::new(()).with_timeout(TIMEOUT);
1118                    client
1119                        .get_known_peers_v2(request)
1120                        .await
1121                        .ok()
1122                        .map(Response::into_inner)
1123                        .map(
1124                            |GetKnownPeersResponseV2 {
1125                                 own_info,
1126                                 mut known_peers,
1127                             }| {
1128                                if !own_info.addresses.is_empty() {
1129                                    known_peers.push(own_info)
1130                                }
1131                                known_peers
1132                            },
1133                        )
1134                })
1135                .pipe(futures::stream::iter)
1136                .buffer_unordered(peers_to_query_count)
1137                .filter_map(std::future::ready)
1138                .flat_map(futures::stream::iter)
1139                .collect::<Vec<_>>()
1140                .await
1141        }
1142    };
1143
1144    // Additionally query V3 when enabled, concurrently with V2
1145    if config.use_get_known_peers_v3() {
1146        let our_info_v2 = state.read().unwrap().our_info_v2.clone();
1147        if let Some(own_info) = our_info_v2 {
1148            let v3_query = {
1149                let peers_to_query_count = config.peers_to_query();
1150                async move {
1151                    peers_to_query
1152                        .into_iter()
1153                        .map(DiscoveryClient::new)
1154                        .map(|mut client| {
1155                            let own_info = own_info.clone();
1156                            async move {
1157                                let request = Request::new(GetKnownPeersRequestV3 { own_info })
1158                                    .with_timeout(TIMEOUT);
1159                                client
1160                                    .get_known_peers_v3(request)
1161                                    .await
1162                                    .ok()
1163                                    .map(Response::into_inner)
1164                                    .map(
1165                                        |GetKnownPeersResponseV3 {
1166                                             own_info,
1167                                             mut known_peers,
1168                                         }| {
1169                                            if !own_info.p2p_addresses().is_empty() {
1170                                                known_peers.push(own_info)
1171                                            }
1172                                            known_peers
1173                                        },
1174                                    )
1175                            }
1176                        })
1177                        .pipe(futures::stream::iter)
1178                        .buffer_unordered(peers_to_query_count)
1179                        .filter_map(std::future::ready)
1180                        .flat_map(futures::stream::iter)
1181                        .collect::<Vec<_>>()
1182                        .await
1183                }
1184            };
1185
1186            let (found_peers_v2, found_peers_v3) = tokio::join!(v2_query, v3_query);
1187
1188            update_known_peers(
1189                state.clone(),
1190                metrics.clone(),
1191                found_peers_v2,
1192                configured_peers.clone(),
1193                &chain_peers,
1194            );
1195            let changed = update_known_peers_versioned(
1196                state,
1197                metrics,
1198                found_peers_v3,
1199                configured_peers,
1200                &chain_peers,
1201                &endpoint_manager,
1202            );
1203            if changed {
1204                let _ = mailbox_tx.try_send(DiscoveryMessage::TrustedPeersUpdated);
1205            }
1206            return;
1207        }
1208    }
1209
1210    // V3 not enabled or our_info_v2 not available - just run V2
1211    let found_peers_v2 = v2_query.await;
1212    update_known_peers(
1213        state,
1214        metrics,
1215        found_peers_v2,
1216        configured_peers,
1217        &chain_peers,
1218    );
1219}
1220
1221fn update_known_peers(
1222    state: Arc<RwLock<State>>,
1223    metrics: Metrics,
1224    found_peers: Vec<SignedNodeInfo>,
1225    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1226    chain_peers: &Arc<RwLock<HashSet<PeerId>>>,
1227) {
1228    use std::collections::hash_map::Entry;
1229
1230    let now_unix = now_unix();
1231    let our_peer_id = state.read().unwrap().our_info.clone().unwrap().peer_id;
1232    let known_peers = &mut state.write().unwrap().known_peers;
1233    // only take the first MAX_PEERS_TO_SEND peers
1234    for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1235        // +1 to account for the "own_info" of the serving peer
1236        // Skip peers whose timestamp is too far in the future from our clock
1237        // or that are too old
1238        if peer_info.timestamp_ms > now_unix.saturating_add(30 * 1_000) // 30 seconds
1239            || now_unix.saturating_sub(peer_info.timestamp_ms) > ONE_DAY_MILLISECONDS
1240        {
1241            continue;
1242        }
1243
1244        if peer_info.peer_id == our_peer_id {
1245            continue;
1246        }
1247
1248        let is_restricted = match peer_info.access_type {
1249            AccessType::Public => false,
1250            AccessType::Private | AccessType::Trusted => true,
1251        };
1252        if is_restricted && !is_trusted_peer(&peer_info.peer_id, &configured_peers, chain_peers) {
1253            continue;
1254        }
1255
1256        // Skip entries that have too many addresses as a means to cap the size of a node's info
1257        if peer_info.addresses.len() > MAX_ADDRESSES_PER_PEER {
1258            continue;
1259        }
1260
1261        // verify that all addresses provided are valid anemo addresses
1262        if !peer_info
1263            .addresses
1264            .iter()
1265            .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
1266        {
1267            continue;
1268        }
1269        let Ok(public_key) = Ed25519PublicKey::from_bytes(&peer_info.peer_id.0) else {
1270            debug_fatal!(
1271                // This should never happen.
1272                "Failed to convert anemo PeerId {:?} to Ed25519PublicKey",
1273                peer_info.peer_id
1274            );
1275            continue;
1276        };
1277        let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
1278        if let Err(e) = public_key.verify(&msg, peer_info.auth_sig()) {
1279            info!(
1280                "Discovery failed to verify signature for NodeInfo for peer {:?}: {e:?}",
1281                peer_info.peer_id
1282            );
1283            // TODO: consider denylisting the source of bad NodeInfo from future requests.
1284            continue;
1285        }
1286        let peer = VerifiedSignedNodeInfo::new_from_verified(peer_info);
1287
1288        match known_peers.entry(peer.peer_id) {
1289            Entry::Occupied(mut o) => {
1290                if peer.timestamp_ms > o.get().timestamp_ms {
1291                    if o.get().addresses.is_empty() && !peer.addresses.is_empty() {
1292                        metrics.inc_num_peers_with_external_address();
1293                    }
1294                    if !o.get().addresses.is_empty() && peer.addresses.is_empty() {
1295                        metrics.dec_num_peers_with_external_address();
1296                    }
1297                    o.insert(peer);
1298                }
1299            }
1300            Entry::Vacant(v) => {
1301                if !peer.addresses.is_empty() {
1302                    metrics.inc_num_peers_with_external_address();
1303                }
1304                v.insert(peer);
1305            }
1306        }
1307    }
1308}
1309
1310/// Returns true if any trusted peer was inserted or updated.
1311fn update_known_peers_versioned(
1312    state: Arc<RwLock<State>>,
1313    metrics: Metrics,
1314    found_peers: Vec<SignedVersionedNodeInfo>,
1315    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1316    chain_peers: &Arc<RwLock<HashSet<PeerId>>>,
1317    endpoint_manager: &EndpointManager,
1318) -> bool {
1319    use std::collections::hash_map::Entry;
1320
1321    let now_unix = now_unix();
1322    let our_peer_id = state
1323        .read()
1324        .unwrap()
1325        .our_info_v2
1326        .as_ref()
1327        .and_then(|info| info.peer_id());
1328    let known_peers_v2 = &mut state.write().unwrap().known_peers_v2;
1329    let mut trusted_peer_changed = false;
1330
1331    for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1332        let timestamp_ms = peer_info.timestamp_ms();
1333
1334        if timestamp_ms > now_unix.saturating_add(30 * 1_000)
1335            || now_unix.saturating_sub(timestamp_ms) > ONE_DAY_MILLISECONDS
1336        {
1337            continue;
1338        }
1339
1340        let Some(peer_id) = peer_info.peer_id() else {
1341            continue;
1342        };
1343
1344        if Some(peer_id) == our_peer_id {
1345            continue;
1346        }
1347
1348        let is_restricted = match peer_info.access_type() {
1349            AccessType::Public => false,
1350            AccessType::Private | AccessType::Trusted => true,
1351        };
1352        let is_trusted = is_trusted_peer(&peer_id, &configured_peers, chain_peers);
1353        if is_restricted && !is_trusted {
1354            continue;
1355        }
1356
1357        let peer = match verify_versioned_node_info(&peer_info) {
1358            Ok(verified) => verified,
1359            Err(reason) => {
1360                info!("Discovery rejecting VersionedNodeInfo for peer {peer_id:?}: {reason}");
1361                continue;
1362            }
1363        };
1364
1365        // Forward discovered addresses for trusted peers to EndpointManager.
1366        if is_trusted && let VersionedNodeInfo::V2(info_v2) = peer_info.data() {
1367            for (endpoint_id, addrs) in &info_v2.addresses {
1368                if !addrs.is_empty() {
1369                    let _ = endpoint_manager.update_endpoint(
1370                        endpoint_id.clone(),
1371                        AddressSource::Discovery,
1372                        addrs.clone(),
1373                    );
1374                }
1375            }
1376        }
1377
1378        let peer_p2p_addresses = peer.p2p_addresses();
1379
1380        match known_peers_v2.entry(peer_id) {
1381            Entry::Occupied(mut o) => {
1382                if peer.timestamp_ms() > o.get().timestamp_ms() {
1383                    let old_addresses = o.get().p2p_addresses();
1384                    if old_addresses.is_empty() && !peer_p2p_addresses.is_empty() {
1385                        metrics.inc_num_peers_with_external_address();
1386                    }
1387                    if !old_addresses.is_empty() && peer_p2p_addresses.is_empty() {
1388                        metrics.dec_num_peers_with_external_address();
1389                    }
1390                    o.insert(peer);
1391                    if is_trusted {
1392                        trusted_peer_changed = true;
1393                    }
1394                }
1395            }
1396            Entry::Vacant(v) => {
1397                if !peer_p2p_addresses.is_empty() {
1398                    metrics.inc_num_peers_with_external_address();
1399                }
1400                v.insert(peer);
1401                if is_trusted {
1402                    trusted_peer_changed = true;
1403                }
1404            }
1405        }
1406    }
1407
1408    trusted_peer_changed
1409}
1410
1411/// A trusted peer is one that appears in the static configured_peers (seed/allowlisted)
1412/// or was dynamically added via on-chain validator info (chain_peers).
1413pub(super) fn is_trusted_peer(
1414    peer_id: &PeerId,
1415    configured_peers: &HashMap<PeerId, PeerInfo>,
1416    chain_peers: &Arc<RwLock<HashSet<PeerId>>>,
1417) -> bool {
1418    configured_peers.contains_key(peer_id) || chain_peers.read().unwrap().contains(peer_id)
1419}
1420
1421pub(super) fn now_unix() -> u64 {
1422    use std::time::{SystemTime, UNIX_EPOCH};
1423
1424    SystemTime::now()
1425        .duration_since(UNIX_EPOCH)
1426        .unwrap()
1427        .as_millis() as u64
1428}