sui_network/discovery/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::types::PeerAffinity;
5use anemo::types::PeerInfo;
6use anemo::{Network, Peer, PeerId, Request, Response, types::PeerEvent};
7use fastcrypto::ed25519::{Ed25519PublicKey, Ed25519Signature};
8use futures::StreamExt;
9use mysten_common::debug_fatal;
10use serde::{Deserialize, Serialize};
11use shared_crypto::intent::IntentScope;
12use std::{
13    collections::{BTreeMap, HashMap, HashSet},
14    path::PathBuf,
15    sync::{Arc, RwLock},
16    time::{Duration, Instant},
17};
18
19use crate::endpoint_manager::{AddressSource, EndpointId, EndpointManager};
20use store::{load_stored_peers, save_stored_peers};
21use sui_config::p2p::{AccessType, DiscoveryConfig, P2pConfig};
22use sui_types::crypto::{NetworkKeyPair, NetworkPublicKey, Signer, ToFromBytes, VerifyingKey};
23use sui_types::digests::Digest;
24use sui_types::message_envelope::{Envelope, Message, VerifiedEnvelope};
25use sui_types::multiaddr::Multiaddr;
26use tap::{Pipe, TapFallible};
27use tokio::sync::broadcast::error::RecvError;
28use tokio::sync::mpsc;
29use tokio::{
30    sync::oneshot,
31    task::{AbortHandle, JoinSet},
32};
33use tracing::{debug, info, trace};
34
35const TIMEOUT: Duration = Duration::from_secs(1);
36const ONE_DAY_MILLISECONDS: u64 = 24 * 60 * 60 * 1_000;
37const MAX_ADDRESS_LENGTH: usize = 300;
38const MAX_PEERS_TO_SEND: usize = 200;
39const MAX_ADDRESSES_PER_PEER: usize = 2;
40
41mod generated {
42    include!(concat!(env!("OUT_DIR"), "/sui.Discovery.rs"));
43}
44mod builder;
45mod metrics;
46mod server;
47mod store;
48#[cfg(test)]
49mod tests;
50
51pub use builder::{Builder, UnstartedDiscovery};
52pub use generated::{
53    discovery_client::DiscoveryClient,
54    discovery_server::{Discovery, DiscoveryServer},
55};
56pub use server::{GetKnownPeersRequestV3, GetKnownPeersResponseV2, GetKnownPeersResponseV3};
57
58/// Message types for the discovery system mailbox.
59#[derive(Debug)]
60pub enum DiscoveryMessage {
61    /// An external source (e.g. admin API, node config) updated a peer's address.
62    PeerAddressChange {
63        peer_id: PeerId,
64        source: AddressSource,
65        addresses: Vec<anemo::types::Address>,
66    },
67    /// Node info was received from inbound RPC.
68    ReceivedNodeInfo {
69        peer_info: Box<SignedVersionedNodeInfo>,
70    },
71    /// A spawned peer-query task discovered updated info for a trusted peer,
72    /// signaling the event loop to update the stored peer addresses.
73    TrustedPeersUpdated,
74    /// A peer has been reported as continuously failing, triggering disconnect and cooldown.
75    PeerFailureReport { peer_id: PeerId },
76}
77
78/// A Handle to the Discovery subsystem. The Discovery system will be shut down once all Handles
79/// have been dropped.
80#[derive(Clone, Debug)]
81pub struct Handle {
82    pub(super) _shutdown_handle: Arc<oneshot::Sender<()>>,
83    pub(super) sender: Sender,
84}
85
86impl Handle {
87    pub fn sender(&self) -> Sender {
88        self.sender.clone()
89    }
90}
91
92/// A lightweight handle for sending messages to the discovery event loop
93/// without holding a shutdown reference.
94#[derive(Clone, Debug)]
95pub struct Sender {
96    pub(super) sender: mpsc::Sender<DiscoveryMessage>,
97}
98
99impl Sender {
100    pub fn peer_address_change(
101        &self,
102        peer_id: PeerId,
103        source: AddressSource,
104        addresses: Vec<anemo::types::Address>,
105    ) {
106        self.sender
107            .try_send(DiscoveryMessage::PeerAddressChange {
108                peer_id,
109                source,
110                addresses,
111            })
112            .expect("Discovery mailbox should not overflow or be closed")
113    }
114
115    pub fn report_peer_failure(&self, peer_id: PeerId) {
116        let _ = self
117            .sender
118            .try_send(DiscoveryMessage::PeerFailureReport { peer_id });
119    }
120}
121
122use self::metrics::Metrics;
123
124/// The internal discovery state shared between the main event loop and the request handler
125struct State {
126    our_info: Option<SignedNodeInfo>,
127    our_info_v2: Option<SignedVersionedNodeInfo>,
128    connected_peers: HashMap<PeerId, ()>,
129    known_peers: HashMap<PeerId, VerifiedSignedNodeInfo>,
130    known_peers_v2: HashMap<PeerId, VerifiedSignedVersionedNodeInfo>,
131    peer_addresses: HashMap<PeerId, BTreeMap<AddressSource, Vec<anemo::types::Address>>>,
132}
133
134/// The information necessary to dial another peer.
135///
136/// `NodeInfo` contains all the information that is shared with other nodes via the discovery
137/// service to advertise how a node can be reached.
138#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
139pub struct NodeInfo {
140    pub peer_id: PeerId,
141    pub addresses: Vec<Multiaddr>,
142
143    /// Creation time.
144    ///
145    /// This is used to determine which of two NodeInfo's from the same PeerId should be retained.
146    pub timestamp_ms: u64,
147
148    /// See docstring for `AccessType`.
149    pub access_type: AccessType,
150}
151
152impl NodeInfo {
153    fn sign(self, keypair: &NetworkKeyPair) -> SignedNodeInfo {
154        let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
155        let sig = keypair.sign(&msg);
156        SignedNodeInfo::new_from_data_and_sig(self, sig)
157    }
158}
159
160pub type SignedNodeInfo = Envelope<NodeInfo, Ed25519Signature>;
161
162pub type VerifiedSignedNodeInfo = VerifiedEnvelope<NodeInfo, Ed25519Signature>;
163
164#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
165pub struct NodeInfoDigest(Digest);
166
167impl NodeInfoDigest {
168    pub const fn new(digest: [u8; 32]) -> Self {
169        Self(Digest::new(digest))
170    }
171}
172
173impl Message for NodeInfo {
174    type DigestType = NodeInfoDigest;
175    const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
176
177    fn digest(&self) -> Self::DigestType {
178        unreachable!("NodeInfoDigest is not used today")
179    }
180}
181
182/// NodeInfoV2 supports multiple address types keyed by EndpointId.
183// TODO: Remove support for V1 once V2 is available in all production networks.
184#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
185pub struct NodeInfoV2 {
186    pub addresses: BTreeMap<EndpointId, Vec<Multiaddr>>,
187    pub timestamp_ms: u64,
188    pub access_type: AccessType,
189}
190
191impl NodeInfoV2 {
192    /// Derive the P2P PeerId from the addresses map.
193    /// Returns None if no P2P endpoint is present.
194    pub fn peer_id(&self) -> Option<PeerId> {
195        self.addresses.keys().find_map(|k| match k {
196            EndpointId::P2p(peer_id) => Some(*peer_id),
197            EndpointId::Consensus(_) => None,
198        })
199    }
200
201    pub fn p2p_addresses(&self) -> &[Multiaddr] {
202        self.addresses
203            .iter()
204            .find_map(|(k, v)| match k {
205                EndpointId::P2p(_) => Some(v.as_slice()),
206                EndpointId::Consensus(_) => None,
207            })
208            .unwrap_or(&[])
209    }
210}
211
212/// Versioned wrapper for NodeInfo types.
213#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
214pub enum VersionedNodeInfo {
215    V1(NodeInfo),
216    V2(NodeInfoV2),
217}
218
219impl VersionedNodeInfo {
220    pub fn peer_id(&self) -> Option<PeerId> {
221        match self {
222            VersionedNodeInfo::V1(info) => Some(info.peer_id),
223            VersionedNodeInfo::V2(info) => info.peer_id(),
224        }
225    }
226
227    pub fn timestamp_ms(&self) -> u64 {
228        match self {
229            VersionedNodeInfo::V1(info) => info.timestamp_ms,
230            VersionedNodeInfo::V2(info) => info.timestamp_ms,
231        }
232    }
233
234    pub fn access_type(&self) -> AccessType {
235        match self {
236            VersionedNodeInfo::V1(info) => info.access_type,
237            VersionedNodeInfo::V2(info) => info.access_type,
238        }
239    }
240
241    pub fn p2p_addresses(&self) -> &[Multiaddr] {
242        match self {
243            VersionedNodeInfo::V1(info) => &info.addresses,
244            VersionedNodeInfo::V2(info) => info.p2p_addresses(),
245        }
246    }
247
248    pub fn sign(self, keypair: &NetworkKeyPair) -> SignedVersionedNodeInfo {
249        let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
250        let sig = keypair.sign(&msg);
251        SignedVersionedNodeInfo::new_from_data_and_sig(self, sig)
252    }
253}
254
255#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
256pub struct VersionedNodeInfoDigest(Digest);
257
258impl Message for VersionedNodeInfo {
259    type DigestType = VersionedNodeInfoDigest;
260    const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
261
262    fn digest(&self) -> Self::DigestType {
263        unreachable!("VersionedNodeInfoDigest is not used today")
264    }
265}
266
267pub type SignedVersionedNodeInfo = Envelope<VersionedNodeInfo, Ed25519Signature>;
268pub type VerifiedSignedVersionedNodeInfo = VerifiedEnvelope<VersionedNodeInfo, Ed25519Signature>;
269
270/// Verifies the signature and endpoint identity consistency of a
271/// `SignedVersionedNodeInfo`, returning a `VerifiedSignedVersionedNodeInfo`
272/// on success.
273///
274/// Checks:
275/// 1. The signature is valid for the P2P peer_id embedded in the node info.
276/// 2. Any non-P2P endpoint identities (e.g. `Consensus`) match the P2P
277///    peer_id, preventing a peer from advertising endpoints under another
278///    key.
279fn verify_versioned_node_info(
280    peer_info: &SignedVersionedNodeInfo,
281) -> Result<VerifiedSignedVersionedNodeInfo, &'static str> {
282    let peer_id = peer_info.peer_id().ok_or("missing P2P peer_id")?;
283
284    let public_key =
285        Ed25519PublicKey::from_bytes(&peer_id.0).map_err(|_| "invalid peer_id public key")?;
286
287    let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
288    public_key
289        .verify(&msg, peer_info.auth_sig())
290        .map_err(|_| "signature verification failed")?;
291
292    match peer_info.data() {
293        VersionedNodeInfo::V1(info) => {
294            if info.addresses.len() > MAX_ADDRESSES_PER_PEER {
295                return Err("too many addresses");
296            }
297            if !info
298                .addresses
299                .iter()
300                .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
301            {
302                return Err("invalid address");
303            }
304        }
305        VersionedNodeInfo::V2(info_v2) => {
306            // Each endpoint variant (P2p, Consensus) may appear at most once.
307            let mut seen_variants = Vec::new();
308            for endpoint_id in info_v2.addresses.keys() {
309                let variant = std::mem::discriminant(endpoint_id);
310                if seen_variants.contains(&variant) {
311                    return Err("duplicate endpoint variant");
312                }
313                seen_variants.push(variant);
314            }
315
316            for (endpoint_id, addrs) in &info_v2.addresses {
317                if addrs.len() > MAX_ADDRESSES_PER_PEER {
318                    return Err("too many addresses for endpoint");
319                }
320                if !addrs.iter().all(|addr| addr.len() < MAX_ADDRESS_LENGTH) {
321                    return Err("address too long");
322                }
323                if matches!(endpoint_id, EndpointId::P2p(_))
324                    && !addrs.iter().all(|addr| addr.to_anemo_address().is_ok())
325                {
326                    return Err("invalid P2P address");
327                }
328            }
329
330            let identities_valid = info_v2.addresses.keys().all(|eid| match eid {
331                EndpointId::P2p(_) => true,
332                EndpointId::Consensus(pubkey) => pubkey.as_bytes() == peer_id.0,
333            });
334            if !identities_valid {
335                return Err("non-P2P endpoint identity mismatch");
336            }
337        }
338    }
339
340    Ok(VerifiedSignedVersionedNodeInfo::new_from_verified(
341        peer_info.clone(),
342    ))
343}
344
345struct DiscoveryEventLoop {
346    config: P2pConfig,
347    discovery_config: Arc<DiscoveryConfig>,
348    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
349    chain_peers: Arc<RwLock<HashSet<PeerId>>>,
350    unidentified_seed_peers: Vec<anemo::types::Address>,
351    network: Network,
352    keypair: NetworkKeyPair,
353    tasks: JoinSet<()>,
354    pending_dials: HashMap<PeerId, AbortHandle>,
355    dial_seed_peers_task: Option<AbortHandle>,
356    shutdown_handle: oneshot::Receiver<()>,
357    state: Arc<RwLock<State>>,
358    mailbox: mpsc::Receiver<DiscoveryMessage>,
359    mailbox_tx: mpsc::Sender<DiscoveryMessage>,
360    metrics: Metrics,
361    consensus_external_address: Option<Multiaddr>,
362    endpoint_manager: EndpointManager,
363    store_path: Option<PathBuf>,
364    peer_cooldowns: HashMap<PeerId, Instant>,
365}
366
367impl DiscoveryEventLoop {
368    pub async fn start(mut self) {
369        info!("Discovery started");
370
371        self.construct_our_info();
372        self.configure_preferred_peers();
373        self.load_stored_peers_on_startup();
374
375        let mut interval = tokio::time::interval(self.discovery_config.interval_period());
376        let mut peer_events = {
377            let (subscriber, _peers) = self.network.subscribe().unwrap();
378            subscriber
379        };
380
381        loop {
382            tokio::select! {
383                now = interval.tick() => {
384                    let now_unix = now_unix();
385                    self.handle_tick(now.into_std(), now_unix);
386                }
387                peer_event = peer_events.recv() => {
388                    self.handle_peer_event(peer_event);
389                },
390                Some(message) = self.mailbox.recv() => {
391                    self.handle_message(message);
392                }
393                Some(task_result) = self.tasks.join_next() => {
394                    match task_result {
395                        Ok(()) => {},
396                        Err(e) => {
397                            if e.is_cancelled() {
398                                // avoid crashing on ungraceful shutdown
399                            } else if e.is_panic() {
400                                // propagate panics.
401                                std::panic::resume_unwind(e.into_panic());
402                            } else {
403                                panic!("task failed: {e}");
404                            }
405                        },
406                    };
407                },
408                // Once the shutdown notification resolves we can terminate the event loop
409                _ = &mut self.shutdown_handle => {
410                    break;
411                }
412            }
413        }
414
415        self.save_stored_peers();
416        info!("Discovery ended");
417    }
418
419    fn handle_message(&mut self, message: DiscoveryMessage) {
420        match message {
421            DiscoveryMessage::PeerAddressChange {
422                peer_id,
423                source,
424                addresses,
425            } => {
426                self.handle_peer_address_change(peer_id, source, addresses);
427            }
428            DiscoveryMessage::ReceivedNodeInfo { peer_info } => {
429                let changed = update_known_peers_versioned(
430                    self.state.clone(),
431                    vec![*peer_info],
432                    self.configured_peers.clone(),
433                    &self.chain_peers,
434                    &self.endpoint_manager,
435                );
436                if changed {
437                    self.save_stored_peers();
438                }
439            }
440            DiscoveryMessage::TrustedPeersUpdated => {
441                self.save_stored_peers();
442            }
443            DiscoveryMessage::PeerFailureReport { peer_id } => {
444                self.handle_peer_failure_report(peer_id);
445            }
446        }
447    }
448
449    fn handle_peer_failure_report(&mut self, peer_id: PeerId) {
450        if self.is_trusted_peer(&peer_id) {
451            info!(?peer_id, "ignoring failure report for trusted peer");
452            return;
453        }
454        let min_peers = self.discovery_config.min_peers_for_disconnect();
455        let connected_count = self.state.read().unwrap().connected_peers.len();
456        if connected_count < min_peers {
457            info!(
458                ?peer_id,
459                connected_count, min_peers, "skipping disconnect, too few connected peers"
460            );
461            return;
462        }
463        info!(
464            ?peer_id,
465            "peer failure reported, disconnecting and adding cooldown"
466        );
467        let _ = self.network.disconnect(peer_id);
468        self.peer_cooldowns.insert(peer_id, Instant::now());
469    }
470
471    fn construct_our_info(&mut self) {
472        if self.state.read().unwrap().our_info.is_some() {
473            return;
474        }
475
476        let peer_id = self.network.peer_id();
477        let timestamp_ms = now_unix();
478        let access_type = self.discovery_config.access_type();
479
480        let addresses: Vec<Multiaddr> = self
481            .config
482            .external_address
483            .clone()
484            .and_then(|addr| addr.to_anemo_address().ok().map(|_| addr))
485            .into_iter()
486            .collect();
487
488        let our_info = NodeInfo {
489            peer_id,
490            addresses: addresses.clone(),
491            timestamp_ms,
492            access_type,
493        }
494        .sign(&self.keypair);
495
496        let mut addresses_map = BTreeMap::new();
497        addresses_map.insert(EndpointId::P2p(peer_id), addresses);
498        if let Some(consensus_addr) = &self.consensus_external_address {
499            // Populates `Consensus` EndpointId from our P2P (anemo) `PeerId`.
500            // This is safe because both the P2P and consensus networks use the same
501            // ed25519 network keypair. Both originate from `NodeConfig::network_key_pair()`.
502            let network_pubkey =
503                NetworkPublicKey::from_bytes(&peer_id.0).expect("PeerId is a valid public key");
504            addresses_map.insert(
505                EndpointId::Consensus(network_pubkey),
506                vec![consensus_addr.clone()],
507            );
508        }
509        let our_info_v2 = VersionedNodeInfo::V2(NodeInfoV2 {
510            addresses: addresses_map,
511            timestamp_ms,
512            access_type,
513        })
514        .sign(&self.keypair);
515
516        let mut state = self.state.write().unwrap();
517        state.our_info = Some(our_info);
518        state.our_info_v2 = Some(our_info_v2);
519    }
520
521    fn configure_preferred_peers(&mut self) {
522        let peers: Vec<_> = self.configured_peers.values().cloned().collect();
523        for peer_info in peers {
524            debug!(?peer_info, "Add configured preferred peer");
525            match peer_info.affinity {
526                PeerAffinity::High => {
527                    self.handle_peer_address_change(
528                        peer_info.peer_id,
529                        AddressSource::Seed,
530                        peer_info.address,
531                    );
532                }
533                _ => {
534                    // Allowed peers accept inbound but aren't actively dialed,
535                    // so insert them directly without address priority tracking.
536                    self.network.known_peers().insert(peer_info);
537                }
538            }
539        }
540    }
541
542    fn update_our_info_timestamp(&mut self, now_unix: u64) {
543        let state = &mut self.state.write().unwrap();
544
545        if let Some(our_info) = &state.our_info {
546            let mut data = our_info.data().clone();
547            data.timestamp_ms = now_unix;
548            state.our_info = Some(data.sign(&self.keypair));
549        }
550
551        if let Some(our_info_v2) = &state.our_info_v2 {
552            let mut data = our_info_v2.data().clone();
553            match &mut data {
554                VersionedNodeInfo::V1(info) => info.timestamp_ms = now_unix,
555                VersionedNodeInfo::V2(info) => info.timestamp_ms = now_unix,
556            }
557            state.our_info_v2 = Some(data.sign(&self.keypair));
558        }
559    }
560
561    fn handle_peer_address_change(
562        &mut self,
563        peer_id: PeerId,
564        source: AddressSource,
565        addresses: Vec<anemo::types::Address>,
566    ) {
567        debug!(
568            ?peer_id,
569            ?source,
570            ?addresses,
571            "Received peer address change"
572        );
573
574        // Update set of trusted peers from on-chain validator configs.
575        if source == AddressSource::Chain {
576            if addresses.is_empty() {
577                self.chain_peers.write().unwrap().remove(&peer_id);
578            } else {
579                self.chain_peers.write().unwrap().insert(peer_id);
580            }
581        }
582
583        // Update stored addresses.
584        {
585            let mut state = self.state.write().unwrap();
586            let source_map = state.peer_addresses.entry(peer_id).or_default();
587
588            if addresses.is_empty() {
589                source_map.remove(&source);
590                if source_map.is_empty() {
591                    state.peer_addresses.remove(&peer_id);
592                }
593            } else {
594                source_map.insert(source, addresses);
595            }
596
597            // When a Chain address arrives, check if there are cached Discovery
598            // P2P addresses in known_peers_v2 that should take priority. This
599            // handles the startup race where cached NodeInfo is loaded before
600            // chain_peers is populated (so update_known_peers_versioned couldn't
601            // forward the Discovery addresses at load time).
602            if source == AddressSource::Chain
603                && !state
604                    .peer_addresses
605                    .get(&peer_id)
606                    .is_some_and(|s| s.contains_key(&AddressSource::Discovery))
607                && let Some(addrs) = state
608                    .known_peers_v2
609                    .get(&peer_id)
610                    .and_then(|info| match info.data() {
611                        VersionedNodeInfo::V2(v2) => Some(v2.p2p_addresses()),
612                        _ => None,
613                    })
614            {
615                let anemo_addrs: Vec<_> = addrs
616                    .iter()
617                    .filter_map(|a| a.to_anemo_address().ok())
618                    .collect();
619                if !anemo_addrs.is_empty() {
620                    state
621                        .peer_addresses
622                        .entry(peer_id)
623                        .or_default()
624                        .insert(AddressSource::Discovery, anemo_addrs);
625                }
626            }
627        }
628
629        // Check if we should use the updated addresses.
630        self.reconfigure_peer_addresses(peer_id);
631    }
632
633    /// Reads the highest-priority addresses from `peer_addresses` for the given peer
634    /// and updates `network.known_peers()` if they differ from the current addresses.
635    fn reconfigure_peer_addresses(&mut self, peer_id: PeerId) {
636        let priority = self
637            .state
638            .read()
639            .unwrap()
640            .peer_addresses
641            .get(&peer_id)
642            .and_then(|sources| {
643                sources
644                    .first_key_value()
645                    .map(|(source, addrs)| (*source, addrs.clone()))
646            });
647
648        // Record the active P2P address source for trusted peers (one gauge per
649        // peer, value-encoded). Untrusted peers are skipped to bound metric
650        // cardinality. A `None` priority (no installed source) records 0.
651        if self.is_trusted_peer(&peer_id) {
652            self.metrics.set_active_p2p_address_source(
653                &peer_id.to_string(),
654                priority.as_ref().map(|(source, _)| *source),
655            );
656        }
657
658        let priority_addresses = priority.map(|(_, addrs)| addrs).unwrap_or_default();
659        let current_addresses = self
660            .network
661            .known_peers()
662            .get(&peer_id)
663            .map(|info| info.address.clone())
664            .unwrap_or_default();
665        if priority_addresses != current_addresses {
666            let new_peer_info = PeerInfo {
667                peer_id,
668                affinity: PeerAffinity::High,
669                address: priority_addresses.clone(),
670            };
671
672            self.network.known_peers().insert(new_peer_info);
673
674            // Override existing connection if there might be one.
675            if !current_addresses.is_empty() {
676                let _ = self.network.disconnect(peer_id);
677
678                if let Some(address) = priority_addresses.first().cloned() {
679                    let network = self.network.clone();
680                    self.tasks.spawn(async move {
681                        // If this fails, ConnectionManager will retry.
682                        let _ = network.connect_with_peer_id(address, peer_id).await;
683                    });
684                }
685            }
686        }
687    }
688
689    fn load_stored_peers_on_startup(&mut self) {
690        let Some(path) = &self.store_path else {
691            return;
692        };
693        let entries = load_stored_peers(path);
694        if entries.is_empty() {
695            return;
696        }
697        info!(
698            count = entries.len(),
699            "Loaded stored peer addresses from {}",
700            path.display()
701        );
702
703        update_known_peers_versioned(
704            self.state.clone(),
705            entries,
706            self.configured_peers.clone(),
707            &self.chain_peers,
708            &self.endpoint_manager,
709        );
710
711        // update_known_peers_versioned forwards addresses through the mailbox.
712        // Drain it now so addresses are applied before the event loop starts.
713        while let Ok(msg) = self.mailbox.try_recv() {
714            self.handle_message(msg);
715        }
716    }
717
718    fn save_stored_peers(&self) {
719        let Some(path) = &self.store_path else {
720            return;
721        };
722        let state = self.state.read().unwrap();
723        let peers_to_save: Vec<SignedVersionedNodeInfo> = state
724            .known_peers_v2
725            .iter()
726            .filter(|(pid, _)| self.is_trusted_peer(pid))
727            .map(|(_, verified)| verified.inner().clone())
728            .collect();
729        drop(state);
730        save_stored_peers(path, &peers_to_save);
731    }
732
733    fn handle_peer_event(&mut self, peer_event: Result<PeerEvent, RecvError>) {
734        match peer_event {
735            Ok(PeerEvent::NewPeer(peer_id)) => {
736                if let Some(peer) = self.network.peer(peer_id) {
737                    self.state
738                        .write()
739                        .unwrap()
740                        .connected_peers
741                        .insert(peer_id, ());
742
743                    // Query the new node for any peers
744                    self.tasks.spawn(query_peer_for_their_known_peers(
745                        peer,
746                        self.discovery_config.clone(),
747                        self.state.clone(),
748                        self.configured_peers.clone(),
749                        self.chain_peers.clone(),
750                        self.endpoint_manager.clone(),
751                        self.mailbox_sender(),
752                    ));
753                }
754            }
755            Ok(PeerEvent::LostPeer(peer_id, _)) => {
756                self.state.write().unwrap().connected_peers.remove(&peer_id);
757            }
758
759            Err(RecvError::Closed) => {
760                panic!("PeerEvent channel shouldn't be able to be closed");
761            }
762
763            Err(RecvError::Lagged(_)) => {
764                trace!("State-Sync fell behind processing PeerEvents");
765            }
766        }
767    }
768
769    fn handle_tick(&mut self, _now: std::time::Instant, now_unix: u64) {
770        self.update_our_info_timestamp(now_unix);
771
772        self.tasks
773            .spawn(query_connected_peers_for_their_known_peers(
774                self.network.clone(),
775                self.discovery_config.clone(),
776                self.state.clone(),
777                self.configured_peers.clone(),
778                self.chain_peers.clone(),
779                self.endpoint_manager.clone(),
780                self.mailbox_sender(),
781            ));
782
783        // Cull old peers older than a day
784        let mut culled_trusted_peers = Vec::new();
785        {
786            let mut state = self.state.write().unwrap();
787            state
788                .known_peers
789                .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms) < ONE_DAY_MILLISECONDS);
790            state.known_peers_v2.retain(|k, v| {
791                let keep = now_unix.saturating_sub(v.timestamp_ms()) < ONE_DAY_MILLISECONDS;
792                if !keep && self.is_trusted_peer(k) {
793                    culled_trusted_peers.push(*k);
794                }
795                keep
796            });
797        }
798
799        // Clear Discovery-sourced addresses for trusted peers whose
800        // signed info expired, allowing fallback to lower-priority sources.
801        for peer_id in culled_trusted_peers {
802            self.endpoint_manager
803                .clear_source(peer_id, AddressSource::Discovery);
804        }
805
806        // Clean out the pending_dials
807        self.pending_dials.retain(|_k, v| !v.is_finished());
808        if let Some(abort_handle) = &self.dial_seed_peers_task
809            && abort_handle.is_finished()
810        {
811            self.dial_seed_peers_task = None;
812        }
813
814        let cooldown = self.discovery_config.peer_failure_cooldown();
815        self.peer_cooldowns
816            .retain(|_, since| since.elapsed() < cooldown);
817
818        // Spawn some dials
819        let state = self.state.read().unwrap();
820        let our_peer_id = self.network.peer_id();
821
822        // Recompute the number of distinct peers with an external address from the
823        // deduped union of both known-peer maps. With v3 enabled a peer can appear
824        // in both maps, so a HashSet avoids double-counting; recomputing each tick
825        // (rather than inc/dec) also avoids drift when expired entries are culled.
826        let mut peers_with_external_address: HashSet<PeerId> = HashSet::new();
827        for (peer_id, info) in state.known_peers.iter() {
828            if !info.addresses.is_empty() {
829                peers_with_external_address.insert(*peer_id);
830            }
831        }
832        for (peer_id, info) in state.known_peers_v2.iter() {
833            if !info.p2p_addresses().is_empty() {
834                peers_with_external_address.insert(*peer_id);
835            }
836        }
837        self.metrics
838            .set_num_peers_with_external_address(peers_with_external_address.len() as i64);
839
840        // Collect eligible peers from both known_peers (V2) and known_peers_v2 (V3),
841        // preferring fresher timestamps when a peer appears in both maps.
842        // Partition into preferred (not on cooldown) and cooldown peers.
843        let mut preferred: HashMap<PeerId, NodeInfo> = HashMap::new();
844        let mut cooldown_peers: HashMap<PeerId, NodeInfo> = HashMap::new();
845
846        for (peer_id, info) in state.known_peers.iter() {
847            if *peer_id != our_peer_id
848                && !info.addresses.is_empty()
849                && !state.connected_peers.contains_key(peer_id)
850                && !self.pending_dials.contains_key(peer_id)
851                && !state.peer_addresses.contains_key(peer_id)
852            {
853                if self.peer_cooldowns.contains_key(peer_id) {
854                    cooldown_peers.insert(*peer_id, info.data().clone());
855                } else {
856                    preferred.insert(*peer_id, info.data().clone());
857                }
858            }
859        }
860        for (peer_id, info) in state.known_peers_v2.iter() {
861            let p2p_addresses = info.p2p_addresses();
862            if *peer_id != our_peer_id
863                && !p2p_addresses.is_empty()
864                && !state.connected_peers.contains_key(peer_id)
865                && !self.pending_dials.contains_key(peer_id)
866                && !state.peer_addresses.contains_key(peer_id)
867            {
868                let node_info = NodeInfo {
869                    peer_id: *peer_id,
870                    addresses: p2p_addresses.to_vec(),
871                    timestamp_ms: info.timestamp_ms(),
872                    access_type: info.access_type(),
873                };
874                if self.peer_cooldowns.contains_key(peer_id) {
875                    if cooldown_peers
876                        .get(peer_id)
877                        .is_none_or(|existing| info.timestamp_ms() > existing.timestamp_ms)
878                    {
879                        cooldown_peers.insert(*peer_id, node_info);
880                    }
881                } else if preferred
882                    .get(peer_id)
883                    .is_none_or(|existing| info.timestamp_ms() > existing.timestamp_ms)
884                {
885                    preferred.insert(*peer_id, node_info);
886                }
887            }
888        }
889
890        let number_of_connections = state.connected_peers.len();
891        let number_to_dial = self
892            .discovery_config
893            .target_concurrent_connections()
894            .saturating_sub(number_of_connections);
895
896        use rand::seq::IteratorRandom;
897        let mut rng = rand::thread_rng();
898
899        let mut to_dial: Vec<_> = preferred
900            .into_iter()
901            .choose_multiple(&mut rng, number_to_dial);
902
903        let remaining = number_to_dial.saturating_sub(to_dial.len());
904        to_dial.extend(
905            cooldown_peers
906                .into_iter()
907                .choose_multiple(&mut rng, remaining),
908        );
909
910        for (peer_id, info) in to_dial {
911            let abort_handle = self
912                .tasks
913                .spawn(try_to_connect_to_peer(self.network.clone(), info));
914            self.pending_dials.insert(peer_id, abort_handle);
915        }
916
917        // If we aren't connected to anything and we aren't presently trying to connect to anyone
918        // we need to try the configured peers with High affinity (seed peers)
919        let has_peers_to_dial = || {
920            self.configured_peers
921                .values()
922                .any(|p| p.affinity == PeerAffinity::High)
923                || !self.unidentified_seed_peers.is_empty()
924        };
925        if self.dial_seed_peers_task.is_none()
926            && state.connected_peers.is_empty()
927            && self.pending_dials.is_empty()
928            && has_peers_to_dial()
929        {
930            let abort_handle = self.tasks.spawn(try_to_connect_to_seed_peers(
931                self.network.clone(),
932                self.discovery_config.clone(),
933                self.configured_peers.clone(),
934                self.unidentified_seed_peers.clone(),
935            ));
936
937            self.dial_seed_peers_task = Some(abort_handle);
938        }
939    }
940
941    fn is_trusted_peer(&self, peer_id: &PeerId) -> bool {
942        is_trusted_peer(peer_id, &self.configured_peers, &self.chain_peers)
943    }
944
945    fn mailbox_sender(&self) -> mpsc::Sender<DiscoveryMessage> {
946        self.mailbox_tx.clone()
947    }
948}
949
950async fn try_to_connect_to_peer(network: Network, info: NodeInfo) {
951    debug!("Connecting to peer {info:?}");
952    for multiaddr in &info.addresses {
953        if let Ok(address) = multiaddr.to_anemo_address() {
954            // Ignore the result and just log the error if there is one
955            if network
956                .connect_with_peer_id(address, info.peer_id)
957                .await
958                .tap_err(|e| {
959                    debug!(
960                        "error dialing {} at address '{}': {e}",
961                        info.peer_id.short_display(4),
962                        multiaddr
963                    )
964                })
965                .is_ok()
966            {
967                return;
968            }
969        }
970    }
971}
972
973async fn try_to_connect_to_seed_peers(
974    network: Network,
975    config: Arc<DiscoveryConfig>,
976    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
977    unidentified_seed_peers: Vec<anemo::types::Address>,
978) {
979    let high_affinity_peers: Vec<_> = configured_peers
980        .values()
981        .filter(|p| p.affinity == PeerAffinity::High)
982        .cloned()
983        .collect();
984    debug!(
985        ?high_affinity_peers,
986        ?unidentified_seed_peers,
987        "Connecting to seed peers"
988    );
989    let network = &network;
990
991    // Attempt connection to all high-affinity and seed peers.
992    let with_peer_id = high_affinity_peers.into_iter().flat_map(|peer_info| {
993        peer_info
994            .address
995            .into_iter()
996            .map(move |addr| (Some(peer_info.peer_id), addr))
997    });
998    let without_peer_id = unidentified_seed_peers.into_iter().map(|addr| (None, addr));
999    futures::stream::iter(with_peer_id.chain(without_peer_id))
1000        .for_each_concurrent(
1001            config.target_concurrent_connections(),
1002            |(peer_id, address)| async move {
1003                // Ignore the result and just log the error if there is one
1004                let _ = if let Some(peer_id) = peer_id {
1005                    network
1006                        .connect_with_peer_id(address.clone(), peer_id)
1007                        .await
1008                        .tap_err(|e| {
1009                            debug!(
1010                                "error dialing peer {} at '{}': {e}",
1011                                peer_id.short_display(4),
1012                                address
1013                            )
1014                        })
1015                } else {
1016                    network
1017                        .connect(address.clone())
1018                        .await
1019                        .tap_err(|e| debug!("error dialing address '{}': {e}", address))
1020                };
1021            },
1022        )
1023        .await;
1024}
1025
1026async fn query_peer_for_known_peers_v2(peer: Peer) -> Option<Vec<SignedNodeInfo>> {
1027    let mut client = DiscoveryClient::new(peer);
1028    let request = Request::new(()).with_timeout(TIMEOUT);
1029    client
1030        .get_known_peers_v2(request)
1031        .await
1032        .ok()
1033        .map(Response::into_inner)
1034        .map(
1035            |GetKnownPeersResponseV2 {
1036                 own_info,
1037                 mut known_peers,
1038             }| {
1039                if !own_info.addresses.is_empty() {
1040                    known_peers.push(own_info)
1041                }
1042                known_peers
1043            },
1044        )
1045}
1046
1047async fn query_peer_for_their_known_peers(
1048    peer: Peer,
1049    discovery_config: Arc<DiscoveryConfig>,
1050    state: Arc<RwLock<State>>,
1051    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1052    chain_peers: Arc<RwLock<HashSet<PeerId>>>,
1053    endpoint_manager: EndpointManager,
1054    mailbox_tx: mpsc::Sender<DiscoveryMessage>,
1055) {
1056    // Query V3 concurrently with V2 when enabled
1057    if discovery_config.use_get_known_peers_v3() {
1058        let our_info_v2 = state.read().unwrap().our_info_v2.clone();
1059        if let Some(own_info) = our_info_v2 {
1060            let peer_for_v3 = peer.clone();
1061            let v3_query = async move {
1062                let mut client = DiscoveryClient::new(peer_for_v3);
1063                let request =
1064                    Request::new(GetKnownPeersRequestV3 { own_info }).with_timeout(TIMEOUT);
1065                client
1066                    .get_known_peers_v3(request)
1067                    .await
1068                    .ok()
1069                    .map(Response::into_inner)
1070                    .map(
1071                        |GetKnownPeersResponseV3 {
1072                             own_info,
1073                             mut known_peers,
1074                         }| {
1075                            if !own_info.p2p_addresses().is_empty() {
1076                                known_peers.push(own_info)
1077                            }
1078                            known_peers
1079                        },
1080                    )
1081            };
1082
1083            let (found_peers_v2, found_peers_v3) =
1084                tokio::join!(query_peer_for_known_peers_v2(peer), v3_query);
1085
1086            if let Some(found_peers) = found_peers_v2 {
1087                update_known_peers(
1088                    state.clone(),
1089                    found_peers,
1090                    configured_peers.clone(),
1091                    &chain_peers,
1092                );
1093            }
1094            if let Some(found_peers) = found_peers_v3 {
1095                let changed = update_known_peers_versioned(
1096                    state,
1097                    found_peers,
1098                    configured_peers,
1099                    &chain_peers,
1100                    &endpoint_manager,
1101                );
1102                if changed {
1103                    let _ = mailbox_tx.try_send(DiscoveryMessage::TrustedPeersUpdated);
1104                }
1105            }
1106            return;
1107        }
1108    }
1109
1110    // V3 not enabled or our_info_v2 not available - just run V2
1111    if let Some(found_peers) = query_peer_for_known_peers_v2(peer).await {
1112        update_known_peers(state, found_peers, configured_peers, &chain_peers);
1113    }
1114}
1115
1116async fn query_connected_peers_for_their_known_peers(
1117    network: Network,
1118    config: Arc<DiscoveryConfig>,
1119    state: Arc<RwLock<State>>,
1120    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1121    chain_peers: Arc<RwLock<HashSet<PeerId>>>,
1122    endpoint_manager: EndpointManager,
1123    mailbox_tx: mpsc::Sender<DiscoveryMessage>,
1124) {
1125    use rand::seq::IteratorRandom;
1126
1127    let peers_to_query: Vec<_> = network
1128        .peers()
1129        .into_iter()
1130        .flat_map(|id| network.peer(id))
1131        .choose_multiple(&mut rand::thread_rng(), config.peers_to_query());
1132
1133    // Query V2 to keep known_peers populated for V2 clients
1134    let v2_query = {
1135        let peers = peers_to_query.clone();
1136        let peers_to_query_count = config.peers_to_query();
1137        async move {
1138            peers
1139                .into_iter()
1140                .map(DiscoveryClient::new)
1141                .map(|mut client| async move {
1142                    let request = Request::new(()).with_timeout(TIMEOUT);
1143                    client
1144                        .get_known_peers_v2(request)
1145                        .await
1146                        .ok()
1147                        .map(Response::into_inner)
1148                        .map(
1149                            |GetKnownPeersResponseV2 {
1150                                 own_info,
1151                                 mut known_peers,
1152                             }| {
1153                                if !own_info.addresses.is_empty() {
1154                                    known_peers.push(own_info)
1155                                }
1156                                known_peers
1157                            },
1158                        )
1159                })
1160                .pipe(futures::stream::iter)
1161                .buffer_unordered(peers_to_query_count)
1162                .filter_map(std::future::ready)
1163                .flat_map(futures::stream::iter)
1164                .collect::<Vec<_>>()
1165                .await
1166        }
1167    };
1168
1169    // Additionally query V3 when enabled, concurrently with V2
1170    if config.use_get_known_peers_v3() {
1171        let our_info_v2 = state.read().unwrap().our_info_v2.clone();
1172        if let Some(own_info) = our_info_v2 {
1173            let v3_query = {
1174                let peers_to_query_count = config.peers_to_query();
1175                async move {
1176                    peers_to_query
1177                        .into_iter()
1178                        .map(DiscoveryClient::new)
1179                        .map(|mut client| {
1180                            let own_info = own_info.clone();
1181                            async move {
1182                                let request = Request::new(GetKnownPeersRequestV3 { own_info })
1183                                    .with_timeout(TIMEOUT);
1184                                client
1185                                    .get_known_peers_v3(request)
1186                                    .await
1187                                    .ok()
1188                                    .map(Response::into_inner)
1189                                    .map(
1190                                        |GetKnownPeersResponseV3 {
1191                                             own_info,
1192                                             mut known_peers,
1193                                         }| {
1194                                            if !own_info.p2p_addresses().is_empty() {
1195                                                known_peers.push(own_info)
1196                                            }
1197                                            known_peers
1198                                        },
1199                                    )
1200                            }
1201                        })
1202                        .pipe(futures::stream::iter)
1203                        .buffer_unordered(peers_to_query_count)
1204                        .filter_map(std::future::ready)
1205                        .flat_map(futures::stream::iter)
1206                        .collect::<Vec<_>>()
1207                        .await
1208                }
1209            };
1210
1211            let (found_peers_v2, found_peers_v3) = tokio::join!(v2_query, v3_query);
1212
1213            update_known_peers(
1214                state.clone(),
1215                found_peers_v2,
1216                configured_peers.clone(),
1217                &chain_peers,
1218            );
1219            let changed = update_known_peers_versioned(
1220                state,
1221                found_peers_v3,
1222                configured_peers,
1223                &chain_peers,
1224                &endpoint_manager,
1225            );
1226            if changed {
1227                let _ = mailbox_tx.try_send(DiscoveryMessage::TrustedPeersUpdated);
1228            }
1229            return;
1230        }
1231    }
1232
1233    // V3 not enabled or our_info_v2 not available - just run V2
1234    let found_peers_v2 = v2_query.await;
1235    update_known_peers(state, found_peers_v2, configured_peers, &chain_peers);
1236}
1237
1238fn update_known_peers(
1239    state: Arc<RwLock<State>>,
1240    found_peers: Vec<SignedNodeInfo>,
1241    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1242    chain_peers: &Arc<RwLock<HashSet<PeerId>>>,
1243) {
1244    use std::collections::hash_map::Entry;
1245
1246    let now_unix = now_unix();
1247    let our_peer_id = state.read().unwrap().our_info.clone().unwrap().peer_id;
1248    let known_peers = &mut state.write().unwrap().known_peers;
1249    // only take the first MAX_PEERS_TO_SEND peers
1250    for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1251        // +1 to account for the "own_info" of the serving peer
1252        // Skip peers whose timestamp is too far in the future from our clock
1253        // or that are too old
1254        if peer_info.timestamp_ms > now_unix.saturating_add(30 * 1_000) // 30 seconds
1255            || now_unix.saturating_sub(peer_info.timestamp_ms) > ONE_DAY_MILLISECONDS
1256        {
1257            continue;
1258        }
1259
1260        if peer_info.peer_id == our_peer_id {
1261            continue;
1262        }
1263
1264        let is_restricted = match peer_info.access_type {
1265            AccessType::Public => false,
1266            AccessType::Private | AccessType::Trusted => true,
1267        };
1268        if is_restricted && !is_trusted_peer(&peer_info.peer_id, &configured_peers, chain_peers) {
1269            continue;
1270        }
1271
1272        // Skip entries that have too many addresses as a means to cap the size of a node's info
1273        if peer_info.addresses.len() > MAX_ADDRESSES_PER_PEER {
1274            continue;
1275        }
1276
1277        // verify that all addresses provided are valid anemo addresses
1278        if !peer_info
1279            .addresses
1280            .iter()
1281            .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
1282        {
1283            continue;
1284        }
1285        let Ok(public_key) = Ed25519PublicKey::from_bytes(&peer_info.peer_id.0) else {
1286            debug_fatal!(
1287                // This should never happen.
1288                "Failed to convert anemo PeerId {:?} to Ed25519PublicKey",
1289                peer_info.peer_id
1290            );
1291            continue;
1292        };
1293        let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
1294        if let Err(e) = public_key.verify(&msg, peer_info.auth_sig()) {
1295            info!(
1296                "Discovery failed to verify signature for NodeInfo for peer {:?}: {e:?}",
1297                peer_info.peer_id
1298            );
1299            // TODO: consider denylisting the source of bad NodeInfo from future requests.
1300            continue;
1301        }
1302        let peer = VerifiedSignedNodeInfo::new_from_verified(peer_info);
1303
1304        match known_peers.entry(peer.peer_id) {
1305            Entry::Occupied(mut o) => {
1306                if peer.timestamp_ms > o.get().timestamp_ms {
1307                    o.insert(peer);
1308                }
1309            }
1310            Entry::Vacant(v) => {
1311                v.insert(peer);
1312            }
1313        }
1314    }
1315}
1316
1317/// Returns true if any trusted peer was inserted or updated.
1318fn update_known_peers_versioned(
1319    state: Arc<RwLock<State>>,
1320    found_peers: Vec<SignedVersionedNodeInfo>,
1321    configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1322    chain_peers: &Arc<RwLock<HashSet<PeerId>>>,
1323    endpoint_manager: &EndpointManager,
1324) -> bool {
1325    use std::collections::hash_map::Entry;
1326
1327    let now_unix = now_unix();
1328    let our_peer_id = state
1329        .read()
1330        .unwrap()
1331        .our_info_v2
1332        .as_ref()
1333        .and_then(|info| info.peer_id());
1334    let known_peers_v2 = &mut state.write().unwrap().known_peers_v2;
1335    let mut trusted_peer_changed = false;
1336
1337    for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1338        let timestamp_ms = peer_info.timestamp_ms();
1339
1340        if timestamp_ms > now_unix.saturating_add(30 * 1_000)
1341            || now_unix.saturating_sub(timestamp_ms) > ONE_DAY_MILLISECONDS
1342        {
1343            continue;
1344        }
1345
1346        let Some(peer_id) = peer_info.peer_id() else {
1347            continue;
1348        };
1349
1350        if Some(peer_id) == our_peer_id {
1351            continue;
1352        }
1353
1354        let is_restricted = match peer_info.access_type() {
1355            AccessType::Public => false,
1356            AccessType::Private | AccessType::Trusted => true,
1357        };
1358        let is_trusted = is_trusted_peer(&peer_id, &configured_peers, chain_peers);
1359        if is_restricted && !is_trusted {
1360            continue;
1361        }
1362
1363        let peer = match verify_versioned_node_info(&peer_info) {
1364            Ok(verified) => verified,
1365            Err(reason) => {
1366                info!("Discovery rejecting VersionedNodeInfo for peer {peer_id:?}: {reason}");
1367                continue;
1368            }
1369        };
1370
1371        // Forward discovered addresses for trusted peers to EndpointManager.
1372        if is_trusted && let VersionedNodeInfo::V2(info_v2) = peer_info.data() {
1373            for (endpoint_id, addrs) in &info_v2.addresses {
1374                if !addrs.is_empty() {
1375                    let _ = endpoint_manager.update_endpoint(
1376                        endpoint_id.clone(),
1377                        AddressSource::Discovery,
1378                        addrs.clone(),
1379                    );
1380                }
1381            }
1382        }
1383
1384        match known_peers_v2.entry(peer_id) {
1385            Entry::Occupied(mut o) => {
1386                if peer.timestamp_ms() > o.get().timestamp_ms() {
1387                    o.insert(peer);
1388                    if is_trusted {
1389                        trusted_peer_changed = true;
1390                    }
1391                }
1392            }
1393            Entry::Vacant(v) => {
1394                v.insert(peer);
1395                if is_trusted {
1396                    trusted_peer_changed = true;
1397                }
1398            }
1399        }
1400    }
1401
1402    trusted_peer_changed
1403}
1404
1405/// A trusted peer is one that appears in the static configured_peers (seed/allowlisted)
1406/// or was dynamically added via on-chain validator info (chain_peers).
1407pub(super) fn is_trusted_peer(
1408    peer_id: &PeerId,
1409    configured_peers: &HashMap<PeerId, PeerInfo>,
1410    chain_peers: &Arc<RwLock<HashSet<PeerId>>>,
1411) -> bool {
1412    configured_peers.contains_key(peer_id) || chain_peers.read().unwrap().contains(peer_id)
1413}
1414
1415pub(super) fn now_unix() -> u64 {
1416    use std::time::{SystemTime, UNIX_EPOCH};
1417
1418    SystemTime::now()
1419        .duration_since(UNIX_EPOCH)
1420        .unwrap()
1421        .as_millis() as u64
1422}