1use anemo::types::PeerAffinity;
5use anemo::types::PeerInfo;
6use anemo::{Network, Peer, PeerId, Request, Response, types::PeerEvent};
7use fastcrypto::ed25519::{Ed25519PublicKey, Ed25519Signature};
8use futures::StreamExt;
9use mysten_common::debug_fatal;
10use serde::{Deserialize, Serialize};
11use shared_crypto::intent::IntentScope;
12use std::{
13 collections::{BTreeMap, HashMap},
14 path::PathBuf,
15 sync::{Arc, RwLock},
16 time::{Duration, Instant},
17};
18
19use crate::endpoint_manager::{AddressSource, EndpointId, EndpointManager};
20use store::{load_stored_peers, save_stored_peers};
21use sui_config::p2p::{AccessType, DiscoveryConfig, P2pConfig};
22use sui_types::crypto::{NetworkKeyPair, NetworkPublicKey, Signer, ToFromBytes, VerifyingKey};
23use sui_types::digests::Digest;
24use sui_types::message_envelope::{Envelope, Message, VerifiedEnvelope};
25use sui_types::multiaddr::Multiaddr;
26use tap::{Pipe, TapFallible};
27use tokio::sync::broadcast::error::RecvError;
28use tokio::sync::mpsc;
29use tokio::{
30 sync::oneshot,
31 task::{AbortHandle, JoinSet},
32};
33use tracing::{debug, info, trace};
34
35const TIMEOUT: Duration = Duration::from_secs(1);
36const ONE_DAY_MILLISECONDS: u64 = 24 * 60 * 60 * 1_000;
37const MAX_ADDRESS_LENGTH: usize = 300;
38const MAX_PEERS_TO_SEND: usize = 200;
39const MAX_ADDRESSES_PER_PEER: usize = 2;
40
41mod generated {
42 include!(concat!(env!("OUT_DIR"), "/sui.Discovery.rs"));
43}
44mod builder;
45mod metrics;
46mod server;
47mod store;
48#[cfg(test)]
49mod tests;
50
51pub use builder::{Builder, UnstartedDiscovery};
52pub use generated::{
53 discovery_client::DiscoveryClient,
54 discovery_server::{Discovery, DiscoveryServer},
55};
56pub use server::{GetKnownPeersRequestV3, GetKnownPeersResponseV2, GetKnownPeersResponseV3};
57
58#[derive(Debug)]
60pub enum DiscoveryMessage {
61 PeerAddressChange {
63 peer_id: PeerId,
64 source: AddressSource,
65 addresses: Vec<anemo::types::Address>,
66 },
67 ReceivedNodeInfo {
69 peer_info: Box<SignedVersionedNodeInfo>,
70 },
71 ConfiguredPeersUpdated,
74 PeerFailureReport { peer_id: PeerId },
76}
77
78#[derive(Clone, Debug)]
81pub struct Handle {
82 pub(super) _shutdown_handle: Arc<oneshot::Sender<()>>,
83 pub(super) sender: Sender,
84}
85
86impl Handle {
87 pub fn sender(&self) -> Sender {
88 self.sender.clone()
89 }
90}
91
92#[derive(Clone, Debug)]
95pub struct Sender {
96 pub(super) sender: mpsc::Sender<DiscoveryMessage>,
97}
98
99impl Sender {
100 pub fn peer_address_change(
101 &self,
102 peer_id: PeerId,
103 source: AddressSource,
104 addresses: Vec<anemo::types::Address>,
105 ) {
106 self.sender
107 .try_send(DiscoveryMessage::PeerAddressChange {
108 peer_id,
109 source,
110 addresses,
111 })
112 .expect("Discovery mailbox should not overflow or be closed")
113 }
114
115 pub fn report_peer_failure(&self, peer_id: PeerId) {
116 let _ = self
117 .sender
118 .try_send(DiscoveryMessage::PeerFailureReport { peer_id });
119 }
120}
121
122use self::metrics::Metrics;
123
124struct State {
126 our_info: Option<SignedNodeInfo>,
127 our_info_v2: Option<SignedVersionedNodeInfo>,
128 connected_peers: HashMap<PeerId, ()>,
129 known_peers: HashMap<PeerId, VerifiedSignedNodeInfo>,
130 known_peers_v2: HashMap<PeerId, VerifiedSignedVersionedNodeInfo>,
131 peer_addresses: HashMap<PeerId, BTreeMap<AddressSource, Vec<anemo::types::Address>>>,
132}
133
134#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
139pub struct NodeInfo {
140 pub peer_id: PeerId,
141 pub addresses: Vec<Multiaddr>,
142
143 pub timestamp_ms: u64,
147
148 pub access_type: AccessType,
150}
151
152impl NodeInfo {
153 fn sign(self, keypair: &NetworkKeyPair) -> SignedNodeInfo {
154 let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
155 let sig = keypair.sign(&msg);
156 SignedNodeInfo::new_from_data_and_sig(self, sig)
157 }
158}
159
160pub type SignedNodeInfo = Envelope<NodeInfo, Ed25519Signature>;
161
162pub type VerifiedSignedNodeInfo = VerifiedEnvelope<NodeInfo, Ed25519Signature>;
163
164#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
165pub struct NodeInfoDigest(Digest);
166
167impl NodeInfoDigest {
168 pub const fn new(digest: [u8; 32]) -> Self {
169 Self(Digest::new(digest))
170 }
171}
172
173impl Message for NodeInfo {
174 type DigestType = NodeInfoDigest;
175 const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
176
177 fn digest(&self) -> Self::DigestType {
178 unreachable!("NodeInfoDigest is not used today")
179 }
180}
181
182#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
185pub struct NodeInfoV2 {
186 pub addresses: BTreeMap<EndpointId, Vec<Multiaddr>>,
187 pub timestamp_ms: u64,
188 pub access_type: AccessType,
189}
190
191impl NodeInfoV2 {
192 pub fn peer_id(&self) -> Option<PeerId> {
195 self.addresses.keys().find_map(|k| match k {
196 EndpointId::P2p(peer_id) => Some(*peer_id),
197 EndpointId::Consensus(_) => None,
198 })
199 }
200
201 pub fn p2p_addresses(&self) -> &[Multiaddr] {
202 self.addresses
203 .iter()
204 .find_map(|(k, v)| match k {
205 EndpointId::P2p(_) => Some(v.as_slice()),
206 EndpointId::Consensus(_) => None,
207 })
208 .unwrap_or(&[])
209 }
210}
211
212#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
214pub enum VersionedNodeInfo {
215 V1(NodeInfo),
216 V2(NodeInfoV2),
217}
218
219impl VersionedNodeInfo {
220 pub fn peer_id(&self) -> Option<PeerId> {
221 match self {
222 VersionedNodeInfo::V1(info) => Some(info.peer_id),
223 VersionedNodeInfo::V2(info) => info.peer_id(),
224 }
225 }
226
227 pub fn timestamp_ms(&self) -> u64 {
228 match self {
229 VersionedNodeInfo::V1(info) => info.timestamp_ms,
230 VersionedNodeInfo::V2(info) => info.timestamp_ms,
231 }
232 }
233
234 pub fn access_type(&self) -> AccessType {
235 match self {
236 VersionedNodeInfo::V1(info) => info.access_type,
237 VersionedNodeInfo::V2(info) => info.access_type,
238 }
239 }
240
241 pub fn p2p_addresses(&self) -> &[Multiaddr] {
242 match self {
243 VersionedNodeInfo::V1(info) => &info.addresses,
244 VersionedNodeInfo::V2(info) => info.p2p_addresses(),
245 }
246 }
247
248 pub fn sign(self, keypair: &NetworkKeyPair) -> SignedVersionedNodeInfo {
249 let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
250 let sig = keypair.sign(&msg);
251 SignedVersionedNodeInfo::new_from_data_and_sig(self, sig)
252 }
253}
254
255#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
256pub struct VersionedNodeInfoDigest(Digest);
257
258impl Message for VersionedNodeInfo {
259 type DigestType = VersionedNodeInfoDigest;
260 const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
261
262 fn digest(&self) -> Self::DigestType {
263 unreachable!("VersionedNodeInfoDigest is not used today")
264 }
265}
266
267pub type SignedVersionedNodeInfo = Envelope<VersionedNodeInfo, Ed25519Signature>;
268pub type VerifiedSignedVersionedNodeInfo = VerifiedEnvelope<VersionedNodeInfo, Ed25519Signature>;
269
270fn verify_versioned_node_info(
280 peer_info: &SignedVersionedNodeInfo,
281) -> Result<VerifiedSignedVersionedNodeInfo, &'static str> {
282 let peer_id = peer_info.peer_id().ok_or("missing P2P peer_id")?;
283
284 let public_key =
285 Ed25519PublicKey::from_bytes(&peer_id.0).map_err(|_| "invalid peer_id public key")?;
286
287 let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
288 public_key
289 .verify(&msg, peer_info.auth_sig())
290 .map_err(|_| "signature verification failed")?;
291
292 match peer_info.data() {
293 VersionedNodeInfo::V1(info) => {
294 if info.addresses.len() > MAX_ADDRESSES_PER_PEER {
295 return Err("too many addresses");
296 }
297 if !info
298 .addresses
299 .iter()
300 .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
301 {
302 return Err("invalid address");
303 }
304 }
305 VersionedNodeInfo::V2(info_v2) => {
306 let mut seen_variants = Vec::new();
308 for endpoint_id in info_v2.addresses.keys() {
309 let variant = std::mem::discriminant(endpoint_id);
310 if seen_variants.contains(&variant) {
311 return Err("duplicate endpoint variant");
312 }
313 seen_variants.push(variant);
314 }
315
316 for (endpoint_id, addrs) in &info_v2.addresses {
317 if addrs.len() > MAX_ADDRESSES_PER_PEER {
318 return Err("too many addresses for endpoint");
319 }
320 if !addrs.iter().all(|addr| addr.len() < MAX_ADDRESS_LENGTH) {
321 return Err("address too long");
322 }
323 if matches!(endpoint_id, EndpointId::P2p(_))
324 && !addrs.iter().all(|addr| addr.to_anemo_address().is_ok())
325 {
326 return Err("invalid P2P address");
327 }
328 }
329
330 let identities_valid = info_v2.addresses.keys().all(|eid| match eid {
331 EndpointId::P2p(_) => true,
332 EndpointId::Consensus(pubkey) => pubkey.as_bytes() == peer_id.0,
333 });
334 if !identities_valid {
335 return Err("non-P2P endpoint identity mismatch");
336 }
337 }
338 }
339
340 Ok(VerifiedSignedVersionedNodeInfo::new_from_verified(
341 peer_info.clone(),
342 ))
343}
344
345struct DiscoveryEventLoop {
346 config: P2pConfig,
347 discovery_config: Arc<DiscoveryConfig>,
348 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
349 unidentified_seed_peers: Vec<anemo::types::Address>,
350 network: Network,
351 keypair: NetworkKeyPair,
352 tasks: JoinSet<()>,
353 pending_dials: HashMap<PeerId, AbortHandle>,
354 dial_seed_peers_task: Option<AbortHandle>,
355 shutdown_handle: oneshot::Receiver<()>,
356 state: Arc<RwLock<State>>,
357 mailbox: mpsc::Receiver<DiscoveryMessage>,
358 mailbox_tx: mpsc::Sender<DiscoveryMessage>,
359 metrics: Metrics,
360 consensus_external_address: Option<Multiaddr>,
361 endpoint_manager: EndpointManager,
362 store_path: Option<PathBuf>,
363 peer_cooldowns: HashMap<PeerId, Instant>,
364}
365
366impl DiscoveryEventLoop {
367 pub async fn start(mut self) {
368 info!("Discovery started");
369
370 self.construct_our_info();
371 self.configure_preferred_peers();
372 self.load_stored_peers_on_startup();
373
374 let mut interval = tokio::time::interval(self.discovery_config.interval_period());
375 let mut peer_events = {
376 let (subscriber, _peers) = self.network.subscribe().unwrap();
377 subscriber
378 };
379
380 loop {
381 tokio::select! {
382 now = interval.tick() => {
383 let now_unix = now_unix();
384 self.handle_tick(now.into_std(), now_unix);
385 }
386 peer_event = peer_events.recv() => {
387 self.handle_peer_event(peer_event);
388 },
389 Some(message) = self.mailbox.recv() => {
390 self.handle_message(message);
391 }
392 Some(task_result) = self.tasks.join_next() => {
393 match task_result {
394 Ok(()) => {},
395 Err(e) => {
396 if e.is_cancelled() {
397 } else if e.is_panic() {
399 std::panic::resume_unwind(e.into_panic());
401 } else {
402 panic!("task failed: {e}");
403 }
404 },
405 };
406 },
407 _ = &mut self.shutdown_handle => {
409 break;
410 }
411 }
412 }
413
414 self.save_stored_peers();
415 info!("Discovery ended");
416 }
417
418 fn handle_message(&mut self, message: DiscoveryMessage) {
419 match message {
420 DiscoveryMessage::PeerAddressChange {
421 peer_id,
422 source,
423 addresses,
424 } => {
425 self.handle_peer_address_change(peer_id, source, addresses);
426 }
427 DiscoveryMessage::ReceivedNodeInfo { peer_info } => {
428 let changed = update_known_peers_versioned(
429 self.state.clone(),
430 self.metrics.clone(),
431 vec![*peer_info],
432 self.configured_peers.clone(),
433 &self.endpoint_manager,
434 );
435 if changed {
436 self.save_stored_peers();
437 }
438 }
439 DiscoveryMessage::ConfiguredPeersUpdated => {
440 self.save_stored_peers();
441 }
442 DiscoveryMessage::PeerFailureReport { peer_id } => {
443 self.handle_peer_failure_report(peer_id);
444 }
445 }
446 }
447
448 fn handle_peer_failure_report(&mut self, peer_id: PeerId) {
449 if self.configured_peers.contains_key(&peer_id) {
450 info!(?peer_id, "ignoring failure report for configured peer");
451 return;
452 }
453 let min_peers = self.discovery_config.min_peers_for_disconnect();
454 let connected_count = self.state.read().unwrap().connected_peers.len();
455 if connected_count < min_peers {
456 info!(
457 ?peer_id,
458 connected_count, min_peers, "skipping disconnect, too few connected peers"
459 );
460 return;
461 }
462 info!(
463 ?peer_id,
464 "peer failure reported, disconnecting and adding cooldown"
465 );
466 let _ = self.network.disconnect(peer_id);
467 self.peer_cooldowns.insert(peer_id, Instant::now());
468 }
469
470 fn construct_our_info(&mut self) {
471 if self.state.read().unwrap().our_info.is_some() {
472 return;
473 }
474
475 let peer_id = self.network.peer_id();
476 let timestamp_ms = now_unix();
477 let access_type = self.discovery_config.access_type();
478
479 let addresses: Vec<Multiaddr> = self
480 .config
481 .external_address
482 .clone()
483 .and_then(|addr| addr.to_anemo_address().ok().map(|_| addr))
484 .into_iter()
485 .collect();
486
487 let our_info = NodeInfo {
488 peer_id,
489 addresses: addresses.clone(),
490 timestamp_ms,
491 access_type,
492 }
493 .sign(&self.keypair);
494
495 let mut addresses_map = BTreeMap::new();
496 addresses_map.insert(EndpointId::P2p(peer_id), addresses);
497 if let Some(consensus_addr) = &self.consensus_external_address {
498 let network_pubkey =
502 NetworkPublicKey::from_bytes(&peer_id.0).expect("PeerId is a valid public key");
503 addresses_map.insert(
504 EndpointId::Consensus(network_pubkey),
505 vec![consensus_addr.clone()],
506 );
507 }
508 let our_info_v2 = VersionedNodeInfo::V2(NodeInfoV2 {
509 addresses: addresses_map,
510 timestamp_ms,
511 access_type,
512 })
513 .sign(&self.keypair);
514
515 let mut state = self.state.write().unwrap();
516 state.our_info = Some(our_info);
517 state.our_info_v2 = Some(our_info_v2);
518 }
519
520 fn configure_preferred_peers(&mut self) {
521 let peers: Vec<_> = self.configured_peers.values().cloned().collect();
522 for peer_info in peers {
523 debug!(?peer_info, "Add configured preferred peer");
524 match peer_info.affinity {
525 PeerAffinity::High => {
526 self.handle_peer_address_change(
527 peer_info.peer_id,
528 AddressSource::Seed,
529 peer_info.address,
530 );
531 }
532 _ => {
533 self.network.known_peers().insert(peer_info);
536 }
537 }
538 }
539 }
540
541 fn update_our_info_timestamp(&mut self, now_unix: u64) {
542 let state = &mut self.state.write().unwrap();
543
544 if let Some(our_info) = &state.our_info {
545 let mut data = our_info.data().clone();
546 data.timestamp_ms = now_unix;
547 state.our_info = Some(data.sign(&self.keypair));
548 }
549
550 if let Some(our_info_v2) = &state.our_info_v2 {
551 let mut data = our_info_v2.data().clone();
552 match &mut data {
553 VersionedNodeInfo::V1(info) => info.timestamp_ms = now_unix,
554 VersionedNodeInfo::V2(info) => info.timestamp_ms = now_unix,
555 }
556 state.our_info_v2 = Some(data.sign(&self.keypair));
557 }
558 }
559
560 fn handle_peer_address_change(
561 &mut self,
562 peer_id: PeerId,
563 source: AddressSource,
564 addresses: Vec<anemo::types::Address>,
565 ) {
566 debug!(
567 ?peer_id,
568 ?source,
569 ?addresses,
570 "Received peer address change"
571 );
572
573 {
575 let mut state = self.state.write().unwrap();
576 let source_map = state.peer_addresses.entry(peer_id).or_default();
577
578 if addresses.is_empty() {
579 source_map.remove(&source);
580 if source_map.is_empty() {
581 state.peer_addresses.remove(&peer_id);
582 }
583 } else {
584 source_map.insert(source, addresses);
585 }
586 }
587
588 self.reconfigure_peer_addresses(peer_id);
590 }
591
592 fn reconfigure_peer_addresses(&mut self, peer_id: PeerId) {
595 let priority_addresses = self
596 .state
597 .read()
598 .unwrap()
599 .peer_addresses
600 .get(&peer_id)
601 .and_then(|sources| sources.first_key_value().map(|(_, addrs)| addrs.clone()))
602 .unwrap_or_default();
603 let current_addresses = self
604 .network
605 .known_peers()
606 .get(&peer_id)
607 .map(|info| info.address.clone())
608 .unwrap_or_default();
609 if priority_addresses != current_addresses {
610 let new_peer_info = PeerInfo {
611 peer_id,
612 affinity: PeerAffinity::High,
613 address: priority_addresses.clone(),
614 };
615
616 self.network.known_peers().insert(new_peer_info);
617
618 if !current_addresses.is_empty() {
620 let _ = self.network.disconnect(peer_id);
621
622 if let Some(address) = priority_addresses.first().cloned() {
623 let network = self.network.clone();
624 self.tasks.spawn(async move {
625 let _ = network.connect_with_peer_id(address, peer_id).await;
627 });
628 }
629 }
630 }
631 }
632
633 fn load_stored_peers_on_startup(&mut self) {
634 let Some(path) = &self.store_path else {
635 return;
636 };
637 let entries = load_stored_peers(path);
638 if entries.is_empty() {
639 return;
640 }
641 info!(
642 count = entries.len(),
643 "Loaded stored peer addresses from {}",
644 path.display()
645 );
646
647 update_known_peers_versioned(
648 self.state.clone(),
649 self.metrics.clone(),
650 entries,
651 self.configured_peers.clone(),
652 &self.endpoint_manager,
653 );
654
655 while let Ok(msg) = self.mailbox.try_recv() {
658 self.handle_message(msg);
659 }
660 }
661
662 fn save_stored_peers(&self) {
663 let Some(path) = &self.store_path else {
664 return;
665 };
666 let state = self.state.read().unwrap();
667 let peers_to_save: Vec<SignedVersionedNodeInfo> = state
668 .known_peers_v2
669 .iter()
670 .filter(|(pid, _)| self.configured_peers.contains_key(pid))
671 .map(|(_, verified)| verified.inner().clone())
672 .collect();
673 drop(state);
674 save_stored_peers(path, &peers_to_save);
675 }
676
677 fn handle_peer_event(&mut self, peer_event: Result<PeerEvent, RecvError>) {
678 match peer_event {
679 Ok(PeerEvent::NewPeer(peer_id)) => {
680 if let Some(peer) = self.network.peer(peer_id) {
681 self.state
682 .write()
683 .unwrap()
684 .connected_peers
685 .insert(peer_id, ());
686
687 self.tasks.spawn(query_peer_for_their_known_peers(
689 peer,
690 self.discovery_config.clone(),
691 self.state.clone(),
692 self.metrics.clone(),
693 self.configured_peers.clone(),
694 self.endpoint_manager.clone(),
695 self.mailbox_sender(),
696 ));
697 }
698 }
699 Ok(PeerEvent::LostPeer(peer_id, _)) => {
700 self.state.write().unwrap().connected_peers.remove(&peer_id);
701 }
702
703 Err(RecvError::Closed) => {
704 panic!("PeerEvent channel shouldn't be able to be closed");
705 }
706
707 Err(RecvError::Lagged(_)) => {
708 trace!("State-Sync fell behind processing PeerEvents");
709 }
710 }
711 }
712
713 fn handle_tick(&mut self, _now: std::time::Instant, now_unix: u64) {
714 self.update_our_info_timestamp(now_unix);
715
716 self.tasks
717 .spawn(query_connected_peers_for_their_known_peers(
718 self.network.clone(),
719 self.discovery_config.clone(),
720 self.state.clone(),
721 self.metrics.clone(),
722 self.configured_peers.clone(),
723 self.endpoint_manager.clone(),
724 self.mailbox_sender(),
725 ));
726
727 let mut culled_configured_peers = Vec::new();
729 {
730 let mut state = self.state.write().unwrap();
731 state
732 .known_peers
733 .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms) < ONE_DAY_MILLISECONDS);
734 state.known_peers_v2.retain(|k, v| {
735 let keep = now_unix.saturating_sub(v.timestamp_ms()) < ONE_DAY_MILLISECONDS;
736 if !keep && self.configured_peers.contains_key(k) {
737 culled_configured_peers.push(*k);
738 }
739 keep
740 });
741 }
742
743 for peer_id in culled_configured_peers {
746 self.endpoint_manager
747 .clear_source(peer_id, AddressSource::Discovery);
748 }
749
750 self.pending_dials.retain(|_k, v| !v.is_finished());
752 if let Some(abort_handle) = &self.dial_seed_peers_task
753 && abort_handle.is_finished()
754 {
755 self.dial_seed_peers_task = None;
756 }
757
758 let cooldown = self.discovery_config.peer_failure_cooldown();
759 self.peer_cooldowns
760 .retain(|_, since| since.elapsed() < cooldown);
761
762 let state = self.state.read().unwrap();
764 let our_peer_id = self.network.peer_id();
765
766 let mut preferred: HashMap<PeerId, NodeInfo> = HashMap::new();
770 let mut cooldown_peers: HashMap<PeerId, NodeInfo> = HashMap::new();
771
772 for (peer_id, info) in state.known_peers.iter() {
773 if *peer_id != our_peer_id
774 && !info.addresses.is_empty()
775 && !state.connected_peers.contains_key(peer_id)
776 && !self.pending_dials.contains_key(peer_id)
777 {
778 if self.peer_cooldowns.contains_key(peer_id) {
779 cooldown_peers.insert(*peer_id, info.data().clone());
780 } else {
781 preferred.insert(*peer_id, info.data().clone());
782 }
783 }
784 }
785 for (peer_id, info) in state.known_peers_v2.iter() {
786 let p2p_addresses = info.p2p_addresses();
787 if *peer_id != our_peer_id
788 && !p2p_addresses.is_empty()
789 && !state.connected_peers.contains_key(peer_id)
790 && !self.pending_dials.contains_key(peer_id)
791 {
792 let node_info = NodeInfo {
793 peer_id: *peer_id,
794 addresses: p2p_addresses.to_vec(),
795 timestamp_ms: info.timestamp_ms(),
796 access_type: info.access_type(),
797 };
798 if self.peer_cooldowns.contains_key(peer_id) {
799 if cooldown_peers
800 .get(peer_id)
801 .is_none_or(|existing| info.timestamp_ms() > existing.timestamp_ms)
802 {
803 cooldown_peers.insert(*peer_id, node_info);
804 }
805 } else if preferred
806 .get(peer_id)
807 .is_none_or(|existing| info.timestamp_ms() > existing.timestamp_ms)
808 {
809 preferred.insert(*peer_id, node_info);
810 }
811 }
812 }
813
814 let number_of_connections = state.connected_peers.len();
815 let number_to_dial = self
816 .discovery_config
817 .target_concurrent_connections()
818 .saturating_sub(number_of_connections);
819
820 use rand::seq::IteratorRandom;
821 let mut rng = rand::thread_rng();
822
823 let mut to_dial: Vec<_> = preferred
824 .into_iter()
825 .choose_multiple(&mut rng, number_to_dial);
826
827 let remaining = number_to_dial.saturating_sub(to_dial.len());
828 to_dial.extend(
829 cooldown_peers
830 .into_iter()
831 .choose_multiple(&mut rng, remaining),
832 );
833
834 for (peer_id, info) in to_dial {
835 let abort_handle = self
836 .tasks
837 .spawn(try_to_connect_to_peer(self.network.clone(), info));
838 self.pending_dials.insert(peer_id, abort_handle);
839 }
840
841 let has_peers_to_dial = || {
844 self.configured_peers
845 .values()
846 .any(|p| p.affinity == PeerAffinity::High)
847 || !self.unidentified_seed_peers.is_empty()
848 };
849 if self.dial_seed_peers_task.is_none()
850 && state.connected_peers.is_empty()
851 && self.pending_dials.is_empty()
852 && has_peers_to_dial()
853 {
854 let abort_handle = self.tasks.spawn(try_to_connect_to_seed_peers(
855 self.network.clone(),
856 self.discovery_config.clone(),
857 self.configured_peers.clone(),
858 self.unidentified_seed_peers.clone(),
859 ));
860
861 self.dial_seed_peers_task = Some(abort_handle);
862 }
863 }
864
865 fn mailbox_sender(&self) -> mpsc::Sender<DiscoveryMessage> {
866 self.mailbox_tx.clone()
867 }
868}
869
870async fn try_to_connect_to_peer(network: Network, info: NodeInfo) {
871 debug!("Connecting to peer {info:?}");
872 for multiaddr in &info.addresses {
873 if let Ok(address) = multiaddr.to_anemo_address() {
874 if network
876 .connect_with_peer_id(address, info.peer_id)
877 .await
878 .tap_err(|e| {
879 debug!(
880 "error dialing {} at address '{}': {e}",
881 info.peer_id.short_display(4),
882 multiaddr
883 )
884 })
885 .is_ok()
886 {
887 return;
888 }
889 }
890 }
891}
892
893async fn try_to_connect_to_seed_peers(
894 network: Network,
895 config: Arc<DiscoveryConfig>,
896 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
897 unidentified_seed_peers: Vec<anemo::types::Address>,
898) {
899 let high_affinity_peers: Vec<_> = configured_peers
900 .values()
901 .filter(|p| p.affinity == PeerAffinity::High)
902 .cloned()
903 .collect();
904 debug!(
905 ?high_affinity_peers,
906 ?unidentified_seed_peers,
907 "Connecting to seed peers"
908 );
909 let network = &network;
910
911 let with_peer_id = high_affinity_peers.into_iter().flat_map(|peer_info| {
913 peer_info
914 .address
915 .into_iter()
916 .map(move |addr| (Some(peer_info.peer_id), addr))
917 });
918 let without_peer_id = unidentified_seed_peers.into_iter().map(|addr| (None, addr));
919 futures::stream::iter(with_peer_id.chain(without_peer_id))
920 .for_each_concurrent(
921 config.target_concurrent_connections(),
922 |(peer_id, address)| async move {
923 let _ = if let Some(peer_id) = peer_id {
925 network
926 .connect_with_peer_id(address.clone(), peer_id)
927 .await
928 .tap_err(|e| {
929 debug!(
930 "error dialing peer {} at '{}': {e}",
931 peer_id.short_display(4),
932 address
933 )
934 })
935 } else {
936 network
937 .connect(address.clone())
938 .await
939 .tap_err(|e| debug!("error dialing address '{}': {e}", address))
940 };
941 },
942 )
943 .await;
944}
945
946async fn query_peer_for_known_peers_v2(peer: Peer) -> Option<Vec<SignedNodeInfo>> {
947 let mut client = DiscoveryClient::new(peer);
948 let request = Request::new(()).with_timeout(TIMEOUT);
949 client
950 .get_known_peers_v2(request)
951 .await
952 .ok()
953 .map(Response::into_inner)
954 .map(
955 |GetKnownPeersResponseV2 {
956 own_info,
957 mut known_peers,
958 }| {
959 if !own_info.addresses.is_empty() {
960 known_peers.push(own_info)
961 }
962 known_peers
963 },
964 )
965}
966
967async fn query_peer_for_their_known_peers(
968 peer: Peer,
969 discovery_config: Arc<DiscoveryConfig>,
970 state: Arc<RwLock<State>>,
971 metrics: Metrics,
972 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
973 endpoint_manager: EndpointManager,
974 mailbox_tx: mpsc::Sender<DiscoveryMessage>,
975) {
976 if discovery_config.use_get_known_peers_v3() {
978 let our_info_v2 = state.read().unwrap().our_info_v2.clone();
979 if let Some(own_info) = our_info_v2 {
980 let peer_for_v3 = peer.clone();
981 let v3_query = async move {
982 let mut client = DiscoveryClient::new(peer_for_v3);
983 let request =
984 Request::new(GetKnownPeersRequestV3 { own_info }).with_timeout(TIMEOUT);
985 client
986 .get_known_peers_v3(request)
987 .await
988 .ok()
989 .map(Response::into_inner)
990 .map(
991 |GetKnownPeersResponseV3 {
992 own_info,
993 mut known_peers,
994 }| {
995 if !own_info.p2p_addresses().is_empty() {
996 known_peers.push(own_info)
997 }
998 known_peers
999 },
1000 )
1001 };
1002
1003 let (found_peers_v2, found_peers_v3) =
1004 tokio::join!(query_peer_for_known_peers_v2(peer), v3_query);
1005
1006 if let Some(found_peers) = found_peers_v2 {
1007 update_known_peers(
1008 state.clone(),
1009 metrics.clone(),
1010 found_peers,
1011 configured_peers.clone(),
1012 );
1013 }
1014 if let Some(found_peers) = found_peers_v3 {
1015 let changed = update_known_peers_versioned(
1016 state,
1017 metrics,
1018 found_peers,
1019 configured_peers,
1020 &endpoint_manager,
1021 );
1022 if changed {
1023 let _ = mailbox_tx.try_send(DiscoveryMessage::ConfiguredPeersUpdated);
1024 }
1025 }
1026 return;
1027 }
1028 }
1029
1030 if let Some(found_peers) = query_peer_for_known_peers_v2(peer).await {
1032 update_known_peers(state, metrics, found_peers, configured_peers);
1033 }
1034}
1035
1036async fn query_connected_peers_for_their_known_peers(
1037 network: Network,
1038 config: Arc<DiscoveryConfig>,
1039 state: Arc<RwLock<State>>,
1040 metrics: Metrics,
1041 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1042 endpoint_manager: EndpointManager,
1043 mailbox_tx: mpsc::Sender<DiscoveryMessage>,
1044) {
1045 use rand::seq::IteratorRandom;
1046
1047 let peers_to_query: Vec<_> = network
1048 .peers()
1049 .into_iter()
1050 .flat_map(|id| network.peer(id))
1051 .choose_multiple(&mut rand::thread_rng(), config.peers_to_query());
1052
1053 let v2_query = {
1055 let peers = peers_to_query.clone();
1056 let peers_to_query_count = config.peers_to_query();
1057 async move {
1058 peers
1059 .into_iter()
1060 .map(DiscoveryClient::new)
1061 .map(|mut client| async move {
1062 let request = Request::new(()).with_timeout(TIMEOUT);
1063 client
1064 .get_known_peers_v2(request)
1065 .await
1066 .ok()
1067 .map(Response::into_inner)
1068 .map(
1069 |GetKnownPeersResponseV2 {
1070 own_info,
1071 mut known_peers,
1072 }| {
1073 if !own_info.addresses.is_empty() {
1074 known_peers.push(own_info)
1075 }
1076 known_peers
1077 },
1078 )
1079 })
1080 .pipe(futures::stream::iter)
1081 .buffer_unordered(peers_to_query_count)
1082 .filter_map(std::future::ready)
1083 .flat_map(futures::stream::iter)
1084 .collect::<Vec<_>>()
1085 .await
1086 }
1087 };
1088
1089 if config.use_get_known_peers_v3() {
1091 let our_info_v2 = state.read().unwrap().our_info_v2.clone();
1092 if let Some(own_info) = our_info_v2 {
1093 let v3_query = {
1094 let peers_to_query_count = config.peers_to_query();
1095 async move {
1096 peers_to_query
1097 .into_iter()
1098 .map(DiscoveryClient::new)
1099 .map(|mut client| {
1100 let own_info = own_info.clone();
1101 async move {
1102 let request = Request::new(GetKnownPeersRequestV3 { own_info })
1103 .with_timeout(TIMEOUT);
1104 client
1105 .get_known_peers_v3(request)
1106 .await
1107 .ok()
1108 .map(Response::into_inner)
1109 .map(
1110 |GetKnownPeersResponseV3 {
1111 own_info,
1112 mut known_peers,
1113 }| {
1114 if !own_info.p2p_addresses().is_empty() {
1115 known_peers.push(own_info)
1116 }
1117 known_peers
1118 },
1119 )
1120 }
1121 })
1122 .pipe(futures::stream::iter)
1123 .buffer_unordered(peers_to_query_count)
1124 .filter_map(std::future::ready)
1125 .flat_map(futures::stream::iter)
1126 .collect::<Vec<_>>()
1127 .await
1128 }
1129 };
1130
1131 let (found_peers_v2, found_peers_v3) = tokio::join!(v2_query, v3_query);
1132
1133 update_known_peers(
1134 state.clone(),
1135 metrics.clone(),
1136 found_peers_v2,
1137 configured_peers.clone(),
1138 );
1139 let changed = update_known_peers_versioned(
1140 state,
1141 metrics,
1142 found_peers_v3,
1143 configured_peers,
1144 &endpoint_manager,
1145 );
1146 if changed {
1147 let _ = mailbox_tx.try_send(DiscoveryMessage::ConfiguredPeersUpdated);
1148 }
1149 return;
1150 }
1151 }
1152
1153 let found_peers_v2 = v2_query.await;
1155 update_known_peers(state, metrics, found_peers_v2, configured_peers);
1156}
1157
1158fn update_known_peers(
1159 state: Arc<RwLock<State>>,
1160 metrics: Metrics,
1161 found_peers: Vec<SignedNodeInfo>,
1162 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1163) {
1164 use std::collections::hash_map::Entry;
1165
1166 let now_unix = now_unix();
1167 let our_peer_id = state.read().unwrap().our_info.clone().unwrap().peer_id;
1168 let known_peers = &mut state.write().unwrap().known_peers;
1169 for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1171 if peer_info.timestamp_ms > now_unix.saturating_add(30 * 1_000) || now_unix.saturating_sub(peer_info.timestamp_ms) > ONE_DAY_MILLISECONDS
1176 {
1177 continue;
1178 }
1179
1180 if peer_info.peer_id == our_peer_id {
1181 continue;
1182 }
1183
1184 let is_restricted = match peer_info.access_type {
1186 AccessType::Public => false,
1187 AccessType::Private | AccessType::Trusted => true,
1188 };
1189 if is_restricted && !configured_peers.contains_key(&peer_info.peer_id) {
1190 continue;
1191 }
1192
1193 if peer_info.addresses.len() > MAX_ADDRESSES_PER_PEER {
1195 continue;
1196 }
1197
1198 if !peer_info
1200 .addresses
1201 .iter()
1202 .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
1203 {
1204 continue;
1205 }
1206 let Ok(public_key) = Ed25519PublicKey::from_bytes(&peer_info.peer_id.0) else {
1207 debug_fatal!(
1208 "Failed to convert anemo PeerId {:?} to Ed25519PublicKey",
1210 peer_info.peer_id
1211 );
1212 continue;
1213 };
1214 let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
1215 if let Err(e) = public_key.verify(&msg, peer_info.auth_sig()) {
1216 info!(
1217 "Discovery failed to verify signature for NodeInfo for peer {:?}: {e:?}",
1218 peer_info.peer_id
1219 );
1220 continue;
1222 }
1223 let peer = VerifiedSignedNodeInfo::new_from_verified(peer_info);
1224
1225 match known_peers.entry(peer.peer_id) {
1226 Entry::Occupied(mut o) => {
1227 if peer.timestamp_ms > o.get().timestamp_ms {
1228 if o.get().addresses.is_empty() && !peer.addresses.is_empty() {
1229 metrics.inc_num_peers_with_external_address();
1230 }
1231 if !o.get().addresses.is_empty() && peer.addresses.is_empty() {
1232 metrics.dec_num_peers_with_external_address();
1233 }
1234 o.insert(peer);
1235 }
1236 }
1237 Entry::Vacant(v) => {
1238 if !peer.addresses.is_empty() {
1239 metrics.inc_num_peers_with_external_address();
1240 }
1241 v.insert(peer);
1242 }
1243 }
1244 }
1245}
1246
1247fn update_known_peers_versioned(
1249 state: Arc<RwLock<State>>,
1250 metrics: Metrics,
1251 found_peers: Vec<SignedVersionedNodeInfo>,
1252 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1253 endpoint_manager: &EndpointManager,
1254) -> bool {
1255 use std::collections::hash_map::Entry;
1256
1257 let now_unix = now_unix();
1258 let our_peer_id = state
1259 .read()
1260 .unwrap()
1261 .our_info_v2
1262 .as_ref()
1263 .and_then(|info| info.peer_id());
1264 let known_peers_v2 = &mut state.write().unwrap().known_peers_v2;
1265 let mut configured_peer_changed = false;
1266
1267 for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1268 let timestamp_ms = peer_info.timestamp_ms();
1269
1270 if timestamp_ms > now_unix.saturating_add(30 * 1_000)
1271 || now_unix.saturating_sub(timestamp_ms) > ONE_DAY_MILLISECONDS
1272 {
1273 continue;
1274 }
1275
1276 let Some(peer_id) = peer_info.peer_id() else {
1277 continue;
1278 };
1279
1280 if Some(peer_id) == our_peer_id {
1281 continue;
1282 }
1283
1284 let is_restricted = match peer_info.access_type() {
1285 AccessType::Public => false,
1286 AccessType::Private | AccessType::Trusted => true,
1287 };
1288 if is_restricted && !configured_peers.contains_key(&peer_id) {
1289 continue;
1290 }
1291
1292 let peer = match verify_versioned_node_info(&peer_info) {
1293 Ok(verified) => verified,
1294 Err(reason) => {
1295 info!("Discovery rejecting VersionedNodeInfo for peer {peer_id:?}: {reason}");
1296 continue;
1297 }
1298 };
1299
1300 let is_configured = configured_peers.contains_key(&peer_id);
1302 if is_configured && let VersionedNodeInfo::V2(info_v2) = peer_info.data() {
1303 for (endpoint_id, addrs) in &info_v2.addresses {
1304 if !addrs.is_empty() {
1305 let _ = endpoint_manager.update_endpoint(
1306 endpoint_id.clone(),
1307 AddressSource::Discovery,
1308 addrs.clone(),
1309 );
1310 }
1311 }
1312 }
1313
1314 let peer_p2p_addresses = peer.p2p_addresses();
1315
1316 match known_peers_v2.entry(peer_id) {
1317 Entry::Occupied(mut o) => {
1318 if peer.timestamp_ms() > o.get().timestamp_ms() {
1319 let old_addresses = o.get().p2p_addresses();
1320 if old_addresses.is_empty() && !peer_p2p_addresses.is_empty() {
1321 metrics.inc_num_peers_with_external_address();
1322 }
1323 if !old_addresses.is_empty() && peer_p2p_addresses.is_empty() {
1324 metrics.dec_num_peers_with_external_address();
1325 }
1326 o.insert(peer);
1327 if is_configured {
1328 configured_peer_changed = true;
1329 }
1330 }
1331 }
1332 Entry::Vacant(v) => {
1333 if !peer_p2p_addresses.is_empty() {
1334 metrics.inc_num_peers_with_external_address();
1335 }
1336 v.insert(peer);
1337 if is_configured {
1338 configured_peer_changed = true;
1339 }
1340 }
1341 }
1342 }
1343
1344 configured_peer_changed
1345}
1346
1347pub(super) fn now_unix() -> u64 {
1348 use std::time::{SystemTime, UNIX_EPOCH};
1349
1350 SystemTime::now()
1351 .duration_since(UNIX_EPOCH)
1352 .unwrap()
1353 .as_millis() as u64
1354}