1use anemo::types::PeerAffinity;
5use anemo::types::PeerInfo;
6use anemo::{Network, Peer, PeerId, Request, Response, types::PeerEvent};
7use fastcrypto::ed25519::{Ed25519PublicKey, Ed25519Signature};
8use futures::StreamExt;
9use mysten_common::debug_fatal;
10use serde::{Deserialize, Serialize};
11use shared_crypto::intent::IntentScope;
12use std::{
13 collections::HashMap,
14 sync::{Arc, RwLock},
15 time::Duration,
16};
17use sui_config::p2p::{AccessType, DiscoveryConfig, P2pConfig};
18use sui_types::crypto::{NetworkKeyPair, Signer, ToFromBytes, VerifyingKey};
19use sui_types::digests::Digest;
20use sui_types::message_envelope::{Envelope, Message, VerifiedEnvelope};
21use sui_types::multiaddr::Multiaddr;
22use tap::{Pipe, TapFallible};
23use tokio::sync::broadcast::error::RecvError;
24use tokio::sync::mpsc;
25use tokio::{
26 sync::oneshot,
27 task::{AbortHandle, JoinSet},
28};
29use tracing::{debug, info, trace};
30
31const TIMEOUT: Duration = Duration::from_secs(1);
32const ONE_DAY_MILLISECONDS: u64 = 24 * 60 * 60 * 1_000;
33const MAX_ADDRESS_LENGTH: usize = 300;
34const MAX_PEERS_TO_SEND: usize = 200;
35const MAX_ADDRESSES_PER_PEER: usize = 2;
36
37mod generated {
38 include!(concat!(env!("OUT_DIR"), "/sui.Discovery.rs"));
39}
40mod builder;
41mod metrics;
42mod server;
43#[cfg(test)]
44mod tests;
45
46pub use builder::{Builder, UnstartedDiscovery};
47pub use generated::{
48 discovery_client::DiscoveryClient,
49 discovery_server::{Discovery, DiscoveryServer},
50};
51pub use server::GetKnownPeersResponseV2;
52
53#[derive(Debug)]
55pub enum DiscoveryMessage {
56 PeerAddressChange(PeerInfo),
58}
59
60#[derive(Clone, Debug)]
63pub struct Handle {
64 pub(super) _shutdown_handle: Arc<oneshot::Sender<()>>,
65 sender: mpsc::Sender<DiscoveryMessage>,
66}
67
68impl Handle {
69 pub fn peer_address_change(&self, peer_info: PeerInfo) {
74 self.sender
75 .try_send(DiscoveryMessage::PeerAddressChange(peer_info))
76 .expect("Discovery mailbox should not overflow or be closed")
77 }
78}
79
80use self::metrics::Metrics;
81
82struct State {
84 our_info: Option<SignedNodeInfo>,
85 connected_peers: HashMap<PeerId, ()>,
86 known_peers: HashMap<PeerId, VerifiedSignedNodeInfo>,
87}
88
89#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
94pub struct NodeInfo {
95 pub peer_id: PeerId,
96 pub addresses: Vec<Multiaddr>,
97
98 pub timestamp_ms: u64,
102
103 pub access_type: AccessType,
105}
106
107impl NodeInfo {
108 fn sign(self, keypair: &NetworkKeyPair) -> SignedNodeInfo {
109 let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
110 let sig = keypair.sign(&msg);
111 SignedNodeInfo::new_from_data_and_sig(self, sig)
112 }
113}
114
115pub type SignedNodeInfo = Envelope<NodeInfo, Ed25519Signature>;
116
117pub type VerifiedSignedNodeInfo = VerifiedEnvelope<NodeInfo, Ed25519Signature>;
118
119#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
120pub struct NodeInfoDigest(Digest);
121
122impl NodeInfoDigest {
123 pub const fn new(digest: [u8; 32]) -> Self {
124 Self(Digest::new(digest))
125 }
126}
127
128impl Message for NodeInfo {
129 type DigestType = NodeInfoDigest;
130 const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
131
132 fn digest(&self) -> Self::DigestType {
133 unreachable!("NodeInfoDigest is not used today")
134 }
135}
136
137struct DiscoveryEventLoop {
138 config: P2pConfig,
139 discovery_config: Arc<DiscoveryConfig>,
140 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
141 unidentified_seed_peers: Vec<anemo::types::Address>,
142 network: Network,
143 keypair: NetworkKeyPair,
144 tasks: JoinSet<()>,
145 pending_dials: HashMap<PeerId, AbortHandle>,
146 dial_seed_peers_task: Option<AbortHandle>,
147 shutdown_handle: oneshot::Receiver<()>,
148 state: Arc<RwLock<State>>,
149 mailbox: mpsc::Receiver<DiscoveryMessage>,
150 metrics: Metrics,
151}
152
153impl DiscoveryEventLoop {
154 pub async fn start(mut self) {
155 info!("Discovery started");
156
157 self.construct_our_info();
158 self.configure_preferred_peers();
159
160 let mut interval = tokio::time::interval(self.discovery_config.interval_period());
161 let mut peer_events = {
162 let (subscriber, _peers) = self.network.subscribe().unwrap();
163 subscriber
164 };
165
166 loop {
167 tokio::select! {
168 now = interval.tick() => {
169 let now_unix = now_unix();
170 self.handle_tick(now.into_std(), now_unix);
171 }
172 peer_event = peer_events.recv() => {
173 self.handle_peer_event(peer_event);
174 },
175 Some(message) = self.mailbox.recv() => {
176 self.handle_message(message);
177 }
178 Some(task_result) = self.tasks.join_next() => {
179 match task_result {
180 Ok(()) => {},
181 Err(e) => {
182 if e.is_cancelled() {
183 } else if e.is_panic() {
185 std::panic::resume_unwind(e.into_panic());
187 } else {
188 panic!("task failed: {e}");
189 }
190 },
191 };
192 },
193 _ = &mut self.shutdown_handle => {
195 break;
196 }
197 }
198 }
199
200 info!("Discovery ended");
201 }
202
203 fn handle_message(&mut self, message: DiscoveryMessage) {
204 match message {
205 DiscoveryMessage::PeerAddressChange(peer_info) => {
206 self.handle_peer_address_change(peer_info);
207 }
208 }
209 }
210
211 fn construct_our_info(&mut self) {
212 if self.state.read().unwrap().our_info.is_some() {
213 return;
214 }
215
216 let address = self
217 .config
218 .external_address
219 .clone()
220 .and_then(|addr| addr.to_anemo_address().ok().map(|_| addr))
221 .into_iter()
222 .collect();
223 let our_info = NodeInfo {
224 peer_id: self.network.peer_id(),
225 addresses: address,
226 timestamp_ms: now_unix(),
227 access_type: self.discovery_config.access_type(),
228 }
229 .sign(&self.keypair);
230
231 self.state.write().unwrap().our_info = Some(our_info);
232 }
233
234 fn configure_preferred_peers(&mut self) {
235 for peer_info in self.configured_peers.values() {
236 debug!(?peer_info, "Add configured preferred peer");
237 self.network.known_peers().insert(peer_info.clone());
238 }
239 }
240
241 fn update_our_info_timestamp(&mut self, now_unix: u64) {
242 let state = &mut self.state.write().unwrap();
243 if let Some(our_info) = &state.our_info {
244 let mut data = our_info.data().clone();
245 data.timestamp_ms = now_unix;
246 state.our_info = Some(data.sign(&self.keypair));
247 }
248 }
249
250 fn handle_peer_address_change(&mut self, peer_info: PeerInfo) {
251 debug!(?peer_info, "Add committee member as preferred peer.");
252 if let Some(old_peer_info) = self.network.known_peers().insert(peer_info.clone())
253 && old_peer_info.address != peer_info.address
254 {
255 let _ = self.network.disconnect(peer_info.peer_id);
258 if let Some(address) = peer_info.address.first().cloned() {
259 let network = self.network.clone();
260 self.tasks.spawn(async move {
261 let _ = network
263 .connect_with_peer_id(address, peer_info.peer_id)
264 .await;
265 });
266 }
267 }
268 }
269
270 fn handle_peer_event(&mut self, peer_event: Result<PeerEvent, RecvError>) {
271 match peer_event {
272 Ok(PeerEvent::NewPeer(peer_id)) => {
273 if let Some(peer) = self.network.peer(peer_id) {
274 self.state
275 .write()
276 .unwrap()
277 .connected_peers
278 .insert(peer_id, ());
279
280 self.tasks.spawn(query_peer_for_their_known_peers(
282 peer,
283 self.state.clone(),
284 self.metrics.clone(),
285 self.configured_peers.clone(),
286 ));
287 }
288 }
289 Ok(PeerEvent::LostPeer(peer_id, _)) => {
290 self.state.write().unwrap().connected_peers.remove(&peer_id);
291 }
292
293 Err(RecvError::Closed) => {
294 panic!("PeerEvent channel shouldn't be able to be closed");
295 }
296
297 Err(RecvError::Lagged(_)) => {
298 trace!("State-Sync fell behind processing PeerEvents");
299 }
300 }
301 }
302
303 fn handle_tick(&mut self, _now: std::time::Instant, now_unix: u64) {
304 self.update_our_info_timestamp(now_unix);
305
306 self.tasks
307 .spawn(query_connected_peers_for_their_known_peers(
308 self.network.clone(),
309 self.discovery_config.clone(),
310 self.state.clone(),
311 self.metrics.clone(),
312 self.configured_peers.clone(),
313 ));
314
315 self.state
317 .write()
318 .unwrap()
319 .known_peers
320 .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms) < ONE_DAY_MILLISECONDS);
321
322 self.pending_dials.retain(|_k, v| !v.is_finished());
324 if let Some(abort_handle) = &self.dial_seed_peers_task
325 && abort_handle.is_finished()
326 {
327 self.dial_seed_peers_task = None;
328 }
329
330 let state = self.state.read().unwrap();
332 let eligible = state
333 .known_peers
334 .clone()
335 .into_iter()
336 .filter(|(peer_id, info)| {
337 peer_id != &self.network.peer_id() &&
338 !info.addresses.is_empty() && !state.connected_peers.contains_key(peer_id) && !self.pending_dials.contains_key(peer_id) })
342 .collect::<Vec<_>>();
343
344 let number_of_connections = state.connected_peers.len();
346 let number_to_dial = std::cmp::min(
347 eligible.len(),
348 self.discovery_config
349 .target_concurrent_connections()
350 .saturating_sub(number_of_connections),
351 );
352
353 for (peer_id, info) in rand::seq::SliceRandom::choose_multiple(
355 eligible.as_slice(),
356 &mut rand::thread_rng(),
357 number_to_dial,
358 ) {
359 let abort_handle = self.tasks.spawn(try_to_connect_to_peer(
360 self.network.clone(),
361 info.data().to_owned(),
362 ));
363 self.pending_dials.insert(*peer_id, abort_handle);
364 }
365
366 let has_peers_to_dial = || {
369 self.configured_peers
370 .values()
371 .any(|p| p.affinity == PeerAffinity::High)
372 || !self.unidentified_seed_peers.is_empty()
373 };
374 if self.dial_seed_peers_task.is_none()
375 && state.connected_peers.is_empty()
376 && self.pending_dials.is_empty()
377 && has_peers_to_dial()
378 {
379 let abort_handle = self.tasks.spawn(try_to_connect_to_seed_peers(
380 self.network.clone(),
381 self.discovery_config.clone(),
382 self.configured_peers.clone(),
383 self.unidentified_seed_peers.clone(),
384 ));
385
386 self.dial_seed_peers_task = Some(abort_handle);
387 }
388 }
389}
390
391async fn try_to_connect_to_peer(network: Network, info: NodeInfo) {
392 debug!("Connecting to peer {info:?}");
393 for multiaddr in &info.addresses {
394 if let Ok(address) = multiaddr.to_anemo_address() {
395 if network
397 .connect_with_peer_id(address, info.peer_id)
398 .await
399 .tap_err(|e| {
400 debug!(
401 "error dialing {} at address '{}': {e}",
402 info.peer_id.short_display(4),
403 multiaddr
404 )
405 })
406 .is_ok()
407 {
408 return;
409 }
410 }
411 }
412}
413
414async fn try_to_connect_to_seed_peers(
415 network: Network,
416 config: Arc<DiscoveryConfig>,
417 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
418 unidentified_seed_peers: Vec<anemo::types::Address>,
419) {
420 let high_affinity_peers: Vec<_> = configured_peers
421 .values()
422 .filter(|p| p.affinity == PeerAffinity::High)
423 .cloned()
424 .collect();
425 debug!(
426 ?high_affinity_peers,
427 ?unidentified_seed_peers,
428 "Connecting to seed peers"
429 );
430 let network = &network;
431
432 let with_peer_id = high_affinity_peers.into_iter().flat_map(|peer_info| {
434 peer_info
435 .address
436 .into_iter()
437 .map(move |addr| (Some(peer_info.peer_id), addr))
438 });
439 let without_peer_id = unidentified_seed_peers.into_iter().map(|addr| (None, addr));
440 futures::stream::iter(with_peer_id.chain(without_peer_id))
441 .for_each_concurrent(
442 config.target_concurrent_connections(),
443 |(peer_id, address)| async move {
444 let _ = if let Some(peer_id) = peer_id {
446 network
447 .connect_with_peer_id(address.clone(), peer_id)
448 .await
449 .tap_err(|e| {
450 debug!(
451 "error dialing peer {} at '{}': {e}",
452 peer_id.short_display(4),
453 address
454 )
455 })
456 } else {
457 network
458 .connect(address.clone())
459 .await
460 .tap_err(|e| debug!("error dialing address '{}': {e}", address))
461 };
462 },
463 )
464 .await;
465}
466
467async fn query_peer_for_their_known_peers(
468 peer: Peer,
469 state: Arc<RwLock<State>>,
470 metrics: Metrics,
471 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
472) {
473 let mut client = DiscoveryClient::new(peer);
474
475 let request = Request::new(()).with_timeout(TIMEOUT);
476 let found_peers = client
477 .get_known_peers_v2(request)
478 .await
479 .ok()
480 .map(Response::into_inner)
481 .map(
482 |GetKnownPeersResponseV2 {
483 own_info,
484 mut known_peers,
485 }| {
486 if !own_info.addresses.is_empty() {
487 known_peers.push(own_info)
488 }
489 known_peers
490 },
491 );
492 if let Some(found_peers) = found_peers {
493 update_known_peers(state, metrics, found_peers, configured_peers);
494 }
495}
496
497async fn query_connected_peers_for_their_known_peers(
498 network: Network,
499 config: Arc<DiscoveryConfig>,
500 state: Arc<RwLock<State>>,
501 metrics: Metrics,
502 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
503) {
504 use rand::seq::IteratorRandom;
505
506 let peers_to_query = network
507 .peers()
508 .into_iter()
509 .flat_map(|id| network.peer(id))
510 .choose_multiple(&mut rand::thread_rng(), config.peers_to_query());
511
512 let found_peers = peers_to_query
513 .into_iter()
514 .map(DiscoveryClient::new)
515 .map(|mut client| async move {
516 let request = Request::new(()).with_timeout(TIMEOUT);
517 client
518 .get_known_peers_v2(request)
519 .await
520 .ok()
521 .map(Response::into_inner)
522 .map(
523 |GetKnownPeersResponseV2 {
524 own_info,
525 mut known_peers,
526 }| {
527 if !own_info.addresses.is_empty() {
528 known_peers.push(own_info)
529 }
530 known_peers
531 },
532 )
533 })
534 .pipe(futures::stream::iter)
535 .buffer_unordered(config.peers_to_query())
536 .filter_map(std::future::ready)
537 .flat_map(futures::stream::iter)
538 .collect::<Vec<_>>()
539 .await;
540
541 update_known_peers(state, metrics, found_peers, configured_peers);
542}
543
544fn update_known_peers(
545 state: Arc<RwLock<State>>,
546 metrics: Metrics,
547 found_peers: Vec<SignedNodeInfo>,
548 configured_peers: Arc<HashMap<PeerId, PeerInfo>>,
549) {
550 use std::collections::hash_map::Entry;
551
552 let now_unix = now_unix();
553 let our_peer_id = state.read().unwrap().our_info.clone().unwrap().peer_id;
554 let known_peers = &mut state.write().unwrap().known_peers;
555 for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
557 if peer_info.timestamp_ms > now_unix.saturating_add(30 * 1_000) || now_unix.saturating_sub(peer_info.timestamp_ms) > ONE_DAY_MILLISECONDS
562 {
563 continue;
564 }
565
566 if peer_info.peer_id == our_peer_id {
567 continue;
568 }
569
570 if peer_info.access_type == AccessType::Private
572 && !configured_peers.contains_key(&peer_info.peer_id)
573 {
574 continue;
575 }
576
577 if peer_info.addresses.len() > MAX_ADDRESSES_PER_PEER {
579 continue;
580 }
581
582 if !peer_info
584 .addresses
585 .iter()
586 .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
587 {
588 continue;
589 }
590 let Ok(public_key) = Ed25519PublicKey::from_bytes(&peer_info.peer_id.0) else {
591 debug_fatal!(
592 "Failed to convert anemo PeerId {:?} to Ed25519PublicKey",
594 peer_info.peer_id
595 );
596 continue;
597 };
598 let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
599 if let Err(e) = public_key.verify(&msg, peer_info.auth_sig()) {
600 info!(
601 "Discovery failed to verify signature for NodeInfo for peer {:?}: {e:?}",
602 peer_info.peer_id
603 );
604 continue;
606 }
607 let peer = VerifiedSignedNodeInfo::new_from_verified(peer_info);
608
609 match known_peers.entry(peer.peer_id) {
610 Entry::Occupied(mut o) => {
611 if peer.timestamp_ms > o.get().timestamp_ms {
612 if o.get().addresses.is_empty() && !peer.addresses.is_empty() {
613 metrics.inc_num_peers_with_external_address();
614 }
615 if !o.get().addresses.is_empty() && peer.addresses.is_empty() {
616 metrics.dec_num_peers_with_external_address();
617 }
618 o.insert(peer);
619 }
620 }
621 Entry::Vacant(v) => {
622 if !peer.addresses.is_empty() {
623 metrics.inc_num_peers_with_external_address();
624 }
625 v.insert(peer);
626 }
627 }
628 }
629}
630
631fn now_unix() -> u64 {
632 use std::time::{SystemTime, UNIX_EPOCH};
633
634 SystemTime::now()
635 .duration_since(UNIX_EPOCH)
636 .unwrap()
637 .as_millis() as u64
638}