sui_network/discovery/
server.rs1use super::{
5 Discovery, DiscoveryMessage, MAX_PEERS_TO_SEND, SignedNodeInfo, SignedVersionedNodeInfo, State,
6 VerifiedSignedVersionedNodeInfo,
7};
8use anemo::{PeerId, Request, Response, types::PeerInfo};
9use rand::seq::IteratorRandom;
10use serde::{Deserialize, Serialize};
11use std::{
12 collections::HashMap,
13 sync::{Arc, OnceLock, RwLock},
14};
15use sui_config::p2p::AccessType;
16use tokio::sync::mpsc;
17
18#[derive(Clone, Debug, Serialize, Deserialize)]
19pub struct GetKnownPeersResponseV2 {
20 pub own_info: SignedNodeInfo,
21 pub known_peers: Vec<SignedNodeInfo>,
22}
23
24#[derive(Clone, Debug, Serialize, Deserialize)]
25pub struct GetKnownPeersRequestV3 {
26 pub own_info: SignedVersionedNodeInfo,
27}
28
29#[derive(Clone, Debug, Serialize, Deserialize)]
30pub struct GetKnownPeersResponseV3 {
31 pub own_info: SignedVersionedNodeInfo,
32 pub known_peers: Vec<SignedVersionedNodeInfo>,
33}
34
35pub(super) struct Server {
36 pub(super) state: Arc<RwLock<State>>,
37 pub(super) configured_peers: Arc<OnceLock<HashMap<PeerId, PeerInfo>>>,
38 pub(super) mailbox_sender: mpsc::Sender<DiscoveryMessage>,
39}
40
41#[anemo::async_trait]
42impl Discovery for Server {
43 async fn get_known_peers_v2(
44 &self,
45 request: Request<()>,
46 ) -> Result<Response<GetKnownPeersResponseV2>, anemo::rpc::Status> {
47 let state = self.state.read().unwrap();
48 let own_info = state
49 .our_info
50 .clone()
51 .ok_or_else(|| anemo::rpc::Status::internal("own_info has not been initialized yet"))?;
52
53 let should_share = |info: &super::VerifiedSignedNodeInfo| match info.access_type {
54 AccessType::Public => true,
55 AccessType::Private => false,
56 AccessType::Trusted => {
57 self.configured_peers
59 .get()
60 .and_then(|configured_peers| {
61 request
62 .peer_id()
63 .map(|id| configured_peers.contains_key(id))
64 })
65 .unwrap_or(false)
66 }
67 };
68
69 let known_peers = if state.known_peers.len() < MAX_PEERS_TO_SEND {
70 state
71 .known_peers
72 .values()
73 .filter(|e| should_share(e))
74 .map(|e| e.inner())
75 .cloned()
76 .collect()
77 } else {
78 let mut rng = rand::thread_rng();
79 let mut known_peers = state
81 .connected_peers
82 .keys()
83 .filter_map(|peer_id| state.known_peers.get(peer_id))
84 .filter(|info| should_share(info))
85 .map(|info| (info.peer_id, info))
86 .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
87 .into_iter()
88 .collect::<HashMap<_, _>>();
89
90 if known_peers.len() <= MAX_PEERS_TO_SEND {
91 for info in state
93 .known_peers
94 .values()
95 .filter(|info| should_share(info))
96 .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
100 {
101 if known_peers.len() >= MAX_PEERS_TO_SEND {
102 break;
103 }
104
105 known_peers.insert(info.peer_id, info);
106 }
107 }
108
109 known_peers
110 .into_values()
111 .map(|e| e.inner())
112 .cloned()
113 .collect()
114 };
115
116 Ok(Response::new(GetKnownPeersResponseV2 {
117 own_info,
118 known_peers,
119 }))
120 }
121
122 async fn get_known_peers_v3(
123 &self,
124 request: Request<GetKnownPeersRequestV3>,
125 ) -> Result<Response<GetKnownPeersResponseV3>, anemo::rpc::Status> {
126 let requester_peer_id = request.peer_id().copied();
127
128 let pushed_info = request.into_body().own_info;
130 let _ = self
131 .mailbox_sender
132 .try_send(DiscoveryMessage::ReceivedNodeInfo {
133 peer_info: Box::new(pushed_info),
134 });
135
136 let state = self.state.read().unwrap();
137 let own_info = state.our_info_v2.clone().ok_or_else(|| {
138 anemo::rpc::Status::internal("own_info_v2 has not been initialized yet")
139 })?;
140
141 let should_share = |info: &VerifiedSignedVersionedNodeInfo| match info.access_type() {
142 AccessType::Public => true,
143 AccessType::Private => false,
144 AccessType::Trusted => self
145 .configured_peers
146 .get()
147 .and_then(|configured_peers| {
148 requester_peer_id.map(|id| configured_peers.contains_key(&id))
149 })
150 .unwrap_or(false),
151 };
152
153 let known_peers = if state.known_peers_v2.len() < MAX_PEERS_TO_SEND {
154 state
155 .known_peers_v2
156 .values()
157 .filter(|e| should_share(e))
158 .map(|e| e.inner())
159 .cloned()
160 .collect()
161 } else {
162 let mut rng = rand::thread_rng();
163 let mut known_peers: HashMap<PeerId, &VerifiedSignedVersionedNodeInfo> = state
165 .connected_peers
166 .keys()
167 .filter_map(|peer_id| {
168 state
169 .known_peers_v2
170 .get(peer_id)
171 .map(|info| (*peer_id, info))
172 })
173 .filter(|(_, info)| should_share(info))
174 .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
175 .into_iter()
176 .collect();
177
178 if known_peers.len() < MAX_PEERS_TO_SEND {
179 for (peer_id, info) in state
181 .known_peers_v2
182 .iter()
183 .filter(|(_, info)| should_share(info))
184 .filter_map(|(_, info)| info.peer_id().map(|pid| (pid, info)))
186 .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
190 {
191 if known_peers.len() >= MAX_PEERS_TO_SEND {
192 break;
193 }
194 known_peers.insert(peer_id, info);
195 }
196 }
197
198 known_peers
199 .into_values()
200 .map(|e| e.inner())
201 .cloned()
202 .collect()
203 };
204
205 Ok(Response::new(GetKnownPeersResponseV3 {
206 own_info,
207 known_peers,
208 }))
209 }
210}