sui_network/discovery/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5    Discovery, DiscoveryEventLoop, DiscoveryServer, State, metrics::Metrics, server::Server,
6};
7use crate::discovery::TrustedPeerChangeEvent;
8use anemo::codegen::InboundRequestLayer;
9use anemo_tower::rate_limit;
10use fastcrypto::traits::KeyPair;
11use std::{
12    collections::HashMap,
13    sync::{Arc, RwLock},
14};
15use sui_config::p2p::P2pConfig;
16use sui_types::crypto::NetworkKeyPair;
17use tap::Pipe;
18use tokio::{
19    sync::{oneshot, watch},
20    task::JoinSet,
21};
22
23/// Discovery Service Builder.
24pub struct Builder {
25    config: Option<P2pConfig>,
26    metrics: Option<Metrics>,
27    trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
28}
29
30impl Builder {
31    #[allow(clippy::new_without_default)]
32    pub fn new(trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>) -> Self {
33        Self {
34            config: None,
35            metrics: None,
36            trusted_peer_change_rx,
37        }
38    }
39
40    pub fn config(mut self, config: P2pConfig) -> Self {
41        self.config = Some(config);
42        self
43    }
44
45    pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
46        self.metrics = Some(Metrics::enabled(registry));
47        self
48    }
49
50    pub fn build(self) -> (UnstartedDiscovery, DiscoveryServer<impl Discovery>) {
51        let discovery_config = self
52            .config
53            .clone()
54            .and_then(|config| config.discovery)
55            .unwrap_or_default();
56        let (builder, server) = self.build_internal();
57        let mut discovery_server = DiscoveryServer::new(server);
58
59        // Apply rate limits from configuration as needed.
60        if let Some(limit) = discovery_config.get_known_peers_rate_limit {
61            discovery_server = discovery_server.add_layer_for_get_known_peers_v2(
62                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
63                    governor::Quota::per_second(limit),
64                    rate_limit::WaitMode::Block,
65                )),
66            );
67        }
68        (builder, discovery_server)
69    }
70
71    pub(super) fn build_internal(self) -> (UnstartedDiscovery, Server) {
72        let Builder {
73            config,
74            metrics,
75            trusted_peer_change_rx,
76        } = self;
77        let config = config.unwrap();
78        let metrics = metrics.unwrap_or_else(Metrics::disabled);
79        let (sender, receiver) = oneshot::channel();
80
81        let handle = Handle {
82            _shutdown_handle: Arc::new(sender),
83        };
84
85        let state = State {
86            our_info: None,
87            connected_peers: HashMap::default(),
88            known_peers: HashMap::default(),
89        }
90        .pipe(RwLock::new)
91        .pipe(Arc::new);
92
93        let server = Server {
94            state: state.clone(),
95        };
96
97        (
98            UnstartedDiscovery {
99                handle,
100                config,
101                shutdown_handle: receiver,
102                state,
103                trusted_peer_change_rx,
104                metrics,
105            },
106            server,
107        )
108    }
109}
110
111/// Handle to an unstarted discovery system
112pub struct UnstartedDiscovery {
113    pub(super) handle: Handle,
114    pub(super) config: P2pConfig,
115    pub(super) shutdown_handle: oneshot::Receiver<()>,
116    pub(super) state: Arc<RwLock<State>>,
117    pub(super) trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
118    pub(super) metrics: Metrics,
119}
120
121impl UnstartedDiscovery {
122    pub(super) fn build(
123        self,
124        network: anemo::Network,
125        keypair: NetworkKeyPair,
126    ) -> (DiscoveryEventLoop, Handle) {
127        let Self {
128            handle,
129            config,
130            shutdown_handle,
131            state,
132            trusted_peer_change_rx,
133            metrics,
134        } = self;
135
136        let discovery_config = config.discovery.clone().unwrap_or_default();
137        let allowlisted_peers = Arc::new(
138            discovery_config
139                .allowlisted_peers
140                .clone()
141                .into_iter()
142                .map(|ap| (ap.peer_id, ap.address))
143                .chain(config.seed_peers.iter().filter_map(|peer| {
144                    peer.peer_id
145                        .map(|peer_id| (peer_id, Some(peer.address.clone())))
146                }))
147                .collect::<HashMap<_, _>>(),
148        );
149        (
150            DiscoveryEventLoop {
151                config,
152                discovery_config: Arc::new(discovery_config),
153                allowlisted_peers,
154                network,
155                keypair,
156                tasks: JoinSet::new(),
157                pending_dials: Default::default(),
158                dial_seed_peers_task: None,
159                shutdown_handle,
160                state,
161                trusted_peer_change_rx,
162                metrics,
163            },
164            handle,
165        )
166    }
167
168    pub fn start(self, network: anemo::Network, keypair: NetworkKeyPair) -> Handle {
169        assert_eq!(network.peer_id().0, *keypair.public().0.as_bytes());
170        let (event_loop, handle) = self.build(network, keypair);
171        tokio::spawn(event_loop.start());
172
173        handle
174    }
175}
176
177/// A Handle to the Discovery subsystem. The Discovery system will be shutdown once its Handle has
178/// been dropped.
179pub struct Handle {
180    _shutdown_handle: Arc<oneshot::Sender<()>>,
181}