1use super::{
5 Discovery, DiscoveryEventLoop, DiscoveryMessage, DiscoveryServer, Handle, State,
6 metrics::Metrics, server::Server,
7};
8use anemo::codegen::InboundRequestLayer;
9use anemo::types::PeerAffinity;
10use anemo::{PeerId, types::PeerInfo};
11use anemo_tower::rate_limit;
12use fastcrypto::traits::KeyPair;
13use std::{
14 collections::HashMap,
15 sync::{Arc, RwLock},
16};
17use sui_config::p2p::P2pConfig;
18use sui_types::crypto::NetworkKeyPair;
19use tap::{Pipe, TapFallible};
20use tokio::{
21 sync::{mpsc, oneshot},
22 task::JoinSet,
23};
24use tracing::warn;
25
26pub struct Builder {
28 config: Option<P2pConfig>,
29 metrics: Option<Metrics>,
30}
31
32impl Builder {
33 #[allow(clippy::new_without_default)]
34 pub fn new() -> Self {
35 Self {
36 config: None,
37 metrics: None,
38 }
39 }
40
41 pub fn config(mut self, config: P2pConfig) -> Self {
42 self.config = Some(config);
43 self
44 }
45
46 pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
47 self.metrics = Some(Metrics::enabled(registry));
48 self
49 }
50
51 pub fn build(self) -> (UnstartedDiscovery, DiscoveryServer<impl Discovery>) {
52 let discovery_config = self
53 .config
54 .clone()
55 .and_then(|config| config.discovery)
56 .unwrap_or_default();
57 let (builder, server) = self.build_internal();
58 let mut discovery_server = DiscoveryServer::new(server);
59
60 if let Some(limit) = discovery_config.get_known_peers_rate_limit {
62 discovery_server = discovery_server.add_layer_for_get_known_peers_v2(
63 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
64 governor::Quota::per_second(limit),
65 rate_limit::WaitMode::Block,
66 )),
67 );
68 }
69 (builder, discovery_server)
70 }
71
72 pub(super) fn build_internal(self) -> (UnstartedDiscovery, Server) {
73 let Builder { config, metrics } = self;
74 let config = config.unwrap();
75 let discovery_config = config.discovery.clone().unwrap_or_default();
76 let metrics = metrics.unwrap_or_else(Metrics::disabled);
77 let (shutdown_tx, shutdown_rx) = oneshot::channel();
78 let (mailbox_tx, mailbox_rx) = mpsc::channel(discovery_config.mailbox_capacity());
79
80 let handle = Handle {
81 _shutdown_handle: Arc::new(shutdown_tx),
82 sender: mailbox_tx,
83 };
84
85 let state = State {
86 our_info: None,
87 connected_peers: HashMap::default(),
88 known_peers: HashMap::default(),
89 }
90 .pipe(RwLock::new)
91 .pipe(Arc::new);
92
93 let server = Server {
94 state: state.clone(),
95 };
96
97 (
98 UnstartedDiscovery {
99 handle,
100 config,
101 shutdown_handle: shutdown_rx,
102 state,
103 mailbox: mailbox_rx,
104 metrics,
105 },
106 server,
107 )
108 }
109}
110
111pub struct UnstartedDiscovery {
113 pub(super) handle: Handle,
114 pub(super) config: P2pConfig,
115 pub(super) shutdown_handle: oneshot::Receiver<()>,
116 pub(super) state: Arc<RwLock<State>>,
117 pub(super) mailbox: mpsc::Receiver<DiscoveryMessage>,
118 pub(super) metrics: Metrics,
119}
120
121impl UnstartedDiscovery {
122 pub(super) fn build(
123 self,
124 network: anemo::Network,
125 keypair: NetworkKeyPair,
126 ) -> (DiscoveryEventLoop, Handle) {
127 let Self {
128 handle,
129 config,
130 shutdown_handle,
131 state,
132 mailbox,
133 metrics,
134 } = self;
135
136 let discovery_config = config.discovery.clone().unwrap_or_default();
137 let (configured_peers, unidentified_seed_peers) =
138 build_peer_config(&config, &discovery_config);
139 (
140 DiscoveryEventLoop {
141 config,
142 discovery_config: Arc::new(discovery_config),
143 configured_peers: Arc::new(configured_peers),
144 unidentified_seed_peers,
145 network,
146 keypair,
147 tasks: JoinSet::new(),
148 pending_dials: Default::default(),
149 dial_seed_peers_task: None,
150 shutdown_handle,
151 state,
152 mailbox,
153 metrics,
154 },
155 handle,
156 )
157 }
158
159 pub fn start(self, network: anemo::Network, keypair: NetworkKeyPair) -> Handle {
160 assert_eq!(network.peer_id().0, *keypair.public().0.as_bytes());
161 let (event_loop, handle) = self.build(network, keypair);
162 tokio::spawn(event_loop.start());
163
164 handle
165 }
166}
167
168fn build_peer_config(
170 config: &P2pConfig,
171 discovery_config: &sui_config::p2p::DiscoveryConfig,
172) -> (HashMap<PeerId, PeerInfo>, Vec<anemo::types::Address>) {
173 let mut configured_peers = HashMap::new();
174 let mut unidentified_seed_peers = Vec::new();
175
176 for seed in &config.seed_peers {
177 let anemo_addr = seed
178 .address
179 .to_anemo_address()
180 .tap_err(|_| warn!(p2p_address=?seed.address, "Skipping seed peer address: can't convert to anemo address"))
181 .ok();
182 match (seed.peer_id, anemo_addr) {
183 (Some(peer_id), addr) => {
184 configured_peers.insert(
185 peer_id,
186 PeerInfo {
187 peer_id,
188 affinity: PeerAffinity::High,
189 address: addr.into_iter().collect(),
190 },
191 );
192 }
193 (None, Some(addr)) => {
194 unidentified_seed_peers.push(addr);
195 }
196 (None, None) => {}
197 }
198 }
199
200 for ap in &discovery_config.allowlisted_peers {
201 let addresses = ap
202 .address
203 .iter()
204 .filter_map(|addr| {
205 addr.to_anemo_address()
206 .tap_err(|_| warn!(p2p_address=?addr, "Skipping allowlisted peer address: can't convert to anemo address"))
207 .ok()
208 })
209 .collect();
210 configured_peers.insert(
211 ap.peer_id,
212 PeerInfo {
213 peer_id: ap.peer_id,
214 affinity: PeerAffinity::Allowed,
215 address: addresses,
216 },
217 );
218 }
219
220 (configured_peers, unidentified_seed_peers)
221}