sui_network/discovery/
server.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5    Discovery, DiscoveryMessage, MAX_PEERS_TO_SEND, SignedNodeInfo, SignedVersionedNodeInfo, State,
6    VerifiedSignedVersionedNodeInfo,
7};
8use anemo::{PeerId, Request, Response, types::PeerInfo};
9use rand::seq::IteratorRandom;
10use serde::{Deserialize, Serialize};
11use std::{
12    collections::HashMap,
13    sync::{Arc, OnceLock, RwLock},
14};
15use sui_config::p2p::AccessType;
16use tokio::sync::mpsc;
17
18#[derive(Clone, Debug, Serialize, Deserialize)]
19pub struct GetKnownPeersResponseV2 {
20    pub own_info: SignedNodeInfo,
21    pub known_peers: Vec<SignedNodeInfo>,
22}
23
24#[derive(Clone, Debug, Serialize, Deserialize)]
25pub struct GetKnownPeersRequestV3 {
26    pub own_info: SignedVersionedNodeInfo,
27}
28
29#[derive(Clone, Debug, Serialize, Deserialize)]
30pub struct GetKnownPeersResponseV3 {
31    pub own_info: SignedVersionedNodeInfo,
32    pub known_peers: Vec<SignedVersionedNodeInfo>,
33}
34
35pub(super) struct Server {
36    pub(super) state: Arc<RwLock<State>>,
37    pub(super) configured_peers: Arc<OnceLock<HashMap<PeerId, PeerInfo>>>,
38    pub(super) mailbox_sender: mpsc::Sender<DiscoveryMessage>,
39}
40
41#[anemo::async_trait]
42impl Discovery for Server {
43    async fn get_known_peers_v2(
44        &self,
45        request: Request<()>,
46    ) -> Result<Response<GetKnownPeersResponseV2>, anemo::rpc::Status> {
47        let state = self.state.read().unwrap();
48        let own_info = state
49            .our_info
50            .clone()
51            .ok_or_else(|| anemo::rpc::Status::internal("own_info has not been initialized yet"))?;
52
53        let should_share = |info: &super::VerifiedSignedNodeInfo| match info.access_type {
54            AccessType::Public => true,
55            AccessType::Private => false,
56            AccessType::Trusted => {
57                // Share Trusted peers only with other preconfigured peers.
58                self.configured_peers
59                    .get()
60                    .and_then(|configured_peers| {
61                        request
62                            .peer_id()
63                            .map(|id| configured_peers.contains_key(id))
64                    })
65                    .unwrap_or(false)
66            }
67        };
68
69        let known_peers = if state.known_peers.len() < MAX_PEERS_TO_SEND {
70            state
71                .known_peers
72                .values()
73                .filter(|e| should_share(e))
74                .map(|e| e.inner())
75                .cloned()
76                .collect()
77        } else {
78            let mut rng = rand::thread_rng();
79            // prefer returning peers that we are connected to as they are known-good
80            let mut known_peers = state
81                .connected_peers
82                .keys()
83                .filter_map(|peer_id| state.known_peers.get(peer_id))
84                .filter(|info| should_share(info))
85                .map(|info| (info.peer_id, info))
86                .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
87                .into_iter()
88                .collect::<HashMap<_, _>>();
89
90            if known_peers.len() <= MAX_PEERS_TO_SEND {
91                // Fill the remaining space with other peers, randomly sampling at most MAX_PEERS_TO_SEND
92                for info in state
93                    .known_peers
94                    .values()
95                    .filter(|info| should_share(info))
96                    // This randomly samples the iterator stream but the order of elements after
97                    // sampling may not be random, this is ok though since we're just trying to do
98                    // best-effort on sharing info of peers we haven't connected with ourselves.
99                    .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
100                {
101                    if known_peers.len() >= MAX_PEERS_TO_SEND {
102                        break;
103                    }
104
105                    known_peers.insert(info.peer_id, info);
106                }
107            }
108
109            known_peers
110                .into_values()
111                .map(|e| e.inner())
112                .cloned()
113                .collect()
114        };
115
116        Ok(Response::new(GetKnownPeersResponseV2 {
117            own_info,
118            known_peers,
119        }))
120    }
121
122    async fn get_known_peers_v3(
123        &self,
124        request: Request<GetKnownPeersRequestV3>,
125    ) -> Result<Response<GetKnownPeersResponseV3>, anemo::rpc::Status> {
126        let requester_peer_id = request.peer_id().copied();
127
128        // Send pushed peer info to the event loop for processing
129        let pushed_info = request.into_body().own_info;
130        let _ = self
131            .mailbox_sender
132            .try_send(DiscoveryMessage::ReceivedNodeInfo {
133                peer_info: Box::new(pushed_info),
134            });
135
136        let state = self.state.read().unwrap();
137        let own_info = state.our_info_v2.clone().ok_or_else(|| {
138            anemo::rpc::Status::internal("own_info_v2 has not been initialized yet")
139        })?;
140
141        let should_share = |info: &VerifiedSignedVersionedNodeInfo| match info.access_type() {
142            AccessType::Public => true,
143            AccessType::Private => false,
144            AccessType::Trusted => self
145                .configured_peers
146                .get()
147                .and_then(|configured_peers| {
148                    requester_peer_id.map(|id| configured_peers.contains_key(&id))
149                })
150                .unwrap_or(false),
151        };
152
153        let known_peers = if state.known_peers_v2.len() < MAX_PEERS_TO_SEND {
154            state
155                .known_peers_v2
156                .values()
157                .filter(|e| should_share(e))
158                .map(|e| e.inner())
159                .cloned()
160                .collect()
161        } else {
162            let mut rng = rand::thread_rng();
163            // prefer returning peers that we are connected to as they are known-good
164            let mut known_peers: HashMap<PeerId, &VerifiedSignedVersionedNodeInfo> = state
165                .connected_peers
166                .keys()
167                .filter_map(|peer_id| {
168                    state
169                        .known_peers_v2
170                        .get(peer_id)
171                        .map(|info| (*peer_id, info))
172                })
173                .filter(|(_, info)| should_share(info))
174                .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
175                .into_iter()
176                .collect();
177
178            if known_peers.len() < MAX_PEERS_TO_SEND {
179                // Fill the remaining space with other peers, randomly sampling at most MAX_PEERS_TO_SEND
180                for (peer_id, info) in state
181                    .known_peers_v2
182                    .iter()
183                    .filter(|(_, info)| should_share(info))
184                    // Filter to only peers with a P2P peer_id before sampling
185                    .filter_map(|(_, info)| info.peer_id().map(|pid| (pid, info)))
186                    // This randomly samples the iterator stream but the order of elements after
187                    // sampling may not be random, this is ok though since we're just trying to do
188                    // best-effort on sharing info of peers we haven't connected with ourselves.
189                    .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
190                {
191                    if known_peers.len() >= MAX_PEERS_TO_SEND {
192                        break;
193                    }
194                    known_peers.insert(peer_id, info);
195                }
196            }
197
198            known_peers
199                .into_values()
200                .map(|e| e.inner())
201                .cloned()
202                .collect()
203        };
204
205        Ok(Response::new(GetKnownPeersResponseV3 {
206            own_info,
207            known_peers,
208        }))
209    }
210}