sui_network/discovery/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5    Discovery, DiscoveryEventLoop, DiscoveryMessage, DiscoveryServer, Handle, State,
6    metrics::Metrics, server::Server,
7};
8use crate::endpoint_manager::EndpointManager;
9use anemo::codegen::InboundRequestLayer;
10use anemo::types::PeerAffinity;
11use anemo::{PeerId, types::PeerInfo};
12use anemo_tower::rate_limit;
13use fastcrypto::traits::KeyPair;
14use std::{
15    collections::{HashMap, HashSet},
16    path::PathBuf,
17    sync::{Arc, OnceLock, RwLock},
18};
19use sui_config::p2p::P2pConfig;
20use sui_types::crypto::NetworkKeyPair;
21use sui_types::multiaddr::Multiaddr;
22use tap::{Pipe, TapFallible};
23use tokio::{
24    sync::{mpsc, oneshot},
25    task::JoinSet,
26};
27use tracing::warn;
28
29/// Discovery Service Builder.
30pub struct Builder {
31    config: Option<P2pConfig>,
32    metrics: Option<Metrics>,
33    consensus_external_address: Option<Multiaddr>,
34}
35
36impl Builder {
37    #[allow(clippy::new_without_default)]
38    pub fn new() -> Self {
39        Self {
40            config: None,
41            metrics: None,
42            consensus_external_address: None,
43        }
44    }
45
46    pub fn config(mut self, config: P2pConfig) -> Self {
47        self.config = Some(config);
48        self
49    }
50
51    pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
52        self.metrics = Some(Metrics::enabled(registry));
53        self
54    }
55
56    pub fn consensus_external_address(mut self, addr: Multiaddr) -> Self {
57        self.consensus_external_address = Some(addr);
58        self
59    }
60
61    pub fn build(
62        self,
63    ) -> (
64        UnstartedDiscovery,
65        DiscoveryServer<impl Discovery>,
66        EndpointManager,
67    ) {
68        let discovery_config = self
69            .config
70            .clone()
71            .and_then(|config| config.discovery)
72            .unwrap_or_default();
73        let (builder, server, endpoint_manager) = self.build_internal();
74        let mut discovery_server = DiscoveryServer::new(server);
75
76        // Apply rate limits from configuration as needed.
77        if let Some(limit) = discovery_config.get_known_peers_rate_limit {
78            discovery_server = discovery_server.add_layer_for_get_known_peers_v2(
79                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
80                    governor::Quota::per_second(limit),
81                    rate_limit::WaitMode::Block,
82                )),
83            );
84            discovery_server = discovery_server.add_layer_for_get_known_peers_v3(
85                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
86                    governor::Quota::per_second(limit),
87                    rate_limit::WaitMode::Block,
88                )),
89            );
90        }
91        (builder, discovery_server, endpoint_manager)
92    }
93
94    pub(super) fn build_internal(self) -> (UnstartedDiscovery, Server, EndpointManager) {
95        let Builder {
96            config,
97            metrics,
98            consensus_external_address,
99        } = self;
100        let config = config.unwrap();
101        let discovery_config = config.discovery.clone().unwrap_or_default();
102        let store_path = discovery_config.peer_addr_store_path.clone();
103        let metrics = metrics.unwrap_or_else(Metrics::disabled);
104        let (shutdown_tx, shutdown_rx) = oneshot::channel();
105        let (mailbox_tx, mailbox_rx) = mpsc::channel(discovery_config.mailbox_capacity());
106
107        let handle = Handle {
108            _shutdown_handle: Arc::new(shutdown_tx),
109            sender: super::Sender {
110                sender: mailbox_tx.clone(),
111            },
112        };
113
114        let endpoint_manager = EndpointManager::new(handle.sender());
115
116        let state = State {
117            our_info: None,
118            our_info_v2: None,
119            connected_peers: HashMap::default(),
120            known_peers: HashMap::default(),
121            known_peers_v2: HashMap::default(),
122            peer_addresses: HashMap::default(),
123        }
124        .pipe(RwLock::new)
125        .pipe(Arc::new);
126
127        let configured_peers = Arc::new(OnceLock::new());
128        let chain_peers = Arc::new(RwLock::new(HashSet::new()));
129
130        let server = Server {
131            state: state.clone(),
132            configured_peers: configured_peers.clone(),
133            chain_peers: chain_peers.clone(),
134            mailbox_sender: mailbox_tx.clone(),
135        };
136
137        (
138            UnstartedDiscovery {
139                handle,
140                config,
141                shutdown_handle: shutdown_rx,
142                state,
143                mailbox: mailbox_rx,
144                mailbox_tx: mailbox_tx.clone(),
145                metrics,
146                configured_peers,
147                chain_peers,
148                consensus_external_address,
149                endpoint_manager: endpoint_manager.clone(),
150                store_path,
151            },
152            server,
153            endpoint_manager,
154        )
155    }
156}
157
158/// Handle to an unstarted discovery system
159pub struct UnstartedDiscovery {
160    pub(super) handle: Handle,
161    pub(super) config: P2pConfig,
162    pub(super) shutdown_handle: oneshot::Receiver<()>,
163    pub(super) state: Arc<RwLock<State>>,
164    pub(super) mailbox: mpsc::Receiver<DiscoveryMessage>,
165    pub(super) mailbox_tx: mpsc::Sender<DiscoveryMessage>,
166    pub(super) metrics: Metrics,
167    pub(super) configured_peers: Arc<OnceLock<HashMap<PeerId, PeerInfo>>>,
168    pub(super) chain_peers: Arc<RwLock<HashSet<PeerId>>>,
169    pub(super) consensus_external_address: Option<Multiaddr>,
170    pub(super) endpoint_manager: EndpointManager,
171    pub(super) store_path: Option<PathBuf>,
172}
173
174impl UnstartedDiscovery {
175    pub fn sender(&self) -> super::Sender {
176        self.handle.sender()
177    }
178
179    pub(super) fn build(
180        self,
181        network: anemo::Network,
182        keypair: NetworkKeyPair,
183    ) -> (DiscoveryEventLoop, Handle) {
184        let Self {
185            handle,
186            config,
187            shutdown_handle,
188            state,
189            mailbox,
190            mailbox_tx,
191            metrics,
192            configured_peers,
193            chain_peers,
194            consensus_external_address,
195            endpoint_manager,
196            store_path,
197        } = self;
198
199        let discovery_config = config.discovery.clone().unwrap_or_default();
200        let (built_configured_peers, unidentified_seed_peers) =
201            build_peer_config(&config, &discovery_config);
202
203        // Populate the shared configured_peers for the Server.
204        configured_peers
205            .set(built_configured_peers.clone())
206            .expect("configured_peers should only be set once");
207
208        (
209            DiscoveryEventLoop {
210                config,
211                discovery_config: Arc::new(discovery_config),
212                configured_peers: Arc::new(built_configured_peers),
213                chain_peers,
214                unidentified_seed_peers,
215                network,
216                keypair,
217                tasks: JoinSet::new(),
218                pending_dials: Default::default(),
219                dial_seed_peers_task: None,
220                shutdown_handle,
221                state,
222                mailbox,
223                mailbox_tx,
224                metrics,
225                consensus_external_address,
226                endpoint_manager,
227                store_path,
228                peer_cooldowns: HashMap::new(),
229            },
230            handle,
231        )
232    }
233
234    pub fn start(self, network: anemo::Network, keypair: NetworkKeyPair) -> Handle {
235        assert_eq!(network.peer_id().0, *keypair.public().0.as_bytes());
236        let (event_loop, handle) = self.build(network, keypair);
237        tokio::spawn(event_loop.start());
238
239        handle
240    }
241}
242
243/// Returns (configured_peers, unidentified_seed_peers).
244fn build_peer_config(
245    config: &P2pConfig,
246    discovery_config: &sui_config::p2p::DiscoveryConfig,
247) -> (HashMap<PeerId, PeerInfo>, Vec<anemo::types::Address>) {
248    let mut configured_peers = HashMap::new();
249    let mut unidentified_seed_peers = Vec::new();
250
251    for seed in &config.seed_peers {
252        let anemo_addr = seed
253            .address
254            .to_anemo_address()
255            .tap_err(|_| warn!(p2p_address=?seed.address, "Skipping seed peer address: can't convert to anemo address"))
256            .ok();
257        match (seed.peer_id, anemo_addr) {
258            (Some(peer_id), addr) => {
259                configured_peers.insert(
260                    peer_id,
261                    PeerInfo {
262                        peer_id,
263                        affinity: PeerAffinity::High,
264                        address: addr.into_iter().collect(),
265                    },
266                );
267            }
268            (None, Some(addr)) => {
269                unidentified_seed_peers.push(addr);
270            }
271            (None, None) => {}
272        }
273    }
274
275    for ap in &discovery_config.allowlisted_peers {
276        let addresses = ap
277            .address
278            .iter()
279            .filter_map(|addr| {
280                addr.to_anemo_address()
281                    .tap_err(|_| warn!(p2p_address=?addr, "Skipping allowlisted peer address: can't convert to anemo address"))
282                    .ok()
283            })
284            .collect();
285        configured_peers.insert(
286            ap.peer_id,
287            PeerInfo {
288                peer_id: ap.peer_id,
289                affinity: PeerAffinity::Allowed,
290                address: addresses,
291            },
292        );
293    }
294
295    (configured_peers, unidentified_seed_peers)
296}