sui_network/discovery/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5    Discovery, DiscoveryEventLoop, DiscoveryMessage, DiscoveryServer, Handle, State,
6    metrics::Metrics, server::Server,
7};
8use crate::endpoint_manager::EndpointManager;
9use anemo::codegen::InboundRequestLayer;
10use anemo::types::PeerAffinity;
11use anemo::{PeerId, types::PeerInfo};
12use anemo_tower::rate_limit;
13use fastcrypto::traits::KeyPair;
14use std::{
15    collections::HashMap,
16    sync::{Arc, OnceLock, RwLock},
17};
18use sui_config::p2p::P2pConfig;
19use sui_types::crypto::NetworkKeyPair;
20use sui_types::multiaddr::Multiaddr;
21use tap::{Pipe, TapFallible};
22use tokio::{
23    sync::{mpsc, oneshot},
24    task::JoinSet,
25};
26use tracing::warn;
27
28/// Discovery Service Builder.
29pub struct Builder {
30    config: Option<P2pConfig>,
31    metrics: Option<Metrics>,
32    consensus_external_address: Option<Multiaddr>,
33}
34
35impl Builder {
36    #[allow(clippy::new_without_default)]
37    pub fn new() -> Self {
38        Self {
39            config: None,
40            metrics: None,
41            consensus_external_address: None,
42        }
43    }
44
45    pub fn config(mut self, config: P2pConfig) -> Self {
46        self.config = Some(config);
47        self
48    }
49
50    pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
51        self.metrics = Some(Metrics::enabled(registry));
52        self
53    }
54
55    pub fn consensus_external_address(mut self, addr: Multiaddr) -> Self {
56        self.consensus_external_address = Some(addr);
57        self
58    }
59
60    pub fn build(
61        self,
62    ) -> (
63        UnstartedDiscovery,
64        DiscoveryServer<impl Discovery>,
65        EndpointManager,
66    ) {
67        let discovery_config = self
68            .config
69            .clone()
70            .and_then(|config| config.discovery)
71            .unwrap_or_default();
72        let (builder, server, endpoint_manager) = self.build_internal();
73        let mut discovery_server = DiscoveryServer::new(server);
74
75        // Apply rate limits from configuration as needed.
76        if let Some(limit) = discovery_config.get_known_peers_rate_limit {
77            discovery_server = discovery_server.add_layer_for_get_known_peers_v2(
78                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
79                    governor::Quota::per_second(limit),
80                    rate_limit::WaitMode::Block,
81                )),
82            );
83            discovery_server = discovery_server.add_layer_for_get_known_peers_v3(
84                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
85                    governor::Quota::per_second(limit),
86                    rate_limit::WaitMode::Block,
87                )),
88            );
89        }
90        (builder, discovery_server, endpoint_manager)
91    }
92
93    pub(super) fn build_internal(self) -> (UnstartedDiscovery, Server, EndpointManager) {
94        let Builder {
95            config,
96            metrics,
97            consensus_external_address,
98        } = self;
99        let config = config.unwrap();
100        let discovery_config = config.discovery.clone().unwrap_or_default();
101        let metrics = metrics.unwrap_or_else(Metrics::disabled);
102        let (shutdown_tx, shutdown_rx) = oneshot::channel();
103        let (mailbox_tx, mailbox_rx) = mpsc::channel(discovery_config.mailbox_capacity());
104
105        let handle = Handle {
106            _shutdown_handle: Arc::new(shutdown_tx),
107            sender: super::Sender {
108                sender: mailbox_tx.clone(),
109            },
110        };
111
112        let endpoint_manager = EndpointManager::new(handle.sender());
113
114        let state = State {
115            our_info: None,
116            our_info_v2: None,
117            connected_peers: HashMap::default(),
118            known_peers: HashMap::default(),
119            known_peers_v2: HashMap::default(),
120            peer_address_overrides: HashMap::default(),
121        }
122        .pipe(RwLock::new)
123        .pipe(Arc::new);
124
125        let configured_peers = Arc::new(OnceLock::new());
126
127        let server = Server {
128            state: state.clone(),
129            configured_peers: configured_peers.clone(),
130            mailbox_sender: mailbox_tx.clone(),
131        };
132
133        (
134            UnstartedDiscovery {
135                handle,
136                config,
137                shutdown_handle: shutdown_rx,
138                state,
139                mailbox: mailbox_rx,
140                metrics,
141                configured_peers,
142                consensus_external_address,
143                endpoint_manager: endpoint_manager.clone(),
144            },
145            server,
146            endpoint_manager,
147        )
148    }
149}
150
151/// Handle to an unstarted discovery system
152pub struct UnstartedDiscovery {
153    pub(super) handle: Handle,
154    pub(super) config: P2pConfig,
155    pub(super) shutdown_handle: oneshot::Receiver<()>,
156    pub(super) state: Arc<RwLock<State>>,
157    pub(super) mailbox: mpsc::Receiver<DiscoveryMessage>,
158    pub(super) metrics: Metrics,
159    pub(super) configured_peers: Arc<OnceLock<HashMap<PeerId, PeerInfo>>>,
160    pub(super) consensus_external_address: Option<Multiaddr>,
161    pub(super) endpoint_manager: EndpointManager,
162}
163
164impl UnstartedDiscovery {
165    pub(super) fn build(
166        self,
167        network: anemo::Network,
168        keypair: NetworkKeyPair,
169    ) -> (DiscoveryEventLoop, Handle) {
170        let Self {
171            handle,
172            config,
173            shutdown_handle,
174            state,
175            mailbox,
176            metrics,
177            configured_peers,
178            consensus_external_address,
179            endpoint_manager,
180        } = self;
181
182        let discovery_config = config.discovery.clone().unwrap_or_default();
183        let (built_configured_peers, unidentified_seed_peers) =
184            build_peer_config(&config, &discovery_config);
185
186        // Populate the shared configured_peers for the Server.
187        configured_peers
188            .set(built_configured_peers.clone())
189            .expect("configured_peers should only be set once");
190
191        (
192            DiscoveryEventLoop {
193                config,
194                discovery_config: Arc::new(discovery_config),
195                configured_peers: Arc::new(built_configured_peers),
196                unidentified_seed_peers,
197                network,
198                keypair,
199                tasks: JoinSet::new(),
200                pending_dials: Default::default(),
201                dial_seed_peers_task: None,
202                shutdown_handle,
203                state,
204                mailbox,
205                metrics,
206                consensus_external_address,
207                endpoint_manager,
208            },
209            handle,
210        )
211    }
212
213    pub fn start(self, network: anemo::Network, keypair: NetworkKeyPair) -> Handle {
214        assert_eq!(network.peer_id().0, *keypair.public().0.as_bytes());
215        let (event_loop, handle) = self.build(network, keypair);
216        tokio::spawn(event_loop.start());
217
218        handle
219    }
220}
221
222/// Returns (configured_peers, unidentified_seed_peers).
223fn build_peer_config(
224    config: &P2pConfig,
225    discovery_config: &sui_config::p2p::DiscoveryConfig,
226) -> (HashMap<PeerId, PeerInfo>, Vec<anemo::types::Address>) {
227    let mut configured_peers = HashMap::new();
228    let mut unidentified_seed_peers = Vec::new();
229
230    for seed in &config.seed_peers {
231        let anemo_addr = seed
232            .address
233            .to_anemo_address()
234            .tap_err(|_| warn!(p2p_address=?seed.address, "Skipping seed peer address: can't convert to anemo address"))
235            .ok();
236        match (seed.peer_id, anemo_addr) {
237            (Some(peer_id), addr) => {
238                configured_peers.insert(
239                    peer_id,
240                    PeerInfo {
241                        peer_id,
242                        affinity: PeerAffinity::High,
243                        address: addr.into_iter().collect(),
244                    },
245                );
246            }
247            (None, Some(addr)) => {
248                unidentified_seed_peers.push(addr);
249            }
250            (None, None) => {}
251        }
252    }
253
254    for ap in &discovery_config.allowlisted_peers {
255        let addresses = ap
256            .address
257            .iter()
258            .filter_map(|addr| {
259                addr.to_anemo_address()
260                    .tap_err(|_| warn!(p2p_address=?addr, "Skipping allowlisted peer address: can't convert to anemo address"))
261                    .ok()
262            })
263            .collect();
264        configured_peers.insert(
265            ap.peer_id,
266            PeerInfo {
267                peer_id: ap.peer_id,
268                affinity: PeerAffinity::Allowed,
269                address: addresses,
270            },
271        );
272    }
273
274    (configured_peers, unidentified_seed_peers)
275}