1use super::{
5 Discovery, DiscoveryEventLoop, DiscoveryMessage, DiscoveryServer, Handle, State,
6 metrics::Metrics, server::Server,
7};
8use crate::endpoint_manager::EndpointManager;
9use anemo::codegen::InboundRequestLayer;
10use anemo::types::PeerAffinity;
11use anemo::{PeerId, types::PeerInfo};
12use anemo_tower::rate_limit;
13use fastcrypto::traits::KeyPair;
14use std::{
15 collections::{HashMap, HashSet},
16 path::PathBuf,
17 sync::{Arc, OnceLock, RwLock},
18};
19use sui_config::p2p::P2pConfig;
20use sui_types::crypto::NetworkKeyPair;
21use sui_types::multiaddr::Multiaddr;
22use tap::{Pipe, TapFallible};
23use tokio::{
24 sync::{mpsc, oneshot},
25 task::JoinSet,
26};
27use tracing::warn;
28
29pub struct Builder {
31 config: Option<P2pConfig>,
32 metrics: Option<Metrics>,
33 consensus_external_address: Option<Multiaddr>,
34}
35
36impl Builder {
37 #[allow(clippy::new_without_default)]
38 pub fn new() -> Self {
39 Self {
40 config: None,
41 metrics: None,
42 consensus_external_address: None,
43 }
44 }
45
46 pub fn config(mut self, config: P2pConfig) -> Self {
47 self.config = Some(config);
48 self
49 }
50
51 pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
52 self.metrics = Some(Metrics::enabled(registry));
53 self
54 }
55
56 pub fn consensus_external_address(mut self, addr: Multiaddr) -> Self {
57 self.consensus_external_address = Some(addr);
58 self
59 }
60
61 pub fn build(
62 self,
63 ) -> (
64 UnstartedDiscovery,
65 DiscoveryServer<impl Discovery>,
66 EndpointManager,
67 ) {
68 let discovery_config = self
69 .config
70 .clone()
71 .and_then(|config| config.discovery)
72 .unwrap_or_default();
73 let (builder, server, endpoint_manager) = self.build_internal();
74 let mut discovery_server = DiscoveryServer::new(server);
75
76 if let Some(limit) = discovery_config.get_known_peers_rate_limit {
78 discovery_server = discovery_server.add_layer_for_get_known_peers_v2(
79 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
80 governor::Quota::per_second(limit),
81 rate_limit::WaitMode::Block,
82 )),
83 );
84 discovery_server = discovery_server.add_layer_for_get_known_peers_v3(
85 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
86 governor::Quota::per_second(limit),
87 rate_limit::WaitMode::Block,
88 )),
89 );
90 }
91 (builder, discovery_server, endpoint_manager)
92 }
93
94 pub(super) fn build_internal(self) -> (UnstartedDiscovery, Server, EndpointManager) {
95 let Builder {
96 config,
97 metrics,
98 consensus_external_address,
99 } = self;
100 let config = config.unwrap();
101 let discovery_config = config.discovery.clone().unwrap_or_default();
102 let store_path = discovery_config.peer_addr_store_path.clone();
103 let metrics = metrics.unwrap_or_else(Metrics::disabled);
104 let (shutdown_tx, shutdown_rx) = oneshot::channel();
105 let (mailbox_tx, mailbox_rx) = mpsc::channel(discovery_config.mailbox_capacity());
106
107 let handle = Handle {
108 _shutdown_handle: Arc::new(shutdown_tx),
109 sender: super::Sender {
110 sender: mailbox_tx.clone(),
111 },
112 };
113
114 let endpoint_manager = EndpointManager::new(handle.sender());
115
116 let state = State {
117 our_info: None,
118 our_info_v2: None,
119 connected_peers: HashMap::default(),
120 known_peers: HashMap::default(),
121 known_peers_v2: HashMap::default(),
122 peer_addresses: HashMap::default(),
123 }
124 .pipe(RwLock::new)
125 .pipe(Arc::new);
126
127 let configured_peers = Arc::new(OnceLock::new());
128 let chain_peers = Arc::new(RwLock::new(HashSet::new()));
129
130 let server = Server {
131 state: state.clone(),
132 configured_peers: configured_peers.clone(),
133 chain_peers: chain_peers.clone(),
134 mailbox_sender: mailbox_tx.clone(),
135 };
136
137 (
138 UnstartedDiscovery {
139 handle,
140 config,
141 shutdown_handle: shutdown_rx,
142 state,
143 mailbox: mailbox_rx,
144 mailbox_tx: mailbox_tx.clone(),
145 metrics,
146 configured_peers,
147 chain_peers,
148 consensus_external_address,
149 endpoint_manager: endpoint_manager.clone(),
150 store_path,
151 },
152 server,
153 endpoint_manager,
154 )
155 }
156}
157
158pub struct UnstartedDiscovery {
160 pub(super) handle: Handle,
161 pub(super) config: P2pConfig,
162 pub(super) shutdown_handle: oneshot::Receiver<()>,
163 pub(super) state: Arc<RwLock<State>>,
164 pub(super) mailbox: mpsc::Receiver<DiscoveryMessage>,
165 pub(super) mailbox_tx: mpsc::Sender<DiscoveryMessage>,
166 pub(super) metrics: Metrics,
167 pub(super) configured_peers: Arc<OnceLock<HashMap<PeerId, PeerInfo>>>,
168 pub(super) chain_peers: Arc<RwLock<HashSet<PeerId>>>,
169 pub(super) consensus_external_address: Option<Multiaddr>,
170 pub(super) endpoint_manager: EndpointManager,
171 pub(super) store_path: Option<PathBuf>,
172}
173
174impl UnstartedDiscovery {
175 pub fn sender(&self) -> super::Sender {
176 self.handle.sender()
177 }
178
179 pub(super) fn build(
180 self,
181 network: anemo::Network,
182 keypair: NetworkKeyPair,
183 ) -> (DiscoveryEventLoop, Handle) {
184 let Self {
185 handle,
186 config,
187 shutdown_handle,
188 state,
189 mailbox,
190 mailbox_tx,
191 metrics,
192 configured_peers,
193 chain_peers,
194 consensus_external_address,
195 endpoint_manager,
196 store_path,
197 } = self;
198
199 let discovery_config = config.discovery.clone().unwrap_or_default();
200 let (built_configured_peers, unidentified_seed_peers) =
201 build_peer_config(&config, &discovery_config);
202
203 configured_peers
205 .set(built_configured_peers.clone())
206 .expect("configured_peers should only be set once");
207
208 (
209 DiscoveryEventLoop {
210 config,
211 discovery_config: Arc::new(discovery_config),
212 configured_peers: Arc::new(built_configured_peers),
213 chain_peers,
214 unidentified_seed_peers,
215 network,
216 keypair,
217 tasks: JoinSet::new(),
218 pending_dials: Default::default(),
219 dial_seed_peers_task: None,
220 shutdown_handle,
221 state,
222 mailbox,
223 mailbox_tx,
224 metrics,
225 consensus_external_address,
226 endpoint_manager,
227 store_path,
228 peer_cooldowns: HashMap::new(),
229 },
230 handle,
231 )
232 }
233
234 pub fn start(self, network: anemo::Network, keypair: NetworkKeyPair) -> Handle {
235 assert_eq!(network.peer_id().0, *keypair.public().0.as_bytes());
236 let (event_loop, handle) = self.build(network, keypair);
237 tokio::spawn(event_loop.start());
238
239 handle
240 }
241}
242
243fn build_peer_config(
245 config: &P2pConfig,
246 discovery_config: &sui_config::p2p::DiscoveryConfig,
247) -> (HashMap<PeerId, PeerInfo>, Vec<anemo::types::Address>) {
248 let mut configured_peers = HashMap::new();
249 let mut unidentified_seed_peers = Vec::new();
250
251 for seed in &config.seed_peers {
252 let anemo_addr = seed
253 .address
254 .to_anemo_address()
255 .tap_err(|_| warn!(p2p_address=?seed.address, "Skipping seed peer address: can't convert to anemo address"))
256 .ok();
257 match (seed.peer_id, anemo_addr) {
258 (Some(peer_id), addr) => {
259 configured_peers.insert(
260 peer_id,
261 PeerInfo {
262 peer_id,
263 affinity: PeerAffinity::High,
264 address: addr.into_iter().collect(),
265 },
266 );
267 }
268 (None, Some(addr)) => {
269 unidentified_seed_peers.push(addr);
270 }
271 (None, None) => {}
272 }
273 }
274
275 for ap in &discovery_config.allowlisted_peers {
276 let addresses = ap
277 .address
278 .iter()
279 .filter_map(|addr| {
280 addr.to_anemo_address()
281 .tap_err(|_| warn!(p2p_address=?addr, "Skipping allowlisted peer address: can't convert to anemo address"))
282 .ok()
283 })
284 .collect();
285 configured_peers.insert(
286 ap.peer_id,
287 PeerInfo {
288 peer_id: ap.peer_id,
289 affinity: PeerAffinity::Allowed,
290 address: addresses,
291 },
292 );
293 }
294
295 (configured_peers, unidentified_seed_peers)
296}