sui_core/
authority_aggregator.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::authority_client::{
6    AuthorityAPI, NetworkAuthorityClient, make_authority_clients_with_timeout_config,
7    make_network_authority_clients_with_network_config,
8};
9use crate::safe_client::{SafeClient, SafeClientMetrics, SafeClientMetricsBase};
10#[cfg(test)]
11use crate::test_authority_clients::MockAuthorityApi;
12use sui_authority_aggregation::ReduceOutput;
13use sui_authority_aggregation::quorum_map_then_reduce_with_timeout;
14use sui_config::genesis::Genesis;
15use sui_network::{
16    DEFAULT_CONNECT_TIMEOUT_SEC, DEFAULT_REQUEST_TIMEOUT_SEC, default_mysten_network_config,
17};
18use sui_swarm_config::network_config::NetworkConfig;
19use sui_types::crypto::AuthorityPublicKeyBytes;
20use sui_types::error::UserInputError;
21use sui_types::object::Object;
22use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
23use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
24use sui_types::{
25    base_types::*,
26    committee::Committee,
27    error::{SuiError, SuiResult},
28};
29use tracing::debug;
30
31use crate::epoch::committee_store::CommitteeStore;
32use prometheus::Registry;
33use std::collections::{BTreeMap, HashMap};
34use std::sync::Arc;
35use std::time::Duration;
36use sui_types::committee::{CommitteeWithNetworkMetadata, StakeUnit};
37use sui_types::messages_grpc::{LayoutGenerationOption, ObjectInfoRequest};
38use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
39
40pub const DEFAULT_RETRIES: usize = 4;
41
42#[derive(Clone)]
43pub struct TimeoutConfig {
44    pub pre_quorum_timeout: Duration,
45    pub post_quorum_timeout: Duration,
46}
47
48impl Default for TimeoutConfig {
49    fn default() -> Self {
50        Self {
51            pre_quorum_timeout: Duration::from_secs(60),
52            post_quorum_timeout: Duration::from_secs(7),
53        }
54    }
55}
56
57#[derive(Clone)]
58pub struct AuthorityAggregator<A: Clone> {
59    /// Our Sui committee.
60    pub committee: Arc<Committee>,
61    /// For more human readable metrics reporting.
62    /// It's OK for this map to be empty or missing validators, it then defaults
63    /// to use concise validator public keys.
64    pub validator_display_names: Arc<HashMap<AuthorityName, String>>,
65    /// Reference gas price for the current epoch.
66    pub reference_gas_price: u64,
67    /// How to talk to this committee.
68    pub authority_clients: Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>>,
69    /// Metric base for the purpose of creating new safe clients during reconfiguration.
70    pub safe_client_metrics_base: SafeClientMetricsBase,
71    pub timeouts: TimeoutConfig,
72    /// Store here for clone during re-config.
73    pub committee_store: Arc<CommitteeStore>,
74}
75
76impl<A: Clone> AuthorityAggregator<A> {
77    pub fn new(
78        committee: Committee,
79        validator_display_names: Arc<HashMap<AuthorityName, String>>,
80        reference_gas_price: u64,
81        committee_store: Arc<CommitteeStore>,
82        authority_clients: BTreeMap<AuthorityName, A>,
83        safe_client_metrics_base: SafeClientMetricsBase,
84        timeouts: TimeoutConfig,
85    ) -> Self {
86        Self {
87            committee: Arc::new(committee),
88            validator_display_names,
89            reference_gas_price,
90            authority_clients: create_safe_clients(
91                authority_clients,
92                &committee_store,
93                &safe_client_metrics_base,
94            ),
95            safe_client_metrics_base,
96            timeouts,
97            committee_store,
98        }
99    }
100
101    pub fn get_client(&self, name: &AuthorityName) -> Option<&Arc<SafeClient<A>>> {
102        self.authority_clients.get(name)
103    }
104
105    pub fn clone_client_test_only(&self, name: &AuthorityName) -> Arc<SafeClient<A>>
106    where
107        A: Clone,
108    {
109        self.authority_clients[name].clone()
110    }
111
112    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
113        self.committee_store.clone()
114    }
115
116    pub fn clone_inner_committee_test_only(&self) -> Committee {
117        (*self.committee).clone()
118    }
119
120    pub fn clone_inner_clients_test_only(&self) -> BTreeMap<AuthorityName, SafeClient<A>> {
121        (*self.authority_clients)
122            .clone()
123            .into_iter()
124            .map(|(k, v)| (k, (*v).clone()))
125            .collect()
126    }
127
128    pub fn get_display_name(&self, name: &AuthorityName) -> String {
129        self.validator_display_names
130            .get(name)
131            .cloned()
132            .unwrap_or_else(|| name.concise().to_string())
133    }
134}
135
136fn create_safe_clients<A: Clone>(
137    authority_clients: BTreeMap<AuthorityName, A>,
138    committee_store: &Arc<CommitteeStore>,
139    safe_client_metrics_base: &SafeClientMetricsBase,
140) -> Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>> {
141    Arc::new(
142        authority_clients
143            .into_iter()
144            .map(|(name, api)| {
145                (
146                    name,
147                    Arc::new(SafeClient::new(
148                        api,
149                        committee_store.clone(),
150                        name,
151                        SafeClientMetrics::new(safe_client_metrics_base, name),
152                    )),
153                )
154            })
155            .collect(),
156    )
157}
158
159impl AuthorityAggregator<NetworkAuthorityClient> {
160    /// Create a new network authority aggregator by reading the committee and network addresses
161    /// information from the given epoch start system state.
162    pub fn new_from_epoch_start_state(
163        epoch_start_state: &EpochStartSystemState,
164        committee_store: &Arc<CommitteeStore>,
165        safe_client_metrics_base: SafeClientMetricsBase,
166    ) -> Self {
167        let committee = epoch_start_state.get_sui_committee_with_network_metadata();
168        let validator_display_names = epoch_start_state.get_authority_names_to_hostnames();
169        Self::new_from_committee(
170            committee,
171            Arc::new(validator_display_names),
172            epoch_start_state.reference_gas_price(),
173            committee_store,
174            safe_client_metrics_base,
175        )
176    }
177
178    /// Create a new AuthorityAggregator using information from the given epoch start system state.
179    /// This is typically used during reconfiguration to create a new AuthorityAggregator with the
180    /// new committee and network addresses.
181    pub fn recreate_with_new_epoch_start_state(
182        &self,
183        epoch_start_state: &EpochStartSystemState,
184    ) -> Self {
185        Self::new_from_epoch_start_state(
186            epoch_start_state,
187            &self.committee_store,
188            self.safe_client_metrics_base.clone(),
189        )
190    }
191
192    pub fn new_from_committee(
193        committee: CommitteeWithNetworkMetadata,
194        validator_display_names: Arc<HashMap<AuthorityName, String>>,
195        reference_gas_price: u64,
196        committee_store: &Arc<CommitteeStore>,
197        safe_client_metrics_base: SafeClientMetricsBase,
198    ) -> Self {
199        let net_config = default_mysten_network_config();
200        let authority_clients =
201            make_network_authority_clients_with_network_config(&committee, &net_config);
202        Self::new(
203            committee.committee().clone(),
204            validator_display_names,
205            reference_gas_price,
206            committee_store.clone(),
207            authority_clients,
208            safe_client_metrics_base,
209            Default::default(),
210        )
211    }
212}
213
214impl<A> AuthorityAggregator<A>
215where
216    A: AuthorityAPI + Send + Sync + 'static + Clone,
217{
218    /// Query the object with highest version number from the authorities.
219    /// We stop after receiving responses from 2f+1 validators.
220    /// This function is untrusted because we simply assume each response is valid and there are no
221    /// byzantine validators.
222    /// Because of this, this function should only be used for testing or benchmarking.
223    pub async fn get_latest_object_version_for_testing(
224        &self,
225        object_id: ObjectID,
226    ) -> SuiResult<Object> {
227        #[derive(Debug, Default)]
228        struct State {
229            latest_object_version: Option<Object>,
230            total_weight: StakeUnit,
231        }
232        let initial_state = State::default();
233        let result = quorum_map_then_reduce_with_timeout(
234                self.committee.clone(),
235                self.authority_clients.clone(),
236                initial_state,
237                |_name, client| {
238                    Box::pin(async move {
239                        let request =
240                            ObjectInfoRequest::latest_object_info_request(object_id, /* generate_layout */ LayoutGenerationOption::None);
241                        let mut retry_count = 0;
242                        loop {
243                            match client.handle_object_info_request(request.clone()).await {
244                                Ok(object_info) => return Ok(object_info),
245                                Err(err) => {
246                                    retry_count += 1;
247                                    if retry_count > 3 {
248                                        return Err(err);
249                                    }
250                                    tokio::time::sleep(Duration::from_secs(1)).await;
251                                }
252                            }
253                        }
254                    })
255                },
256                |mut state, name, weight, result| {
257                    Box::pin(async move {
258                        state.total_weight += weight;
259                        match result {
260                            Ok(object_info) => {
261                                debug!("Received object info response from validator {:?} with version: {:?}", name.concise(), object_info.object.version());
262                                if state.latest_object_version.as_ref().is_none_or(|latest| {
263                                    object_info.object.version() > latest.version()
264                                }) {
265                                    state.latest_object_version = Some(object_info.object);
266                                }
267                            }
268                            Err(err) => {
269                                debug!("Received error from validator {:?}: {:?}", name.concise(), err);
270                            }
271                        };
272                        if state.total_weight >= self.committee.quorum_threshold() {
273                            if let Some(object) = state.latest_object_version {
274                                return ReduceOutput::Success(object);
275                            } else {
276                                return ReduceOutput::Failed(state);
277                            }
278                        }
279                        ReduceOutput::Continue(state)
280                    })
281                },
282                // A long timeout before we hear back from a quorum
283                self.timeouts.pre_quorum_timeout,
284            )
285            .await.map_err(|_state| SuiError::from(UserInputError::ObjectNotFound {
286                object_id,
287                version: None,
288            }))?;
289        Ok(result.0)
290    }
291
292    /// Get the latest system state object from the authorities.
293    /// This function assumes all validators are honest.
294    /// It should only be used for testing or benchmarking.
295    pub async fn get_latest_system_state_object_for_testing(
296        &self,
297    ) -> anyhow::Result<SuiSystemState> {
298        #[derive(Debug, Default)]
299        struct State {
300            latest_system_state: Option<SuiSystemState>,
301            total_weight: StakeUnit,
302        }
303        let initial_state = State::default();
304        let result = quorum_map_then_reduce_with_timeout(
305            self.committee.clone(),
306            self.authority_clients.clone(),
307            initial_state,
308            |_name, client| Box::pin(async move { client.handle_system_state_object().await }),
309            |mut state, name, weight, result| {
310                Box::pin(async move {
311                    state.total_weight += weight;
312                    match result {
313                        Ok(system_state) => {
314                            debug!(
315                                "Received system state object from validator {:?} with epoch: {:?}",
316                                name.concise(),
317                                system_state.epoch()
318                            );
319                            if state
320                                .latest_system_state
321                                .as_ref()
322                                .is_none_or(|latest| system_state.epoch() > latest.epoch())
323                            {
324                                state.latest_system_state = Some(system_state);
325                            }
326                        }
327                        Err(err) => {
328                            debug!(
329                                "Received error from validator {:?}: {:?}",
330                                name.concise(),
331                                err
332                            );
333                        }
334                    };
335                    if state.total_weight >= self.committee.quorum_threshold() {
336                        if let Some(system_state) = state.latest_system_state {
337                            return ReduceOutput::Success(system_state);
338                        } else {
339                            return ReduceOutput::Failed(state);
340                        }
341                    }
342                    ReduceOutput::Continue(state)
343                })
344            },
345            // A long timeout before we hear back from a quorum
346            self.timeouts.pre_quorum_timeout,
347        )
348        .await
349        .map_err(|_| anyhow::anyhow!("Failed to get latest system state from the authorities"))?;
350        Ok(result.0)
351    }
352}
353
354#[derive(Default)]
355pub struct AuthorityAggregatorBuilder<'a> {
356    network_config: Option<&'a NetworkConfig>,
357    genesis: Option<&'a Genesis>,
358    committee: Option<Committee>,
359    reference_gas_price: Option<u64>,
360    committee_store: Option<Arc<CommitteeStore>>,
361    registry: Option<&'a Registry>,
362    safe_client_metrics_base: Option<SafeClientMetricsBase>,
363    timeouts_config: Option<TimeoutConfig>,
364}
365
366impl<'a> AuthorityAggregatorBuilder<'a> {
367    pub fn from_network_config(config: &'a NetworkConfig) -> Self {
368        Self {
369            network_config: Some(config),
370            ..Default::default()
371        }
372    }
373
374    pub fn from_genesis(genesis: &'a Genesis) -> Self {
375        Self {
376            genesis: Some(genesis),
377            ..Default::default()
378        }
379    }
380
381    pub fn from_committee(committee: Committee) -> Self {
382        Self {
383            committee: Some(committee),
384            ..Default::default()
385        }
386    }
387
388    #[cfg(test)]
389    pub fn from_committee_size(committee_size: usize) -> Self {
390        let (committee, _keypairs) = Committee::new_simple_test_committee_of_size(committee_size);
391        Self::from_committee(committee)
392    }
393
394    pub fn with_committee_store(mut self, committee_store: Arc<CommitteeStore>) -> Self {
395        self.committee_store = Some(committee_store);
396        self
397    }
398
399    pub fn with_registry(mut self, registry: &'a Registry) -> Self {
400        self.registry = Some(registry);
401        self
402    }
403
404    pub fn with_timeouts_config(mut self, timeouts_config: TimeoutConfig) -> Self {
405        self.timeouts_config = Some(timeouts_config);
406        self
407    }
408
409    pub fn with_safe_client_metrics_base(
410        mut self,
411        safe_client_metrics_base: SafeClientMetricsBase,
412    ) -> Self {
413        self.safe_client_metrics_base = Some(safe_client_metrics_base);
414        self
415    }
416
417    fn get_network_committee(&self) -> CommitteeWithNetworkMetadata {
418        self.get_genesis()
419            .unwrap_or_else(|| panic!("need either NetworkConfig or Genesis."))
420            .committee_with_network()
421    }
422
423    fn get_committee_authority_names_to_hostnames(&self) -> HashMap<AuthorityName, String> {
424        if let Some(genesis) = self.get_genesis() {
425            let state = genesis
426                .sui_system_object()
427                .into_genesis_version_for_tooling();
428            state
429                .validators
430                .active_validators
431                .iter()
432                .map(|v| {
433                    let metadata = v.verified_metadata();
434                    let name = metadata.sui_pubkey_bytes();
435
436                    (name, metadata.name.clone())
437                })
438                .collect()
439        } else {
440            HashMap::new()
441        }
442    }
443
444    fn get_reference_gas_price(&self) -> u64 {
445        self.reference_gas_price.unwrap_or_else(|| {
446            self.get_genesis()
447                .map(|g| g.reference_gas_price())
448                .unwrap_or(1000)
449        })
450    }
451
452    fn get_genesis(&self) -> Option<&Genesis> {
453        if let Some(network_config) = self.network_config {
454            Some(&network_config.genesis)
455        } else if let Some(genesis) = self.genesis {
456            Some(genesis)
457        } else {
458            None
459        }
460    }
461
462    fn get_committee(&self) -> Committee {
463        self.committee
464            .clone()
465            .unwrap_or_else(|| self.get_network_committee().committee().clone())
466    }
467
468    pub fn build_network_clients(
469        self,
470    ) -> (
471        AuthorityAggregator<NetworkAuthorityClient>,
472        BTreeMap<AuthorityPublicKeyBytes, NetworkAuthorityClient>,
473    ) {
474        let genesis_committee = self.get_genesis().unwrap().committee();
475        let network_committee = self.get_network_committee();
476        let auth_clients = make_authority_clients_with_timeout_config(
477            &network_committee,
478            DEFAULT_CONNECT_TIMEOUT_SEC,
479            DEFAULT_REQUEST_TIMEOUT_SEC,
480        );
481        let auth_agg = self.build_custom_clients(&genesis_committee, auth_clients.clone());
482        (auth_agg, auth_clients)
483    }
484
485    pub fn build_custom_clients<C: Clone>(
486        self,
487        genesis_committee: &Committee,
488        authority_clients: BTreeMap<AuthorityName, C>,
489    ) -> AuthorityAggregator<C> {
490        let committee = self.get_committee();
491        let validator_display_names = self.get_committee_authority_names_to_hostnames();
492        let reference_gas_price = self.get_reference_gas_price();
493        let registry = Registry::new();
494        let registry = self.registry.unwrap_or(&registry);
495        let safe_client_metrics_base = self
496            .safe_client_metrics_base
497            .unwrap_or_else(|| SafeClientMetricsBase::new(registry));
498
499        let committee_store = self
500            .committee_store
501            .unwrap_or_else(|| Arc::new(CommitteeStore::new_for_testing(genesis_committee)));
502
503        let timeouts_config = self.timeouts_config.unwrap_or_default();
504
505        AuthorityAggregator::new(
506            committee,
507            Arc::new(validator_display_names),
508            reference_gas_price,
509            committee_store,
510            authority_clients,
511            safe_client_metrics_base,
512            timeouts_config,
513        )
514    }
515
516    #[cfg(test)]
517    pub fn build_mock_authority_aggregator(self) -> AuthorityAggregator<MockAuthorityApi> {
518        let committee = self.get_committee();
519        let clients = committee
520            .names()
521            .map(|name| {
522                (
523                    *name,
524                    MockAuthorityApi::new(
525                        Duration::from_millis(100),
526                        Arc::new(std::sync::Mutex::new(30)),
527                    ),
528                )
529            })
530            .collect();
531        self.build_custom_clients(&committee, clients)
532    }
533}