sui_network/discovery/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5    Discovery, DiscoveryEventLoop, DiscoveryMessage, DiscoveryServer, Handle, State,
6    metrics::Metrics, server::Server,
7};
8use crate::endpoint_manager::EndpointManager;
9use anemo::codegen::InboundRequestLayer;
10use anemo::types::PeerAffinity;
11use anemo::{PeerId, types::PeerInfo};
12use anemo_tower::rate_limit;
13use fastcrypto::traits::KeyPair;
14use std::{
15    collections::HashMap,
16    path::PathBuf,
17    sync::{Arc, OnceLock, RwLock},
18};
19use sui_config::p2p::P2pConfig;
20use sui_types::crypto::NetworkKeyPair;
21use sui_types::multiaddr::Multiaddr;
22use tap::{Pipe, TapFallible};
23use tokio::{
24    sync::{mpsc, oneshot},
25    task::JoinSet,
26};
27use tracing::warn;
28
29/// Discovery Service Builder.
30pub struct Builder {
31    config: Option<P2pConfig>,
32    metrics: Option<Metrics>,
33    consensus_external_address: Option<Multiaddr>,
34}
35
36impl Builder {
37    #[allow(clippy::new_without_default)]
38    pub fn new() -> Self {
39        Self {
40            config: None,
41            metrics: None,
42            consensus_external_address: None,
43        }
44    }
45
46    pub fn config(mut self, config: P2pConfig) -> Self {
47        self.config = Some(config);
48        self
49    }
50
51    pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
52        self.metrics = Some(Metrics::enabled(registry));
53        self
54    }
55
56    pub fn consensus_external_address(mut self, addr: Multiaddr) -> Self {
57        self.consensus_external_address = Some(addr);
58        self
59    }
60
61    pub fn build(
62        self,
63    ) -> (
64        UnstartedDiscovery,
65        DiscoveryServer<impl Discovery>,
66        EndpointManager,
67    ) {
68        let discovery_config = self
69            .config
70            .clone()
71            .and_then(|config| config.discovery)
72            .unwrap_or_default();
73        let (builder, server, endpoint_manager) = self.build_internal();
74        let mut discovery_server = DiscoveryServer::new(server);
75
76        // Apply rate limits from configuration as needed.
77        if let Some(limit) = discovery_config.get_known_peers_rate_limit {
78            discovery_server = discovery_server.add_layer_for_get_known_peers_v2(
79                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
80                    governor::Quota::per_second(limit),
81                    rate_limit::WaitMode::Block,
82                )),
83            );
84            discovery_server = discovery_server.add_layer_for_get_known_peers_v3(
85                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
86                    governor::Quota::per_second(limit),
87                    rate_limit::WaitMode::Block,
88                )),
89            );
90        }
91        (builder, discovery_server, endpoint_manager)
92    }
93
94    pub(super) fn build_internal(self) -> (UnstartedDiscovery, Server, EndpointManager) {
95        let Builder {
96            config,
97            metrics,
98            consensus_external_address,
99        } = self;
100        let config = config.unwrap();
101        let discovery_config = config.discovery.clone().unwrap_or_default();
102        let store_path = discovery_config.peer_addr_store_path.clone();
103        let metrics = metrics.unwrap_or_else(Metrics::disabled);
104        let (shutdown_tx, shutdown_rx) = oneshot::channel();
105        let (mailbox_tx, mailbox_rx) = mpsc::channel(discovery_config.mailbox_capacity());
106
107        let handle = Handle {
108            _shutdown_handle: Arc::new(shutdown_tx),
109            sender: super::Sender {
110                sender: mailbox_tx.clone(),
111            },
112        };
113
114        let endpoint_manager = EndpointManager::new(handle.sender());
115
116        let state = State {
117            our_info: None,
118            our_info_v2: None,
119            connected_peers: HashMap::default(),
120            known_peers: HashMap::default(),
121            known_peers_v2: HashMap::default(),
122            peer_addresses: HashMap::default(),
123        }
124        .pipe(RwLock::new)
125        .pipe(Arc::new);
126
127        let configured_peers = Arc::new(OnceLock::new());
128
129        let server = Server {
130            state: state.clone(),
131            configured_peers: configured_peers.clone(),
132            mailbox_sender: mailbox_tx.clone(),
133        };
134
135        (
136            UnstartedDiscovery {
137                handle,
138                config,
139                shutdown_handle: shutdown_rx,
140                state,
141                mailbox: mailbox_rx,
142                mailbox_tx: mailbox_tx.clone(),
143                metrics,
144                configured_peers,
145                consensus_external_address,
146                endpoint_manager: endpoint_manager.clone(),
147                store_path,
148            },
149            server,
150            endpoint_manager,
151        )
152    }
153}
154
155/// Handle to an unstarted discovery system
156pub struct UnstartedDiscovery {
157    pub(super) handle: Handle,
158    pub(super) config: P2pConfig,
159    pub(super) shutdown_handle: oneshot::Receiver<()>,
160    pub(super) state: Arc<RwLock<State>>,
161    pub(super) mailbox: mpsc::Receiver<DiscoveryMessage>,
162    pub(super) mailbox_tx: mpsc::Sender<DiscoveryMessage>,
163    pub(super) metrics: Metrics,
164    pub(super) configured_peers: Arc<OnceLock<HashMap<PeerId, PeerInfo>>>,
165    pub(super) consensus_external_address: Option<Multiaddr>,
166    pub(super) endpoint_manager: EndpointManager,
167    pub(super) store_path: Option<PathBuf>,
168}
169
170impl UnstartedDiscovery {
171    pub fn sender(&self) -> super::Sender {
172        self.handle.sender()
173    }
174
175    pub(super) fn build(
176        self,
177        network: anemo::Network,
178        keypair: NetworkKeyPair,
179    ) -> (DiscoveryEventLoop, Handle) {
180        let Self {
181            handle,
182            config,
183            shutdown_handle,
184            state,
185            mailbox,
186            mailbox_tx,
187            metrics,
188            configured_peers,
189            consensus_external_address,
190            endpoint_manager,
191            store_path,
192        } = self;
193
194        let discovery_config = config.discovery.clone().unwrap_or_default();
195        let (built_configured_peers, unidentified_seed_peers) =
196            build_peer_config(&config, &discovery_config);
197
198        // Populate the shared configured_peers for the Server.
199        configured_peers
200            .set(built_configured_peers.clone())
201            .expect("configured_peers should only be set once");
202
203        (
204            DiscoveryEventLoop {
205                config,
206                discovery_config: Arc::new(discovery_config),
207                configured_peers: Arc::new(built_configured_peers),
208                unidentified_seed_peers,
209                network,
210                keypair,
211                tasks: JoinSet::new(),
212                pending_dials: Default::default(),
213                dial_seed_peers_task: None,
214                shutdown_handle,
215                state,
216                mailbox,
217                mailbox_tx,
218                metrics,
219                consensus_external_address,
220                endpoint_manager,
221                store_path,
222                peer_cooldowns: HashMap::new(),
223            },
224            handle,
225        )
226    }
227
228    pub fn start(self, network: anemo::Network, keypair: NetworkKeyPair) -> Handle {
229        assert_eq!(network.peer_id().0, *keypair.public().0.as_bytes());
230        let (event_loop, handle) = self.build(network, keypair);
231        tokio::spawn(event_loop.start());
232
233        handle
234    }
235}
236
237/// Returns (configured_peers, unidentified_seed_peers).
238fn build_peer_config(
239    config: &P2pConfig,
240    discovery_config: &sui_config::p2p::DiscoveryConfig,
241) -> (HashMap<PeerId, PeerInfo>, Vec<anemo::types::Address>) {
242    let mut configured_peers = HashMap::new();
243    let mut unidentified_seed_peers = Vec::new();
244
245    for seed in &config.seed_peers {
246        let anemo_addr = seed
247            .address
248            .to_anemo_address()
249            .tap_err(|_| warn!(p2p_address=?seed.address, "Skipping seed peer address: can't convert to anemo address"))
250            .ok();
251        match (seed.peer_id, anemo_addr) {
252            (Some(peer_id), addr) => {
253                configured_peers.insert(
254                    peer_id,
255                    PeerInfo {
256                        peer_id,
257                        affinity: PeerAffinity::High,
258                        address: addr.into_iter().collect(),
259                    },
260                );
261            }
262            (None, Some(addr)) => {
263                unidentified_seed_peers.push(addr);
264            }
265            (None, None) => {}
266        }
267    }
268
269    for ap in &discovery_config.allowlisted_peers {
270        let addresses = ap
271            .address
272            .iter()
273            .filter_map(|addr| {
274                addr.to_anemo_address()
275                    .tap_err(|_| warn!(p2p_address=?addr, "Skipping allowlisted peer address: can't convert to anemo address"))
276                    .ok()
277            })
278            .collect();
279        configured_peers.insert(
280            ap.peer_id,
281            PeerInfo {
282                peer_id: ap.peer_id,
283                affinity: PeerAffinity::Allowed,
284                address: addresses,
285            },
286        );
287    }
288
289    (configured_peers, unidentified_seed_peers)
290}