1use crate::authority_client::{
6 AuthorityAPI, NetworkAuthorityClient, make_authority_clients_with_timeout_config,
7 make_network_authority_clients_with_network_config,
8};
9use crate::safe_client::{SafeClient, SafeClientMetrics, SafeClientMetricsBase};
10#[cfg(test)]
11use crate::test_authority_clients::MockAuthorityApi;
12use sui_authority_aggregation::ReduceOutput;
13use sui_authority_aggregation::quorum_map_then_reduce_with_timeout;
14use sui_config::genesis::Genesis;
15use sui_network::{
16 DEFAULT_CONNECT_TIMEOUT_SEC, DEFAULT_REQUEST_TIMEOUT_SEC, default_mysten_network_config,
17};
18use sui_swarm_config::network_config::NetworkConfig;
19use sui_types::crypto::AuthorityPublicKeyBytes;
20use sui_types::error::UserInputError;
21use sui_types::object::Object;
22use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
23use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
24use sui_types::{
25 base_types::*,
26 committee::Committee,
27 error::{SuiError, SuiResult},
28};
29use tracing::debug;
30
31use crate::epoch::committee_store::CommitteeStore;
32use prometheus::Registry;
33use std::collections::{BTreeMap, HashMap};
34use std::sync::Arc;
35use std::time::Duration;
36use sui_types::committee::{CommitteeWithNetworkMetadata, StakeUnit};
37use sui_types::messages_grpc::{LayoutGenerationOption, ObjectInfoRequest};
38use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
39
40pub const DEFAULT_RETRIES: usize = 4;
41
42#[derive(Clone)]
43pub struct TimeoutConfig {
44 pub pre_quorum_timeout: Duration,
45 pub post_quorum_timeout: Duration,
46}
47
48impl Default for TimeoutConfig {
49 fn default() -> Self {
50 Self {
51 pre_quorum_timeout: Duration::from_secs(60),
52 post_quorum_timeout: Duration::from_secs(7),
53 }
54 }
55}
56
57#[derive(Clone)]
58pub struct AuthorityAggregator<A: Clone> {
59 pub committee: Arc<Committee>,
61 pub validator_display_names: Arc<HashMap<AuthorityName, String>>,
65 pub reference_gas_price: u64,
67 pub authority_clients: Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>>,
69 pub safe_client_metrics_base: SafeClientMetricsBase,
71 pub timeouts: TimeoutConfig,
72 pub committee_store: Arc<CommitteeStore>,
74}
75
76impl<A: Clone> AuthorityAggregator<A> {
77 pub fn new(
78 committee: Committee,
79 validator_display_names: Arc<HashMap<AuthorityName, String>>,
80 reference_gas_price: u64,
81 committee_store: Arc<CommitteeStore>,
82 authority_clients: BTreeMap<AuthorityName, A>,
83 safe_client_metrics_base: SafeClientMetricsBase,
84 timeouts: TimeoutConfig,
85 ) -> Self {
86 Self {
87 committee: Arc::new(committee),
88 validator_display_names,
89 reference_gas_price,
90 authority_clients: create_safe_clients(
91 authority_clients,
92 &committee_store,
93 &safe_client_metrics_base,
94 ),
95 safe_client_metrics_base,
96 timeouts,
97 committee_store,
98 }
99 }
100
101 pub fn get_client(&self, name: &AuthorityName) -> Option<&Arc<SafeClient<A>>> {
102 self.authority_clients.get(name)
103 }
104
105 pub fn clone_client_test_only(&self, name: &AuthorityName) -> Arc<SafeClient<A>>
106 where
107 A: Clone,
108 {
109 self.authority_clients[name].clone()
110 }
111
112 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
113 self.committee_store.clone()
114 }
115
116 pub fn clone_inner_committee_test_only(&self) -> Committee {
117 (*self.committee).clone()
118 }
119
120 pub fn clone_inner_clients_test_only(&self) -> BTreeMap<AuthorityName, SafeClient<A>> {
121 (*self.authority_clients)
122 .clone()
123 .into_iter()
124 .map(|(k, v)| (k, (*v).clone()))
125 .collect()
126 }
127
128 pub fn get_display_name(&self, name: &AuthorityName) -> String {
129 self.validator_display_names
130 .get(name)
131 .cloned()
132 .unwrap_or_else(|| name.concise().to_string())
133 }
134}
135
136fn create_safe_clients<A: Clone>(
137 authority_clients: BTreeMap<AuthorityName, A>,
138 committee_store: &Arc<CommitteeStore>,
139 safe_client_metrics_base: &SafeClientMetricsBase,
140) -> Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>> {
141 Arc::new(
142 authority_clients
143 .into_iter()
144 .map(|(name, api)| {
145 (
146 name,
147 Arc::new(SafeClient::new(
148 api,
149 committee_store.clone(),
150 name,
151 SafeClientMetrics::new(safe_client_metrics_base, name),
152 )),
153 )
154 })
155 .collect(),
156 )
157}
158
159impl AuthorityAggregator<NetworkAuthorityClient> {
160 pub fn new_from_epoch_start_state(
163 epoch_start_state: &EpochStartSystemState,
164 committee_store: &Arc<CommitteeStore>,
165 safe_client_metrics_base: SafeClientMetricsBase,
166 ) -> Self {
167 let committee = epoch_start_state.get_sui_committee_with_network_metadata();
168 let validator_display_names = epoch_start_state.get_authority_names_to_hostnames();
169 Self::new_from_committee(
170 committee,
171 Arc::new(validator_display_names),
172 epoch_start_state.reference_gas_price(),
173 committee_store,
174 safe_client_metrics_base,
175 )
176 }
177
178 pub fn recreate_with_new_epoch_start_state(
182 &self,
183 epoch_start_state: &EpochStartSystemState,
184 ) -> Self {
185 Self::new_from_epoch_start_state(
186 epoch_start_state,
187 &self.committee_store,
188 self.safe_client_metrics_base.clone(),
189 )
190 }
191
192 pub fn new_from_committee(
193 committee: CommitteeWithNetworkMetadata,
194 validator_display_names: Arc<HashMap<AuthorityName, String>>,
195 reference_gas_price: u64,
196 committee_store: &Arc<CommitteeStore>,
197 safe_client_metrics_base: SafeClientMetricsBase,
198 ) -> Self {
199 let net_config = default_mysten_network_config();
200 let authority_clients =
201 make_network_authority_clients_with_network_config(&committee, &net_config);
202 Self::new(
203 committee.committee().clone(),
204 validator_display_names,
205 reference_gas_price,
206 committee_store.clone(),
207 authority_clients,
208 safe_client_metrics_base,
209 Default::default(),
210 )
211 }
212}
213
214impl<A> AuthorityAggregator<A>
215where
216 A: AuthorityAPI + Send + Sync + 'static + Clone,
217{
218 pub async fn get_latest_object_version_for_testing(
224 &self,
225 object_id: ObjectID,
226 ) -> SuiResult<Object> {
227 #[derive(Debug, Default)]
228 struct State {
229 latest_object_version: Option<Object>,
230 total_weight: StakeUnit,
231 }
232 let initial_state = State::default();
233 let result = quorum_map_then_reduce_with_timeout(
234 self.committee.clone(),
235 self.authority_clients.clone(),
236 initial_state,
237 |_name, client| {
238 Box::pin(async move {
239 let request =
240 ObjectInfoRequest::latest_object_info_request(object_id, LayoutGenerationOption::None);
241 let mut retry_count = 0;
242 loop {
243 match client.handle_object_info_request(request.clone()).await {
244 Ok(object_info) => return Ok(object_info),
245 Err(err) => {
246 retry_count += 1;
247 if retry_count > 3 {
248 return Err(err);
249 }
250 tokio::time::sleep(Duration::from_secs(1)).await;
251 }
252 }
253 }
254 })
255 },
256 |mut state, name, weight, result| {
257 Box::pin(async move {
258 state.total_weight += weight;
259 match result {
260 Ok(object_info) => {
261 debug!("Received object info response from validator {:?} with version: {:?}", name.concise(), object_info.object.version());
262 if state.latest_object_version.as_ref().is_none_or(|latest| {
263 object_info.object.version() > latest.version()
264 }) {
265 state.latest_object_version = Some(object_info.object);
266 }
267 }
268 Err(err) => {
269 debug!("Received error from validator {:?}: {:?}", name.concise(), err);
270 }
271 };
272 if state.total_weight >= self.committee.quorum_threshold() {
273 if let Some(object) = state.latest_object_version {
274 return ReduceOutput::Success(object);
275 } else {
276 return ReduceOutput::Failed(state);
277 }
278 }
279 ReduceOutput::Continue(state)
280 })
281 },
282 self.timeouts.pre_quorum_timeout,
284 )
285 .await.map_err(|_state| SuiError::from(UserInputError::ObjectNotFound {
286 object_id,
287 version: None,
288 }))?;
289 Ok(result.0)
290 }
291
292 pub async fn get_latest_system_state_object_for_testing(
296 &self,
297 ) -> anyhow::Result<SuiSystemState> {
298 #[derive(Debug, Default)]
299 struct State {
300 latest_system_state: Option<SuiSystemState>,
301 total_weight: StakeUnit,
302 }
303 let initial_state = State::default();
304 let result = quorum_map_then_reduce_with_timeout(
305 self.committee.clone(),
306 self.authority_clients.clone(),
307 initial_state,
308 |_name, client| Box::pin(async move { client.handle_system_state_object().await }),
309 |mut state, name, weight, result| {
310 Box::pin(async move {
311 state.total_weight += weight;
312 match result {
313 Ok(system_state) => {
314 debug!(
315 "Received system state object from validator {:?} with epoch: {:?}",
316 name.concise(),
317 system_state.epoch()
318 );
319 if state
320 .latest_system_state
321 .as_ref()
322 .is_none_or(|latest| system_state.epoch() > latest.epoch())
323 {
324 state.latest_system_state = Some(system_state);
325 }
326 }
327 Err(err) => {
328 debug!(
329 "Received error from validator {:?}: {:?}",
330 name.concise(),
331 err
332 );
333 }
334 };
335 if state.total_weight >= self.committee.quorum_threshold() {
336 if let Some(system_state) = state.latest_system_state {
337 return ReduceOutput::Success(system_state);
338 } else {
339 return ReduceOutput::Failed(state);
340 }
341 }
342 ReduceOutput::Continue(state)
343 })
344 },
345 self.timeouts.pre_quorum_timeout,
347 )
348 .await
349 .map_err(|_| anyhow::anyhow!("Failed to get latest system state from the authorities"))?;
350 Ok(result.0)
351 }
352}
353
354#[derive(Default)]
355pub struct AuthorityAggregatorBuilder<'a> {
356 network_config: Option<&'a NetworkConfig>,
357 genesis: Option<&'a Genesis>,
358 committee: Option<Committee>,
359 reference_gas_price: Option<u64>,
360 committee_store: Option<Arc<CommitteeStore>>,
361 registry: Option<&'a Registry>,
362 timeouts_config: Option<TimeoutConfig>,
363}
364
365impl<'a> AuthorityAggregatorBuilder<'a> {
366 pub fn from_network_config(config: &'a NetworkConfig) -> Self {
367 Self {
368 network_config: Some(config),
369 ..Default::default()
370 }
371 }
372
373 pub fn from_genesis(genesis: &'a Genesis) -> Self {
374 Self {
375 genesis: Some(genesis),
376 ..Default::default()
377 }
378 }
379
380 pub fn from_committee(committee: Committee) -> Self {
381 Self {
382 committee: Some(committee),
383 ..Default::default()
384 }
385 }
386
387 #[cfg(test)]
388 pub fn from_committee_size(committee_size: usize) -> Self {
389 let (committee, _keypairs) = Committee::new_simple_test_committee_of_size(committee_size);
390 Self::from_committee(committee)
391 }
392
393 pub fn with_committee_store(mut self, committee_store: Arc<CommitteeStore>) -> Self {
394 self.committee_store = Some(committee_store);
395 self
396 }
397
398 pub fn with_registry(mut self, registry: &'a Registry) -> Self {
399 self.registry = Some(registry);
400 self
401 }
402
403 pub fn with_timeouts_config(mut self, timeouts_config: TimeoutConfig) -> Self {
404 self.timeouts_config = Some(timeouts_config);
405 self
406 }
407
408 fn get_network_committee(&self) -> CommitteeWithNetworkMetadata {
409 self.get_genesis()
410 .unwrap_or_else(|| panic!("need either NetworkConfig or Genesis."))
411 .committee_with_network()
412 }
413
414 fn get_committee_authority_names_to_hostnames(&self) -> HashMap<AuthorityName, String> {
415 if let Some(genesis) = self.get_genesis() {
416 let state = genesis
417 .sui_system_object()
418 .into_genesis_version_for_tooling();
419 state
420 .validators
421 .active_validators
422 .iter()
423 .map(|v| {
424 let metadata = v.verified_metadata();
425 let name = metadata.sui_pubkey_bytes();
426
427 (name, metadata.name.clone())
428 })
429 .collect()
430 } else {
431 HashMap::new()
432 }
433 }
434
435 fn get_reference_gas_price(&self) -> u64 {
436 self.reference_gas_price.unwrap_or_else(|| {
437 self.get_genesis()
438 .map(|g| g.reference_gas_price())
439 .unwrap_or(1000)
440 })
441 }
442
443 fn get_genesis(&self) -> Option<&Genesis> {
444 if let Some(network_config) = self.network_config {
445 Some(&network_config.genesis)
446 } else if let Some(genesis) = self.genesis {
447 Some(genesis)
448 } else {
449 None
450 }
451 }
452
453 fn get_committee(&self) -> Committee {
454 self.committee
455 .clone()
456 .unwrap_or_else(|| self.get_network_committee().committee().clone())
457 }
458
459 pub fn build_network_clients(
460 self,
461 ) -> (
462 AuthorityAggregator<NetworkAuthorityClient>,
463 BTreeMap<AuthorityPublicKeyBytes, NetworkAuthorityClient>,
464 ) {
465 let network_committee = self.get_network_committee();
466 let auth_clients = make_authority_clients_with_timeout_config(
467 &network_committee,
468 DEFAULT_CONNECT_TIMEOUT_SEC,
469 DEFAULT_REQUEST_TIMEOUT_SEC,
470 );
471 let auth_agg = self.build_custom_clients(auth_clients.clone());
472 (auth_agg, auth_clients)
473 }
474
475 pub fn build_custom_clients<C: Clone>(
476 self,
477 authority_clients: BTreeMap<AuthorityName, C>,
478 ) -> AuthorityAggregator<C> {
479 let committee = self.get_committee();
480 let validator_display_names = self.get_committee_authority_names_to_hostnames();
481 let reference_gas_price = self.get_reference_gas_price();
482 let registry = Registry::new();
483 let registry = self.registry.unwrap_or(®istry);
484 let safe_client_metrics_base = SafeClientMetricsBase::new(registry);
485
486 let committee_store = self
487 .committee_store
488 .unwrap_or_else(|| Arc::new(CommitteeStore::new_for_testing(&committee)));
489
490 let timeouts_config = self.timeouts_config.unwrap_or_default();
491
492 AuthorityAggregator::new(
493 committee,
494 Arc::new(validator_display_names),
495 reference_gas_price,
496 committee_store,
497 authority_clients,
498 safe_client_metrics_base,
499 timeouts_config,
500 )
501 }
502
503 #[cfg(test)]
504 pub fn build_mock_authority_aggregator(self) -> AuthorityAggregator<MockAuthorityApi> {
505 let committee = self.get_committee();
506 let clients = committee
507 .names()
508 .map(|name| {
509 (
510 *name,
511 MockAuthorityApi::new(
512 Duration::from_millis(100),
513 Arc::new(std::sync::Mutex::new(30)),
514 ),
515 )
516 })
517 .collect();
518 self.build_custom_clients(clients)
519 }
520}