sui_network/discovery/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5    Discovery, DiscoveryEventLoop, DiscoveryMessage, DiscoveryServer, Handle, State,
6    metrics::Metrics, server::Server,
7};
8use anemo::codegen::InboundRequestLayer;
9use anemo::types::PeerAffinity;
10use anemo::{PeerId, types::PeerInfo};
11use anemo_tower::rate_limit;
12use fastcrypto::traits::KeyPair;
13use std::{
14    collections::HashMap,
15    sync::{Arc, RwLock},
16};
17use sui_config::p2p::P2pConfig;
18use sui_types::crypto::NetworkKeyPair;
19use tap::{Pipe, TapFallible};
20use tokio::{
21    sync::{mpsc, oneshot},
22    task::JoinSet,
23};
24use tracing::warn;
25
26/// Discovery Service Builder.
27pub struct Builder {
28    config: Option<P2pConfig>,
29    metrics: Option<Metrics>,
30}
31
32impl Builder {
33    #[allow(clippy::new_without_default)]
34    pub fn new() -> Self {
35        Self {
36            config: None,
37            metrics: None,
38        }
39    }
40
41    pub fn config(mut self, config: P2pConfig) -> Self {
42        self.config = Some(config);
43        self
44    }
45
46    pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
47        self.metrics = Some(Metrics::enabled(registry));
48        self
49    }
50
51    pub fn build(self) -> (UnstartedDiscovery, DiscoveryServer<impl Discovery>) {
52        let discovery_config = self
53            .config
54            .clone()
55            .and_then(|config| config.discovery)
56            .unwrap_or_default();
57        let (builder, server) = self.build_internal();
58        let mut discovery_server = DiscoveryServer::new(server);
59
60        // Apply rate limits from configuration as needed.
61        if let Some(limit) = discovery_config.get_known_peers_rate_limit {
62            discovery_server = discovery_server.add_layer_for_get_known_peers_v2(
63                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
64                    governor::Quota::per_second(limit),
65                    rate_limit::WaitMode::Block,
66                )),
67            );
68        }
69        (builder, discovery_server)
70    }
71
72    pub(super) fn build_internal(self) -> (UnstartedDiscovery, Server) {
73        let Builder { config, metrics } = self;
74        let config = config.unwrap();
75        let discovery_config = config.discovery.clone().unwrap_or_default();
76        let metrics = metrics.unwrap_or_else(Metrics::disabled);
77        let (shutdown_tx, shutdown_rx) = oneshot::channel();
78        let (mailbox_tx, mailbox_rx) = mpsc::channel(discovery_config.mailbox_capacity());
79
80        let handle = Handle {
81            _shutdown_handle: Arc::new(shutdown_tx),
82            sender: mailbox_tx,
83        };
84
85        let state = State {
86            our_info: None,
87            connected_peers: HashMap::default(),
88            known_peers: HashMap::default(),
89        }
90        .pipe(RwLock::new)
91        .pipe(Arc::new);
92
93        let server = Server {
94            state: state.clone(),
95        };
96
97        (
98            UnstartedDiscovery {
99                handle,
100                config,
101                shutdown_handle: shutdown_rx,
102                state,
103                mailbox: mailbox_rx,
104                metrics,
105            },
106            server,
107        )
108    }
109}
110
111/// Handle to an unstarted discovery system
112pub struct UnstartedDiscovery {
113    pub(super) handle: Handle,
114    pub(super) config: P2pConfig,
115    pub(super) shutdown_handle: oneshot::Receiver<()>,
116    pub(super) state: Arc<RwLock<State>>,
117    pub(super) mailbox: mpsc::Receiver<DiscoveryMessage>,
118    pub(super) metrics: Metrics,
119}
120
121impl UnstartedDiscovery {
122    pub(super) fn build(
123        self,
124        network: anemo::Network,
125        keypair: NetworkKeyPair,
126    ) -> (DiscoveryEventLoop, Handle) {
127        let Self {
128            handle,
129            config,
130            shutdown_handle,
131            state,
132            mailbox,
133            metrics,
134        } = self;
135
136        let discovery_config = config.discovery.clone().unwrap_or_default();
137        let (configured_peers, unidentified_seed_peers) =
138            build_peer_config(&config, &discovery_config);
139        (
140            DiscoveryEventLoop {
141                config,
142                discovery_config: Arc::new(discovery_config),
143                configured_peers: Arc::new(configured_peers),
144                unidentified_seed_peers,
145                network,
146                keypair,
147                tasks: JoinSet::new(),
148                pending_dials: Default::default(),
149                dial_seed_peers_task: None,
150                shutdown_handle,
151                state,
152                mailbox,
153                metrics,
154            },
155            handle,
156        )
157    }
158
159    pub fn start(self, network: anemo::Network, keypair: NetworkKeyPair) -> Handle {
160        assert_eq!(network.peer_id().0, *keypair.public().0.as_bytes());
161        let (event_loop, handle) = self.build(network, keypair);
162        tokio::spawn(event_loop.start());
163
164        handle
165    }
166}
167
168/// Returns (configured_peers, unidentified_seed_peers).
169fn build_peer_config(
170    config: &P2pConfig,
171    discovery_config: &sui_config::p2p::DiscoveryConfig,
172) -> (HashMap<PeerId, PeerInfo>, Vec<anemo::types::Address>) {
173    let mut configured_peers = HashMap::new();
174    let mut unidentified_seed_peers = Vec::new();
175
176    for seed in &config.seed_peers {
177        let anemo_addr = seed
178            .address
179            .to_anemo_address()
180            .tap_err(|_| warn!(p2p_address=?seed.address, "Skipping seed peer address: can't convert to anemo address"))
181            .ok();
182        match (seed.peer_id, anemo_addr) {
183            (Some(peer_id), addr) => {
184                configured_peers.insert(
185                    peer_id,
186                    PeerInfo {
187                        peer_id,
188                        affinity: PeerAffinity::High,
189                        address: addr.into_iter().collect(),
190                    },
191                );
192            }
193            (None, Some(addr)) => {
194                unidentified_seed_peers.push(addr);
195            }
196            (None, None) => {}
197        }
198    }
199
200    for ap in &discovery_config.allowlisted_peers {
201        let addresses = ap
202            .address
203            .iter()
204            .filter_map(|addr| {
205                addr.to_anemo_address()
206                    .tap_err(|_| warn!(p2p_address=?addr, "Skipping allowlisted peer address: can't convert to anemo address"))
207                    .ok()
208            })
209            .collect();
210        configured_peers.insert(
211            ap.peer_id,
212            PeerInfo {
213                peer_id: ap.peer_id,
214                affinity: PeerAffinity::Allowed,
215                address: addresses,
216            },
217        );
218    }
219
220    (configured_peers, unidentified_seed_peers)
221}