sui_network/discovery/
server.rs1use super::{
5 Discovery, DiscoveryMessage, MAX_PEERS_TO_SEND, SignedNodeInfo, SignedVersionedNodeInfo, State,
6 VerifiedSignedVersionedNodeInfo, is_trusted_peer,
7};
8use anemo::{PeerId, Request, Response, types::PeerInfo};
9use rand::seq::IteratorRandom;
10use serde::{Deserialize, Serialize};
11use std::{
12 collections::{HashMap, HashSet},
13 sync::{Arc, OnceLock, RwLock},
14};
15use sui_config::p2p::AccessType;
16use tokio::sync::mpsc;
17
18#[derive(Clone, Debug, Serialize, Deserialize)]
19pub struct GetKnownPeersResponseV2 {
20 pub own_info: SignedNodeInfo,
21 pub known_peers: Vec<SignedNodeInfo>,
22}
23
24#[derive(Clone, Debug, Serialize, Deserialize)]
25pub struct GetKnownPeersRequestV3 {
26 pub own_info: SignedVersionedNodeInfo,
27}
28
29#[derive(Clone, Debug, Serialize, Deserialize)]
30pub struct GetKnownPeersResponseV3 {
31 pub own_info: SignedVersionedNodeInfo,
32 pub known_peers: Vec<SignedVersionedNodeInfo>,
33}
34
35pub(super) struct Server {
36 pub(super) state: Arc<RwLock<State>>,
37 pub(super) configured_peers: Arc<OnceLock<HashMap<PeerId, PeerInfo>>>,
38 pub(super) chain_peers: Arc<RwLock<HashSet<PeerId>>>,
39 pub(super) mailbox_sender: mpsc::Sender<DiscoveryMessage>,
40}
41
42impl Server {
43 #[allow(clippy::result_large_err)]
44 fn is_requester_trusted(&self, peer_id: Option<&PeerId>) -> Result<bool, anemo::rpc::Status> {
45 let Some(peer_id) = peer_id else {
46 return Ok(false);
47 };
48 let configured_peers = self.configured_peers.get().ok_or_else(|| {
49 anemo::rpc::Status::internal("configured_peers has not been initialized yet")
50 })?;
51 Ok(is_trusted_peer(
52 peer_id,
53 configured_peers,
54 &self.chain_peers,
55 ))
56 }
57}
58
59#[anemo::async_trait]
60impl Discovery for Server {
61 async fn get_known_peers_v2(
62 &self,
63 request: Request<()>,
64 ) -> Result<Response<GetKnownPeersResponseV2>, anemo::rpc::Status> {
65 let state = self.state.read().unwrap();
66 let own_info = state
67 .our_info
68 .clone()
69 .ok_or_else(|| anemo::rpc::Status::internal("own_info has not been initialized yet"))?;
70
71 let requester_is_trusted = self.is_requester_trusted(request.peer_id())?;
72 let should_share = |info: &super::VerifiedSignedNodeInfo| match info.access_type {
73 AccessType::Public => true,
74 AccessType::Private => false,
75 AccessType::Trusted => requester_is_trusted,
76 };
77
78 let known_peers = if state.known_peers.len() < MAX_PEERS_TO_SEND {
79 state
80 .known_peers
81 .values()
82 .filter(|e| should_share(e))
83 .map(|e| e.inner())
84 .cloned()
85 .collect()
86 } else {
87 let mut rng = rand::thread_rng();
88 let mut known_peers = state
90 .connected_peers
91 .keys()
92 .filter_map(|peer_id| state.known_peers.get(peer_id))
93 .filter(|info| should_share(info))
94 .map(|info| (info.peer_id, info))
95 .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
96 .into_iter()
97 .collect::<HashMap<_, _>>();
98
99 if known_peers.len() <= MAX_PEERS_TO_SEND {
100 for info in state
102 .known_peers
103 .values()
104 .filter(|info| should_share(info))
105 .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
109 {
110 if known_peers.len() >= MAX_PEERS_TO_SEND {
111 break;
112 }
113
114 known_peers.insert(info.peer_id, info);
115 }
116 }
117
118 known_peers
119 .into_values()
120 .map(|e| e.inner())
121 .cloned()
122 .collect()
123 };
124
125 Ok(Response::new(GetKnownPeersResponseV2 {
126 own_info,
127 known_peers,
128 }))
129 }
130
131 async fn get_known_peers_v3(
132 &self,
133 request: Request<GetKnownPeersRequestV3>,
134 ) -> Result<Response<GetKnownPeersResponseV3>, anemo::rpc::Status> {
135 let requester_peer_id = request.peer_id().copied();
136
137 let pushed_info = request.into_body().own_info;
139 let _ = self
140 .mailbox_sender
141 .try_send(DiscoveryMessage::ReceivedNodeInfo {
142 peer_info: Box::new(pushed_info),
143 });
144
145 let state = self.state.read().unwrap();
146 let own_info = state.our_info_v2.clone().ok_or_else(|| {
147 anemo::rpc::Status::internal("own_info_v2 has not been initialized yet")
148 })?;
149
150 let requester_is_trusted = self.is_requester_trusted(requester_peer_id.as_ref())?;
151 let should_share = |info: &VerifiedSignedVersionedNodeInfo| match info.access_type() {
152 AccessType::Public => true,
153 AccessType::Private => false,
154 AccessType::Trusted => requester_is_trusted,
155 };
156
157 let known_peers = if state.known_peers_v2.len() < MAX_PEERS_TO_SEND {
158 state
159 .known_peers_v2
160 .values()
161 .filter(|e| should_share(e))
162 .map(|e| e.inner())
163 .cloned()
164 .collect()
165 } else {
166 let mut rng = rand::thread_rng();
167 let mut known_peers: HashMap<PeerId, &VerifiedSignedVersionedNodeInfo> = state
169 .connected_peers
170 .keys()
171 .filter_map(|peer_id| {
172 state
173 .known_peers_v2
174 .get(peer_id)
175 .map(|info| (*peer_id, info))
176 })
177 .filter(|(_, info)| should_share(info))
178 .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
179 .into_iter()
180 .collect();
181
182 if known_peers.len() < MAX_PEERS_TO_SEND {
183 for (peer_id, info) in state
185 .known_peers_v2
186 .iter()
187 .filter(|(_, info)| should_share(info))
188 .filter_map(|(_, info)| info.peer_id().map(|pid| (pid, info)))
190 .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
194 {
195 if known_peers.len() >= MAX_PEERS_TO_SEND {
196 break;
197 }
198 known_peers.insert(peer_id, info);
199 }
200 }
201
202 known_peers
203 .into_values()
204 .map(|e| e.inner())
205 .cloned()
206 .collect()
207 };
208
209 Ok(Response::new(GetKnownPeersResponseV3 {
210 own_info,
211 known_peers,
212 }))
213 }
214}