1use anemo::types::PeerInfo;
5use anemo::{Network, Peer, PeerId, Request, Response, types::PeerEvent};
6use fastcrypto::ed25519::{Ed25519PublicKey, Ed25519Signature};
7use futures::StreamExt;
8use mysten_common::debug_fatal;
9use serde::{Deserialize, Serialize};
10use shared_crypto::intent::IntentScope;
11use std::{
12 collections::HashMap,
13 sync::{Arc, RwLock},
14 time::Duration,
15};
16use sui_config::p2p::{AccessType, DiscoveryConfig, P2pConfig, SeedPeer};
17use sui_types::crypto::{NetworkKeyPair, Signer, ToFromBytes, VerifyingKey};
18use sui_types::digests::Digest;
19use sui_types::message_envelope::{Envelope, Message, VerifiedEnvelope};
20use sui_types::multiaddr::Multiaddr;
21use tap::{Pipe, TapFallible};
22use tokio::sync::broadcast::error::RecvError;
23use tokio::sync::watch;
24use tokio::{
25 sync::oneshot,
26 task::{AbortHandle, JoinSet},
27};
28use tracing::{debug, info, trace};
29
30const TIMEOUT: Duration = Duration::from_secs(1);
31const ONE_DAY_MILLISECONDS: u64 = 24 * 60 * 60 * 1_000;
32const MAX_ADDRESS_LENGTH: usize = 300;
33const MAX_PEERS_TO_SEND: usize = 200;
34const MAX_ADDRESSES_PER_PEER: usize = 2;
35
36mod generated {
37 include!(concat!(env!("OUT_DIR"), "/sui.Discovery.rs"));
38}
39mod builder;
40mod metrics;
41mod server;
42#[cfg(test)]
43mod tests;
44
45pub use builder::{Builder, Handle, UnstartedDiscovery};
46pub use generated::{
47 discovery_client::DiscoveryClient,
48 discovery_server::{Discovery, DiscoveryServer},
49};
50pub use server::GetKnownPeersResponseV2;
51
52use self::metrics::Metrics;
53
54struct State {
56 our_info: Option<SignedNodeInfo>,
57 connected_peers: HashMap<PeerId, ()>,
58 known_peers: HashMap<PeerId, VerifiedSignedNodeInfo>,
59}
60
61#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
66pub struct NodeInfo {
67 pub peer_id: PeerId,
68 pub addresses: Vec<Multiaddr>,
69
70 pub timestamp_ms: u64,
74
75 pub access_type: AccessType,
77}
78
79impl NodeInfo {
80 fn sign(self, keypair: &NetworkKeyPair) -> SignedNodeInfo {
81 let msg = bcs::to_bytes(&self).expect("BCS serialization should not fail");
82 let sig = keypair.sign(&msg);
83 SignedNodeInfo::new_from_data_and_sig(self, sig)
84 }
85}
86
87pub type SignedNodeInfo = Envelope<NodeInfo, Ed25519Signature>;
88
89pub type VerifiedSignedNodeInfo = VerifiedEnvelope<NodeInfo, Ed25519Signature>;
90
91#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
92pub struct NodeInfoDigest(Digest);
93
94impl NodeInfoDigest {
95 pub const fn new(digest: [u8; 32]) -> Self {
96 Self(Digest::new(digest))
97 }
98}
99
100impl Message for NodeInfo {
101 type DigestType = NodeInfoDigest;
102 const SCOPE: IntentScope = IntentScope::DiscoveryPeers;
103
104 fn digest(&self) -> Self::DigestType {
105 unreachable!("NodeInfoDigest is not used today")
106 }
107}
108
109#[derive(Clone, Debug, Default)]
110pub struct TrustedPeerChangeEvent {
111 pub new_peers: Vec<PeerInfo>,
112}
113
114struct DiscoveryEventLoop {
115 config: P2pConfig,
116 discovery_config: Arc<DiscoveryConfig>,
117 allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
118 network: Network,
119 keypair: NetworkKeyPair,
120 tasks: JoinSet<()>,
121 pending_dials: HashMap<PeerId, AbortHandle>,
122 dial_seed_peers_task: Option<AbortHandle>,
123 shutdown_handle: oneshot::Receiver<()>,
124 state: Arc<RwLock<State>>,
125 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
126 metrics: Metrics,
127}
128
129impl DiscoveryEventLoop {
130 pub async fn start(mut self) {
131 info!("Discovery started");
132
133 self.construct_our_info();
134 self.configure_preferred_peers();
135
136 let mut interval = tokio::time::interval(self.discovery_config.interval_period());
137 let mut peer_events = {
138 let (subscriber, _peers) = self.network.subscribe().unwrap();
139 subscriber
140 };
141
142 loop {
143 tokio::select! {
144 now = interval.tick() => {
145 let now_unix = now_unix();
146 self.handle_tick(now.into_std(), now_unix);
147 }
148 peer_event = peer_events.recv() => {
149 self.handle_peer_event(peer_event);
150 },
151 Ok(()) = self.trusted_peer_change_rx.changed() => {
152 let event: TrustedPeerChangeEvent = self.trusted_peer_change_rx.borrow_and_update().clone();
153 self.handle_trusted_peer_change_event(event);
154 }
155 Some(task_result) = self.tasks.join_next() => {
156 match task_result {
157 Ok(()) => {},
158 Err(e) => {
159 if e.is_cancelled() {
160 } else if e.is_panic() {
162 std::panic::resume_unwind(e.into_panic());
164 } else {
165 panic!("task failed: {e}");
166 }
167 },
168 };
169 },
170 _ = &mut self.shutdown_handle => {
172 break;
173 }
174 }
175 }
176
177 info!("Discovery ended");
178 }
179
180 fn construct_our_info(&mut self) {
181 if self.state.read().unwrap().our_info.is_some() {
182 return;
183 }
184
185 let address = self
186 .config
187 .external_address
188 .clone()
189 .and_then(|addr| addr.to_anemo_address().ok().map(|_| addr))
190 .into_iter()
191 .collect();
192 let our_info = NodeInfo {
193 peer_id: self.network.peer_id(),
194 addresses: address,
195 timestamp_ms: now_unix(),
196 access_type: self.discovery_config.access_type(),
197 }
198 .sign(&self.keypair);
199
200 self.state.write().unwrap().our_info = Some(our_info);
201 }
202
203 fn configure_preferred_peers(&mut self) {
204 for (peer_id, address) in self
205 .discovery_config
206 .allowlisted_peers
207 .iter()
208 .map(|sp| (sp.peer_id, sp.address.clone()))
209 .chain(self.config.seed_peers.iter().filter_map(|ap| {
210 ap.peer_id
211 .map(|peer_id| (peer_id, Some(ap.address.clone())))
212 }))
213 {
214 let anemo_address = if let Some(address) = address {
215 let Ok(address) = address.to_anemo_address() else {
216 debug!(p2p_address=?address, "Can't convert p2p address to anemo address");
217 continue;
218 };
219 Some(address)
220 } else {
221 None
222 };
223
224 let peer_info = anemo::types::PeerInfo {
227 peer_id,
228 affinity: anemo::types::PeerAffinity::High,
229 address: anemo_address.into_iter().collect(),
230 };
231 debug!(?peer_info, "Add configured preferred peer");
232 self.network.known_peers().insert(peer_info);
233 }
234 }
235
236 fn update_our_info_timestamp(&mut self, now_unix: u64) {
237 let state = &mut self.state.write().unwrap();
238 if let Some(our_info) = &state.our_info {
239 let mut data = our_info.data().clone();
240 data.timestamp_ms = now_unix;
241 state.our_info = Some(data.sign(&self.keypair));
242 }
243 }
244
245 fn handle_trusted_peer_change_event(
248 &mut self,
249 trusted_peer_change_event: TrustedPeerChangeEvent,
250 ) {
251 for peer_info in trusted_peer_change_event.new_peers {
252 debug!(?peer_info, "Add committee member as preferred peer.");
253 self.network.known_peers().insert(peer_info);
254 }
255 }
256
257 fn handle_peer_event(&mut self, peer_event: Result<PeerEvent, RecvError>) {
258 match peer_event {
259 Ok(PeerEvent::NewPeer(peer_id)) => {
260 if let Some(peer) = self.network.peer(peer_id) {
261 self.state
262 .write()
263 .unwrap()
264 .connected_peers
265 .insert(peer_id, ());
266
267 self.tasks.spawn(query_peer_for_their_known_peers(
269 peer,
270 self.state.clone(),
271 self.metrics.clone(),
272 self.allowlisted_peers.clone(),
273 ));
274 }
275 }
276 Ok(PeerEvent::LostPeer(peer_id, _)) => {
277 self.state.write().unwrap().connected_peers.remove(&peer_id);
278 }
279
280 Err(RecvError::Closed) => {
281 panic!("PeerEvent channel shouldn't be able to be closed");
282 }
283
284 Err(RecvError::Lagged(_)) => {
285 trace!("State-Sync fell behind processing PeerEvents");
286 }
287 }
288 }
289
290 fn handle_tick(&mut self, _now: std::time::Instant, now_unix: u64) {
291 self.update_our_info_timestamp(now_unix);
292
293 self.tasks
294 .spawn(query_connected_peers_for_their_known_peers(
295 self.network.clone(),
296 self.discovery_config.clone(),
297 self.state.clone(),
298 self.metrics.clone(),
299 self.allowlisted_peers.clone(),
300 ));
301
302 self.state
304 .write()
305 .unwrap()
306 .known_peers
307 .retain(|_k, v| now_unix.saturating_sub(v.timestamp_ms) < ONE_DAY_MILLISECONDS);
308
309 self.pending_dials.retain(|_k, v| !v.is_finished());
311 if let Some(abort_handle) = &self.dial_seed_peers_task
312 && abort_handle.is_finished()
313 {
314 self.dial_seed_peers_task = None;
315 }
316
317 let state = self.state.read().unwrap();
319 let eligible = state
320 .known_peers
321 .clone()
322 .into_iter()
323 .filter(|(peer_id, info)| {
324 peer_id != &self.network.peer_id() &&
325 !info.addresses.is_empty() && !state.connected_peers.contains_key(peer_id) && !self.pending_dials.contains_key(peer_id) })
329 .collect::<Vec<_>>();
330
331 let number_of_connections = state.connected_peers.len();
333 let number_to_dial = std::cmp::min(
334 eligible.len(),
335 self.discovery_config
336 .target_concurrent_connections()
337 .saturating_sub(number_of_connections),
338 );
339
340 for (peer_id, info) in rand::seq::SliceRandom::choose_multiple(
342 eligible.as_slice(),
343 &mut rand::thread_rng(),
344 number_to_dial,
345 ) {
346 let abort_handle = self.tasks.spawn(try_to_connect_to_peer(
347 self.network.clone(),
348 info.data().to_owned(),
349 ));
350 self.pending_dials.insert(*peer_id, abort_handle);
351 }
352
353 if self.dial_seed_peers_task.is_none()
356 && state.connected_peers.is_empty()
357 && self.pending_dials.is_empty()
358 && !self.config.seed_peers.is_empty()
359 {
360 let abort_handle = self.tasks.spawn(try_to_connect_to_seed_peers(
361 self.network.clone(),
362 self.discovery_config.clone(),
363 self.config.seed_peers.clone(),
364 ));
365
366 self.dial_seed_peers_task = Some(abort_handle);
367 }
368 }
369}
370
371async fn try_to_connect_to_peer(network: Network, info: NodeInfo) {
372 debug!("Connecting to peer {info:?}");
373 for multiaddr in &info.addresses {
374 if let Ok(address) = multiaddr.to_anemo_address() {
375 if network
377 .connect_with_peer_id(address, info.peer_id)
378 .await
379 .tap_err(|e| {
380 debug!(
381 "error dialing {} at address '{}': {e}",
382 info.peer_id.short_display(4),
383 multiaddr
384 )
385 })
386 .is_ok()
387 {
388 return;
389 }
390 }
391 }
392}
393
394async fn try_to_connect_to_seed_peers(
395 network: Network,
396 config: Arc<DiscoveryConfig>,
397 seed_peers: Vec<SeedPeer>,
398) {
399 debug!(?seed_peers, "Connecting to seed peers");
400 let network = &network;
401
402 futures::stream::iter(seed_peers.into_iter().filter_map(|seed| {
403 seed.address
404 .to_anemo_address()
405 .ok()
406 .map(|address| (seed, address))
407 }))
408 .for_each_concurrent(
409 config.target_concurrent_connections(),
410 |(seed, address)| async move {
411 let _ = if let Some(peer_id) = seed.peer_id {
413 network.connect_with_peer_id(address, peer_id).await
414 } else {
415 network.connect(address).await
416 }
417 .tap_err(|e| debug!("error dialing multiaddr '{}': {e}", seed.address));
418 },
419 )
420 .await;
421}
422
423async fn query_peer_for_their_known_peers(
424 peer: Peer,
425 state: Arc<RwLock<State>>,
426 metrics: Metrics,
427 allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
428) {
429 let mut client = DiscoveryClient::new(peer);
430
431 let request = Request::new(()).with_timeout(TIMEOUT);
432 let found_peers = client
433 .get_known_peers_v2(request)
434 .await
435 .ok()
436 .map(Response::into_inner)
437 .map(
438 |GetKnownPeersResponseV2 {
439 own_info,
440 mut known_peers,
441 }| {
442 if !own_info.addresses.is_empty() {
443 known_peers.push(own_info)
444 }
445 known_peers
446 },
447 );
448 if let Some(found_peers) = found_peers {
449 update_known_peers(state, metrics, found_peers, allowlisted_peers);
450 }
451}
452
453async fn query_connected_peers_for_their_known_peers(
454 network: Network,
455 config: Arc<DiscoveryConfig>,
456 state: Arc<RwLock<State>>,
457 metrics: Metrics,
458 allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
459) {
460 use rand::seq::IteratorRandom;
461
462 let peers_to_query = network
463 .peers()
464 .into_iter()
465 .flat_map(|id| network.peer(id))
466 .choose_multiple(&mut rand::thread_rng(), config.peers_to_query());
467
468 let found_peers = peers_to_query
469 .into_iter()
470 .map(DiscoveryClient::new)
471 .map(|mut client| async move {
472 let request = Request::new(()).with_timeout(TIMEOUT);
473 client
474 .get_known_peers_v2(request)
475 .await
476 .ok()
477 .map(Response::into_inner)
478 .map(
479 |GetKnownPeersResponseV2 {
480 own_info,
481 mut known_peers,
482 }| {
483 if !own_info.addresses.is_empty() {
484 known_peers.push(own_info)
485 }
486 known_peers
487 },
488 )
489 })
490 .pipe(futures::stream::iter)
491 .buffer_unordered(config.peers_to_query())
492 .filter_map(std::future::ready)
493 .flat_map(futures::stream::iter)
494 .collect::<Vec<_>>()
495 .await;
496
497 update_known_peers(state, metrics, found_peers, allowlisted_peers);
498}
499
500fn update_known_peers(
501 state: Arc<RwLock<State>>,
502 metrics: Metrics,
503 found_peers: Vec<SignedNodeInfo>,
504 allowlisted_peers: Arc<HashMap<PeerId, Option<Multiaddr>>>,
505) {
506 use std::collections::hash_map::Entry;
507
508 let now_unix = now_unix();
509 let our_peer_id = state.read().unwrap().our_info.clone().unwrap().peer_id;
510 let known_peers = &mut state.write().unwrap().known_peers;
511 for peer_info in found_peers.into_iter().take(MAX_PEERS_TO_SEND + 1) {
513 if peer_info.timestamp_ms > now_unix.saturating_add(30 * 1_000) || now_unix.saturating_sub(peer_info.timestamp_ms) > ONE_DAY_MILLISECONDS
518 {
519 continue;
520 }
521
522 if peer_info.peer_id == our_peer_id {
523 continue;
524 }
525
526 if peer_info.access_type == AccessType::Private
528 && !allowlisted_peers.contains_key(&peer_info.peer_id)
529 {
530 continue;
531 }
532
533 if peer_info.addresses.len() > MAX_ADDRESSES_PER_PEER {
535 continue;
536 }
537
538 if !peer_info
540 .addresses
541 .iter()
542 .all(|addr| addr.len() < MAX_ADDRESS_LENGTH && addr.to_anemo_address().is_ok())
543 {
544 continue;
545 }
546 let Ok(public_key) = Ed25519PublicKey::from_bytes(&peer_info.peer_id.0) else {
547 debug_fatal!(
548 "Failed to convert anemo PeerId {:?} to Ed25519PublicKey",
550 peer_info.peer_id
551 );
552 continue;
553 };
554 let msg = bcs::to_bytes(peer_info.data()).expect("BCS serialization should not fail");
555 if let Err(e) = public_key.verify(&msg, peer_info.auth_sig()) {
556 info!(
557 "Discovery failed to verify signature for NodeInfo for peer {:?}: {e:?}",
558 peer_info.peer_id
559 );
560 continue;
562 }
563 let peer = VerifiedSignedNodeInfo::new_from_verified(peer_info);
564
565 match known_peers.entry(peer.peer_id) {
566 Entry::Occupied(mut o) => {
567 if peer.timestamp_ms > o.get().timestamp_ms {
568 if o.get().addresses.is_empty() && !peer.addresses.is_empty() {
569 metrics.inc_num_peers_with_external_address();
570 }
571 if !o.get().addresses.is_empty() && peer.addresses.is_empty() {
572 metrics.dec_num_peers_with_external_address();
573 }
574 o.insert(peer);
575 }
576 }
577 Entry::Vacant(v) => {
578 if !peer.addresses.is_empty() {
579 metrics.inc_num_peers_with_external_address();
580 }
581 v.insert(peer);
582 }
583 }
584 }
585}
586
587fn now_unix() -> u64 {
588 use std::time::{SystemTime, UNIX_EPOCH};
589
590 SystemTime::now()
591 .duration_since(UNIX_EPOCH)
592 .unwrap()
593 .as_millis() as u64
594}