sui_core/
authority_aggregator.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::authority_client::{
6    AuthorityAPI, NetworkAuthorityClient, make_authority_clients_with_timeout_config,
7    make_network_authority_clients_with_network_config,
8};
9use crate::safe_client::{SafeClient, SafeClientMetrics, SafeClientMetricsBase};
10#[cfg(test)]
11use crate::test_authority_clients::MockAuthorityApi;
12use sui_authority_aggregation::ReduceOutput;
13use sui_authority_aggregation::quorum_map_then_reduce_with_timeout;
14use sui_config::genesis::Genesis;
15use sui_network::{
16    DEFAULT_CONNECT_TIMEOUT_SEC, DEFAULT_REQUEST_TIMEOUT_SEC, default_mysten_network_config,
17};
18use sui_swarm_config::network_config::NetworkConfig;
19use sui_types::crypto::AuthorityPublicKeyBytes;
20use sui_types::error::UserInputError;
21use sui_types::object::Object;
22use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
23use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
24use sui_types::{
25    base_types::*,
26    committee::Committee,
27    error::{SuiError, SuiResult},
28};
29use tracing::debug;
30
31use crate::epoch::committee_store::CommitteeStore;
32use prometheus::Registry;
33use std::collections::{BTreeMap, HashMap};
34use std::sync::Arc;
35use std::time::Duration;
36use sui_types::committee::{CommitteeWithNetworkMetadata, StakeUnit};
37use sui_types::messages_grpc::{LayoutGenerationOption, ObjectInfoRequest};
38use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
39
40pub const DEFAULT_RETRIES: usize = 4;
41
42#[derive(Clone)]
43pub struct TimeoutConfig {
44    pub pre_quorum_timeout: Duration,
45    pub post_quorum_timeout: Duration,
46}
47
48impl Default for TimeoutConfig {
49    fn default() -> Self {
50        Self {
51            pre_quorum_timeout: Duration::from_secs(60),
52            post_quorum_timeout: Duration::from_secs(7),
53        }
54    }
55}
56
57#[derive(Clone)]
58pub struct AuthorityAggregator<A: Clone> {
59    /// Our Sui committee.
60    pub committee: Arc<Committee>,
61    /// For more human readable metrics reporting.
62    /// It's OK for this map to be empty or missing validators, it then defaults
63    /// to use concise validator public keys.
64    pub validator_display_names: Arc<HashMap<AuthorityName, String>>,
65    /// Reference gas price for the current epoch.
66    pub reference_gas_price: u64,
67    /// How to talk to this committee.
68    pub authority_clients: Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>>,
69    /// Metric base for the purpose of creating new safe clients during reconfiguration.
70    pub safe_client_metrics_base: SafeClientMetricsBase,
71    pub timeouts: TimeoutConfig,
72    /// Store here for clone during re-config.
73    pub committee_store: Arc<CommitteeStore>,
74}
75
76impl<A: Clone> AuthorityAggregator<A> {
77    pub fn new(
78        committee: Committee,
79        validator_display_names: Arc<HashMap<AuthorityName, String>>,
80        reference_gas_price: u64,
81        committee_store: Arc<CommitteeStore>,
82        authority_clients: BTreeMap<AuthorityName, A>,
83        safe_client_metrics_base: SafeClientMetricsBase,
84        timeouts: TimeoutConfig,
85    ) -> Self {
86        Self {
87            committee: Arc::new(committee),
88            validator_display_names,
89            reference_gas_price,
90            authority_clients: create_safe_clients(
91                authority_clients,
92                &committee_store,
93                &safe_client_metrics_base,
94            ),
95            safe_client_metrics_base,
96            timeouts,
97            committee_store,
98        }
99    }
100
101    pub fn get_client(&self, name: &AuthorityName) -> Option<&Arc<SafeClient<A>>> {
102        self.authority_clients.get(name)
103    }
104
105    pub fn clone_client_test_only(&self, name: &AuthorityName) -> Arc<SafeClient<A>>
106    where
107        A: Clone,
108    {
109        self.authority_clients[name].clone()
110    }
111
112    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
113        self.committee_store.clone()
114    }
115
116    pub fn clone_inner_committee_test_only(&self) -> Committee {
117        (*self.committee).clone()
118    }
119
120    pub fn clone_inner_clients_test_only(&self) -> BTreeMap<AuthorityName, SafeClient<A>> {
121        (*self.authority_clients)
122            .clone()
123            .into_iter()
124            .map(|(k, v)| (k, (*v).clone()))
125            .collect()
126    }
127
128    pub fn get_display_name(&self, name: &AuthorityName) -> String {
129        self.validator_display_names
130            .get(name)
131            .cloned()
132            .unwrap_or_else(|| name.concise().to_string())
133    }
134}
135
136fn create_safe_clients<A: Clone>(
137    authority_clients: BTreeMap<AuthorityName, A>,
138    committee_store: &Arc<CommitteeStore>,
139    safe_client_metrics_base: &SafeClientMetricsBase,
140) -> Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>> {
141    Arc::new(
142        authority_clients
143            .into_iter()
144            .map(|(name, api)| {
145                (
146                    name,
147                    Arc::new(SafeClient::new(
148                        api,
149                        committee_store.clone(),
150                        name,
151                        SafeClientMetrics::new(safe_client_metrics_base, name),
152                    )),
153                )
154            })
155            .collect(),
156    )
157}
158
159impl AuthorityAggregator<NetworkAuthorityClient> {
160    /// Create a new network authority aggregator by reading the committee and network addresses
161    /// information from the given epoch start system state.
162    pub fn new_from_epoch_start_state(
163        epoch_start_state: &EpochStartSystemState,
164        committee_store: &Arc<CommitteeStore>,
165        safe_client_metrics_base: SafeClientMetricsBase,
166    ) -> Self {
167        let committee = epoch_start_state.get_sui_committee_with_network_metadata();
168        let validator_display_names = epoch_start_state.get_authority_names_to_hostnames();
169        Self::new_from_committee(
170            committee,
171            Arc::new(validator_display_names),
172            epoch_start_state.reference_gas_price(),
173            committee_store,
174            safe_client_metrics_base,
175        )
176    }
177
178    /// Create a new AuthorityAggregator using information from the given epoch start system state.
179    /// This is typically used during reconfiguration to create a new AuthorityAggregator with the
180    /// new committee and network addresses.
181    pub fn recreate_with_new_epoch_start_state(
182        &self,
183        epoch_start_state: &EpochStartSystemState,
184    ) -> Self {
185        Self::new_from_epoch_start_state(
186            epoch_start_state,
187            &self.committee_store,
188            self.safe_client_metrics_base.clone(),
189        )
190    }
191
192    pub fn new_from_committee(
193        committee: CommitteeWithNetworkMetadata,
194        validator_display_names: Arc<HashMap<AuthorityName, String>>,
195        reference_gas_price: u64,
196        committee_store: &Arc<CommitteeStore>,
197        safe_client_metrics_base: SafeClientMetricsBase,
198    ) -> Self {
199        let net_config = default_mysten_network_config();
200        let authority_clients =
201            make_network_authority_clients_with_network_config(&committee, &net_config);
202        Self::new(
203            committee.committee().clone(),
204            validator_display_names,
205            reference_gas_price,
206            committee_store.clone(),
207            authority_clients,
208            safe_client_metrics_base,
209            Default::default(),
210        )
211    }
212}
213
214impl<A> AuthorityAggregator<A>
215where
216    A: AuthorityAPI + Send + Sync + 'static + Clone,
217{
218    /// Query the object with highest version number from the authorities.
219    /// We stop after receiving responses from 2f+1 validators.
220    /// This function is untrusted because we simply assume each response is valid and there are no
221    /// byzantine validators.
222    /// Because of this, this function should only be used for testing or benchmarking.
223    pub async fn get_latest_object_version_for_testing(
224        &self,
225        object_id: ObjectID,
226    ) -> SuiResult<Object> {
227        #[derive(Debug, Default)]
228        struct State {
229            latest_object_version: Option<Object>,
230            total_weight: StakeUnit,
231        }
232        let initial_state = State::default();
233        let result = quorum_map_then_reduce_with_timeout(
234                self.committee.clone(),
235                self.authority_clients.clone(),
236                initial_state,
237                |_name, client| {
238                    Box::pin(async move {
239                        let request =
240                            ObjectInfoRequest::latest_object_info_request(object_id, /* generate_layout */ LayoutGenerationOption::None);
241                        let mut retry_count = 0;
242                        loop {
243                            match client.handle_object_info_request(request.clone()).await {
244                                Ok(object_info) => return Ok(object_info),
245                                Err(err) => {
246                                    retry_count += 1;
247                                    if retry_count > 3 {
248                                        return Err(err);
249                                    }
250                                    tokio::time::sleep(Duration::from_secs(1)).await;
251                                }
252                            }
253                        }
254                    })
255                },
256                |mut state, name, weight, result| {
257                    Box::pin(async move {
258                        state.total_weight += weight;
259                        match result {
260                            Ok(object_info) => {
261                                debug!("Received object info response from validator {:?} with version: {:?}", name.concise(), object_info.object.version());
262                                if state.latest_object_version.as_ref().is_none_or(|latest| {
263                                    object_info.object.version() > latest.version()
264                                }) {
265                                    state.latest_object_version = Some(object_info.object);
266                                }
267                            }
268                            Err(err) => {
269                                debug!("Received error from validator {:?}: {:?}", name.concise(), err);
270                            }
271                        };
272                        if state.total_weight >= self.committee.quorum_threshold() {
273                            if let Some(object) = state.latest_object_version {
274                                return ReduceOutput::Success(object);
275                            } else {
276                                return ReduceOutput::Failed(state);
277                            }
278                        }
279                        ReduceOutput::Continue(state)
280                    })
281                },
282                // A long timeout before we hear back from a quorum
283                self.timeouts.pre_quorum_timeout,
284            )
285            .await.map_err(|_state| SuiError::from(UserInputError::ObjectNotFound {
286                object_id,
287                version: None,
288            }))?;
289        Ok(result.0)
290    }
291
292    /// Get the latest system state object from the authorities.
293    /// This function assumes all validators are honest.
294    /// It should only be used for testing or benchmarking.
295    pub async fn get_latest_system_state_object_for_testing(
296        &self,
297    ) -> anyhow::Result<SuiSystemState> {
298        #[derive(Debug, Default)]
299        struct State {
300            latest_system_state: Option<SuiSystemState>,
301            total_weight: StakeUnit,
302        }
303        let initial_state = State::default();
304        let result = quorum_map_then_reduce_with_timeout(
305            self.committee.clone(),
306            self.authority_clients.clone(),
307            initial_state,
308            |_name, client| Box::pin(async move { client.handle_system_state_object().await }),
309            |mut state, name, weight, result| {
310                Box::pin(async move {
311                    state.total_weight += weight;
312                    match result {
313                        Ok(system_state) => {
314                            debug!(
315                                "Received system state object from validator {:?} with epoch: {:?}",
316                                name.concise(),
317                                system_state.epoch()
318                            );
319                            if state
320                                .latest_system_state
321                                .as_ref()
322                                .is_none_or(|latest| system_state.epoch() > latest.epoch())
323                            {
324                                state.latest_system_state = Some(system_state);
325                            }
326                        }
327                        Err(err) => {
328                            debug!(
329                                "Received error from validator {:?}: {:?}",
330                                name.concise(),
331                                err
332                            );
333                        }
334                    };
335                    if state.total_weight >= self.committee.quorum_threshold() {
336                        if let Some(system_state) = state.latest_system_state {
337                            return ReduceOutput::Success(system_state);
338                        } else {
339                            return ReduceOutput::Failed(state);
340                        }
341                    }
342                    ReduceOutput::Continue(state)
343                })
344            },
345            // A long timeout before we hear back from a quorum
346            self.timeouts.pre_quorum_timeout,
347        )
348        .await
349        .map_err(|_| anyhow::anyhow!("Failed to get latest system state from the authorities"))?;
350        Ok(result.0)
351    }
352}
353
354#[derive(Default)]
355pub struct AuthorityAggregatorBuilder<'a> {
356    network_config: Option<&'a NetworkConfig>,
357    genesis: Option<&'a Genesis>,
358    committee: Option<Committee>,
359    reference_gas_price: Option<u64>,
360    committee_store: Option<Arc<CommitteeStore>>,
361    registry: Option<&'a Registry>,
362    timeouts_config: Option<TimeoutConfig>,
363}
364
365impl<'a> AuthorityAggregatorBuilder<'a> {
366    pub fn from_network_config(config: &'a NetworkConfig) -> Self {
367        Self {
368            network_config: Some(config),
369            ..Default::default()
370        }
371    }
372
373    pub fn from_genesis(genesis: &'a Genesis) -> Self {
374        Self {
375            genesis: Some(genesis),
376            ..Default::default()
377        }
378    }
379
380    pub fn from_committee(committee: Committee) -> Self {
381        Self {
382            committee: Some(committee),
383            ..Default::default()
384        }
385    }
386
387    #[cfg(test)]
388    pub fn from_committee_size(committee_size: usize) -> Self {
389        let (committee, _keypairs) = Committee::new_simple_test_committee_of_size(committee_size);
390        Self::from_committee(committee)
391    }
392
393    pub fn with_committee_store(mut self, committee_store: Arc<CommitteeStore>) -> Self {
394        self.committee_store = Some(committee_store);
395        self
396    }
397
398    pub fn with_registry(mut self, registry: &'a Registry) -> Self {
399        self.registry = Some(registry);
400        self
401    }
402
403    pub fn with_timeouts_config(mut self, timeouts_config: TimeoutConfig) -> Self {
404        self.timeouts_config = Some(timeouts_config);
405        self
406    }
407
408    fn get_network_committee(&self) -> CommitteeWithNetworkMetadata {
409        self.get_genesis()
410            .unwrap_or_else(|| panic!("need either NetworkConfig or Genesis."))
411            .committee_with_network()
412    }
413
414    fn get_committee_authority_names_to_hostnames(&self) -> HashMap<AuthorityName, String> {
415        if let Some(genesis) = self.get_genesis() {
416            let state = genesis
417                .sui_system_object()
418                .into_genesis_version_for_tooling();
419            state
420                .validators
421                .active_validators
422                .iter()
423                .map(|v| {
424                    let metadata = v.verified_metadata();
425                    let name = metadata.sui_pubkey_bytes();
426
427                    (name, metadata.name.clone())
428                })
429                .collect()
430        } else {
431            HashMap::new()
432        }
433    }
434
435    fn get_reference_gas_price(&self) -> u64 {
436        self.reference_gas_price.unwrap_or_else(|| {
437            self.get_genesis()
438                .map(|g| g.reference_gas_price())
439                .unwrap_or(1000)
440        })
441    }
442
443    fn get_genesis(&self) -> Option<&Genesis> {
444        if let Some(network_config) = self.network_config {
445            Some(&network_config.genesis)
446        } else if let Some(genesis) = self.genesis {
447            Some(genesis)
448        } else {
449            None
450        }
451    }
452
453    fn get_committee(&self) -> Committee {
454        self.committee
455            .clone()
456            .unwrap_or_else(|| self.get_network_committee().committee().clone())
457    }
458
459    pub fn build_network_clients(
460        self,
461    ) -> (
462        AuthorityAggregator<NetworkAuthorityClient>,
463        BTreeMap<AuthorityPublicKeyBytes, NetworkAuthorityClient>,
464    ) {
465        let network_committee = self.get_network_committee();
466        let auth_clients = make_authority_clients_with_timeout_config(
467            &network_committee,
468            DEFAULT_CONNECT_TIMEOUT_SEC,
469            DEFAULT_REQUEST_TIMEOUT_SEC,
470        );
471        let auth_agg = self.build_custom_clients(auth_clients.clone());
472        (auth_agg, auth_clients)
473    }
474
475    pub fn build_custom_clients<C: Clone>(
476        self,
477        authority_clients: BTreeMap<AuthorityName, C>,
478    ) -> AuthorityAggregator<C> {
479        let committee = self.get_committee();
480        let validator_display_names = self.get_committee_authority_names_to_hostnames();
481        let reference_gas_price = self.get_reference_gas_price();
482        let registry = Registry::new();
483        let registry = self.registry.unwrap_or(&registry);
484        let safe_client_metrics_base = SafeClientMetricsBase::new(registry);
485
486        let committee_store = self
487            .committee_store
488            .unwrap_or_else(|| Arc::new(CommitteeStore::new_for_testing(&committee)));
489
490        let timeouts_config = self.timeouts_config.unwrap_or_default();
491
492        AuthorityAggregator::new(
493            committee,
494            Arc::new(validator_display_names),
495            reference_gas_price,
496            committee_store,
497            authority_clients,
498            safe_client_metrics_base,
499            timeouts_config,
500        )
501    }
502
503    #[cfg(test)]
504    pub fn build_mock_authority_aggregator(self) -> AuthorityAggregator<MockAuthorityApi> {
505        let committee = self.get_committee();
506        let clients = committee
507            .names()
508            .map(|name| {
509                (
510                    *name,
511                    MockAuthorityApi::new(
512                        Duration::from_millis(100),
513                        Arc::new(std::sync::Mutex::new(30)),
514                    ),
515                )
516            })
517            .collect();
518        self.build_custom_clients(clients)
519    }
520}