sui_network/
endpoint_manager.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use arc_swap::ArcSwapOption;
7use mysten_network::Multiaddr;
8use serde::{Deserialize, Serialize};
9use sui_types::crypto::NetworkPublicKey;
10use sui_types::error::{SuiErrorKind, SuiResult};
11use tap::TapFallible;
12use tracing::warn;
13
14use crate::discovery;
15
16/// EndpointManager can be used to dynamically update the addresses of
17/// other nodes in the network.
18#[derive(Clone)]
19pub struct EndpointManager {
20    inner: Arc<Inner>,
21}
22
23struct Inner {
24    discovery_sender: discovery::Sender,
25    consensus_address_updater: ArcSwapOption<Arc<dyn ConsensusAddressUpdater>>,
26}
27
28pub trait ConsensusAddressUpdater: Send + Sync + 'static {
29    fn update_address(
30        &self,
31        network_pubkey: NetworkPublicKey,
32        source: AddressSource,
33        addresses: Vec<Multiaddr>,
34    ) -> SuiResult<()>;
35}
36
37impl EndpointManager {
38    pub fn new(discovery_sender: discovery::Sender) -> Self {
39        Self {
40            inner: Arc::new(Inner {
41                discovery_sender,
42                consensus_address_updater: ArcSwapOption::empty(),
43            }),
44        }
45    }
46
47    pub fn set_consensus_address_updater(
48        &self,
49        consensus_address_updater: Arc<dyn ConsensusAddressUpdater>,
50    ) {
51        self.inner
52            .consensus_address_updater
53            .store(Some(Arc::new(consensus_address_updater)));
54    }
55
56    /// Updates the address(es) for the given endpoint from the specified source.
57    ///
58    /// Multiple sources can provide addresses for the same peer. The highest-priority
59    /// source's addresses are used. Empty `addresses` clears a source.
60    pub fn update_endpoint(
61        &self,
62        endpoint: EndpointId,
63        source: AddressSource,
64        addresses: Vec<Multiaddr>,
65    ) -> SuiResult<()> {
66        match endpoint {
67            EndpointId::P2p(peer_id) => {
68                let anemo_addresses: Vec<_> = addresses
69                    .into_iter()
70                    .filter_map(|addr| {
71                        addr.to_anemo_address()
72                            .tap_err(|_| {
73                                warn!(
74                                    ?addr,
75                                    "Skipping peer address: can't convert to anemo address"
76                                )
77                            })
78                            .ok()
79                    })
80                    .collect();
81
82                self.inner
83                    .discovery_sender
84                    .peer_address_change(peer_id, source, anemo_addresses);
85            }
86            EndpointId::Consensus(network_pubkey) => {
87                if let Some(updater) = self.inner.consensus_address_updater.load_full() {
88                    updater
89                        .update_address(network_pubkey.clone(), source, addresses)
90                        .map_err(|e| {
91                            warn!(?network_pubkey, "Error updating consensus address: {e:?}");
92                            e
93                        })?;
94                } else {
95                    return Err(SuiErrorKind::GenericAuthorityError {
96                        error: "Consensus address updater not configured".to_string(),
97                    }
98                    .into());
99                }
100            }
101        }
102
103        Ok(())
104    }
105}
106
107#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
108pub enum EndpointId {
109    P2p(anemo::PeerId),
110    Consensus(NetworkPublicKey),
111}
112
113#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
114// NOTE: AddressSources are prioritized in order of the enum variants below.
115pub enum AddressSource {
116    Admin,
117    Config,
118    Discovery,
119    Committee,
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use fastcrypto::traits::KeyPair;
126    use std::sync::{Arc, Mutex};
127    use sui_types::crypto::{NetworkKeyPair, get_key_pair};
128
129    type UpdateEntry = (NetworkPublicKey, Vec<Multiaddr>);
130    // Mock consensus address updater for testing
131    struct MockConsensusAddressUpdater {
132        updates: Arc<Mutex<Vec<UpdateEntry>>>,
133    }
134
135    impl MockConsensusAddressUpdater {
136        fn new() -> (Self, Arc<Mutex<Vec<UpdateEntry>>>) {
137            let updates = Arc::new(Mutex::new(Vec::new()));
138            let updater = Self {
139                updates: updates.clone(),
140            };
141            (updater, updates)
142        }
143    }
144
145    impl ConsensusAddressUpdater for MockConsensusAddressUpdater {
146        fn update_address(
147            &self,
148            network_pubkey: NetworkPublicKey,
149            _source: AddressSource,
150            addresses: Vec<Multiaddr>,
151        ) -> SuiResult<()> {
152            self.updates
153                .lock()
154                .unwrap()
155                .push((network_pubkey.clone(), addresses));
156            Ok(())
157        }
158    }
159
160    fn create_mock_endpoint_manager() -> EndpointManager {
161        use sui_config::p2p::P2pConfig;
162
163        let config = P2pConfig::default();
164        let (_unstarted, _server, endpoint_manager) =
165            discovery::Builder::new().config(config).build();
166        endpoint_manager
167    }
168
169    #[tokio::test]
170    async fn test_update_consensus_endpoint() {
171        let endpoint_manager = create_mock_endpoint_manager();
172
173        let (mock_updater, updates) = MockConsensusAddressUpdater::new();
174        endpoint_manager.set_consensus_address_updater(Arc::new(mock_updater));
175
176        let (_, network_key): (_, NetworkKeyPair) = get_key_pair();
177        let network_pubkey = network_key.public();
178
179        let addresses = vec![
180            "/ip4/127.0.0.1/udp/9000".parse().unwrap(),
181            "/ip4/127.0.0.1/udp/9001".parse().unwrap(),
182        ];
183
184        let result = endpoint_manager.update_endpoint(
185            EndpointId::Consensus(network_pubkey.clone()),
186            AddressSource::Admin,
187            addresses.clone(),
188        );
189
190        assert!(result.is_ok());
191
192        let recorded_updates = updates.lock().unwrap();
193        assert_eq!(recorded_updates.len(), 1);
194        assert_eq!(recorded_updates[0].0, network_pubkey.clone());
195        assert_eq!(recorded_updates[0].1, addresses);
196    }
197
198    #[tokio::test]
199    async fn test_update_consensus_endpoint_without_updater() {
200        let endpoint_manager = create_mock_endpoint_manager();
201
202        let (_, network_key): (_, NetworkKeyPair) = get_key_pair();
203        let network_pubkey = network_key.public();
204
205        let addresses = vec!["/ip4/127.0.0.1/udp/9000".parse().unwrap()];
206
207        let result = endpoint_manager.update_endpoint(
208            EndpointId::Consensus(network_pubkey.clone()),
209            AddressSource::Admin,
210            addresses,
211        );
212
213        assert!(result.is_err());
214        let err = result.unwrap_err();
215        assert!(
216            err.to_string()
217                .contains("Consensus address updater not configured")
218        );
219    }
220}