1use super::{
5 Discovery, DiscoveryEventLoop, DiscoveryMessage, DiscoveryServer, Handle, State,
6 metrics::Metrics, server::Server,
7};
8use crate::endpoint_manager::EndpointManager;
9use anemo::codegen::InboundRequestLayer;
10use anemo::types::PeerAffinity;
11use anemo::{PeerId, types::PeerInfo};
12use anemo_tower::rate_limit;
13use fastcrypto::traits::KeyPair;
14use std::{
15 collections::HashMap,
16 sync::{Arc, OnceLock, RwLock},
17};
18use sui_config::p2p::P2pConfig;
19use sui_types::crypto::NetworkKeyPair;
20use sui_types::multiaddr::Multiaddr;
21use tap::{Pipe, TapFallible};
22use tokio::{
23 sync::{mpsc, oneshot},
24 task::JoinSet,
25};
26use tracing::warn;
27
28pub struct Builder {
30 config: Option<P2pConfig>,
31 metrics: Option<Metrics>,
32 consensus_external_address: Option<Multiaddr>,
33}
34
35impl Builder {
36 #[allow(clippy::new_without_default)]
37 pub fn new() -> Self {
38 Self {
39 config: None,
40 metrics: None,
41 consensus_external_address: None,
42 }
43 }
44
45 pub fn config(mut self, config: P2pConfig) -> Self {
46 self.config = Some(config);
47 self
48 }
49
50 pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
51 self.metrics = Some(Metrics::enabled(registry));
52 self
53 }
54
55 pub fn consensus_external_address(mut self, addr: Multiaddr) -> Self {
56 self.consensus_external_address = Some(addr);
57 self
58 }
59
60 pub fn build(
61 self,
62 ) -> (
63 UnstartedDiscovery,
64 DiscoveryServer<impl Discovery>,
65 EndpointManager,
66 ) {
67 let discovery_config = self
68 .config
69 .clone()
70 .and_then(|config| config.discovery)
71 .unwrap_or_default();
72 let (builder, server, endpoint_manager) = self.build_internal();
73 let mut discovery_server = DiscoveryServer::new(server);
74
75 if let Some(limit) = discovery_config.get_known_peers_rate_limit {
77 discovery_server = discovery_server.add_layer_for_get_known_peers_v2(
78 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
79 governor::Quota::per_second(limit),
80 rate_limit::WaitMode::Block,
81 )),
82 );
83 discovery_server = discovery_server.add_layer_for_get_known_peers_v3(
84 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
85 governor::Quota::per_second(limit),
86 rate_limit::WaitMode::Block,
87 )),
88 );
89 }
90 (builder, discovery_server, endpoint_manager)
91 }
92
93 pub(super) fn build_internal(self) -> (UnstartedDiscovery, Server, EndpointManager) {
94 let Builder {
95 config,
96 metrics,
97 consensus_external_address,
98 } = self;
99 let config = config.unwrap();
100 let discovery_config = config.discovery.clone().unwrap_or_default();
101 let metrics = metrics.unwrap_or_else(Metrics::disabled);
102 let (shutdown_tx, shutdown_rx) = oneshot::channel();
103 let (mailbox_tx, mailbox_rx) = mpsc::channel(discovery_config.mailbox_capacity());
104
105 let handle = Handle {
106 _shutdown_handle: Arc::new(shutdown_tx),
107 sender: super::Sender {
108 sender: mailbox_tx.clone(),
109 },
110 };
111
112 let endpoint_manager = EndpointManager::new(handle.sender());
113
114 let state = State {
115 our_info: None,
116 our_info_v2: None,
117 connected_peers: HashMap::default(),
118 known_peers: HashMap::default(),
119 known_peers_v2: HashMap::default(),
120 peer_address_overrides: HashMap::default(),
121 }
122 .pipe(RwLock::new)
123 .pipe(Arc::new);
124
125 let configured_peers = Arc::new(OnceLock::new());
126
127 let server = Server {
128 state: state.clone(),
129 configured_peers: configured_peers.clone(),
130 mailbox_sender: mailbox_tx.clone(),
131 };
132
133 (
134 UnstartedDiscovery {
135 handle,
136 config,
137 shutdown_handle: shutdown_rx,
138 state,
139 mailbox: mailbox_rx,
140 metrics,
141 configured_peers,
142 consensus_external_address,
143 endpoint_manager: endpoint_manager.clone(),
144 },
145 server,
146 endpoint_manager,
147 )
148 }
149}
150
151pub struct UnstartedDiscovery {
153 pub(super) handle: Handle,
154 pub(super) config: P2pConfig,
155 pub(super) shutdown_handle: oneshot::Receiver<()>,
156 pub(super) state: Arc<RwLock<State>>,
157 pub(super) mailbox: mpsc::Receiver<DiscoveryMessage>,
158 pub(super) metrics: Metrics,
159 pub(super) configured_peers: Arc<OnceLock<HashMap<PeerId, PeerInfo>>>,
160 pub(super) consensus_external_address: Option<Multiaddr>,
161 pub(super) endpoint_manager: EndpointManager,
162}
163
164impl UnstartedDiscovery {
165 pub(super) fn build(
166 self,
167 network: anemo::Network,
168 keypair: NetworkKeyPair,
169 ) -> (DiscoveryEventLoop, Handle) {
170 let Self {
171 handle,
172 config,
173 shutdown_handle,
174 state,
175 mailbox,
176 metrics,
177 configured_peers,
178 consensus_external_address,
179 endpoint_manager,
180 } = self;
181
182 let discovery_config = config.discovery.clone().unwrap_or_default();
183 let (built_configured_peers, unidentified_seed_peers) =
184 build_peer_config(&config, &discovery_config);
185
186 configured_peers
188 .set(built_configured_peers.clone())
189 .expect("configured_peers should only be set once");
190
191 (
192 DiscoveryEventLoop {
193 config,
194 discovery_config: Arc::new(discovery_config),
195 configured_peers: Arc::new(built_configured_peers),
196 unidentified_seed_peers,
197 network,
198 keypair,
199 tasks: JoinSet::new(),
200 pending_dials: Default::default(),
201 dial_seed_peers_task: None,
202 shutdown_handle,
203 state,
204 mailbox,
205 metrics,
206 consensus_external_address,
207 endpoint_manager,
208 },
209 handle,
210 )
211 }
212
213 pub fn start(self, network: anemo::Network, keypair: NetworkKeyPair) -> Handle {
214 assert_eq!(network.peer_id().0, *keypair.public().0.as_bytes());
215 let (event_loop, handle) = self.build(network, keypair);
216 tokio::spawn(event_loop.start());
217
218 handle
219 }
220}
221
222fn build_peer_config(
224 config: &P2pConfig,
225 discovery_config: &sui_config::p2p::DiscoveryConfig,
226) -> (HashMap<PeerId, PeerInfo>, Vec<anemo::types::Address>) {
227 let mut configured_peers = HashMap::new();
228 let mut unidentified_seed_peers = Vec::new();
229
230 for seed in &config.seed_peers {
231 let anemo_addr = seed
232 .address
233 .to_anemo_address()
234 .tap_err(|_| warn!(p2p_address=?seed.address, "Skipping seed peer address: can't convert to anemo address"))
235 .ok();
236 match (seed.peer_id, anemo_addr) {
237 (Some(peer_id), addr) => {
238 configured_peers.insert(
239 peer_id,
240 PeerInfo {
241 peer_id,
242 affinity: PeerAffinity::High,
243 address: addr.into_iter().collect(),
244 },
245 );
246 }
247 (None, Some(addr)) => {
248 unidentified_seed_peers.push(addr);
249 }
250 (None, None) => {}
251 }
252 }
253
254 for ap in &discovery_config.allowlisted_peers {
255 let addresses = ap
256 .address
257 .iter()
258 .filter_map(|addr| {
259 addr.to_anemo_address()
260 .tap_err(|_| warn!(p2p_address=?addr, "Skipping allowlisted peer address: can't convert to anemo address"))
261 .ok()
262 })
263 .collect();
264 configured_peers.insert(
265 ap.peer_id,
266 PeerInfo {
267 peer_id: ap.peer_id,
268 affinity: PeerAffinity::Allowed,
269 address: addresses,
270 },
271 );
272 }
273
274 (configured_peers, unidentified_seed_peers)
275}