sui_network/discovery/
builder.rs1use super::{
5 Discovery, DiscoveryEventLoop, DiscoveryServer, State, metrics::Metrics, server::Server,
6};
7use crate::discovery::TrustedPeerChangeEvent;
8use anemo::codegen::InboundRequestLayer;
9use anemo_tower::rate_limit;
10use fastcrypto::traits::KeyPair;
11use std::{
12 collections::HashMap,
13 sync::{Arc, RwLock},
14};
15use sui_config::p2p::P2pConfig;
16use sui_types::crypto::NetworkKeyPair;
17use tap::Pipe;
18use tokio::{
19 sync::{oneshot, watch},
20 task::JoinSet,
21};
22
23pub struct Builder {
25 config: Option<P2pConfig>,
26 metrics: Option<Metrics>,
27 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
28}
29
30impl Builder {
31 #[allow(clippy::new_without_default)]
32 pub fn new(trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>) -> Self {
33 Self {
34 config: None,
35 metrics: None,
36 trusted_peer_change_rx,
37 }
38 }
39
40 pub fn config(mut self, config: P2pConfig) -> Self {
41 self.config = Some(config);
42 self
43 }
44
45 pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
46 self.metrics = Some(Metrics::enabled(registry));
47 self
48 }
49
50 pub fn build(self) -> (UnstartedDiscovery, DiscoveryServer<impl Discovery>) {
51 let discovery_config = self
52 .config
53 .clone()
54 .and_then(|config| config.discovery)
55 .unwrap_or_default();
56 let (builder, server) = self.build_internal();
57 let mut discovery_server = DiscoveryServer::new(server);
58
59 if let Some(limit) = discovery_config.get_known_peers_rate_limit {
61 discovery_server = discovery_server.add_layer_for_get_known_peers_v2(
62 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
63 governor::Quota::per_second(limit),
64 rate_limit::WaitMode::Block,
65 )),
66 );
67 }
68 (builder, discovery_server)
69 }
70
71 pub(super) fn build_internal(self) -> (UnstartedDiscovery, Server) {
72 let Builder {
73 config,
74 metrics,
75 trusted_peer_change_rx,
76 } = self;
77 let config = config.unwrap();
78 let metrics = metrics.unwrap_or_else(Metrics::disabled);
79 let (sender, receiver) = oneshot::channel();
80
81 let handle = Handle {
82 _shutdown_handle: Arc::new(sender),
83 };
84
85 let state = State {
86 our_info: None,
87 connected_peers: HashMap::default(),
88 known_peers: HashMap::default(),
89 }
90 .pipe(RwLock::new)
91 .pipe(Arc::new);
92
93 let server = Server {
94 state: state.clone(),
95 };
96
97 (
98 UnstartedDiscovery {
99 handle,
100 config,
101 shutdown_handle: receiver,
102 state,
103 trusted_peer_change_rx,
104 metrics,
105 },
106 server,
107 )
108 }
109}
110
111pub struct UnstartedDiscovery {
113 pub(super) handle: Handle,
114 pub(super) config: P2pConfig,
115 pub(super) shutdown_handle: oneshot::Receiver<()>,
116 pub(super) state: Arc<RwLock<State>>,
117 pub(super) trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
118 pub(super) metrics: Metrics,
119}
120
121impl UnstartedDiscovery {
122 pub(super) fn build(
123 self,
124 network: anemo::Network,
125 keypair: NetworkKeyPair,
126 ) -> (DiscoveryEventLoop, Handle) {
127 let Self {
128 handle,
129 config,
130 shutdown_handle,
131 state,
132 trusted_peer_change_rx,
133 metrics,
134 } = self;
135
136 let discovery_config = config.discovery.clone().unwrap_or_default();
137 let allowlisted_peers = Arc::new(
138 discovery_config
139 .allowlisted_peers
140 .clone()
141 .into_iter()
142 .map(|ap| (ap.peer_id, ap.address))
143 .chain(config.seed_peers.iter().filter_map(|peer| {
144 peer.peer_id
145 .map(|peer_id| (peer_id, Some(peer.address.clone())))
146 }))
147 .collect::<HashMap<_, _>>(),
148 );
149 (
150 DiscoveryEventLoop {
151 config,
152 discovery_config: Arc::new(discovery_config),
153 allowlisted_peers,
154 network,
155 keypair,
156 tasks: JoinSet::new(),
157 pending_dials: Default::default(),
158 dial_seed_peers_task: None,
159 shutdown_handle,
160 state,
161 trusted_peer_change_rx,
162 metrics,
163 },
164 handle,
165 )
166 }
167
168 pub fn start(self, network: anemo::Network, keypair: NetworkKeyPair) -> Handle {
169 assert_eq!(network.peer_id().0, *keypair.public().0.as_bytes());
170 let (event_loop, handle) = self.build(network, keypair);
171 tokio::spawn(event_loop.start());
172
173 handle
174 }
175}
176
177pub struct Handle {
180 _shutdown_handle: Arc<oneshot::Sender<()>>,
181}