1use anemo::types::PeerAffinity;
5use anemo::types::PeerInfo;
6use anemo::{Network, Peer, PeerId, Request, Response, types::PeerEvent};
7use fastcrypto::ed25519::{Ed25519PublicKey, Ed25519Signature};
8use futures::StreamExt;
9use mysten_common::debug_fatal;
10use serde::{Deserialize, Serialize};
11use shared_crypto::intent::IntentScope;
12use std::{
13 collections::{BTreeMap, HashMap},
14 sync::{Arc, RwLock},
15 time::Duration,
16};
17
18use crate::endpoint_manager::{AddressSource, EndpointId, EndpointManager};
19use sui_config::p2p::{AccessType, DiscoveryConfig, P2pConfig};
20use sui_types::crypto::{NetworkKeyPair, NetworkPublicKey, Signer, ToFromBytes, VerifyingKey};
21use sui_types::digests::Digest;
22use sui_types::message_envelope::{Envelope, Message, VerifiedEnvelope};
23use sui_types::multiaddr::Multiaddr;
24use tap::{Pipe, TapFallible};
25use tokio::sync::broadcast::error::RecvError;
26use tokio::sync::mpsc;
27use tokio::{
28 sync::oneshot,
29 task::{AbortHandle, JoinSet},
30};
31use tracing::{debug, info, trace};
32
33const TIMEOUT: Duration = Duration::from_secs(1);
34const ONE_DAY_MILLISECONDS: u64 = 24 * 60 * 60 * 1_000;
35const MAX_ADDRESS_LENGTH: usize = 300;
36const MAX_PEERS_TO_SEND: usize = 200;
37const MAX_ADDRESSES_PER_PEER: usize = 2;
38
39mod generated {
40 include!(concat!(env!("OUT_DIR"), "/sui.Discovery.rs"));
41}
42mod builder;
43mod metrics;
44mod server;
45#[cfg(test)]
46mod tests;
47
48pub use builder::{Builder, UnstartedDiscovery};
49pub use generated::{
50 discovery_client::DiscoveryClient,
51 discovery_server::{Discovery, DiscoveryServer},
52};
53pub use server::{GetKnownPeersRequestV3, GetKnownPeersResponseV2, GetKnownPeersResponseV3};
54
55#[derive(Debug)]
57pub enum DiscoveryMessage {
58 PeerAddressChange {
59 peer_id: PeerId,
60 source: AddressSource,
61 addresses: Vec<anemo::types::Address>,
62 },
63 ReceivedNodeInfo {
64 peer_info: Box<SignedVersionedNodeInfo>,
65 },
66}
67
68#[derive(Clone, Debug)]
71pub struct Handle {
72 pub(super) _shutdown_handle: Arc<oneshot::Sender<()>>,
73 pub(super) sender: Sender,
74}
75
76impl Handle {
77 pub fn sender(&self) -> Sender {
78 self.sender.clone()
79 }
80}
81
82#[derive(Clone, Debug)]
85pub struct Sender {
86 pub(super) sender: mpsc::Sender<DiscoveryMessage>,
87}
88
89impl Sender {
90 pub fn peer_address_change(
91 &self,
92 peer_id: PeerId,
93 source: AddressSource,
94 addresses: Vec<anemo::types::Address>,
95 ) {
96 self.sender
97 .try_send(DiscoveryMessage::PeerAddressChange {
98 peer_id,
99 source,
100 addresses,
101 })
102 .expect("Discovery mailbox should not overflow or be closed")
103 }
104}
105
106use self::metrics::Metrics;
107
108struct State {
110 our_info: Option<SignedNodeInfo>,
111 our_info_v2: Option<SignedVersionedNodeInfo>,
112 connected_peers: HashMap<PeerId, ()>,
113 known_peers: HashMap<PeerId, VerifiedSignedNodeInfo>,
114 known_peers_v2: HashMap<PeerId, VerifiedSignedVersionedNodeInfo>,
115 peer_address_overrides: HashMap<PeerId, BTreeMap<AddressSource, Vec<anemo::types::Address>>>,
116}
117
118#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
123pub struct NodeInfo {
124 pub peer_id: PeerId,
125 pub addresses: Vec<Multiaddr>,
126
127 pub timestamp_ms: u64,
131
132 pub access_type: AccessType,
134}
135
136impl NodeInfo {
137 fn sign(self, keypair: &NetworkKeyPair) -> SignedNodeInfo {
138 let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
139 let sig = keypair.sign(&msg);
140 SignedNodeInfo::new_from_data_and_sig(self, sig)
141 }
142}
143
144pub type SignedNodeInfo = Envelope<NodeInfo, Ed25519Signature>;
145
146pub type VerifiedSignedNodeInfo = VerifiedEnvelope<NodeInfo, Ed25519Signature>;
147
148#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
149pub struct NodeInfoDigest(Digest);
150
151impl NodeInfoDigest {
152 pub const fn new(digest: [u8; 32]) -> Self {
153 Self(Digest::new(digest))
154 }
155}
156
157impl Message for NodeInfo {
158 type DigestType = NodeInfoDigest;
159 const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
160
161 fn digest(&self) -> Self::DigestType {
162 unreachable!("NodeInfoDigest is not used today")
163 }
164}
165
166#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
169pub struct NodeInfoV2 {
170 pub addresses: BTreeMap<EndpointId, Vec<Multiaddr>>,
171 pub timestamp_ms: u64,
172 pub access_type: AccessType,
173}
174
175impl NodeInfoV2 {
176 pub fn peer_id(&self) -> Option<PeerId> {
179 self.addresses.keys().find_map(|k| match k {
180 EndpointId::P2p(peer_id) => Some(*peer_id),
181 EndpointId::Consensus(_) => None,
182 })
183 }
184
185 pub fn p2p_addresses(&self) -> &[Multiaddr] {
186 self.addresses
187 .iter()
188 .find_map(|(k, v)| match k {
189 EndpointId::P2p(_) => Some(v.as_slice()),
190 EndpointId::Consensus(_) => None,
191 })
192 .unwrap_or(&[])
193 }
194}
195
196#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
198pub enum VersionedNodeInfo {
199 V1(NodeInfo),
200 V2(NodeInfoV2),
201}
202
203impl VersionedNodeInfo {
204 pub fn peer_id(&self) -> Option<PeerId> {
205 match self {
206 VersionedNodeInfo::V1(info) => Some(info.peer_id),
207 VersionedNodeInfo::V2(info) => info.peer_id(),
208 }
209 }
210
211 pub fn timestamp_ms(&self) -> u64 {
212 match self {
213 VersionedNodeInfo::V1(info) => info.timestamp_ms,
214 VersionedNodeInfo::V2(info) => info.timestamp_ms,
215 }
216 }
217
218 pub fn access_type(&self) -> AccessType {
219 match self {
220 VersionedNodeInfo::V1(info) => info.access_type,
221 VersionedNodeInfo::V2(info) => info.access_type,
222 }
223 }
224
225 pub fn p2p_addresses(&self) -> &[Multiaddr] {
226 match self {
227 VersionedNodeInfo::V1(info) => &info.addresses,
228 VersionedNodeInfo::V2(info) => info.p2p_addresses(),
229 }
230 }
231
232 pub fn sign(self, keypair: &NetworkKeyPair) -> SignedVersionedNodeInfo {
233 let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
234 let sig = keypair.sign(&msg);
235 SignedVersionedNodeInfo::new_from_data_and_sig(self, sig)
236 }
237}
238
239#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
240pub struct VersionedNodeInfoDigest(Digest);
241
242impl Message for VersionedNodeInfo {
243 type DigestType = VersionedNodeInfoDigest;
244 const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
245
246 fn digest(&self) -> Self::DigestType {
247 unreachable!("VersionedNodeInfoDigest is not used today")
248 }
249}
250
251pub type SignedVersionedNodeInfo = Envelope<VersionedNodeInfo, Ed25519Signature>;
252pub type VerifiedSignedVersionedNodeInfo = VerifiedEnvelope<VersionedNodeInfo, Ed25519Signature>;
253
254fn verify_versioned_node_info(
264 peer_info: &SignedVersionedNodeInfo,
265) -> Result<VerifiedSignedVersionedNodeInfo, &'static str> {
266 let peer_id = peer_info.peer_id().ok_or("missing P2P peer_id")?;
267
268 let public_key =
269 Ed25519PublicKey::from_bytes(&peer_id.0).map_err(|_| "invalid peer_id public key")?;
270
271 let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
272 public_key
273 .verify(&msg, peer_info.auth_sig())
274 .map_err(|_| "signature verification failed")?;
275
276 match peer_info.data() {
277 VersionedNodeInfo::V1(info) => {
278 if info.addresses.len() > MAX_ADDRESSES_PER_PEER {
279 return Err("too many addresses");
280 }
281 if !info
282 .addresses
283 .iter()
284 .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
285 {
286 return Err("invalid address");
287 }
288 }
289 VersionedNodeInfo::V2(info_v2) => {
290 let mut seen_variants = Vec::new();
292 for endpoint_id in info_v2.addresses.keys() {
293 let variant = std::mem::discriminant(endpoint_id);
294 if seen_variants.contains(&variant) {
295 return Err("duplicate endpoint variant");
296 }
297 seen_variants.push(variant);
298 }
299
300 for (endpoint_id, addrs) in &info_v2.addresses {
301 if addrs.len() > MAX_ADDRESSES_PER_PEER {
302 return Err("too many addresses for endpoint");
303 }
304 if !addrs.iter().all(|addr| addr.len() < MAX_ADDRESS_LENGTH) {
305 return Err("address too long");
306 }
307 if matches!(endpoint_id, EndpointId::P2p(_))
308 && !addrs.iter().all(|addr| addr.to_anemo_address().is_ok())
309 {
310 return Err("invalid P2P address");
311 }
312 }
313
314 let identities_valid = info_v2.addresses.keys().all(|eid| match eid {
315 EndpointId::P2p(_) => true,
316 EndpointId::Consensus(pubkey) => pubkey.as_bytes() == peer_id.0,
317 });
318 if !identities_valid {
319 return Err("non-P2P endpoint identity mismatch");
320 }
321 }
322 }
323
324 Ok(VerifiedSignedVersionedNodeInfo::new_from_verified(
325 peer_info.clone(),
326 ))
327}
328
329struct DiscoveryEventLoop {
330 config: P2pConfig,
331 discovery_config: Arc<DiscoveryConfig>,
332 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
333 unidentified_seed_peers: Vec<anemo::types::Address>,
334 network: Network,
335 keypair: NetworkKeyPair,
336 tasks: JoinSet<()>,
337 pending_dials: HashMap<PeerId, AbortHandle>,
338 dial_seed_peers_task: Option<AbortHandle>,
339 shutdown_handle: oneshot::Receiver<()>,
340 state: Arc<RwLock<State>>,
341 mailbox: mpsc::Receiver<DiscoveryMessage>,
342 metrics: Metrics,
343 consensus_external_address: Option<Multiaddr>,
344 endpoint_manager: EndpointManager,
345}
346
347impl DiscoveryEventLoop {
348 pub async fn start(mut self) {
349 info!("Discovery started");
350
351 self.construct_our_info();
352 self.configure_preferred_peers();
353
354 let mut interval = tokio::time::interval(self.discovery_config.interval_period());
355 let mut peer_events = {
356 let (subscriber, _peers) = self.network.subscribe().unwrap();
357 subscriber
358 };
359
360 loop {
361 tokio::select! {
362 now = interval.tick() => {
363 let now_unix = now_unix();
364 self.handle_tick(now.into_std(), now_unix);
365 }
366 peer_event = peer_events.recv() => {
367 self.handle_peer_event(peer_event);
368 },
369 Some(message) = self.mailbox.recv() => {
370 self.handle_message(message);
371 }
372 Some(task_result) = self.tasks.join_next() => {
373 match task_result {
374 Ok(()) => {},
375 Err(e) => {
376 if e.is_cancelled() {
377 } else if e.is_panic() {
379 std::panic::resume_unwind(e.into_panic());
381 } else {
382 panic!("task failed: {e}");
383 }
384 },
385 };
386 },
387 _ = &mut self.shutdown_handle => {
389 break;
390 }
391 }
392 }
393
394 info!("Discovery ended");
395 }
396
397 fn handle_message(&mut self, message: DiscoveryMessage) {
398 match message {
399 DiscoveryMessage::PeerAddressChange {
400 peer_id,
401 source,
402 addresses,
403 } => {
404 self.handle_peer_address_change(peer_id, source, addresses);
405 }
406 DiscoveryMessage::ReceivedNodeInfo { peer_info } => {
407 update_known_peers_versioned(
408 self.state.clone(),
409 self.metrics.clone(),
410 vec![*peer_info],
411 self.configured_peers.clone(),
412 &self.endpoint_manager,
413 );
414 }
415 }
416 }
417
418 fn construct_our_info(&mut self) {
419 if self.state.read().unwrap().our_info.is_some() {
420 return;
421 }
422
423 let peer_id = self.network.peer_id();
424 let timestamp_ms = now_unix();
425 let access_type = self.discovery_config.access_type();
426
427 let addresses: Vec<Multiaddr> = self
428 .config
429 .external_address
430 .clone()
431 .and_then(|addr| addr.to_anemo_address().ok().map(|_| addr))
432 .into_iter()
433 .collect();
434
435 let our_info = NodeInfo {
436 peer_id,
437 addresses: addresses.clone(),
438 timestamp_ms,
439 access_type,
440 }
441 .sign(&self.keypair);
442
443 let mut addresses_map = BTreeMap::new();
444 addresses_map.insert(EndpointId::P2p(peer_id), addresses);
445 if let Some(consensus_addr) = &self.consensus_external_address {
446 let network_pubkey =
450 NetworkPublicKey::from_bytes(&peer_id.0).expect("PeerId is a valid public key");
451 addresses_map.insert(
452 EndpointId::Consensus(network_pubkey),
453 vec![consensus_addr.clone()],
454 );
455 }
456 let our_info_v2 = VersionedNodeInfo::V2(NodeInfoV2 {
457 addresses: addresses_map,
458 timestamp_ms,
459 access_type,
460 })
461 .sign(&self.keypair);
462
463 let mut state = self.state.write().unwrap();
464 state.our_info = Some(our_info);
465 state.our_info_v2 = Some(our_info_v2);
466 }
467
468 fn configure_preferred_peers(&mut self) {
469 for peer_info in self.configured_peers.values() {
470 debug!(?peer_info, "Add configured preferred peer");
471 self.network.known_peers().insert(peer_info.clone());
472 }
473 }
474
475 fn update_our_info_timestamp(&mut self, now_unix: u64) {
476 let state = &mut self.state.write().unwrap();
477
478 if let Some(our_info) = &state.our_info {
479 let mut data = our_info.data().clone();
480 data.timestamp_ms = now_unix;
481 state.our_info = Some(data.sign(&self.keypair));
482 }
483
484 if let Some(our_info_v2) = &state.our_info_v2 {
485 let mut data = our_info_v2.data().clone();
486 match &mut data {
487 VersionedNodeInfo::V1(info) => info.timestamp_ms = now_unix,
488 VersionedNodeInfo::V2(info) => info.timestamp_ms = now_unix,
489 }
490 state.our_info_v2 = Some(data.sign(&self.keypair));
491 }
492 }
493
494 fn handle_peer_address_change(
495 &mut self,
496 peer_id: PeerId,
497 source: AddressSource,
498 addresses: Vec<anemo::types::Address>,
499 ) {
500 debug!(
501 ?peer_id,
502 ?source,
503 ?addresses,
504 "Received peer address change"
505 );
506
507 {
509 let mut state = self.state.write().unwrap();
510 let source_map = state.peer_address_overrides.entry(peer_id).or_default();
511
512 if addresses.is_empty() {
513 source_map.remove(&source);
514 if source_map.is_empty() {
515 state.peer_address_overrides.remove(&peer_id);
516 }
517 } else {
518 source_map.insert(source, addresses);
519 }
520 }
521
522 let priority_addresses = self
524 .state
525 .read()
526 .unwrap()
527 .peer_address_overrides
528 .get(&peer_id)
529 .and_then(|sources| sources.first_key_value().map(|(_, addrs)| addrs.clone()))
530 .unwrap_or_default();
531 let current_addresses = self
532 .network
533 .known_peers()
534 .get(&peer_id)
535 .map(|info| info.address.clone())
536 .unwrap_or_default();
537 if priority_addresses != current_addresses {
538 let new_peer_info = PeerInfo {
539 peer_id,
540 affinity: PeerAffinity::High,
541 address: priority_addresses.clone(),
542 };
543
544 self.network.known_peers().insert(new_peer_info);
545 let _ = self.network.disconnect(peer_id);
546
547 if let Some(address) = priority_addresses.first().cloned() {
548 let network = self.network.clone();
549 self.tasks.spawn(async move {
550 let _ = network.connect_with_peer_id(address, peer_id).await;
552 });
553 }
554 }
555 }
556
557 fn handle_peer_event(&mut self, peer_event: Result<PeerEvent, RecvError>) {
558 match peer_event {
559 Ok(PeerEvent::NewPeer(peer_id)) => {
560 if let Some(peer) = self.network.peer(peer_id) {
561 self.state
562 .write()
563 .unwrap()
564 .connected_peers
565 .insert(peer_id, ());
566
567 self.tasks.spawn(query_peer_for_their_known_peers(
569 peer,
570 self.discovery_config.clone(),
571 self.state.clone(),
572 self.metrics.clone(),
573 self.configured_peers.clone(),
574 self.endpoint_manager.clone(),
575 ));
576 }
577 }
578 Ok(PeerEvent::LostPeer(peer_id, _)) => {
579 self.state.write().unwrap().connected_peers.remove(&peer_id);
580 }
581
582 Err(RecvError::Closed) => {
583 panic!("PeerEvent channel shouldn't be able to be closed");
584 }
585
586 Err(RecvError::Lagged(_)) => {
587 trace!("State-Sync fell behind processing PeerEvents");
588 }
589 }
590 }
591
592 fn handle_tick(&mut self, _now: std::time::Instant, now_unix: u64) {
593 self.update_our_info_timestamp(now_unix);
594
595 self.tasks
596 .spawn(query_connected_peers_for_their_known_peers(
597 self.network.clone(),
598 self.discovery_config.clone(),
599 self.state.clone(),
600 self.metrics.clone(),
601 self.configured_peers.clone(),
602 self.endpoint_manager.clone(),
603 ));
604
605 {
607 let mut state = self.state.write().unwrap();
608 state
609 .known_peers
610 .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms) < ONE_DAY_MILLISECONDS);
611 state
612 .known_peers_v2
613 .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms()) < ONE_DAY_MILLISECONDS);
614 }
615
616 self.pending_dials.retain(|_k, v| !v.is_finished());
618 if let Some(abort_handle) = &self.dial_seed_peers_task
619 && abort_handle.is_finished()
620 {
621 self.dial_seed_peers_task = None;
622 }
623
624 let state = self.state.read().unwrap();
626 let our_peer_id = self.network.peer_id();
627
628 let mut eligible: HashMap<PeerId, NodeInfo> = HashMap::new();
631
632 for (peer_id, info) in state.known_peers.iter() {
633 if *peer_id != our_peer_id
634 && !info.addresses.is_empty()
635 && !state.connected_peers.contains_key(peer_id)
636 && !self.pending_dials.contains_key(peer_id)
637 {
638 eligible.insert(*peer_id, info.data().clone());
639 }
640 }
641 for (peer_id, info) in state.known_peers_v2.iter() {
642 let p2p_addresses = info.p2p_addresses();
643 if *peer_id != our_peer_id
644 && !p2p_addresses.is_empty()
645 && !state.connected_peers.contains_key(peer_id)
646 && !self.pending_dials.contains_key(peer_id)
647 && eligible
648 .get(peer_id)
649 .is_none_or(|existing| info.timestamp_ms() > existing.timestamp_ms)
650 {
651 eligible.insert(
652 *peer_id,
653 NodeInfo {
654 peer_id: *peer_id,
655 addresses: p2p_addresses.to_vec(),
656 timestamp_ms: info.timestamp_ms(),
657 access_type: info.access_type(),
658 },
659 );
660 }
661 }
662
663 let number_of_connections = state.connected_peers.len();
665 let number_to_dial = std::cmp::min(
666 eligible.len(),
667 self.discovery_config
668 .target_concurrent_connections()
669 .saturating_sub(number_of_connections),
670 );
671
672 use rand::seq::IteratorRandom;
674 for (peer_id, info) in eligible
675 .into_iter()
676 .choose_multiple(&mut rand::thread_rng(), number_to_dial)
677 {
678 let abort_handle = self
679 .tasks
680 .spawn(try_to_connect_to_peer(self.network.clone(), info));
681 self.pending_dials.insert(peer_id, abort_handle);
682 }
683
684 let has_peers_to_dial = || {
687 self.configured_peers
688 .values()
689 .any(|p| p.affinity == PeerAffinity::High)
690 || !self.unidentified_seed_peers.is_empty()
691 };
692 if self.dial_seed_peers_task.is_none()
693 && state.connected_peers.is_empty()
694 && self.pending_dials.is_empty()
695 && has_peers_to_dial()
696 {
697 let abort_handle = self.tasks.spawn(try_to_connect_to_seed_peers(
698 self.network.clone(),
699 self.discovery_config.clone(),
700 self.configured_peers.clone(),
701 self.unidentified_seed_peers.clone(),
702 ));
703
704 self.dial_seed_peers_task = Some(abort_handle);
705 }
706 }
707}
708
709async fn try_to_connect_to_peer(network: Network, info: NodeInfo) {
710 debug!("Connecting to peer {info:?}");
711 for multiaddr in &info.addresses {
712 if let Ok(address) = multiaddr.to_anemo_address() {
713 if network
715 .connect_with_peer_id(address, info.peer_id)
716 .await
717 .tap_err(|e| {
718 debug!(
719 "error dialing {} at address '{}': {e}",
720 info.peer_id.short_display(4),
721 multiaddr
722 )
723 })
724 .is_ok()
725 {
726 return;
727 }
728 }
729 }
730}
731
732async fn try_to_connect_to_seed_peers(
733 network: Network,
734 config: Arc<DiscoveryConfig>,
735 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
736 unidentified_seed_peers: Vec<anemo::types::Address>,
737) {
738 let high_affinity_peers: Vec<_> = configured_peers
739 .values()
740 .filter(|p| p.affinity == PeerAffinity::High)
741 .cloned()
742 .collect();
743 debug!(
744 ?high_affinity_peers,
745 ?unidentified_seed_peers,
746 "Connecting to seed peers"
747 );
748 let network = &network;
749
750 let with_peer_id = high_affinity_peers.into_iter().flat_map(|peer_info| {
752 peer_info
753 .address
754 .into_iter()
755 .map(move |addr| (Some(peer_info.peer_id), addr))
756 });
757 let without_peer_id = unidentified_seed_peers.into_iter().map(|addr| (None, addr));
758 futures::stream::iter(with_peer_id.chain(without_peer_id))
759 .for_each_concurrent(
760 config.target_concurrent_connections(),
761 |(peer_id, address)| async move {
762 let _ = if let Some(peer_id) = peer_id {
764 network
765 .connect_with_peer_id(address.clone(), peer_id)
766 .await
767 .tap_err(|e| {
768 debug!(
769 "error dialing peer {} at '{}': {e}",
770 peer_id.short_display(4),
771 address
772 )
773 })
774 } else {
775 network
776 .connect(address.clone())
777 .await
778 .tap_err(|e| debug!("error dialing address '{}': {e}", address))
779 };
780 },
781 )
782 .await;
783}
784
785async fn query_peer_for_known_peers_v2(peer: Peer) -> Option<Vec<SignedNodeInfo>> {
786 let mut client = DiscoveryClient::new(peer);
787 let request = Request::new(()).with_timeout(TIMEOUT);
788 client
789 .get_known_peers_v2(request)
790 .await
791 .ok()
792 .map(Response::into_inner)
793 .map(
794 |GetKnownPeersResponseV2 {
795 own_info,
796 mut known_peers,
797 }| {
798 if !own_info.addresses.is_empty() {
799 known_peers.push(own_info)
800 }
801 known_peers
802 },
803 )
804}
805
806async fn query_peer_for_their_known_peers(
807 peer: Peer,
808 discovery_config: Arc<DiscoveryConfig>,
809 state: Arc<RwLock<State>>,
810 metrics: Metrics,
811 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
812 endpoint_manager: EndpointManager,
813) {
814 if discovery_config.use_get_known_peers_v3() {
816 let our_info_v2 = state.read().unwrap().our_info_v2.clone();
817 if let Some(own_info) = our_info_v2 {
818 let peer_for_v3 = peer.clone();
819 let v3_query = async move {
820 let mut client = DiscoveryClient::new(peer_for_v3);
821 let request =
822 Request::new(GetKnownPeersRequestV3 { own_info }).with_timeout(TIMEOUT);
823 client
824 .get_known_peers_v3(request)
825 .await
826 .ok()
827 .map(Response::into_inner)
828 .map(
829 |GetKnownPeersResponseV3 {
830 own_info,
831 mut known_peers,
832 }| {
833 if !own_info.p2p_addresses().is_empty() {
834 known_peers.push(own_info)
835 }
836 known_peers
837 },
838 )
839 };
840
841 let (found_peers_v2, found_peers_v3) =
842 tokio::join!(query_peer_for_known_peers_v2(peer), v3_query);
843
844 if let Some(found_peers) = found_peers_v2 {
845 update_known_peers(
846 state.clone(),
847 metrics.clone(),
848 found_peers,
849 configured_peers.clone(),
850 );
851 }
852 if let Some(found_peers) = found_peers_v3 {
853 update_known_peers_versioned(
854 state,
855 metrics,
856 found_peers,
857 configured_peers,
858 &endpoint_manager,
859 );
860 }
861 return;
862 }
863 }
864
865 if let Some(found_peers) = query_peer_for_known_peers_v2(peer).await {
867 update_known_peers(state, metrics, found_peers, configured_peers);
868 }
869}
870
871async fn query_connected_peers_for_their_known_peers(
872 network: Network,
873 config: Arc<DiscoveryConfig>,
874 state: Arc<RwLock<State>>,
875 metrics: Metrics,
876 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
877 endpoint_manager: EndpointManager,
878) {
879 use rand::seq::IteratorRandom;
880
881 let peers_to_query: Vec<_> = network
882 .peers()
883 .into_iter()
884 .flat_map(|id| network.peer(id))
885 .choose_multiple(&mut rand::thread_rng(), config.peers_to_query());
886
887 let v2_query = {
889 let peers = peers_to_query.clone();
890 let peers_to_query_count = config.peers_to_query();
891 async move {
892 peers
893 .into_iter()
894 .map(DiscoveryClient::new)
895 .map(|mut client| async move {
896 let request = Request::new(()).with_timeout(TIMEOUT);
897 client
898 .get_known_peers_v2(request)
899 .await
900 .ok()
901 .map(Response::into_inner)
902 .map(
903 |GetKnownPeersResponseV2 {
904 own_info,
905 mut known_peers,
906 }| {
907 if !own_info.addresses.is_empty() {
908 known_peers.push(own_info)
909 }
910 known_peers
911 },
912 )
913 })
914 .pipe(futures::stream::iter)
915 .buffer_unordered(peers_to_query_count)
916 .filter_map(std::future::ready)
917 .flat_map(futures::stream::iter)
918 .collect::<Vec<_>>()
919 .await
920 }
921 };
922
923 if config.use_get_known_peers_v3() {
925 let our_info_v2 = state.read().unwrap().our_info_v2.clone();
926 if let Some(own_info) = our_info_v2 {
927 let v3_query = {
928 let peers_to_query_count = config.peers_to_query();
929 async move {
930 peers_to_query
931 .into_iter()
932 .map(DiscoveryClient::new)
933 .map(|mut client| {
934 let own_info = own_info.clone();
935 async move {
936 let request = Request::new(GetKnownPeersRequestV3 { own_info })
937 .with_timeout(TIMEOUT);
938 client
939 .get_known_peers_v3(request)
940 .await
941 .ok()
942 .map(Response::into_inner)
943 .map(
944 |GetKnownPeersResponseV3 {
945 own_info,
946 mut known_peers,
947 }| {
948 if !own_info.p2p_addresses().is_empty() {
949 known_peers.push(own_info)
950 }
951 known_peers
952 },
953 )
954 }
955 })
956 .pipe(futures::stream::iter)
957 .buffer_unordered(peers_to_query_count)
958 .filter_map(std::future::ready)
959 .flat_map(futures::stream::iter)
960 .collect::<Vec<_>>()
961 .await
962 }
963 };
964
965 let (found_peers_v2, found_peers_v3) = tokio::join!(v2_query, v3_query);
966
967 update_known_peers(
968 state.clone(),
969 metrics.clone(),
970 found_peers_v2,
971 configured_peers.clone(),
972 );
973 update_known_peers_versioned(
974 state,
975 metrics,
976 found_peers_v3,
977 configured_peers,
978 &endpoint_manager,
979 );
980 return;
981 }
982 }
983
984 let found_peers_v2 = v2_query.await;
986 update_known_peers(state, metrics, found_peers_v2, configured_peers);
987}
988
989fn update_known_peers(
990 state: Arc<RwLock<State>>,
991 metrics: Metrics,
992 found_peers: Vec<SignedNodeInfo>,
993 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
994) {
995 use std::collections::hash_map::Entry;
996
997 let now_unix = now_unix();
998 let our_peer_id = state.read().unwrap().our_info.clone().unwrap().peer_id;
999 let known_peers = &mut state.write().unwrap().known_peers;
1000 for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1002 if peer_info.timestamp_ms > now_unix.saturating_add(30 * 1_000) || now_unix.saturating_sub(peer_info.timestamp_ms) > ONE_DAY_MILLISECONDS
1007 {
1008 continue;
1009 }
1010
1011 if peer_info.peer_id == our_peer_id {
1012 continue;
1013 }
1014
1015 let is_restricted = match peer_info.access_type {
1017 AccessType::Public => false,
1018 AccessType::Private | AccessType::Trusted => true,
1019 };
1020 if is_restricted && !configured_peers.contains_key(&peer_info.peer_id) {
1021 continue;
1022 }
1023
1024 if peer_info.addresses.len() > MAX_ADDRESSES_PER_PEER {
1026 continue;
1027 }
1028
1029 if !peer_info
1031 .addresses
1032 .iter()
1033 .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
1034 {
1035 continue;
1036 }
1037 let Ok(public_key) = Ed25519PublicKey::from_bytes(&peer_info.peer_id.0) else {
1038 debug_fatal!(
1039 "Failed to convert anemo PeerId {:?} to Ed25519PublicKey",
1041 peer_info.peer_id
1042 );
1043 continue;
1044 };
1045 let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
1046 if let Err(e) = public_key.verify(&msg, peer_info.auth_sig()) {
1047 info!(
1048 "Discovery failed to verify signature for NodeInfo for peer {:?}: {e:?}",
1049 peer_info.peer_id
1050 );
1051 continue;
1053 }
1054 let peer = VerifiedSignedNodeInfo::new_from_verified(peer_info);
1055
1056 match known_peers.entry(peer.peer_id) {
1057 Entry::Occupied(mut o) => {
1058 if peer.timestamp_ms > o.get().timestamp_ms {
1059 if o.get().addresses.is_empty() && !peer.addresses.is_empty() {
1060 metrics.inc_num_peers_with_external_address();
1061 }
1062 if !o.get().addresses.is_empty() && peer.addresses.is_empty() {
1063 metrics.dec_num_peers_with_external_address();
1064 }
1065 o.insert(peer);
1066 }
1067 }
1068 Entry::Vacant(v) => {
1069 if !peer.addresses.is_empty() {
1070 metrics.inc_num_peers_with_external_address();
1071 }
1072 v.insert(peer);
1073 }
1074 }
1075 }
1076}
1077
1078fn update_known_peers_versioned(
1079 state: Arc<RwLock<State>>,
1080 metrics: Metrics,
1081 found_peers: Vec<SignedVersionedNodeInfo>,
1082 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1083 endpoint_manager: &EndpointManager,
1084) {
1085 use std::collections::hash_map::Entry;
1086
1087 let now_unix = now_unix();
1088 let our_peer_id = state
1089 .read()
1090 .unwrap()
1091 .our_info_v2
1092 .as_ref()
1093 .and_then(|info| info.peer_id());
1094 let known_peers_v2 = &mut state.write().unwrap().known_peers_v2;
1095
1096 for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1097 let timestamp_ms = peer_info.timestamp_ms();
1098
1099 if timestamp_ms > now_unix.saturating_add(30 * 1_000)
1100 || now_unix.saturating_sub(timestamp_ms) > ONE_DAY_MILLISECONDS
1101 {
1102 continue;
1103 }
1104
1105 let Some(peer_id) = peer_info.peer_id() else {
1106 continue;
1107 };
1108
1109 if Some(peer_id) == our_peer_id {
1110 continue;
1111 }
1112
1113 let is_restricted = match peer_info.access_type() {
1114 AccessType::Public => false,
1115 AccessType::Private | AccessType::Trusted => true,
1116 };
1117 if is_restricted && !configured_peers.contains_key(&peer_id) {
1118 continue;
1119 }
1120
1121 let peer = match verify_versioned_node_info(&peer_info) {
1122 Ok(verified) => verified,
1123 Err(reason) => {
1124 info!("Discovery rejecting VersionedNodeInfo for peer {peer_id:?}: {reason}");
1125 continue;
1126 }
1127 };
1128
1129 if configured_peers.contains_key(&peer_id)
1131 && let VersionedNodeInfo::V2(info_v2) = peer_info.data()
1132 {
1133 for (endpoint_id, addrs) in &info_v2.addresses {
1134 if !matches!(endpoint_id, EndpointId::P2p(_)) && !addrs.is_empty() {
1135 let _ = endpoint_manager.update_endpoint(
1136 endpoint_id.clone(),
1137 AddressSource::Discovery,
1138 addrs.clone(),
1139 );
1140 }
1141 }
1142 }
1143
1144 let peer_p2p_addresses = peer.p2p_addresses();
1145
1146 match known_peers_v2.entry(peer_id) {
1147 Entry::Occupied(mut o) => {
1148 if peer.timestamp_ms() > o.get().timestamp_ms() {
1149 let old_addresses = o.get().p2p_addresses();
1150 if old_addresses.is_empty() && !peer_p2p_addresses.is_empty() {
1151 metrics.inc_num_peers_with_external_address();
1152 }
1153 if !old_addresses.is_empty() && peer_p2p_addresses.is_empty() {
1154 metrics.dec_num_peers_with_external_address();
1155 }
1156 o.insert(peer);
1157 }
1158 }
1159 Entry::Vacant(v) => {
1160 if !peer_p2p_addresses.is_empty() {
1161 metrics.inc_num_peers_with_external_address();
1162 }
1163 v.insert(peer);
1164 }
1165 }
1166 }
1167}
1168
1169pub(super) fn now_unix() -> u64 {
1170 use std::time::{SystemTime, UNIX_EPOCH};
1171
1172 SystemTime::now()
1173 .duration_since(UNIX_EPOCH)
1174 .unwrap()
1175 .as_millis() as u64
1176}