1use anemo::types::PeerAffinity;
5use anemo::types::PeerInfo;
6use anemo::{Network, Peer, PeerId, Request, Response, types::PeerEvent};
7use fastcrypto::ed25519::{Ed25519PublicKey, Ed25519Signature};
8use futures::StreamExt;
9use mysten_common::debug_fatal;
10use serde::{Deserialize, Serialize};
11use shared_crypto::intent::IntentScope;
12use std::{
13 collections::{BTreeMap, HashMap, HashSet},
14 path::PathBuf,
15 sync::{Arc, RwLock},
16 time::{Duration, Instant},
17};
18
19use crate::endpoint_manager::{AddressSource, EndpointId, EndpointManager};
20use store::{load_stored_peers, save_stored_peers};
21use sui_config::p2p::{AccessType, DiscoveryConfig, P2pConfig};
22use sui_types::crypto::{NetworkKeyPair, NetworkPublicKey, Signer, ToFromBytes, VerifyingKey};
23use sui_types::digests::Digest;
24use sui_types::message_envelope::{Envelope, Message, VerifiedEnvelope};
25use sui_types::multiaddr::Multiaddr;
26use tap::{Pipe, TapFallible};
27use tokio::sync::broadcast::error::RecvError;
28use tokio::sync::mpsc;
29use tokio::{
30 sync::oneshot,
31 task::{AbortHandle, JoinSet},
32};
33use tracing::{debug, info, trace};
34
35const TIMEOUT: Duration = Duration::from_secs(1);
36const ONE_DAY_MILLISECONDS: u64 = 24 * 60 * 60 * 1_000;
37const MAX_ADDRESS_LENGTH: usize = 300;
38const MAX_PEERS_TO_SEND: usize = 200;
39const MAX_ADDRESSES_PER_PEER: usize = 2;
40
41mod generated {
42 include!(concat!(env!("OUT_DIR"), "/sui.Discovery.rs"));
43}
44mod builder;
45mod metrics;
46mod server;
47mod store;
48#[cfg(test)]
49mod tests;
50
51pub use builder::{Builder, UnstartedDiscovery};
52pub use generated::{
53 discovery_client::DiscoveryClient,
54 discovery_server::{Discovery, DiscoveryServer},
55};
56pub use server::{GetKnownPeersRequestV3, GetKnownPeersResponseV2, GetKnownPeersResponseV3};
57
58#[derive(Debug)]
60pub enum DiscoveryMessage {
61 PeerAddressChange {
63 peer_id: PeerId,
64 source: AddressSource,
65 addresses: Vec<anemo::types::Address>,
66 },
67 ReceivedNodeInfo {
69 peer_info: Box<SignedVersionedNodeInfo>,
70 },
71 TrustedPeersUpdated,
74 PeerFailureReport { peer_id: PeerId },
76}
77
78#[derive(Clone, Debug)]
81pub struct Handle {
82 pub(super) _shutdown_handle: Arc<oneshot::Sender<()>>,
83 pub(super) sender: Sender,
84}
85
86impl Handle {
87 pub fn sender(&self) -> Sender {
88 self.sender.clone()
89 }
90}
91
92#[derive(Clone, Debug)]
95pub struct Sender {
96 pub(super) sender: mpsc::Sender<DiscoveryMessage>,
97}
98
99impl Sender {
100 pub fn peer_address_change(
101 &self,
102 peer_id: PeerId,
103 source: AddressSource,
104 addresses: Vec<anemo::types::Address>,
105 ) {
106 self.sender
107 .try_send(DiscoveryMessage::PeerAddressChange {
108 peer_id,
109 source,
110 addresses,
111 })
112 .expect("Discovery mailbox should not overflow or be closed")
113 }
114
115 pub fn report_peer_failure(&self, peer_id: PeerId) {
116 let _ = self
117 .sender
118 .try_send(DiscoveryMessage::PeerFailureReport { peer_id });
119 }
120}
121
122use self::metrics::Metrics;
123
124struct State {
126 our_info: Option<SignedNodeInfo>,
127 our_info_v2: Option<SignedVersionedNodeInfo>,
128 connected_peers: HashMap<PeerId, ()>,
129 known_peers: HashMap<PeerId, VerifiedSignedNodeInfo>,
130 known_peers_v2: HashMap<PeerId, VerifiedSignedVersionedNodeInfo>,
131 peer_addresses: HashMap<PeerId, BTreeMap<AddressSource, Vec<anemo::types::Address>>>,
132}
133
134#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
139pub struct NodeInfo {
140 pub peer_id: PeerId,
141 pub addresses: Vec<Multiaddr>,
142
143 pub timestamp_ms: u64,
147
148 pub access_type: AccessType,
150}
151
152impl NodeInfo {
153 fn sign(self, keypair: &NetworkKeyPair) -> SignedNodeInfo {
154 let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
155 let sig = keypair.sign(&msg);
156 SignedNodeInfo::new_from_data_and_sig(self, sig)
157 }
158}
159
160pub type SignedNodeInfo = Envelope<NodeInfo, Ed25519Signature>;
161
162pub type VerifiedSignedNodeInfo = VerifiedEnvelope<NodeInfo, Ed25519Signature>;
163
164#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
165pub struct NodeInfoDigest(Digest);
166
167impl NodeInfoDigest {
168 pub const fn new(digest: [u8; 32]) -> Self {
169 Self(Digest::new(digest))
170 }
171}
172
173impl Message for NodeInfo {
174 type DigestType = NodeInfoDigest;
175 const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
176
177 fn digest(&self) -> Self::DigestType {
178 unreachable!("NodeInfoDigest is not used today")
179 }
180}
181
182#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
185pub struct NodeInfoV2 {
186 pub addresses: BTreeMap<EndpointId, Vec<Multiaddr>>,
187 pub timestamp_ms: u64,
188 pub access_type: AccessType,
189}
190
191impl NodeInfoV2 {
192 pub fn peer_id(&self) -> Option<PeerId> {
195 self.addresses.keys().find_map(|k| match k {
196 EndpointId::P2p(peer_id) => Some(*peer_id),
197 EndpointId::Consensus(_) => None,
198 })
199 }
200
201 pub fn p2p_addresses(&self) -> &[Multiaddr] {
202 self.addresses
203 .iter()
204 .find_map(|(k, v)| match k {
205 EndpointId::P2p(_) => Some(v.as_slice()),
206 EndpointId::Consensus(_) => None,
207 })
208 .unwrap_or(&[])
209 }
210}
211
212#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
214pub enum VersionedNodeInfo {
215 V1(NodeInfo),
216 V2(NodeInfoV2),
217}
218
219impl VersionedNodeInfo {
220 pub fn peer_id(&self) -> Option<PeerId> {
221 match self {
222 VersionedNodeInfo::V1(info) => Some(info.peer_id),
223 VersionedNodeInfo::V2(info) => info.peer_id(),
224 }
225 }
226
227 pub fn timestamp_ms(&self) -> u64 {
228 match self {
229 VersionedNodeInfo::V1(info) => info.timestamp_ms,
230 VersionedNodeInfo::V2(info) => info.timestamp_ms,
231 }
232 }
233
234 pub fn access_type(&self) -> AccessType {
235 match self {
236 VersionedNodeInfo::V1(info) => info.access_type,
237 VersionedNodeInfo::V2(info) => info.access_type,
238 }
239 }
240
241 pub fn p2p_addresses(&self) -> &[Multiaddr] {
242 match self {
243 VersionedNodeInfo::V1(info) => &info.addresses,
244 VersionedNodeInfo::V2(info) => info.p2p_addresses(),
245 }
246 }
247
248 pub fn sign(self, keypair: &NetworkKeyPair) -> SignedVersionedNodeInfo {
249 let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
250 let sig = keypair.sign(&msg);
251 SignedVersionedNodeInfo::new_from_data_and_sig(self, sig)
252 }
253}
254
255#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
256pub struct VersionedNodeInfoDigest(Digest);
257
258impl Message for VersionedNodeInfo {
259 type DigestType = VersionedNodeInfoDigest;
260 const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
261
262 fn digest(&self) -> Self::DigestType {
263 unreachable!("VersionedNodeInfoDigest is not used today")
264 }
265}
266
267pub type SignedVersionedNodeInfo = Envelope<VersionedNodeInfo, Ed25519Signature>;
268pub type VerifiedSignedVersionedNodeInfo = VerifiedEnvelope<VersionedNodeInfo, Ed25519Signature>;
269
270fn verify_versioned_node_info(
280 peer_info: &SignedVersionedNodeInfo,
281) -> Result<VerifiedSignedVersionedNodeInfo, &'static str> {
282 let peer_id = peer_info.peer_id().ok_or("missing P2P peer_id")?;
283
284 let public_key =
285 Ed25519PublicKey::from_bytes(&peer_id.0).map_err(|_| "invalid peer_id public key")?;
286
287 let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
288 public_key
289 .verify(&msg, peer_info.auth_sig())
290 .map_err(|_| "signature verification failed")?;
291
292 match peer_info.data() {
293 VersionedNodeInfo::V1(info) => {
294 if info.addresses.len() > MAX_ADDRESSES_PER_PEER {
295 return Err("too many addresses");
296 }
297 if !info
298 .addresses
299 .iter()
300 .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
301 {
302 return Err("invalid address");
303 }
304 }
305 VersionedNodeInfo::V2(info_v2) => {
306 let mut seen_variants = Vec::new();
308 for endpoint_id in info_v2.addresses.keys() {
309 let variant = std::mem::discriminant(endpoint_id);
310 if seen_variants.contains(&variant) {
311 return Err("duplicate endpoint variant");
312 }
313 seen_variants.push(variant);
314 }
315
316 for (endpoint_id, addrs) in &info_v2.addresses {
317 if addrs.len() > MAX_ADDRESSES_PER_PEER {
318 return Err("too many addresses for endpoint");
319 }
320 if !addrs.iter().all(|addr| addr.len() < MAX_ADDRESS_LENGTH) {
321 return Err("address too long");
322 }
323 if matches!(endpoint_id, EndpointId::P2p(_))
324 && !addrs.iter().all(|addr| addr.to_anemo_address().is_ok())
325 {
326 return Err("invalid P2P address");
327 }
328 }
329
330 let identities_valid = info_v2.addresses.keys().all(|eid| match eid {
331 EndpointId::P2p(_) => true,
332 EndpointId::Consensus(pubkey) => pubkey.as_bytes() == peer_id.0,
333 });
334 if !identities_valid {
335 return Err("non-P2P endpoint identity mismatch");
336 }
337 }
338 }
339
340 Ok(VerifiedSignedVersionedNodeInfo::new_from_verified(
341 peer_info.clone(),
342 ))
343}
344
345struct DiscoveryEventLoop {
346 config: P2pConfig,
347 discovery_config: Arc<DiscoveryConfig>,
348 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
349 chain_peers: Arc<RwLock<HashSet<PeerId>>>,
350 unidentified_seed_peers: Vec<anemo::types::Address>,
351 network: Network,
352 keypair: NetworkKeyPair,
353 tasks: JoinSet<()>,
354 pending_dials: HashMap<PeerId, AbortHandle>,
355 dial_seed_peers_task: Option<AbortHandle>,
356 shutdown_handle: oneshot::Receiver<()>,
357 state: Arc<RwLock<State>>,
358 mailbox: mpsc::Receiver<DiscoveryMessage>,
359 mailbox_tx: mpsc::Sender<DiscoveryMessage>,
360 metrics: Metrics,
361 consensus_external_address: Option<Multiaddr>,
362 endpoint_manager: EndpointManager,
363 store_path: Option<PathBuf>,
364 peer_cooldowns: HashMap<PeerId, Instant>,
365}
366
367impl DiscoveryEventLoop {
368 pub async fn start(mut self) {
369 info!("Discovery started");
370
371 self.construct_our_info();
372 self.configure_preferred_peers();
373 self.load_stored_peers_on_startup();
374
375 let mut interval = tokio::time::interval(self.discovery_config.interval_period());
376 let mut peer_events = {
377 let (subscriber, _peers) = self.network.subscribe().unwrap();
378 subscriber
379 };
380
381 loop {
382 tokio::select! {
383 now = interval.tick() => {
384 let now_unix = now_unix();
385 self.handle_tick(now.into_std(), now_unix);
386 }
387 peer_event = peer_events.recv() => {
388 self.handle_peer_event(peer_event);
389 },
390 Some(message) = self.mailbox.recv() => {
391 self.handle_message(message);
392 }
393 Some(task_result) = self.tasks.join_next() => {
394 match task_result {
395 Ok(()) => {},
396 Err(e) => {
397 if e.is_cancelled() {
398 } else if e.is_panic() {
400 std::panic::resume_unwind(e.into_panic());
402 } else {
403 panic!("task failed: {e}");
404 }
405 },
406 };
407 },
408 _ = &mut self.shutdown_handle => {
410 break;
411 }
412 }
413 }
414
415 self.save_stored_peers();
416 info!("Discovery ended");
417 }
418
419 fn handle_message(&mut self, message: DiscoveryMessage) {
420 match message {
421 DiscoveryMessage::PeerAddressChange {
422 peer_id,
423 source,
424 addresses,
425 } => {
426 self.handle_peer_address_change(peer_id, source, addresses);
427 }
428 DiscoveryMessage::ReceivedNodeInfo { peer_info } => {
429 let changed = update_known_peers_versioned(
430 self.state.clone(),
431 vec![*peer_info],
432 self.configured_peers.clone(),
433 &self.chain_peers,
434 &self.endpoint_manager,
435 );
436 if changed {
437 self.save_stored_peers();
438 }
439 }
440 DiscoveryMessage::TrustedPeersUpdated => {
441 self.save_stored_peers();
442 }
443 DiscoveryMessage::PeerFailureReport { peer_id } => {
444 self.handle_peer_failure_report(peer_id);
445 }
446 }
447 }
448
449 fn handle_peer_failure_report(&mut self, peer_id: PeerId) {
450 if self.is_trusted_peer(&peer_id) {
451 info!(?peer_id, "ignoring failure report for trusted peer");
452 return;
453 }
454 let min_peers = self.discovery_config.min_peers_for_disconnect();
455 let connected_count = self.state.read().unwrap().connected_peers.len();
456 if connected_count < min_peers {
457 info!(
458 ?peer_id,
459 connected_count, min_peers, "skipping disconnect, too few connected peers"
460 );
461 return;
462 }
463 info!(
464 ?peer_id,
465 "peer failure reported, disconnecting and adding cooldown"
466 );
467 let _ = self.network.disconnect(peer_id);
468 self.peer_cooldowns.insert(peer_id, Instant::now());
469 }
470
471 fn construct_our_info(&mut self) {
472 if self.state.read().unwrap().our_info.is_some() {
473 return;
474 }
475
476 let peer_id = self.network.peer_id();
477 let timestamp_ms = now_unix();
478 let access_type = self.discovery_config.access_type();
479
480 let addresses: Vec<Multiaddr> = self
481 .config
482 .external_address
483 .clone()
484 .and_then(|addr| addr.to_anemo_address().ok().map(|_| addr))
485 .into_iter()
486 .collect();
487
488 let our_info = NodeInfo {
489 peer_id,
490 addresses: addresses.clone(),
491 timestamp_ms,
492 access_type,
493 }
494 .sign(&self.keypair);
495
496 let mut addresses_map = BTreeMap::new();
497 addresses_map.insert(EndpointId::P2p(peer_id), addresses);
498 if let Some(consensus_addr) = &self.consensus_external_address {
499 let network_pubkey =
503 NetworkPublicKey::from_bytes(&peer_id.0).expect("PeerId is a valid public key");
504 addresses_map.insert(
505 EndpointId::Consensus(network_pubkey),
506 vec![consensus_addr.clone()],
507 );
508 }
509 let our_info_v2 = VersionedNodeInfo::V2(NodeInfoV2 {
510 addresses: addresses_map,
511 timestamp_ms,
512 access_type,
513 })
514 .sign(&self.keypair);
515
516 let mut state = self.state.write().unwrap();
517 state.our_info = Some(our_info);
518 state.our_info_v2 = Some(our_info_v2);
519 }
520
521 fn configure_preferred_peers(&mut self) {
522 let peers: Vec<_> = self.configured_peers.values().cloned().collect();
523 for peer_info in peers {
524 debug!(?peer_info, "Add configured preferred peer");
525 match peer_info.affinity {
526 PeerAffinity::High => {
527 self.handle_peer_address_change(
528 peer_info.peer_id,
529 AddressSource::Seed,
530 peer_info.address,
531 );
532 }
533 _ => {
534 self.network.known_peers().insert(peer_info);
537 }
538 }
539 }
540 }
541
542 fn update_our_info_timestamp(&mut self, now_unix: u64) {
543 let state = &mut self.state.write().unwrap();
544
545 if let Some(our_info) = &state.our_info {
546 let mut data = our_info.data().clone();
547 data.timestamp_ms = now_unix;
548 state.our_info = Some(data.sign(&self.keypair));
549 }
550
551 if let Some(our_info_v2) = &state.our_info_v2 {
552 let mut data = our_info_v2.data().clone();
553 match &mut data {
554 VersionedNodeInfo::V1(info) => info.timestamp_ms = now_unix,
555 VersionedNodeInfo::V2(info) => info.timestamp_ms = now_unix,
556 }
557 state.our_info_v2 = Some(data.sign(&self.keypair));
558 }
559 }
560
561 fn handle_peer_address_change(
562 &mut self,
563 peer_id: PeerId,
564 source: AddressSource,
565 addresses: Vec<anemo::types::Address>,
566 ) {
567 debug!(
568 ?peer_id,
569 ?source,
570 ?addresses,
571 "Received peer address change"
572 );
573
574 if source == AddressSource::Chain {
576 if addresses.is_empty() {
577 self.chain_peers.write().unwrap().remove(&peer_id);
578 } else {
579 self.chain_peers.write().unwrap().insert(peer_id);
580 }
581 }
582
583 {
585 let mut state = self.state.write().unwrap();
586 let source_map = state.peer_addresses.entry(peer_id).or_default();
587
588 if addresses.is_empty() {
589 source_map.remove(&source);
590 if source_map.is_empty() {
591 state.peer_addresses.remove(&peer_id);
592 }
593 } else {
594 source_map.insert(source, addresses);
595 }
596
597 if source == AddressSource::Chain
603 && !state
604 .peer_addresses
605 .get(&peer_id)
606 .is_some_and(|s| s.contains_key(&AddressSource::Discovery))
607 && let Some(addrs) = state
608 .known_peers_v2
609 .get(&peer_id)
610 .and_then(|info| match info.data() {
611 VersionedNodeInfo::V2(v2) => Some(v2.p2p_addresses()),
612 _ => None,
613 })
614 {
615 let anemo_addrs: Vec<_> = addrs
616 .iter()
617 .filter_map(|a| a.to_anemo_address().ok())
618 .collect();
619 if !anemo_addrs.is_empty() {
620 state
621 .peer_addresses
622 .entry(peer_id)
623 .or_default()
624 .insert(AddressSource::Discovery, anemo_addrs);
625 }
626 }
627 }
628
629 self.reconfigure_peer_addresses(peer_id);
631 }
632
633 fn reconfigure_peer_addresses(&mut self, peer_id: PeerId) {
636 let priority = self
637 .state
638 .read()
639 .unwrap()
640 .peer_addresses
641 .get(&peer_id)
642 .and_then(|sources| {
643 sources
644 .first_key_value()
645 .map(|(source, addrs)| (*source, addrs.clone()))
646 });
647
648 if self.is_trusted_peer(&peer_id) {
652 self.metrics.set_active_p2p_address_source(
653 &peer_id.to_string(),
654 priority.as_ref().map(|(source, _)| *source),
655 );
656 }
657
658 let priority_addresses = priority.map(|(_, addrs)| addrs).unwrap_or_default();
659 let current_addresses = self
660 .network
661 .known_peers()
662 .get(&peer_id)
663 .map(|info| info.address.clone())
664 .unwrap_or_default();
665 if priority_addresses != current_addresses {
666 let new_peer_info = PeerInfo {
667 peer_id,
668 affinity: PeerAffinity::High,
669 address: priority_addresses.clone(),
670 };
671
672 self.network.known_peers().insert(new_peer_info);
673
674 if !current_addresses.is_empty() {
676 let _ = self.network.disconnect(peer_id);
677
678 if let Some(address) = priority_addresses.first().cloned() {
679 let network = self.network.clone();
680 self.tasks.spawn(async move {
681 let _ = network.connect_with_peer_id(address, peer_id).await;
683 });
684 }
685 }
686 }
687 }
688
689 fn load_stored_peers_on_startup(&mut self) {
690 let Some(path) = &self.store_path else {
691 return;
692 };
693 let entries = load_stored_peers(path);
694 if entries.is_empty() {
695 return;
696 }
697 info!(
698 count = entries.len(),
699 "Loaded stored peer addresses from {}",
700 path.display()
701 );
702
703 update_known_peers_versioned(
704 self.state.clone(),
705 entries,
706 self.configured_peers.clone(),
707 &self.chain_peers,
708 &self.endpoint_manager,
709 );
710
711 while let Ok(msg) = self.mailbox.try_recv() {
714 self.handle_message(msg);
715 }
716 }
717
718 fn save_stored_peers(&self) {
719 let Some(path) = &self.store_path else {
720 return;
721 };
722 let state = self.state.read().unwrap();
723 let peers_to_save: Vec<SignedVersionedNodeInfo> = state
724 .known_peers_v2
725 .iter()
726 .filter(|(pid, _)| self.is_trusted_peer(pid))
727 .map(|(_, verified)| verified.inner().clone())
728 .collect();
729 drop(state);
730 save_stored_peers(path, &peers_to_save);
731 }
732
733 fn handle_peer_event(&mut self, peer_event: Result<PeerEvent, RecvError>) {
734 match peer_event {
735 Ok(PeerEvent::NewPeer(peer_id)) => {
736 if let Some(peer) = self.network.peer(peer_id) {
737 self.state
738 .write()
739 .unwrap()
740 .connected_peers
741 .insert(peer_id, ());
742
743 self.tasks.spawn(query_peer_for_their_known_peers(
745 peer,
746 self.discovery_config.clone(),
747 self.state.clone(),
748 self.configured_peers.clone(),
749 self.chain_peers.clone(),
750 self.endpoint_manager.clone(),
751 self.mailbox_sender(),
752 ));
753 }
754 }
755 Ok(PeerEvent::LostPeer(peer_id, _)) => {
756 self.state.write().unwrap().connected_peers.remove(&peer_id);
757 }
758
759 Err(RecvError::Closed) => {
760 panic!("PeerEvent channel shouldn't be able to be closed");
761 }
762
763 Err(RecvError::Lagged(_)) => {
764 trace!("State-Sync fell behind processing PeerEvents");
765 }
766 }
767 }
768
769 fn handle_tick(&mut self, _now: std::time::Instant, now_unix: u64) {
770 self.update_our_info_timestamp(now_unix);
771
772 self.tasks
773 .spawn(query_connected_peers_for_their_known_peers(
774 self.network.clone(),
775 self.discovery_config.clone(),
776 self.state.clone(),
777 self.configured_peers.clone(),
778 self.chain_peers.clone(),
779 self.endpoint_manager.clone(),
780 self.mailbox_sender(),
781 ));
782
783 let mut culled_trusted_peers = Vec::new();
785 {
786 let mut state = self.state.write().unwrap();
787 state
788 .known_peers
789 .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms) < ONE_DAY_MILLISECONDS);
790 state.known_peers_v2.retain(|k, v| {
791 let keep = now_unix.saturating_sub(v.timestamp_ms()) < ONE_DAY_MILLISECONDS;
792 if !keep && self.is_trusted_peer(k) {
793 culled_trusted_peers.push(*k);
794 }
795 keep
796 });
797 }
798
799 for peer_id in culled_trusted_peers {
802 self.endpoint_manager
803 .clear_source(peer_id, AddressSource::Discovery);
804 }
805
806 self.pending_dials.retain(|_k, v| !v.is_finished());
808 if let Some(abort_handle) = &self.dial_seed_peers_task
809 && abort_handle.is_finished()
810 {
811 self.dial_seed_peers_task = None;
812 }
813
814 let cooldown = self.discovery_config.peer_failure_cooldown();
815 self.peer_cooldowns
816 .retain(|_, since| since.elapsed() < cooldown);
817
818 let state = self.state.read().unwrap();
820 let our_peer_id = self.network.peer_id();
821
822 let mut peers_with_external_address: HashSet<PeerId> = HashSet::new();
827 for (peer_id, info) in state.known_peers.iter() {
828 if !info.addresses.is_empty() {
829 peers_with_external_address.insert(*peer_id);
830 }
831 }
832 for (peer_id, info) in state.known_peers_v2.iter() {
833 if !info.p2p_addresses().is_empty() {
834 peers_with_external_address.insert(*peer_id);
835 }
836 }
837 self.metrics
838 .set_num_peers_with_external_address(peers_with_external_address.len() as i64);
839
840 let mut preferred: HashMap<PeerId, NodeInfo> = HashMap::new();
844 let mut cooldown_peers: HashMap<PeerId, NodeInfo> = HashMap::new();
845
846 for (peer_id, info) in state.known_peers.iter() {
847 if *peer_id != our_peer_id
848 && !info.addresses.is_empty()
849 && !state.connected_peers.contains_key(peer_id)
850 && !self.pending_dials.contains_key(peer_id)
851 && !state.peer_addresses.contains_key(peer_id)
852 {
853 if self.peer_cooldowns.contains_key(peer_id) {
854 cooldown_peers.insert(*peer_id, info.data().clone());
855 } else {
856 preferred.insert(*peer_id, info.data().clone());
857 }
858 }
859 }
860 for (peer_id, info) in state.known_peers_v2.iter() {
861 let p2p_addresses = info.p2p_addresses();
862 if *peer_id != our_peer_id
863 && !p2p_addresses.is_empty()
864 && !state.connected_peers.contains_key(peer_id)
865 && !self.pending_dials.contains_key(peer_id)
866 && !state.peer_addresses.contains_key(peer_id)
867 {
868 let node_info = NodeInfo {
869 peer_id: *peer_id,
870 addresses: p2p_addresses.to_vec(),
871 timestamp_ms: info.timestamp_ms(),
872 access_type: info.access_type(),
873 };
874 if self.peer_cooldowns.contains_key(peer_id) {
875 if cooldown_peers
876 .get(peer_id)
877 .is_none_or(|existing| info.timestamp_ms() > existing.timestamp_ms)
878 {
879 cooldown_peers.insert(*peer_id, node_info);
880 }
881 } else if preferred
882 .get(peer_id)
883 .is_none_or(|existing| info.timestamp_ms() > existing.timestamp_ms)
884 {
885 preferred.insert(*peer_id, node_info);
886 }
887 }
888 }
889
890 let number_of_connections = state.connected_peers.len();
891 let number_to_dial = self
892 .discovery_config
893 .target_concurrent_connections()
894 .saturating_sub(number_of_connections);
895
896 use rand::seq::IteratorRandom;
897 let mut rng = rand::thread_rng();
898
899 let mut to_dial: Vec<_> = preferred
900 .into_iter()
901 .choose_multiple(&mut rng, number_to_dial);
902
903 let remaining = number_to_dial.saturating_sub(to_dial.len());
904 to_dial.extend(
905 cooldown_peers
906 .into_iter()
907 .choose_multiple(&mut rng, remaining),
908 );
909
910 for (peer_id, info) in to_dial {
911 let abort_handle = self
912 .tasks
913 .spawn(try_to_connect_to_peer(self.network.clone(), info));
914 self.pending_dials.insert(peer_id, abort_handle);
915 }
916
917 let has_peers_to_dial = || {
920 self.configured_peers
921 .values()
922 .any(|p| p.affinity == PeerAffinity::High)
923 || !self.unidentified_seed_peers.is_empty()
924 };
925 if self.dial_seed_peers_task.is_none()
926 && state.connected_peers.is_empty()
927 && self.pending_dials.is_empty()
928 && has_peers_to_dial()
929 {
930 let abort_handle = self.tasks.spawn(try_to_connect_to_seed_peers(
931 self.network.clone(),
932 self.discovery_config.clone(),
933 self.configured_peers.clone(),
934 self.unidentified_seed_peers.clone(),
935 ));
936
937 self.dial_seed_peers_task = Some(abort_handle);
938 }
939 }
940
941 fn is_trusted_peer(&self, peer_id: &PeerId) -> bool {
942 is_trusted_peer(peer_id, &self.configured_peers, &self.chain_peers)
943 }
944
945 fn mailbox_sender(&self) -> mpsc::Sender<DiscoveryMessage> {
946 self.mailbox_tx.clone()
947 }
948}
949
950async fn try_to_connect_to_peer(network: Network, info: NodeInfo) {
951 debug!("Connecting to peer {info:?}");
952 for multiaddr in &info.addresses {
953 if let Ok(address) = multiaddr.to_anemo_address() {
954 if network
956 .connect_with_peer_id(address, info.peer_id)
957 .await
958 .tap_err(|e| {
959 debug!(
960 "error dialing {} at address '{}': {e}",
961 info.peer_id.short_display(4),
962 multiaddr
963 )
964 })
965 .is_ok()
966 {
967 return;
968 }
969 }
970 }
971}
972
973async fn try_to_connect_to_seed_peers(
974 network: Network,
975 config: Arc<DiscoveryConfig>,
976 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
977 unidentified_seed_peers: Vec<anemo::types::Address>,
978) {
979 let high_affinity_peers: Vec<_> = configured_peers
980 .values()
981 .filter(|p| p.affinity == PeerAffinity::High)
982 .cloned()
983 .collect();
984 debug!(
985 ?high_affinity_peers,
986 ?unidentified_seed_peers,
987 "Connecting to seed peers"
988 );
989 let network = &network;
990
991 let with_peer_id = high_affinity_peers.into_iter().flat_map(|peer_info| {
993 peer_info
994 .address
995 .into_iter()
996 .map(move |addr| (Some(peer_info.peer_id), addr))
997 });
998 let without_peer_id = unidentified_seed_peers.into_iter().map(|addr| (None, addr));
999 futures::stream::iter(with_peer_id.chain(without_peer_id))
1000 .for_each_concurrent(
1001 config.target_concurrent_connections(),
1002 |(peer_id, address)| async move {
1003 let _ = if let Some(peer_id) = peer_id {
1005 network
1006 .connect_with_peer_id(address.clone(), peer_id)
1007 .await
1008 .tap_err(|e| {
1009 debug!(
1010 "error dialing peer {} at '{}': {e}",
1011 peer_id.short_display(4),
1012 address
1013 )
1014 })
1015 } else {
1016 network
1017 .connect(address.clone())
1018 .await
1019 .tap_err(|e| debug!("error dialing address '{}': {e}", address))
1020 };
1021 },
1022 )
1023 .await;
1024}
1025
1026async fn query_peer_for_known_peers_v2(peer: Peer) -> Option<Vec<SignedNodeInfo>> {
1027 let mut client = DiscoveryClient::new(peer);
1028 let request = Request::new(()).with_timeout(TIMEOUT);
1029 client
1030 .get_known_peers_v2(request)
1031 .await
1032 .ok()
1033 .map(Response::into_inner)
1034 .map(
1035 |GetKnownPeersResponseV2 {
1036 own_info,
1037 mut known_peers,
1038 }| {
1039 if !own_info.addresses.is_empty() {
1040 known_peers.push(own_info)
1041 }
1042 known_peers
1043 },
1044 )
1045}
1046
1047async fn query_peer_for_their_known_peers(
1048 peer: Peer,
1049 discovery_config: Arc<DiscoveryConfig>,
1050 state: Arc<RwLock<State>>,
1051 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1052 chain_peers: Arc<RwLock<HashSet<PeerId>>>,
1053 endpoint_manager: EndpointManager,
1054 mailbox_tx: mpsc::Sender<DiscoveryMessage>,
1055) {
1056 if discovery_config.use_get_known_peers_v3() {
1058 let our_info_v2 = state.read().unwrap().our_info_v2.clone();
1059 if let Some(own_info) = our_info_v2 {
1060 let peer_for_v3 = peer.clone();
1061 let v3_query = async move {
1062 let mut client = DiscoveryClient::new(peer_for_v3);
1063 let request =
1064 Request::new(GetKnownPeersRequestV3 { own_info }).with_timeout(TIMEOUT);
1065 client
1066 .get_known_peers_v3(request)
1067 .await
1068 .ok()
1069 .map(Response::into_inner)
1070 .map(
1071 |GetKnownPeersResponseV3 {
1072 own_info,
1073 mut known_peers,
1074 }| {
1075 if !own_info.p2p_addresses().is_empty() {
1076 known_peers.push(own_info)
1077 }
1078 known_peers
1079 },
1080 )
1081 };
1082
1083 let (found_peers_v2, found_peers_v3) =
1084 tokio::join!(query_peer_for_known_peers_v2(peer), v3_query);
1085
1086 if let Some(found_peers) = found_peers_v2 {
1087 update_known_peers(
1088 state.clone(),
1089 found_peers,
1090 configured_peers.clone(),
1091 &chain_peers,
1092 );
1093 }
1094 if let Some(found_peers) = found_peers_v3 {
1095 let changed = update_known_peers_versioned(
1096 state,
1097 found_peers,
1098 configured_peers,
1099 &chain_peers,
1100 &endpoint_manager,
1101 );
1102 if changed {
1103 let _ = mailbox_tx.try_send(DiscoveryMessage::TrustedPeersUpdated);
1104 }
1105 }
1106 return;
1107 }
1108 }
1109
1110 if let Some(found_peers) = query_peer_for_known_peers_v2(peer).await {
1112 update_known_peers(state, found_peers, configured_peers, &chain_peers);
1113 }
1114}
1115
1116async fn query_connected_peers_for_their_known_peers(
1117 network: Network,
1118 config: Arc<DiscoveryConfig>,
1119 state: Arc<RwLock<State>>,
1120 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1121 chain_peers: Arc<RwLock<HashSet<PeerId>>>,
1122 endpoint_manager: EndpointManager,
1123 mailbox_tx: mpsc::Sender<DiscoveryMessage>,
1124) {
1125 use rand::seq::IteratorRandom;
1126
1127 let peers_to_query: Vec<_> = network
1128 .peers()
1129 .into_iter()
1130 .flat_map(|id| network.peer(id))
1131 .choose_multiple(&mut rand::thread_rng(), config.peers_to_query());
1132
1133 let v2_query = {
1135 let peers = peers_to_query.clone();
1136 let peers_to_query_count = config.peers_to_query();
1137 async move {
1138 peers
1139 .into_iter()
1140 .map(DiscoveryClient::new)
1141 .map(|mut client| async move {
1142 let request = Request::new(()).with_timeout(TIMEOUT);
1143 client
1144 .get_known_peers_v2(request)
1145 .await
1146 .ok()
1147 .map(Response::into_inner)
1148 .map(
1149 |GetKnownPeersResponseV2 {
1150 own_info,
1151 mut known_peers,
1152 }| {
1153 if !own_info.addresses.is_empty() {
1154 known_peers.push(own_info)
1155 }
1156 known_peers
1157 },
1158 )
1159 })
1160 .pipe(futures::stream::iter)
1161 .buffer_unordered(peers_to_query_count)
1162 .filter_map(std::future::ready)
1163 .flat_map(futures::stream::iter)
1164 .collect::<Vec<_>>()
1165 .await
1166 }
1167 };
1168
1169 if config.use_get_known_peers_v3() {
1171 let our_info_v2 = state.read().unwrap().our_info_v2.clone();
1172 if let Some(own_info) = our_info_v2 {
1173 let v3_query = {
1174 let peers_to_query_count = config.peers_to_query();
1175 async move {
1176 peers_to_query
1177 .into_iter()
1178 .map(DiscoveryClient::new)
1179 .map(|mut client| {
1180 let own_info = own_info.clone();
1181 async move {
1182 let request = Request::new(GetKnownPeersRequestV3 { own_info })
1183 .with_timeout(TIMEOUT);
1184 client
1185 .get_known_peers_v3(request)
1186 .await
1187 .ok()
1188 .map(Response::into_inner)
1189 .map(
1190 |GetKnownPeersResponseV3 {
1191 own_info,
1192 mut known_peers,
1193 }| {
1194 if !own_info.p2p_addresses().is_empty() {
1195 known_peers.push(own_info)
1196 }
1197 known_peers
1198 },
1199 )
1200 }
1201 })
1202 .pipe(futures::stream::iter)
1203 .buffer_unordered(peers_to_query_count)
1204 .filter_map(std::future::ready)
1205 .flat_map(futures::stream::iter)
1206 .collect::<Vec<_>>()
1207 .await
1208 }
1209 };
1210
1211 let (found_peers_v2, found_peers_v3) = tokio::join!(v2_query, v3_query);
1212
1213 update_known_peers(
1214 state.clone(),
1215 found_peers_v2,
1216 configured_peers.clone(),
1217 &chain_peers,
1218 );
1219 let changed = update_known_peers_versioned(
1220 state,
1221 found_peers_v3,
1222 configured_peers,
1223 &chain_peers,
1224 &endpoint_manager,
1225 );
1226 if changed {
1227 let _ = mailbox_tx.try_send(DiscoveryMessage::TrustedPeersUpdated);
1228 }
1229 return;
1230 }
1231 }
1232
1233 let found_peers_v2 = v2_query.await;
1235 update_known_peers(state, found_peers_v2, configured_peers, &chain_peers);
1236}
1237
1238fn update_known_peers(
1239 state: Arc<RwLock<State>>,
1240 found_peers: Vec<SignedNodeInfo>,
1241 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1242 chain_peers: &Arc<RwLock<HashSet<PeerId>>>,
1243) {
1244 use std::collections::hash_map::Entry;
1245
1246 let now_unix = now_unix();
1247 let our_peer_id = state.read().unwrap().our_info.clone().unwrap().peer_id;
1248 let known_peers = &mut state.write().unwrap().known_peers;
1249 for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1251 if peer_info.timestamp_ms > now_unix.saturating_add(30 * 1_000) || now_unix.saturating_sub(peer_info.timestamp_ms) > ONE_DAY_MILLISECONDS
1256 {
1257 continue;
1258 }
1259
1260 if peer_info.peer_id == our_peer_id {
1261 continue;
1262 }
1263
1264 let is_restricted = match peer_info.access_type {
1265 AccessType::Public => false,
1266 AccessType::Private | AccessType::Trusted => true,
1267 };
1268 if is_restricted && !is_trusted_peer(&peer_info.peer_id, &configured_peers, chain_peers) {
1269 continue;
1270 }
1271
1272 if peer_info.addresses.len() > MAX_ADDRESSES_PER_PEER {
1274 continue;
1275 }
1276
1277 if !peer_info
1279 .addresses
1280 .iter()
1281 .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
1282 {
1283 continue;
1284 }
1285 let Ok(public_key) = Ed25519PublicKey::from_bytes(&peer_info.peer_id.0) else {
1286 debug_fatal!(
1287 "Failed to convert anemo PeerId {:?} to Ed25519PublicKey",
1289 peer_info.peer_id
1290 );
1291 continue;
1292 };
1293 let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
1294 if let Err(e) = public_key.verify(&msg, peer_info.auth_sig()) {
1295 info!(
1296 "Discovery failed to verify signature for NodeInfo for peer {:?}: {e:?}",
1297 peer_info.peer_id
1298 );
1299 continue;
1301 }
1302 let peer = VerifiedSignedNodeInfo::new_from_verified(peer_info);
1303
1304 match known_peers.entry(peer.peer_id) {
1305 Entry::Occupied(mut o) => {
1306 if peer.timestamp_ms > o.get().timestamp_ms {
1307 o.insert(peer);
1308 }
1309 }
1310 Entry::Vacant(v) => {
1311 v.insert(peer);
1312 }
1313 }
1314 }
1315}
1316
1317fn update_known_peers_versioned(
1319 state: Arc<RwLock<State>>,
1320 found_peers: Vec<SignedVersionedNodeInfo>,
1321 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1322 chain_peers: &Arc<RwLock<HashSet<PeerId>>>,
1323 endpoint_manager: &EndpointManager,
1324) -> bool {
1325 use std::collections::hash_map::Entry;
1326
1327 let now_unix = now_unix();
1328 let our_peer_id = state
1329 .read()
1330 .unwrap()
1331 .our_info_v2
1332 .as_ref()
1333 .and_then(|info| info.peer_id());
1334 let known_peers_v2 = &mut state.write().unwrap().known_peers_v2;
1335 let mut trusted_peer_changed = false;
1336
1337 for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1338 let timestamp_ms = peer_info.timestamp_ms();
1339
1340 if timestamp_ms > now_unix.saturating_add(30 * 1_000)
1341 || now_unix.saturating_sub(timestamp_ms) > ONE_DAY_MILLISECONDS
1342 {
1343 continue;
1344 }
1345
1346 let Some(peer_id) = peer_info.peer_id() else {
1347 continue;
1348 };
1349
1350 if Some(peer_id) == our_peer_id {
1351 continue;
1352 }
1353
1354 let is_restricted = match peer_info.access_type() {
1355 AccessType::Public => false,
1356 AccessType::Private | AccessType::Trusted => true,
1357 };
1358 let is_trusted = is_trusted_peer(&peer_id, &configured_peers, chain_peers);
1359 if is_restricted && !is_trusted {
1360 continue;
1361 }
1362
1363 let peer = match verify_versioned_node_info(&peer_info) {
1364 Ok(verified) => verified,
1365 Err(reason) => {
1366 info!("Discovery rejecting VersionedNodeInfo for peer {peer_id:?}: {reason}");
1367 continue;
1368 }
1369 };
1370
1371 if is_trusted && let VersionedNodeInfo::V2(info_v2) = peer_info.data() {
1373 for (endpoint_id, addrs) in &info_v2.addresses {
1374 if !addrs.is_empty() {
1375 let _ = endpoint_manager.update_endpoint(
1376 endpoint_id.clone(),
1377 AddressSource::Discovery,
1378 addrs.clone(),
1379 );
1380 }
1381 }
1382 }
1383
1384 match known_peers_v2.entry(peer_id) {
1385 Entry::Occupied(mut o) => {
1386 if peer.timestamp_ms() > o.get().timestamp_ms() {
1387 o.insert(peer);
1388 if is_trusted {
1389 trusted_peer_changed = true;
1390 }
1391 }
1392 }
1393 Entry::Vacant(v) => {
1394 v.insert(peer);
1395 if is_trusted {
1396 trusted_peer_changed = true;
1397 }
1398 }
1399 }
1400 }
1401
1402 trusted_peer_changed
1403}
1404
1405pub(super) fn is_trusted_peer(
1408 peer_id: &PeerId,
1409 configured_peers: &HashMap<PeerId, PeerInfo>,
1410 chain_peers: &Arc<RwLock<HashSet<PeerId>>>,
1411) -> bool {
1412 configured_peers.contains_key(peer_id) || chain_peers.read().unwrap().contains(peer_id)
1413}
1414
1415pub(super) fn now_unix() -> u64 {
1416 use std::time::{SystemTime, UNIX_EPOCH};
1417
1418 SystemTime::now()
1419 .duration_since(UNIX_EPOCH)
1420 .unwrap()
1421 .as_millis() as u64
1422}