sui_network/
endpoint_manager.rs1use std::sync::Arc;
5
6use arc_swap::ArcSwapOption;
7use mysten_network::Multiaddr;
8use serde::{Deserialize, Serialize};
9use sui_types::crypto::NetworkPublicKey;
10use sui_types::error::{SuiErrorKind, SuiResult};
11use tap::TapFallible;
12use tracing::warn;
13
14use crate::discovery;
15
16#[derive(Clone)]
19pub struct EndpointManager {
20 inner: Arc<Inner>,
21}
22
23struct Inner {
24 discovery_sender: discovery::Sender,
25 consensus_address_updater: ArcSwapOption<Arc<dyn ConsensusAddressUpdater>>,
26}
27
28pub trait ConsensusAddressUpdater: Send + Sync + 'static {
29 fn update_address(
30 &self,
31 network_pubkey: NetworkPublicKey,
32 source: AddressSource,
33 addresses: Vec<Multiaddr>,
34 ) -> SuiResult<()>;
35}
36
37impl EndpointManager {
38 pub fn new(discovery_sender: discovery::Sender) -> Self {
39 Self {
40 inner: Arc::new(Inner {
41 discovery_sender,
42 consensus_address_updater: ArcSwapOption::empty(),
43 }),
44 }
45 }
46
47 pub fn set_consensus_address_updater(
48 &self,
49 consensus_address_updater: Arc<dyn ConsensusAddressUpdater>,
50 ) {
51 self.inner
52 .consensus_address_updater
53 .store(Some(Arc::new(consensus_address_updater)));
54 }
55
56 pub fn update_endpoint(
61 &self,
62 endpoint: EndpointId,
63 source: AddressSource,
64 addresses: Vec<Multiaddr>,
65 ) -> SuiResult<()> {
66 match endpoint {
67 EndpointId::P2p(peer_id) => {
68 let anemo_addresses: Vec<_> = addresses
69 .into_iter()
70 .filter_map(|addr| {
71 addr.to_anemo_address()
72 .tap_err(|_| {
73 warn!(
74 ?addr,
75 "Skipping peer address: can't convert to anemo address"
76 )
77 })
78 .ok()
79 })
80 .collect();
81
82 self.inner
83 .discovery_sender
84 .peer_address_change(peer_id, source, anemo_addresses);
85 }
86 EndpointId::Consensus(network_pubkey) => {
87 if let Some(updater) = self.inner.consensus_address_updater.load_full() {
88 updater
89 .update_address(network_pubkey.clone(), source, addresses)
90 .map_err(|e| {
91 warn!(?network_pubkey, "Error updating consensus address: {e:?}");
92 e
93 })?;
94 } else {
95 return Err(SuiErrorKind::GenericAuthorityError {
96 error: "Consensus address updater not configured".to_string(),
97 }
98 .into());
99 }
100 }
101 }
102
103 Ok(())
104 }
105}
106
107#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
108pub enum EndpointId {
109 P2p(anemo::PeerId),
110 Consensus(NetworkPublicKey),
111}
112
113#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
114pub enum AddressSource {
116 Admin,
117 Config,
118 Discovery,
119 Committee,
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use fastcrypto::traits::KeyPair;
126 use std::sync::{Arc, Mutex};
127 use sui_types::crypto::{NetworkKeyPair, get_key_pair};
128
129 type UpdateEntry = (NetworkPublicKey, Vec<Multiaddr>);
130 struct MockConsensusAddressUpdater {
132 updates: Arc<Mutex<Vec<UpdateEntry>>>,
133 }
134
135 impl MockConsensusAddressUpdater {
136 fn new() -> (Self, Arc<Mutex<Vec<UpdateEntry>>>) {
137 let updates = Arc::new(Mutex::new(Vec::new()));
138 let updater = Self {
139 updates: updates.clone(),
140 };
141 (updater, updates)
142 }
143 }
144
145 impl ConsensusAddressUpdater for MockConsensusAddressUpdater {
146 fn update_address(
147 &self,
148 network_pubkey: NetworkPublicKey,
149 _source: AddressSource,
150 addresses: Vec<Multiaddr>,
151 ) -> SuiResult<()> {
152 self.updates
153 .lock()
154 .unwrap()
155 .push((network_pubkey.clone(), addresses));
156 Ok(())
157 }
158 }
159
160 fn create_mock_endpoint_manager() -> EndpointManager {
161 use sui_config::p2p::P2pConfig;
162
163 let config = P2pConfig::default();
164 let (_unstarted, _server, endpoint_manager) =
165 discovery::Builder::new().config(config).build();
166 endpoint_manager
167 }
168
169 #[tokio::test]
170 async fn test_update_consensus_endpoint() {
171 let endpoint_manager = create_mock_endpoint_manager();
172
173 let (mock_updater, updates) = MockConsensusAddressUpdater::new();
174 endpoint_manager.set_consensus_address_updater(Arc::new(mock_updater));
175
176 let (_, network_key): (_, NetworkKeyPair) = get_key_pair();
177 let network_pubkey = network_key.public();
178
179 let addresses = vec![
180 "/ip4/127.0.0.1/udp/9000".parse().unwrap(),
181 "/ip4/127.0.0.1/udp/9001".parse().unwrap(),
182 ];
183
184 let result = endpoint_manager.update_endpoint(
185 EndpointId::Consensus(network_pubkey.clone()),
186 AddressSource::Admin,
187 addresses.clone(),
188 );
189
190 assert!(result.is_ok());
191
192 let recorded_updates = updates.lock().unwrap();
193 assert_eq!(recorded_updates.len(), 1);
194 assert_eq!(recorded_updates[0].0, network_pubkey.clone());
195 assert_eq!(recorded_updates[0].1, addresses);
196 }
197
198 #[tokio::test]
199 async fn test_update_consensus_endpoint_without_updater() {
200 let endpoint_manager = create_mock_endpoint_manager();
201
202 let (_, network_key): (_, NetworkKeyPair) = get_key_pair();
203 let network_pubkey = network_key.public();
204
205 let addresses = vec!["/ip4/127.0.0.1/udp/9000".parse().unwrap()];
206
207 let result = endpoint_manager.update_endpoint(
208 EndpointId::Consensus(network_pubkey.clone()),
209 AddressSource::Admin,
210 addresses,
211 );
212
213 assert!(result.is_err());
214 let err = result.unwrap_err();
215 assert!(
216 err.to_string()
217 .contains("Consensus address updater not configured")
218 );
219 }
220}