1use anemo::types::PeerAffinity;
5use anemo::types::PeerInfo;
6use anemo::{Network, Peer, PeerId, Request, Response, types::PeerEvent};
7use fastcrypto::ed25519::{Ed25519PublicKey, Ed25519Signature};
8use futures::StreamExt;
9use mysten_common::debug_fatal;
10use serde::{Deserialize, Serialize};
11use shared_crypto::intent::IntentScope;
12use std::{
13 collections::{BTreeMap, HashMap, HashSet},
14 path::PathBuf,
15 sync::{Arc, RwLock},
16 time::{Duration, Instant},
17};
18
19use crate::endpoint_manager::{AddressSource, EndpointId, EndpointManager};
20use store::{load_stored_peers, save_stored_peers};
21use sui_config::p2p::{AccessType, DiscoveryConfig, P2pConfig};
22use sui_types::crypto::{NetworkKeyPair, NetworkPublicKey, Signer, ToFromBytes, VerifyingKey};
23use sui_types::digests::Digest;
24use sui_types::message_envelope::{Envelope, Message, VerifiedEnvelope};
25use sui_types::multiaddr::Multiaddr;
26use tap::{Pipe, TapFallible};
27use tokio::sync::broadcast::error::RecvError;
28use tokio::sync::mpsc;
29use tokio::{
30 sync::oneshot,
31 task::{AbortHandle, JoinSet},
32};
33use tracing::{debug, info, trace};
34
35const TIMEOUT: Duration = Duration::from_secs(1);
36const ONE_DAY_MILLISECONDS: u64 = 24 * 60 * 60 * 1_000;
37const MAX_ADDRESS_LENGTH: usize = 300;
38const MAX_PEERS_TO_SEND: usize = 200;
39const MAX_ADDRESSES_PER_PEER: usize = 2;
40
41mod generated {
42 include!(concat!(env!("OUT_DIR"), "/sui.Discovery.rs"));
43}
44mod builder;
45mod metrics;
46mod server;
47mod store;
48#[cfg(test)]
49mod tests;
50
51pub use builder::{Builder, UnstartedDiscovery};
52pub use generated::{
53 discovery_client::DiscoveryClient,
54 discovery_server::{Discovery, DiscoveryServer},
55};
56pub use server::{GetKnownPeersRequestV3, GetKnownPeersResponseV2, GetKnownPeersResponseV3};
57
58#[derive(Debug)]
60pub enum DiscoveryMessage {
61 PeerAddressChange {
63 peer_id: PeerId,
64 source: AddressSource,
65 addresses: Vec<anemo::types::Address>,
66 },
67 ReceivedNodeInfo {
69 peer_info: Box<SignedVersionedNodeInfo>,
70 },
71 TrustedPeersUpdated,
74 PeerFailureReport { peer_id: PeerId },
76}
77
78#[derive(Clone, Debug)]
81pub struct Handle {
82 pub(super) _shutdown_handle: Arc<oneshot::Sender<()>>,
83 pub(super) sender: Sender,
84}
85
86impl Handle {
87 pub fn sender(&self) -> Sender {
88 self.sender.clone()
89 }
90}
91
92#[derive(Clone, Debug)]
95pub struct Sender {
96 pub(super) sender: mpsc::Sender<DiscoveryMessage>,
97}
98
99impl Sender {
100 pub fn peer_address_change(
101 &self,
102 peer_id: PeerId,
103 source: AddressSource,
104 addresses: Vec<anemo::types::Address>,
105 ) {
106 self.sender
107 .try_send(DiscoveryMessage::PeerAddressChange {
108 peer_id,
109 source,
110 addresses,
111 })
112 .expect("Discovery mailbox should not overflow or be closed")
113 }
114
115 pub fn report_peer_failure(&self, peer_id: PeerId) {
116 let _ = self
117 .sender
118 .try_send(DiscoveryMessage::PeerFailureReport { peer_id });
119 }
120}
121
122use self::metrics::Metrics;
123
124struct State {
126 our_info: Option<SignedNodeInfo>,
127 our_info_v2: Option<SignedVersionedNodeInfo>,
128 connected_peers: HashMap<PeerId, ()>,
129 known_peers: HashMap<PeerId, VerifiedSignedNodeInfo>,
130 known_peers_v2: HashMap<PeerId, VerifiedSignedVersionedNodeInfo>,
131 peer_addresses: HashMap<PeerId, BTreeMap<AddressSource, Vec<anemo::types::Address>>>,
132}
133
134#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
139pub struct NodeInfo {
140 pub peer_id: PeerId,
141 pub addresses: Vec<Multiaddr>,
142
143 pub timestamp_ms: u64,
147
148 pub access_type: AccessType,
150}
151
152impl NodeInfo {
153 fn sign(self, keypair: &NetworkKeyPair) -> SignedNodeInfo {
154 let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
155 let sig = keypair.sign(&msg);
156 SignedNodeInfo::new_from_data_and_sig(self, sig)
157 }
158}
159
160pub type SignedNodeInfo = Envelope<NodeInfo, Ed25519Signature>;
161
162pub type VerifiedSignedNodeInfo = VerifiedEnvelope<NodeInfo, Ed25519Signature>;
163
164#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
165pub struct NodeInfoDigest(Digest);
166
167impl NodeInfoDigest {
168 pub const fn new(digest: [u8; 32]) -> Self {
169 Self(Digest::new(digest))
170 }
171}
172
173impl Message for NodeInfo {
174 type DigestType = NodeInfoDigest;
175 const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
176
177 fn digest(&self) -> Self::DigestType {
178 unreachable!("NodeInfoDigest is not used today")
179 }
180}
181
182#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
185pub struct NodeInfoV2 {
186 pub addresses: BTreeMap<EndpointId, Vec<Multiaddr>>,
187 pub timestamp_ms: u64,
188 pub access_type: AccessType,
189}
190
191impl NodeInfoV2 {
192 pub fn peer_id(&self) -> Option<PeerId> {
195 self.addresses.keys().find_map(|k| match k {
196 EndpointId::P2p(peer_id) => Some(*peer_id),
197 EndpointId::Consensus(_) => None,
198 })
199 }
200
201 pub fn p2p_addresses(&self) -> &[Multiaddr] {
202 self.addresses
203 .iter()
204 .find_map(|(k, v)| match k {
205 EndpointId::P2p(_) => Some(v.as_slice()),
206 EndpointId::Consensus(_) => None,
207 })
208 .unwrap_or(&[])
209 }
210}
211
212#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
214pub enum VersionedNodeInfo {
215 V1(NodeInfo),
216 V2(NodeInfoV2),
217}
218
219impl VersionedNodeInfo {
220 pub fn peer_id(&self) -> Option<PeerId> {
221 match self {
222 VersionedNodeInfo::V1(info) => Some(info.peer_id),
223 VersionedNodeInfo::V2(info) => info.peer_id(),
224 }
225 }
226
227 pub fn timestamp_ms(&self) -> u64 {
228 match self {
229 VersionedNodeInfo::V1(info) => info.timestamp_ms,
230 VersionedNodeInfo::V2(info) => info.timestamp_ms,
231 }
232 }
233
234 pub fn access_type(&self) -> AccessType {
235 match self {
236 VersionedNodeInfo::V1(info) => info.access_type,
237 VersionedNodeInfo::V2(info) => info.access_type,
238 }
239 }
240
241 pub fn p2p_addresses(&self) -> &[Multiaddr] {
242 match self {
243 VersionedNodeInfo::V1(info) => &info.addresses,
244 VersionedNodeInfo::V2(info) => info.p2p_addresses(),
245 }
246 }
247
248 pub fn sign(self, keypair: &NetworkKeyPair) -> SignedVersionedNodeInfo {
249 let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
250 let sig = keypair.sign(&msg);
251 SignedVersionedNodeInfo::new_from_data_and_sig(self, sig)
252 }
253}
254
255#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
256pub struct VersionedNodeInfoDigest(Digest);
257
258impl Message for VersionedNodeInfo {
259 type DigestType = VersionedNodeInfoDigest;
260 const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
261
262 fn digest(&self) -> Self::DigestType {
263 unreachable!("VersionedNodeInfoDigest is not used today")
264 }
265}
266
267pub type SignedVersionedNodeInfo = Envelope<VersionedNodeInfo, Ed25519Signature>;
268pub type VerifiedSignedVersionedNodeInfo = VerifiedEnvelope<VersionedNodeInfo, Ed25519Signature>;
269
270fn verify_versioned_node_info(
280 peer_info: &SignedVersionedNodeInfo,
281) -> Result<VerifiedSignedVersionedNodeInfo, &'static str> {
282 let peer_id = peer_info.peer_id().ok_or("missing P2P peer_id")?;
283
284 let public_key =
285 Ed25519PublicKey::from_bytes(&peer_id.0).map_err(|_| "invalid peer_id public key")?;
286
287 let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
288 public_key
289 .verify(&msg, peer_info.auth_sig())
290 .map_err(|_| "signature verification failed")?;
291
292 match peer_info.data() {
293 VersionedNodeInfo::V1(info) => {
294 if info.addresses.len() > MAX_ADDRESSES_PER_PEER {
295 return Err("too many addresses");
296 }
297 if !info
298 .addresses
299 .iter()
300 .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
301 {
302 return Err("invalid address");
303 }
304 }
305 VersionedNodeInfo::V2(info_v2) => {
306 let mut seen_variants = Vec::new();
308 for endpoint_id in info_v2.addresses.keys() {
309 let variant = std::mem::discriminant(endpoint_id);
310 if seen_variants.contains(&variant) {
311 return Err("duplicate endpoint variant");
312 }
313 seen_variants.push(variant);
314 }
315
316 for (endpoint_id, addrs) in &info_v2.addresses {
317 if addrs.len() > MAX_ADDRESSES_PER_PEER {
318 return Err("too many addresses for endpoint");
319 }
320 if !addrs.iter().all(|addr| addr.len() < MAX_ADDRESS_LENGTH) {
321 return Err("address too long");
322 }
323 if matches!(endpoint_id, EndpointId::P2p(_))
324 && !addrs.iter().all(|addr| addr.to_anemo_address().is_ok())
325 {
326 return Err("invalid P2P address");
327 }
328 }
329
330 let identities_valid = info_v2.addresses.keys().all(|eid| match eid {
331 EndpointId::P2p(_) => true,
332 EndpointId::Consensus(pubkey) => pubkey.as_bytes() == peer_id.0,
333 });
334 if !identities_valid {
335 return Err("non-P2P endpoint identity mismatch");
336 }
337 }
338 }
339
340 Ok(VerifiedSignedVersionedNodeInfo::new_from_verified(
341 peer_info.clone(),
342 ))
343}
344
345struct DiscoveryEventLoop {
346 config: P2pConfig,
347 discovery_config: Arc<DiscoveryConfig>,
348 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
349 chain_peers: Arc<RwLock<HashSet<PeerId>>>,
350 unidentified_seed_peers: Vec<anemo::types::Address>,
351 network: Network,
352 keypair: NetworkKeyPair,
353 tasks: JoinSet<()>,
354 pending_dials: HashMap<PeerId, AbortHandle>,
355 dial_seed_peers_task: Option<AbortHandle>,
356 shutdown_handle: oneshot::Receiver<()>,
357 state: Arc<RwLock<State>>,
358 mailbox: mpsc::Receiver<DiscoveryMessage>,
359 mailbox_tx: mpsc::Sender<DiscoveryMessage>,
360 metrics: Metrics,
361 consensus_external_address: Option<Multiaddr>,
362 endpoint_manager: EndpointManager,
363 store_path: Option<PathBuf>,
364 peer_cooldowns: HashMap<PeerId, Instant>,
365}
366
367impl DiscoveryEventLoop {
368 pub async fn start(mut self) {
369 info!("Discovery started");
370
371 self.construct_our_info();
372 self.configure_preferred_peers();
373 self.load_stored_peers_on_startup();
374
375 let mut interval = tokio::time::interval(self.discovery_config.interval_period());
376 let mut peer_events = {
377 let (subscriber, _peers) = self.network.subscribe().unwrap();
378 subscriber
379 };
380
381 loop {
382 tokio::select! {
383 now = interval.tick() => {
384 let now_unix = now_unix();
385 self.handle_tick(now.into_std(), now_unix);
386 }
387 peer_event = peer_events.recv() => {
388 self.handle_peer_event(peer_event);
389 },
390 Some(message) = self.mailbox.recv() => {
391 self.handle_message(message);
392 }
393 Some(task_result) = self.tasks.join_next() => {
394 match task_result {
395 Ok(()) => {},
396 Err(e) => {
397 if e.is_cancelled() {
398 } else if e.is_panic() {
400 std::panic::resume_unwind(e.into_panic());
402 } else {
403 panic!("task failed: {e}");
404 }
405 },
406 };
407 },
408 _ = &mut self.shutdown_handle => {
410 break;
411 }
412 }
413 }
414
415 self.save_stored_peers();
416 info!("Discovery ended");
417 }
418
419 fn handle_message(&mut self, message: DiscoveryMessage) {
420 match message {
421 DiscoveryMessage::PeerAddressChange {
422 peer_id,
423 source,
424 addresses,
425 } => {
426 self.handle_peer_address_change(peer_id, source, addresses);
427 }
428 DiscoveryMessage::ReceivedNodeInfo { peer_info } => {
429 let changed = update_known_peers_versioned(
430 self.state.clone(),
431 self.metrics.clone(),
432 vec![*peer_info],
433 self.configured_peers.clone(),
434 &self.chain_peers,
435 &self.endpoint_manager,
436 );
437 if changed {
438 self.save_stored_peers();
439 }
440 }
441 DiscoveryMessage::TrustedPeersUpdated => {
442 self.save_stored_peers();
443 }
444 DiscoveryMessage::PeerFailureReport { peer_id } => {
445 self.handle_peer_failure_report(peer_id);
446 }
447 }
448 }
449
450 fn handle_peer_failure_report(&mut self, peer_id: PeerId) {
451 if self.is_trusted_peer(&peer_id) {
452 info!(?peer_id, "ignoring failure report for trusted peer");
453 return;
454 }
455 let min_peers = self.discovery_config.min_peers_for_disconnect();
456 let connected_count = self.state.read().unwrap().connected_peers.len();
457 if connected_count < min_peers {
458 info!(
459 ?peer_id,
460 connected_count, min_peers, "skipping disconnect, too few connected peers"
461 );
462 return;
463 }
464 info!(
465 ?peer_id,
466 "peer failure reported, disconnecting and adding cooldown"
467 );
468 let _ = self.network.disconnect(peer_id);
469 self.peer_cooldowns.insert(peer_id, Instant::now());
470 }
471
472 fn construct_our_info(&mut self) {
473 if self.state.read().unwrap().our_info.is_some() {
474 return;
475 }
476
477 let peer_id = self.network.peer_id();
478 let timestamp_ms = now_unix();
479 let access_type = self.discovery_config.access_type();
480
481 let addresses: Vec<Multiaddr> = self
482 .config
483 .external_address
484 .clone()
485 .and_then(|addr| addr.to_anemo_address().ok().map(|_| addr))
486 .into_iter()
487 .collect();
488
489 let our_info = NodeInfo {
490 peer_id,
491 addresses: addresses.clone(),
492 timestamp_ms,
493 access_type,
494 }
495 .sign(&self.keypair);
496
497 let mut addresses_map = BTreeMap::new();
498 addresses_map.insert(EndpointId::P2p(peer_id), addresses);
499 if let Some(consensus_addr) = &self.consensus_external_address {
500 let network_pubkey =
504 NetworkPublicKey::from_bytes(&peer_id.0).expect("PeerId is a valid public key");
505 addresses_map.insert(
506 EndpointId::Consensus(network_pubkey),
507 vec![consensus_addr.clone()],
508 );
509 }
510 let our_info_v2 = VersionedNodeInfo::V2(NodeInfoV2 {
511 addresses: addresses_map,
512 timestamp_ms,
513 access_type,
514 })
515 .sign(&self.keypair);
516
517 let mut state = self.state.write().unwrap();
518 state.our_info = Some(our_info);
519 state.our_info_v2 = Some(our_info_v2);
520 }
521
522 fn configure_preferred_peers(&mut self) {
523 let peers: Vec<_> = self.configured_peers.values().cloned().collect();
524 for peer_info in peers {
525 debug!(?peer_info, "Add configured preferred peer");
526 match peer_info.affinity {
527 PeerAffinity::High => {
528 self.handle_peer_address_change(
529 peer_info.peer_id,
530 AddressSource::Seed,
531 peer_info.address,
532 );
533 }
534 _ => {
535 self.network.known_peers().insert(peer_info);
538 }
539 }
540 }
541 }
542
543 fn update_our_info_timestamp(&mut self, now_unix: u64) {
544 let state = &mut self.state.write().unwrap();
545
546 if let Some(our_info) = &state.our_info {
547 let mut data = our_info.data().clone();
548 data.timestamp_ms = now_unix;
549 state.our_info = Some(data.sign(&self.keypair));
550 }
551
552 if let Some(our_info_v2) = &state.our_info_v2 {
553 let mut data = our_info_v2.data().clone();
554 match &mut data {
555 VersionedNodeInfo::V1(info) => info.timestamp_ms = now_unix,
556 VersionedNodeInfo::V2(info) => info.timestamp_ms = now_unix,
557 }
558 state.our_info_v2 = Some(data.sign(&self.keypair));
559 }
560 }
561
562 fn handle_peer_address_change(
563 &mut self,
564 peer_id: PeerId,
565 source: AddressSource,
566 addresses: Vec<anemo::types::Address>,
567 ) {
568 debug!(
569 ?peer_id,
570 ?source,
571 ?addresses,
572 "Received peer address change"
573 );
574
575 if source == AddressSource::Chain {
577 if addresses.is_empty() {
578 self.chain_peers.write().unwrap().remove(&peer_id);
579 } else {
580 self.chain_peers.write().unwrap().insert(peer_id);
581 }
582 }
583
584 {
586 let mut state = self.state.write().unwrap();
587 let source_map = state.peer_addresses.entry(peer_id).or_default();
588
589 if addresses.is_empty() {
590 source_map.remove(&source);
591 if source_map.is_empty() {
592 state.peer_addresses.remove(&peer_id);
593 }
594 } else {
595 source_map.insert(source, addresses);
596 }
597
598 if source == AddressSource::Chain
604 && !state
605 .peer_addresses
606 .get(&peer_id)
607 .is_some_and(|s| s.contains_key(&AddressSource::Discovery))
608 && let Some(addrs) = state
609 .known_peers_v2
610 .get(&peer_id)
611 .and_then(|info| match info.data() {
612 VersionedNodeInfo::V2(v2) => Some(v2.p2p_addresses()),
613 _ => None,
614 })
615 {
616 let anemo_addrs: Vec<_> = addrs
617 .iter()
618 .filter_map(|a| a.to_anemo_address().ok())
619 .collect();
620 if !anemo_addrs.is_empty() {
621 state
622 .peer_addresses
623 .entry(peer_id)
624 .or_default()
625 .insert(AddressSource::Discovery, anemo_addrs);
626 }
627 }
628 }
629
630 self.reconfigure_peer_addresses(peer_id);
632 }
633
634 fn reconfigure_peer_addresses(&mut self, peer_id: PeerId) {
637 let priority_addresses = self
638 .state
639 .read()
640 .unwrap()
641 .peer_addresses
642 .get(&peer_id)
643 .and_then(|sources| sources.first_key_value().map(|(_, addrs)| addrs.clone()))
644 .unwrap_or_default();
645 let current_addresses = self
646 .network
647 .known_peers()
648 .get(&peer_id)
649 .map(|info| info.address.clone())
650 .unwrap_or_default();
651 if priority_addresses != current_addresses {
652 let new_peer_info = PeerInfo {
653 peer_id,
654 affinity: PeerAffinity::High,
655 address: priority_addresses.clone(),
656 };
657
658 self.network.known_peers().insert(new_peer_info);
659
660 if !current_addresses.is_empty() {
662 let _ = self.network.disconnect(peer_id);
663
664 if let Some(address) = priority_addresses.first().cloned() {
665 let network = self.network.clone();
666 self.tasks.spawn(async move {
667 let _ = network.connect_with_peer_id(address, peer_id).await;
669 });
670 }
671 }
672 }
673 }
674
675 fn load_stored_peers_on_startup(&mut self) {
676 let Some(path) = &self.store_path else {
677 return;
678 };
679 let entries = load_stored_peers(path);
680 if entries.is_empty() {
681 return;
682 }
683 info!(
684 count = entries.len(),
685 "Loaded stored peer addresses from {}",
686 path.display()
687 );
688
689 update_known_peers_versioned(
690 self.state.clone(),
691 self.metrics.clone(),
692 entries,
693 self.configured_peers.clone(),
694 &self.chain_peers,
695 &self.endpoint_manager,
696 );
697
698 while let Ok(msg) = self.mailbox.try_recv() {
701 self.handle_message(msg);
702 }
703 }
704
705 fn save_stored_peers(&self) {
706 let Some(path) = &self.store_path else {
707 return;
708 };
709 let state = self.state.read().unwrap();
710 let peers_to_save: Vec<SignedVersionedNodeInfo> = state
711 .known_peers_v2
712 .iter()
713 .filter(|(pid, _)| self.is_trusted_peer(pid))
714 .map(|(_, verified)| verified.inner().clone())
715 .collect();
716 drop(state);
717 save_stored_peers(path, &peers_to_save);
718 }
719
720 fn handle_peer_event(&mut self, peer_event: Result<PeerEvent, RecvError>) {
721 match peer_event {
722 Ok(PeerEvent::NewPeer(peer_id)) => {
723 if let Some(peer) = self.network.peer(peer_id) {
724 self.state
725 .write()
726 .unwrap()
727 .connected_peers
728 .insert(peer_id, ());
729
730 self.tasks.spawn(query_peer_for_their_known_peers(
732 peer,
733 self.discovery_config.clone(),
734 self.state.clone(),
735 self.metrics.clone(),
736 self.configured_peers.clone(),
737 self.chain_peers.clone(),
738 self.endpoint_manager.clone(),
739 self.mailbox_sender(),
740 ));
741 }
742 }
743 Ok(PeerEvent::LostPeer(peer_id, _)) => {
744 self.state.write().unwrap().connected_peers.remove(&peer_id);
745 }
746
747 Err(RecvError::Closed) => {
748 panic!("PeerEvent channel shouldn't be able to be closed");
749 }
750
751 Err(RecvError::Lagged(_)) => {
752 trace!("State-Sync fell behind processing PeerEvents");
753 }
754 }
755 }
756
757 fn handle_tick(&mut self, _now: std::time::Instant, now_unix: u64) {
758 self.update_our_info_timestamp(now_unix);
759
760 self.tasks
761 .spawn(query_connected_peers_for_their_known_peers(
762 self.network.clone(),
763 self.discovery_config.clone(),
764 self.state.clone(),
765 self.metrics.clone(),
766 self.configured_peers.clone(),
767 self.chain_peers.clone(),
768 self.endpoint_manager.clone(),
769 self.mailbox_sender(),
770 ));
771
772 let mut culled_trusted_peers = Vec::new();
774 {
775 let mut state = self.state.write().unwrap();
776 state
777 .known_peers
778 .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms) < ONE_DAY_MILLISECONDS);
779 state.known_peers_v2.retain(|k, v| {
780 let keep = now_unix.saturating_sub(v.timestamp_ms()) < ONE_DAY_MILLISECONDS;
781 if !keep && self.is_trusted_peer(k) {
782 culled_trusted_peers.push(*k);
783 }
784 keep
785 });
786 }
787
788 for peer_id in culled_trusted_peers {
791 self.endpoint_manager
792 .clear_source(peer_id, AddressSource::Discovery);
793 }
794
795 self.pending_dials.retain(|_k, v| !v.is_finished());
797 if let Some(abort_handle) = &self.dial_seed_peers_task
798 && abort_handle.is_finished()
799 {
800 self.dial_seed_peers_task = None;
801 }
802
803 let cooldown = self.discovery_config.peer_failure_cooldown();
804 self.peer_cooldowns
805 .retain(|_, since| since.elapsed() < cooldown);
806
807 let state = self.state.read().unwrap();
809 let our_peer_id = self.network.peer_id();
810
811 let mut preferred: HashMap<PeerId, NodeInfo> = HashMap::new();
815 let mut cooldown_peers: HashMap<PeerId, NodeInfo> = HashMap::new();
816
817 for (peer_id, info) in state.known_peers.iter() {
818 if *peer_id != our_peer_id
819 && !info.addresses.is_empty()
820 && !state.connected_peers.contains_key(peer_id)
821 && !self.pending_dials.contains_key(peer_id)
822 && !state.peer_addresses.contains_key(peer_id)
823 {
824 if self.peer_cooldowns.contains_key(peer_id) {
825 cooldown_peers.insert(*peer_id, info.data().clone());
826 } else {
827 preferred.insert(*peer_id, info.data().clone());
828 }
829 }
830 }
831 for (peer_id, info) in state.known_peers_v2.iter() {
832 let p2p_addresses = info.p2p_addresses();
833 if *peer_id != our_peer_id
834 && !p2p_addresses.is_empty()
835 && !state.connected_peers.contains_key(peer_id)
836 && !self.pending_dials.contains_key(peer_id)
837 && !state.peer_addresses.contains_key(peer_id)
838 {
839 let node_info = NodeInfo {
840 peer_id: *peer_id,
841 addresses: p2p_addresses.to_vec(),
842 timestamp_ms: info.timestamp_ms(),
843 access_type: info.access_type(),
844 };
845 if self.peer_cooldowns.contains_key(peer_id) {
846 if cooldown_peers
847 .get(peer_id)
848 .is_none_or(|existing| info.timestamp_ms() > existing.timestamp_ms)
849 {
850 cooldown_peers.insert(*peer_id, node_info);
851 }
852 } else if preferred
853 .get(peer_id)
854 .is_none_or(|existing| info.timestamp_ms() > existing.timestamp_ms)
855 {
856 preferred.insert(*peer_id, node_info);
857 }
858 }
859 }
860
861 let number_of_connections = state.connected_peers.len();
862 let number_to_dial = self
863 .discovery_config
864 .target_concurrent_connections()
865 .saturating_sub(number_of_connections);
866
867 use rand::seq::IteratorRandom;
868 let mut rng = rand::thread_rng();
869
870 let mut to_dial: Vec<_> = preferred
871 .into_iter()
872 .choose_multiple(&mut rng, number_to_dial);
873
874 let remaining = number_to_dial.saturating_sub(to_dial.len());
875 to_dial.extend(
876 cooldown_peers
877 .into_iter()
878 .choose_multiple(&mut rng, remaining),
879 );
880
881 for (peer_id, info) in to_dial {
882 let abort_handle = self
883 .tasks
884 .spawn(try_to_connect_to_peer(self.network.clone(), info));
885 self.pending_dials.insert(peer_id, abort_handle);
886 }
887
888 let has_peers_to_dial = || {
891 self.configured_peers
892 .values()
893 .any(|p| p.affinity == PeerAffinity::High)
894 || !self.unidentified_seed_peers.is_empty()
895 };
896 if self.dial_seed_peers_task.is_none()
897 && state.connected_peers.is_empty()
898 && self.pending_dials.is_empty()
899 && has_peers_to_dial()
900 {
901 let abort_handle = self.tasks.spawn(try_to_connect_to_seed_peers(
902 self.network.clone(),
903 self.discovery_config.clone(),
904 self.configured_peers.clone(),
905 self.unidentified_seed_peers.clone(),
906 ));
907
908 self.dial_seed_peers_task = Some(abort_handle);
909 }
910 }
911
912 fn is_trusted_peer(&self, peer_id: &PeerId) -> bool {
913 is_trusted_peer(peer_id, &self.configured_peers, &self.chain_peers)
914 }
915
916 fn mailbox_sender(&self) -> mpsc::Sender<DiscoveryMessage> {
917 self.mailbox_tx.clone()
918 }
919}
920
921async fn try_to_connect_to_peer(network: Network, info: NodeInfo) {
922 debug!("Connecting to peer {info:?}");
923 for multiaddr in &info.addresses {
924 if let Ok(address) = multiaddr.to_anemo_address() {
925 if network
927 .connect_with_peer_id(address, info.peer_id)
928 .await
929 .tap_err(|e| {
930 debug!(
931 "error dialing {} at address '{}': {e}",
932 info.peer_id.short_display(4),
933 multiaddr
934 )
935 })
936 .is_ok()
937 {
938 return;
939 }
940 }
941 }
942}
943
944async fn try_to_connect_to_seed_peers(
945 network: Network,
946 config: Arc<DiscoveryConfig>,
947 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
948 unidentified_seed_peers: Vec<anemo::types::Address>,
949) {
950 let high_affinity_peers: Vec<_> = configured_peers
951 .values()
952 .filter(|p| p.affinity == PeerAffinity::High)
953 .cloned()
954 .collect();
955 debug!(
956 ?high_affinity_peers,
957 ?unidentified_seed_peers,
958 "Connecting to seed peers"
959 );
960 let network = &network;
961
962 let with_peer_id = high_affinity_peers.into_iter().flat_map(|peer_info| {
964 peer_info
965 .address
966 .into_iter()
967 .map(move |addr| (Some(peer_info.peer_id), addr))
968 });
969 let without_peer_id = unidentified_seed_peers.into_iter().map(|addr| (None, addr));
970 futures::stream::iter(with_peer_id.chain(without_peer_id))
971 .for_each_concurrent(
972 config.target_concurrent_connections(),
973 |(peer_id, address)| async move {
974 let _ = if let Some(peer_id) = peer_id {
976 network
977 .connect_with_peer_id(address.clone(), peer_id)
978 .await
979 .tap_err(|e| {
980 debug!(
981 "error dialing peer {} at '{}': {e}",
982 peer_id.short_display(4),
983 address
984 )
985 })
986 } else {
987 network
988 .connect(address.clone())
989 .await
990 .tap_err(|e| debug!("error dialing address '{}': {e}", address))
991 };
992 },
993 )
994 .await;
995}
996
997async fn query_peer_for_known_peers_v2(peer: Peer) -> Option<Vec<SignedNodeInfo>> {
998 let mut client = DiscoveryClient::new(peer);
999 let request = Request::new(()).with_timeout(TIMEOUT);
1000 client
1001 .get_known_peers_v2(request)
1002 .await
1003 .ok()
1004 .map(Response::into_inner)
1005 .map(
1006 |GetKnownPeersResponseV2 {
1007 own_info,
1008 mut known_peers,
1009 }| {
1010 if !own_info.addresses.is_empty() {
1011 known_peers.push(own_info)
1012 }
1013 known_peers
1014 },
1015 )
1016}
1017
1018async fn query_peer_for_their_known_peers(
1019 peer: Peer,
1020 discovery_config: Arc<DiscoveryConfig>,
1021 state: Arc<RwLock<State>>,
1022 metrics: Metrics,
1023 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1024 chain_peers: Arc<RwLock<HashSet<PeerId>>>,
1025 endpoint_manager: EndpointManager,
1026 mailbox_tx: mpsc::Sender<DiscoveryMessage>,
1027) {
1028 if discovery_config.use_get_known_peers_v3() {
1030 let our_info_v2 = state.read().unwrap().our_info_v2.clone();
1031 if let Some(own_info) = our_info_v2 {
1032 let peer_for_v3 = peer.clone();
1033 let v3_query = async move {
1034 let mut client = DiscoveryClient::new(peer_for_v3);
1035 let request =
1036 Request::new(GetKnownPeersRequestV3 { own_info }).with_timeout(TIMEOUT);
1037 client
1038 .get_known_peers_v3(request)
1039 .await
1040 .ok()
1041 .map(Response::into_inner)
1042 .map(
1043 |GetKnownPeersResponseV3 {
1044 own_info,
1045 mut known_peers,
1046 }| {
1047 if !own_info.p2p_addresses().is_empty() {
1048 known_peers.push(own_info)
1049 }
1050 known_peers
1051 },
1052 )
1053 };
1054
1055 let (found_peers_v2, found_peers_v3) =
1056 tokio::join!(query_peer_for_known_peers_v2(peer), v3_query);
1057
1058 if let Some(found_peers) = found_peers_v2 {
1059 update_known_peers(
1060 state.clone(),
1061 metrics.clone(),
1062 found_peers,
1063 configured_peers.clone(),
1064 &chain_peers,
1065 );
1066 }
1067 if let Some(found_peers) = found_peers_v3 {
1068 let changed = update_known_peers_versioned(
1069 state,
1070 metrics,
1071 found_peers,
1072 configured_peers,
1073 &chain_peers,
1074 &endpoint_manager,
1075 );
1076 if changed {
1077 let _ = mailbox_tx.try_send(DiscoveryMessage::TrustedPeersUpdated);
1078 }
1079 }
1080 return;
1081 }
1082 }
1083
1084 if let Some(found_peers) = query_peer_for_known_peers_v2(peer).await {
1086 update_known_peers(state, metrics, found_peers, configured_peers, &chain_peers);
1087 }
1088}
1089
1090async fn query_connected_peers_for_their_known_peers(
1091 network: Network,
1092 config: Arc<DiscoveryConfig>,
1093 state: Arc<RwLock<State>>,
1094 metrics: Metrics,
1095 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1096 chain_peers: Arc<RwLock<HashSet<PeerId>>>,
1097 endpoint_manager: EndpointManager,
1098 mailbox_tx: mpsc::Sender<DiscoveryMessage>,
1099) {
1100 use rand::seq::IteratorRandom;
1101
1102 let peers_to_query: Vec<_> = network
1103 .peers()
1104 .into_iter()
1105 .flat_map(|id| network.peer(id))
1106 .choose_multiple(&mut rand::thread_rng(), config.peers_to_query());
1107
1108 let v2_query = {
1110 let peers = peers_to_query.clone();
1111 let peers_to_query_count = config.peers_to_query();
1112 async move {
1113 peers
1114 .into_iter()
1115 .map(DiscoveryClient::new)
1116 .map(|mut client| async move {
1117 let request = Request::new(()).with_timeout(TIMEOUT);
1118 client
1119 .get_known_peers_v2(request)
1120 .await
1121 .ok()
1122 .map(Response::into_inner)
1123 .map(
1124 |GetKnownPeersResponseV2 {
1125 own_info,
1126 mut known_peers,
1127 }| {
1128 if !own_info.addresses.is_empty() {
1129 known_peers.push(own_info)
1130 }
1131 known_peers
1132 },
1133 )
1134 })
1135 .pipe(futures::stream::iter)
1136 .buffer_unordered(peers_to_query_count)
1137 .filter_map(std::future::ready)
1138 .flat_map(futures::stream::iter)
1139 .collect::<Vec<_>>()
1140 .await
1141 }
1142 };
1143
1144 if config.use_get_known_peers_v3() {
1146 let our_info_v2 = state.read().unwrap().our_info_v2.clone();
1147 if let Some(own_info) = our_info_v2 {
1148 let v3_query = {
1149 let peers_to_query_count = config.peers_to_query();
1150 async move {
1151 peers_to_query
1152 .into_iter()
1153 .map(DiscoveryClient::new)
1154 .map(|mut client| {
1155 let own_info = own_info.clone();
1156 async move {
1157 let request = Request::new(GetKnownPeersRequestV3 { own_info })
1158 .with_timeout(TIMEOUT);
1159 client
1160 .get_known_peers_v3(request)
1161 .await
1162 .ok()
1163 .map(Response::into_inner)
1164 .map(
1165 |GetKnownPeersResponseV3 {
1166 own_info,
1167 mut known_peers,
1168 }| {
1169 if !own_info.p2p_addresses().is_empty() {
1170 known_peers.push(own_info)
1171 }
1172 known_peers
1173 },
1174 )
1175 }
1176 })
1177 .pipe(futures::stream::iter)
1178 .buffer_unordered(peers_to_query_count)
1179 .filter_map(std::future::ready)
1180 .flat_map(futures::stream::iter)
1181 .collect::<Vec<_>>()
1182 .await
1183 }
1184 };
1185
1186 let (found_peers_v2, found_peers_v3) = tokio::join!(v2_query, v3_query);
1187
1188 update_known_peers(
1189 state.clone(),
1190 metrics.clone(),
1191 found_peers_v2,
1192 configured_peers.clone(),
1193 &chain_peers,
1194 );
1195 let changed = update_known_peers_versioned(
1196 state,
1197 metrics,
1198 found_peers_v3,
1199 configured_peers,
1200 &chain_peers,
1201 &endpoint_manager,
1202 );
1203 if changed {
1204 let _ = mailbox_tx.try_send(DiscoveryMessage::TrustedPeersUpdated);
1205 }
1206 return;
1207 }
1208 }
1209
1210 let found_peers_v2 = v2_query.await;
1212 update_known_peers(
1213 state,
1214 metrics,
1215 found_peers_v2,
1216 configured_peers,
1217 &chain_peers,
1218 );
1219}
1220
1221fn update_known_peers(
1222 state: Arc<RwLock<State>>,
1223 metrics: Metrics,
1224 found_peers: Vec<SignedNodeInfo>,
1225 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1226 chain_peers: &Arc<RwLock<HashSet<PeerId>>>,
1227) {
1228 use std::collections::hash_map::Entry;
1229
1230 let now_unix = now_unix();
1231 let our_peer_id = state.read().unwrap().our_info.clone().unwrap().peer_id;
1232 let known_peers = &mut state.write().unwrap().known_peers;
1233 for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1235 if peer_info.timestamp_ms > now_unix.saturating_add(30 * 1_000) || now_unix.saturating_sub(peer_info.timestamp_ms) > ONE_DAY_MILLISECONDS
1240 {
1241 continue;
1242 }
1243
1244 if peer_info.peer_id == our_peer_id {
1245 continue;
1246 }
1247
1248 let is_restricted = match peer_info.access_type {
1249 AccessType::Public => false,
1250 AccessType::Private | AccessType::Trusted => true,
1251 };
1252 if is_restricted && !is_trusted_peer(&peer_info.peer_id, &configured_peers, chain_peers) {
1253 continue;
1254 }
1255
1256 if peer_info.addresses.len() > MAX_ADDRESSES_PER_PEER {
1258 continue;
1259 }
1260
1261 if !peer_info
1263 .addresses
1264 .iter()
1265 .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
1266 {
1267 continue;
1268 }
1269 let Ok(public_key) = Ed25519PublicKey::from_bytes(&peer_info.peer_id.0) else {
1270 debug_fatal!(
1271 "Failed to convert anemo PeerId {:?} to Ed25519PublicKey",
1273 peer_info.peer_id
1274 );
1275 continue;
1276 };
1277 let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
1278 if let Err(e) = public_key.verify(&msg, peer_info.auth_sig()) {
1279 info!(
1280 "Discovery failed to verify signature for NodeInfo for peer {:?}: {e:?}",
1281 peer_info.peer_id
1282 );
1283 continue;
1285 }
1286 let peer = VerifiedSignedNodeInfo::new_from_verified(peer_info);
1287
1288 match known_peers.entry(peer.peer_id) {
1289 Entry::Occupied(mut o) => {
1290 if peer.timestamp_ms > o.get().timestamp_ms {
1291 if o.get().addresses.is_empty() && !peer.addresses.is_empty() {
1292 metrics.inc_num_peers_with_external_address();
1293 }
1294 if !o.get().addresses.is_empty() && peer.addresses.is_empty() {
1295 metrics.dec_num_peers_with_external_address();
1296 }
1297 o.insert(peer);
1298 }
1299 }
1300 Entry::Vacant(v) => {
1301 if !peer.addresses.is_empty() {
1302 metrics.inc_num_peers_with_external_address();
1303 }
1304 v.insert(peer);
1305 }
1306 }
1307 }
1308}
1309
1310fn update_known_peers_versioned(
1312 state: Arc<RwLock<State>>,
1313 metrics: Metrics,
1314 found_peers: Vec<SignedVersionedNodeInfo>,
1315 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
1316 chain_peers: &Arc<RwLock<HashSet<PeerId>>>,
1317 endpoint_manager: &EndpointManager,
1318) -> bool {
1319 use std::collections::hash_map::Entry;
1320
1321 let now_unix = now_unix();
1322 let our_peer_id = state
1323 .read()
1324 .unwrap()
1325 .our_info_v2
1326 .as_ref()
1327 .and_then(|info| info.peer_id());
1328 let known_peers_v2 = &mut state.write().unwrap().known_peers_v2;
1329 let mut trusted_peer_changed = false;
1330
1331 for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
1332 let timestamp_ms = peer_info.timestamp_ms();
1333
1334 if timestamp_ms > now_unix.saturating_add(30 * 1_000)
1335 || now_unix.saturating_sub(timestamp_ms) > ONE_DAY_MILLISECONDS
1336 {
1337 continue;
1338 }
1339
1340 let Some(peer_id) = peer_info.peer_id() else {
1341 continue;
1342 };
1343
1344 if Some(peer_id) == our_peer_id {
1345 continue;
1346 }
1347
1348 let is_restricted = match peer_info.access_type() {
1349 AccessType::Public => false,
1350 AccessType::Private | AccessType::Trusted => true,
1351 };
1352 let is_trusted = is_trusted_peer(&peer_id, &configured_peers, chain_peers);
1353 if is_restricted && !is_trusted {
1354 continue;
1355 }
1356
1357 let peer = match verify_versioned_node_info(&peer_info) {
1358 Ok(verified) => verified,
1359 Err(reason) => {
1360 info!("Discovery rejecting VersionedNodeInfo for peer {peer_id:?}: {reason}");
1361 continue;
1362 }
1363 };
1364
1365 if is_trusted && let VersionedNodeInfo::V2(info_v2) = peer_info.data() {
1367 for (endpoint_id, addrs) in &info_v2.addresses {
1368 if !addrs.is_empty() {
1369 let _ = endpoint_manager.update_endpoint(
1370 endpoint_id.clone(),
1371 AddressSource::Discovery,
1372 addrs.clone(),
1373 );
1374 }
1375 }
1376 }
1377
1378 let peer_p2p_addresses = peer.p2p_addresses();
1379
1380 match known_peers_v2.entry(peer_id) {
1381 Entry::Occupied(mut o) => {
1382 if peer.timestamp_ms() > o.get().timestamp_ms() {
1383 let old_addresses = o.get().p2p_addresses();
1384 if old_addresses.is_empty() && !peer_p2p_addresses.is_empty() {
1385 metrics.inc_num_peers_with_external_address();
1386 }
1387 if !old_addresses.is_empty() && peer_p2p_addresses.is_empty() {
1388 metrics.dec_num_peers_with_external_address();
1389 }
1390 o.insert(peer);
1391 if is_trusted {
1392 trusted_peer_changed = true;
1393 }
1394 }
1395 }
1396 Entry::Vacant(v) => {
1397 if !peer_p2p_addresses.is_empty() {
1398 metrics.inc_num_peers_with_external_address();
1399 }
1400 v.insert(peer);
1401 if is_trusted {
1402 trusted_peer_changed = true;
1403 }
1404 }
1405 }
1406 }
1407
1408 trusted_peer_changed
1409}
1410
1411pub(super) fn is_trusted_peer(
1414 peer_id: &PeerId,
1415 configured_peers: &HashMap<PeerId, PeerInfo>,
1416 chain_peers: &Arc<RwLock<HashSet<PeerId>>>,
1417) -> bool {
1418 configured_peers.contains_key(peer_id) || chain_peers.read().unwrap().contains(peer_id)
1419}
1420
1421pub(super) fn now_unix() -> u64 {
1422 use std::time::{SystemTime, UNIX_EPOCH};
1423
1424 SystemTime::now()
1425 .duration_since(UNIX_EPOCH)
1426 .unwrap()
1427 .as_millis() as u64
1428}