sui_network/discovery/
server.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5    Discovery, DiscoveryMessage, MAX_PEERS_TO_SEND, SignedNodeInfo, SignedVersionedNodeInfo, State,
6    VerifiedSignedVersionedNodeInfo, is_trusted_peer,
7};
8use anemo::{PeerId, Request, Response, types::PeerInfo};
9use rand::seq::IteratorRandom;
10use serde::{Deserialize, Serialize};
11use std::{
12    collections::{HashMap, HashSet},
13    sync::{Arc, OnceLock, RwLock},
14};
15use sui_config::p2p::AccessType;
16use tokio::sync::mpsc;
17
18#[derive(Clone, Debug, Serialize, Deserialize)]
19pub struct GetKnownPeersResponseV2 {
20    pub own_info: SignedNodeInfo,
21    pub known_peers: Vec<SignedNodeInfo>,
22}
23
24#[derive(Clone, Debug, Serialize, Deserialize)]
25pub struct GetKnownPeersRequestV3 {
26    pub own_info: SignedVersionedNodeInfo,
27}
28
29#[derive(Clone, Debug, Serialize, Deserialize)]
30pub struct GetKnownPeersResponseV3 {
31    pub own_info: SignedVersionedNodeInfo,
32    pub known_peers: Vec<SignedVersionedNodeInfo>,
33}
34
35pub(super) struct Server {
36    pub(super) state: Arc<RwLock<State>>,
37    pub(super) configured_peers: Arc<OnceLock<HashMap<PeerId, PeerInfo>>>,
38    pub(super) chain_peers: Arc<RwLock<HashSet<PeerId>>>,
39    pub(super) mailbox_sender: mpsc::Sender<DiscoveryMessage>,
40}
41
42impl Server {
43    #[allow(clippy::result_large_err)]
44    fn is_requester_trusted(&self, peer_id: Option<&PeerId>) -> Result<bool, anemo::rpc::Status> {
45        let Some(peer_id) = peer_id else {
46            return Ok(false);
47        };
48        let configured_peers = self.configured_peers.get().ok_or_else(|| {
49            anemo::rpc::Status::internal("configured_peers has not been initialized yet")
50        })?;
51        Ok(is_trusted_peer(
52            peer_id,
53            configured_peers,
54            &self.chain_peers,
55        ))
56    }
57}
58
59#[anemo::async_trait]
60impl Discovery for Server {
61    async fn get_known_peers_v2(
62        &self,
63        request: Request<()>,
64    ) -> Result<Response<GetKnownPeersResponseV2>, anemo::rpc::Status> {
65        let state = self.state.read().unwrap();
66        let own_info = state
67            .our_info
68            .clone()
69            .ok_or_else(|| anemo::rpc::Status::internal("own_info has not been initialized yet"))?;
70
71        let requester_is_trusted = self.is_requester_trusted(request.peer_id())?;
72        let should_share = |info: &super::VerifiedSignedNodeInfo| match info.access_type {
73            AccessType::Public => true,
74            AccessType::Private => false,
75            AccessType::Trusted => requester_is_trusted,
76        };
77
78        let known_peers = if state.known_peers.len() < MAX_PEERS_TO_SEND {
79            state
80                .known_peers
81                .values()
82                .filter(|e| should_share(e))
83                .map(|e| e.inner())
84                .cloned()
85                .collect()
86        } else {
87            let mut rng = rand::thread_rng();
88            // prefer returning peers that we are connected to as they are known-good
89            let mut known_peers = state
90                .connected_peers
91                .keys()
92                .filter_map(|peer_id| state.known_peers.get(peer_id))
93                .filter(|info| should_share(info))
94                .map(|info| (info.peer_id, info))
95                .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
96                .into_iter()
97                .collect::<HashMap<_, _>>();
98
99            if known_peers.len() <= MAX_PEERS_TO_SEND {
100                // Fill the remaining space with other peers, randomly sampling at most MAX_PEERS_TO_SEND
101                for info in state
102                    .known_peers
103                    .values()
104                    .filter(|info| should_share(info))
105                    // This randomly samples the iterator stream but the order of elements after
106                    // sampling may not be random, this is ok though since we're just trying to do
107                    // best-effort on sharing info of peers we haven't connected with ourselves.
108                    .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
109                {
110                    if known_peers.len() >= MAX_PEERS_TO_SEND {
111                        break;
112                    }
113
114                    known_peers.insert(info.peer_id, info);
115                }
116            }
117
118            known_peers
119                .into_values()
120                .map(|e| e.inner())
121                .cloned()
122                .collect()
123        };
124
125        Ok(Response::new(GetKnownPeersResponseV2 {
126            own_info,
127            known_peers,
128        }))
129    }
130
131    async fn get_known_peers_v3(
132        &self,
133        request: Request<GetKnownPeersRequestV3>,
134    ) -> Result<Response<GetKnownPeersResponseV3>, anemo::rpc::Status> {
135        let requester_peer_id = request.peer_id().copied();
136
137        // Send pushed peer info to the event loop for processing
138        let pushed_info = request.into_body().own_info;
139        let _ = self
140            .mailbox_sender
141            .try_send(DiscoveryMessage::ReceivedNodeInfo {
142                peer_info: Box::new(pushed_info),
143            });
144
145        let state = self.state.read().unwrap();
146        let own_info = state.our_info_v2.clone().ok_or_else(|| {
147            anemo::rpc::Status::internal("own_info_v2 has not been initialized yet")
148        })?;
149
150        let requester_is_trusted = self.is_requester_trusted(requester_peer_id.as_ref())?;
151        let should_share = |info: &VerifiedSignedVersionedNodeInfo| match info.access_type() {
152            AccessType::Public => true,
153            AccessType::Private => false,
154            AccessType::Trusted => requester_is_trusted,
155        };
156
157        let known_peers = if state.known_peers_v2.len() < MAX_PEERS_TO_SEND {
158            state
159                .known_peers_v2
160                .values()
161                .filter(|e| should_share(e))
162                .map(|e| e.inner())
163                .cloned()
164                .collect()
165        } else {
166            let mut rng = rand::thread_rng();
167            // prefer returning peers that we are connected to as they are known-good
168            let mut known_peers: HashMap<PeerId, &VerifiedSignedVersionedNodeInfo> = state
169                .connected_peers
170                .keys()
171                .filter_map(|peer_id| {
172                    state
173                        .known_peers_v2
174                        .get(peer_id)
175                        .map(|info| (*peer_id, info))
176                })
177                .filter(|(_, info)| should_share(info))
178                .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
179                .into_iter()
180                .collect();
181
182            if known_peers.len() < MAX_PEERS_TO_SEND {
183                // Fill the remaining space with other peers, randomly sampling at most MAX_PEERS_TO_SEND
184                for (peer_id, info) in state
185                    .known_peers_v2
186                    .iter()
187                    .filter(|(_, info)| should_share(info))
188                    // Filter to only peers with a P2P peer_id before sampling
189                    .filter_map(|(_, info)| info.peer_id().map(|pid| (pid, info)))
190                    // This randomly samples the iterator stream but the order of elements after
191                    // sampling may not be random, this is ok though since we're just trying to do
192                    // best-effort on sharing info of peers we haven't connected with ourselves.
193                    .choose_multiple(&mut rng, MAX_PEERS_TO_SEND)
194                {
195                    if known_peers.len() >= MAX_PEERS_TO_SEND {
196                        break;
197                    }
198                    known_peers.insert(peer_id, info);
199                }
200            }
201
202            known_peers
203                .into_values()
204                .map(|e| e.inner())
205                .cloned()
206                .collect()
207        };
208
209        Ok(Response::new(GetKnownPeersResponseV3 {
210            own_info,
211            known_peers,
212        }))
213    }
214}