1use super::{
5 Discovery, DiscoveryEventLoop, DiscoveryMessage, DiscoveryServer, Handle, State,
6 metrics::Metrics, server::Server,
7};
8use crate::endpoint_manager::EndpointManager;
9use anemo::codegen::InboundRequestLayer;
10use anemo::types::PeerAffinity;
11use anemo::{PeerId, types::PeerInfo};
12use anemo_tower::rate_limit;
13use fastcrypto::traits::KeyPair;
14use std::{
15 collections::HashMap,
16 path::PathBuf,
17 sync::{Arc, OnceLock, RwLock},
18};
19use sui_config::p2p::P2pConfig;
20use sui_types::crypto::NetworkKeyPair;
21use sui_types::multiaddr::Multiaddr;
22use tap::{Pipe, TapFallible};
23use tokio::{
24 sync::{mpsc, oneshot},
25 task::JoinSet,
26};
27use tracing::warn;
28
29pub struct Builder {
31 config: Option<P2pConfig>,
32 metrics: Option<Metrics>,
33 consensus_external_address: Option<Multiaddr>,
34}
35
36impl Builder {
37 #[allow(clippy::new_without_default)]
38 pub fn new() -> Self {
39 Self {
40 config: None,
41 metrics: None,
42 consensus_external_address: None,
43 }
44 }
45
46 pub fn config(mut self, config: P2pConfig) -> Self {
47 self.config = Some(config);
48 self
49 }
50
51 pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
52 self.metrics = Some(Metrics::enabled(registry));
53 self
54 }
55
56 pub fn consensus_external_address(mut self, addr: Multiaddr) -> Self {
57 self.consensus_external_address = Some(addr);
58 self
59 }
60
61 pub fn build(
62 self,
63 ) -> (
64 UnstartedDiscovery,
65 DiscoveryServer<impl Discovery>,
66 EndpointManager,
67 ) {
68 let discovery_config = self
69 .config
70 .clone()
71 .and_then(|config| config.discovery)
72 .unwrap_or_default();
73 let (builder, server, endpoint_manager) = self.build_internal();
74 let mut discovery_server = DiscoveryServer::new(server);
75
76 if let Some(limit) = discovery_config.get_known_peers_rate_limit {
78 discovery_server = discovery_server.add_layer_for_get_known_peers_v2(
79 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
80 governor::Quota::per_second(limit),
81 rate_limit::WaitMode::Block,
82 )),
83 );
84 discovery_server = discovery_server.add_layer_for_get_known_peers_v3(
85 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
86 governor::Quota::per_second(limit),
87 rate_limit::WaitMode::Block,
88 )),
89 );
90 }
91 (builder, discovery_server, endpoint_manager)
92 }
93
94 pub(super) fn build_internal(self) -> (UnstartedDiscovery, Server, EndpointManager) {
95 let Builder {
96 config,
97 metrics,
98 consensus_external_address,
99 } = self;
100 let config = config.unwrap();
101 let discovery_config = config.discovery.clone().unwrap_or_default();
102 let store_path = discovery_config.peer_addr_store_path.clone();
103 let metrics = metrics.unwrap_or_else(Metrics::disabled);
104 let (shutdown_tx, shutdown_rx) = oneshot::channel();
105 let (mailbox_tx, mailbox_rx) = mpsc::channel(discovery_config.mailbox_capacity());
106
107 let handle = Handle {
108 _shutdown_handle: Arc::new(shutdown_tx),
109 sender: super::Sender {
110 sender: mailbox_tx.clone(),
111 },
112 };
113
114 let endpoint_manager = EndpointManager::new(handle.sender());
115
116 let state = State {
117 our_info: None,
118 our_info_v2: None,
119 connected_peers: HashMap::default(),
120 known_peers: HashMap::default(),
121 known_peers_v2: HashMap::default(),
122 peer_addresses: HashMap::default(),
123 }
124 .pipe(RwLock::new)
125 .pipe(Arc::new);
126
127 let configured_peers = Arc::new(OnceLock::new());
128
129 let server = Server {
130 state: state.clone(),
131 configured_peers: configured_peers.clone(),
132 mailbox_sender: mailbox_tx.clone(),
133 };
134
135 (
136 UnstartedDiscovery {
137 handle,
138 config,
139 shutdown_handle: shutdown_rx,
140 state,
141 mailbox: mailbox_rx,
142 mailbox_tx: mailbox_tx.clone(),
143 metrics,
144 configured_peers,
145 consensus_external_address,
146 endpoint_manager: endpoint_manager.clone(),
147 store_path,
148 },
149 server,
150 endpoint_manager,
151 )
152 }
153}
154
155pub struct UnstartedDiscovery {
157 pub(super) handle: Handle,
158 pub(super) config: P2pConfig,
159 pub(super) shutdown_handle: oneshot::Receiver<()>,
160 pub(super) state: Arc<RwLock<State>>,
161 pub(super) mailbox: mpsc::Receiver<DiscoveryMessage>,
162 pub(super) mailbox_tx: mpsc::Sender<DiscoveryMessage>,
163 pub(super) metrics: Metrics,
164 pub(super) configured_peers: Arc<OnceLock<HashMap<PeerId, PeerInfo>>>,
165 pub(super) consensus_external_address: Option<Multiaddr>,
166 pub(super) endpoint_manager: EndpointManager,
167 pub(super) store_path: Option<PathBuf>,
168}
169
170impl UnstartedDiscovery {
171 pub fn sender(&self) -> super::Sender {
172 self.handle.sender()
173 }
174
175 pub(super) fn build(
176 self,
177 network: anemo::Network,
178 keypair: NetworkKeyPair,
179 ) -> (DiscoveryEventLoop, Handle) {
180 let Self {
181 handle,
182 config,
183 shutdown_handle,
184 state,
185 mailbox,
186 mailbox_tx,
187 metrics,
188 configured_peers,
189 consensus_external_address,
190 endpoint_manager,
191 store_path,
192 } = self;
193
194 let discovery_config = config.discovery.clone().unwrap_or_default();
195 let (built_configured_peers, unidentified_seed_peers) =
196 build_peer_config(&config, &discovery_config);
197
198 configured_peers
200 .set(built_configured_peers.clone())
201 .expect("configured_peers should only be set once");
202
203 (
204 DiscoveryEventLoop {
205 config,
206 discovery_config: Arc::new(discovery_config),
207 configured_peers: Arc::new(built_configured_peers),
208 unidentified_seed_peers,
209 network,
210 keypair,
211 tasks: JoinSet::new(),
212 pending_dials: Default::default(),
213 dial_seed_peers_task: None,
214 shutdown_handle,
215 state,
216 mailbox,
217 mailbox_tx,
218 metrics,
219 consensus_external_address,
220 endpoint_manager,
221 store_path,
222 peer_cooldowns: HashMap::new(),
223 },
224 handle,
225 )
226 }
227
228 pub fn start(self, network: anemo::Network, keypair: NetworkKeyPair) -> Handle {
229 assert_eq!(network.peer_id().0, *keypair.public().0.as_bytes());
230 let (event_loop, handle) = self.build(network, keypair);
231 tokio::spawn(event_loop.start());
232
233 handle
234 }
235}
236
237fn build_peer_config(
239 config: &P2pConfig,
240 discovery_config: &sui_config::p2p::DiscoveryConfig,
241) -> (HashMap<PeerId, PeerInfo>, Vec<anemo::types::Address>) {
242 let mut configured_peers = HashMap::new();
243 let mut unidentified_seed_peers = Vec::new();
244
245 for seed in &config.seed_peers {
246 let anemo_addr = seed
247 .address
248 .to_anemo_address()
249 .tap_err(|_| warn!(p2p_address=?seed.address, "Skipping seed peer address: can't convert to anemo address"))
250 .ok();
251 match (seed.peer_id, anemo_addr) {
252 (Some(peer_id), addr) => {
253 configured_peers.insert(
254 peer_id,
255 PeerInfo {
256 peer_id,
257 affinity: PeerAffinity::High,
258 address: addr.into_iter().collect(),
259 },
260 );
261 }
262 (None, Some(addr)) => {
263 unidentified_seed_peers.push(addr);
264 }
265 (None, None) => {}
266 }
267 }
268
269 for ap in &discovery_config.allowlisted_peers {
270 let addresses = ap
271 .address
272 .iter()
273 .filter_map(|addr| {
274 addr.to_anemo_address()
275 .tap_err(|_| warn!(p2p_address=?addr, "Skipping allowlisted peer address: can't convert to anemo address"))
276 .ok()
277 })
278 .collect();
279 configured_peers.insert(
280 ap.peer_id,
281 PeerInfo {
282 peer_id: ap.peer_id,
283 affinity: PeerAffinity::Allowed,
284 address: addresses,
285 },
286 );
287 }
288
289 (configured_peers, unidentified_seed_peers)
290}