1use crate::authority_client::{
6 AuthorityAPI, NetworkAuthorityClient, make_authority_clients_with_timeout_config,
7 make_network_authority_clients_with_network_config,
8};
9use crate::safe_client::{SafeClient, SafeClientMetrics, SafeClientMetricsBase};
10#[cfg(test)]
11use crate::test_authority_clients::MockAuthorityApi;
12use sui_authority_aggregation::ReduceOutput;
13use sui_authority_aggregation::quorum_map_then_reduce_with_timeout;
14use sui_config::genesis::Genesis;
15use sui_network::{
16 DEFAULT_CONNECT_TIMEOUT_SEC, DEFAULT_REQUEST_TIMEOUT_SEC, default_mysten_network_config,
17};
18use sui_swarm_config::network_config::NetworkConfig;
19use sui_types::crypto::AuthorityPublicKeyBytes;
20use sui_types::error::UserInputError;
21use sui_types::object::Object;
22use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
23use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
24use sui_types::{
25 base_types::*,
26 committee::Committee,
27 error::{SuiError, SuiResult},
28};
29use tracing::debug;
30
31use crate::epoch::committee_store::CommitteeStore;
32use prometheus::Registry;
33use std::collections::{BTreeMap, HashMap};
34use std::sync::Arc;
35use std::time::Duration;
36use sui_types::committee::{CommitteeWithNetworkMetadata, StakeUnit};
37use sui_types::messages_grpc::{LayoutGenerationOption, ObjectInfoRequest};
38use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
39
40pub const DEFAULT_RETRIES: usize = 4;
41
42#[derive(Clone)]
43pub struct TimeoutConfig {
44 pub pre_quorum_timeout: Duration,
45 pub post_quorum_timeout: Duration,
46}
47
48impl Default for TimeoutConfig {
49 fn default() -> Self {
50 Self {
51 pre_quorum_timeout: Duration::from_secs(60),
52 post_quorum_timeout: Duration::from_secs(7),
53 }
54 }
55}
56
57#[derive(Clone)]
58pub struct AuthorityAggregator<A: Clone> {
59 pub committee: Arc<Committee>,
61 pub validator_display_names: Arc<HashMap<AuthorityName, String>>,
65 pub reference_gas_price: u64,
67 pub authority_clients: Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>>,
69 pub safe_client_metrics_base: SafeClientMetricsBase,
71 pub timeouts: TimeoutConfig,
72 pub committee_store: Arc<CommitteeStore>,
74}
75
76impl<A: Clone> AuthorityAggregator<A> {
77 pub fn new(
78 committee: Committee,
79 validator_display_names: Arc<HashMap<AuthorityName, String>>,
80 reference_gas_price: u64,
81 committee_store: Arc<CommitteeStore>,
82 authority_clients: BTreeMap<AuthorityName, A>,
83 safe_client_metrics_base: SafeClientMetricsBase,
84 timeouts: TimeoutConfig,
85 ) -> Self {
86 Self {
87 committee: Arc::new(committee),
88 validator_display_names,
89 reference_gas_price,
90 authority_clients: create_safe_clients(
91 authority_clients,
92 &committee_store,
93 &safe_client_metrics_base,
94 ),
95 safe_client_metrics_base,
96 timeouts,
97 committee_store,
98 }
99 }
100
101 pub fn get_client(&self, name: &AuthorityName) -> Option<&Arc<SafeClient<A>>> {
102 self.authority_clients.get(name)
103 }
104
105 pub fn clone_client_test_only(&self, name: &AuthorityName) -> Arc<SafeClient<A>>
106 where
107 A: Clone,
108 {
109 self.authority_clients[name].clone()
110 }
111
112 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
113 self.committee_store.clone()
114 }
115
116 pub fn clone_inner_committee_test_only(&self) -> Committee {
117 (*self.committee).clone()
118 }
119
120 pub fn clone_inner_clients_test_only(&self) -> BTreeMap<AuthorityName, SafeClient<A>> {
121 (*self.authority_clients)
122 .clone()
123 .into_iter()
124 .map(|(k, v)| (k, (*v).clone()))
125 .collect()
126 }
127
128 pub fn get_display_name(&self, name: &AuthorityName) -> String {
129 self.validator_display_names
130 .get(name)
131 .cloned()
132 .unwrap_or_else(|| name.concise().to_string())
133 }
134}
135
136fn create_safe_clients<A: Clone>(
137 authority_clients: BTreeMap<AuthorityName, A>,
138 committee_store: &Arc<CommitteeStore>,
139 safe_client_metrics_base: &SafeClientMetricsBase,
140) -> Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>> {
141 Arc::new(
142 authority_clients
143 .into_iter()
144 .map(|(name, api)| {
145 (
146 name,
147 Arc::new(SafeClient::new(
148 api,
149 committee_store.clone(),
150 name,
151 SafeClientMetrics::new(safe_client_metrics_base, name),
152 )),
153 )
154 })
155 .collect(),
156 )
157}
158
159impl AuthorityAggregator<NetworkAuthorityClient> {
160 pub fn new_from_epoch_start_state(
163 epoch_start_state: &EpochStartSystemState,
164 committee_store: &Arc<CommitteeStore>,
165 safe_client_metrics_base: SafeClientMetricsBase,
166 ) -> Self {
167 let committee = epoch_start_state.get_sui_committee_with_network_metadata();
168 let validator_display_names = epoch_start_state.get_authority_names_to_hostnames();
169 Self::new_from_committee(
170 committee,
171 Arc::new(validator_display_names),
172 epoch_start_state.reference_gas_price(),
173 committee_store,
174 safe_client_metrics_base,
175 )
176 }
177
178 pub fn recreate_with_new_epoch_start_state(
182 &self,
183 epoch_start_state: &EpochStartSystemState,
184 ) -> Self {
185 Self::new_from_epoch_start_state(
186 epoch_start_state,
187 &self.committee_store,
188 self.safe_client_metrics_base.clone(),
189 )
190 }
191
192 pub fn new_from_committee(
193 committee: CommitteeWithNetworkMetadata,
194 validator_display_names: Arc<HashMap<AuthorityName, String>>,
195 reference_gas_price: u64,
196 committee_store: &Arc<CommitteeStore>,
197 safe_client_metrics_base: SafeClientMetricsBase,
198 ) -> Self {
199 let net_config = default_mysten_network_config();
200 let authority_clients =
201 make_network_authority_clients_with_network_config(&committee, &net_config);
202 Self::new(
203 committee.committee().clone(),
204 validator_display_names,
205 reference_gas_price,
206 committee_store.clone(),
207 authority_clients,
208 safe_client_metrics_base,
209 Default::default(),
210 )
211 }
212}
213
214impl<A> AuthorityAggregator<A>
215where
216 A: AuthorityAPI + Send + Sync + 'static + Clone,
217{
218 pub async fn get_latest_object_version_for_testing(
224 &self,
225 object_id: ObjectID,
226 ) -> SuiResult<Object> {
227 #[derive(Debug, Default)]
228 struct State {
229 latest_object_version: Option<Object>,
230 total_weight: StakeUnit,
231 }
232 let initial_state = State::default();
233 let result = quorum_map_then_reduce_with_timeout(
234 self.committee.clone(),
235 self.authority_clients.clone(),
236 initial_state,
237 |_name, client| {
238 Box::pin(async move {
239 let request =
240 ObjectInfoRequest::latest_object_info_request(object_id, LayoutGenerationOption::None);
241 let mut retry_count = 0;
242 loop {
243 match client.handle_object_info_request(request.clone()).await {
244 Ok(object_info) => return Ok(object_info),
245 Err(err) => {
246 retry_count += 1;
247 if retry_count > 3 {
248 return Err(err);
249 }
250 tokio::time::sleep(Duration::from_secs(1)).await;
251 }
252 }
253 }
254 })
255 },
256 |mut state, name, weight, result| {
257 Box::pin(async move {
258 state.total_weight += weight;
259 match result {
260 Ok(object_info) => {
261 debug!("Received object info response from validator {:?} with version: {:?}", name.concise(), object_info.object.version());
262 if state.latest_object_version.as_ref().is_none_or(|latest| {
263 object_info.object.version() > latest.version()
264 }) {
265 state.latest_object_version = Some(object_info.object);
266 }
267 }
268 Err(err) => {
269 debug!("Received error from validator {:?}: {:?}", name.concise(), err);
270 }
271 };
272 if state.total_weight >= self.committee.quorum_threshold() {
273 if let Some(object) = state.latest_object_version {
274 return ReduceOutput::Success(object);
275 } else {
276 return ReduceOutput::Failed(state);
277 }
278 }
279 ReduceOutput::Continue(state)
280 })
281 },
282 self.timeouts.pre_quorum_timeout,
284 )
285 .await.map_err(|_state| SuiError::from(UserInputError::ObjectNotFound {
286 object_id,
287 version: None,
288 }))?;
289 Ok(result.0)
290 }
291
292 pub async fn get_latest_system_state_object_for_testing(
296 &self,
297 ) -> anyhow::Result<SuiSystemState> {
298 #[derive(Debug, Default)]
299 struct State {
300 latest_system_state: Option<SuiSystemState>,
301 total_weight: StakeUnit,
302 }
303 let initial_state = State::default();
304 let result = quorum_map_then_reduce_with_timeout(
305 self.committee.clone(),
306 self.authority_clients.clone(),
307 initial_state,
308 |_name, client| Box::pin(async move { client.handle_system_state_object().await }),
309 |mut state, name, weight, result| {
310 Box::pin(async move {
311 state.total_weight += weight;
312 match result {
313 Ok(system_state) => {
314 debug!(
315 "Received system state object from validator {:?} with epoch: {:?}",
316 name.concise(),
317 system_state.epoch()
318 );
319 if state
320 .latest_system_state
321 .as_ref()
322 .is_none_or(|latest| system_state.epoch() > latest.epoch())
323 {
324 state.latest_system_state = Some(system_state);
325 }
326 }
327 Err(err) => {
328 debug!(
329 "Received error from validator {:?}: {:?}",
330 name.concise(),
331 err
332 );
333 }
334 };
335 if state.total_weight >= self.committee.quorum_threshold() {
336 if let Some(system_state) = state.latest_system_state {
337 return ReduceOutput::Success(system_state);
338 } else {
339 return ReduceOutput::Failed(state);
340 }
341 }
342 ReduceOutput::Continue(state)
343 })
344 },
345 self.timeouts.pre_quorum_timeout,
347 )
348 .await
349 .map_err(|_| anyhow::anyhow!("Failed to get latest system state from the authorities"))?;
350 Ok(result.0)
351 }
352}
353
354#[derive(Default)]
355pub struct AuthorityAggregatorBuilder<'a> {
356 network_config: Option<&'a NetworkConfig>,
357 genesis: Option<&'a Genesis>,
358 committee: Option<Committee>,
359 reference_gas_price: Option<u64>,
360 committee_store: Option<Arc<CommitteeStore>>,
361 registry: Option<&'a Registry>,
362 safe_client_metrics_base: Option<SafeClientMetricsBase>,
363 timeouts_config: Option<TimeoutConfig>,
364}
365
366impl<'a> AuthorityAggregatorBuilder<'a> {
367 pub fn from_network_config(config: &'a NetworkConfig) -> Self {
368 Self {
369 network_config: Some(config),
370 ..Default::default()
371 }
372 }
373
374 pub fn from_genesis(genesis: &'a Genesis) -> Self {
375 Self {
376 genesis: Some(genesis),
377 ..Default::default()
378 }
379 }
380
381 pub fn from_committee(committee: Committee) -> Self {
382 Self {
383 committee: Some(committee),
384 ..Default::default()
385 }
386 }
387
388 #[cfg(test)]
389 pub fn from_committee_size(committee_size: usize) -> Self {
390 let (committee, _keypairs) = Committee::new_simple_test_committee_of_size(committee_size);
391 Self::from_committee(committee)
392 }
393
394 pub fn with_committee_store(mut self, committee_store: Arc<CommitteeStore>) -> Self {
395 self.committee_store = Some(committee_store);
396 self
397 }
398
399 pub fn with_registry(mut self, registry: &'a Registry) -> Self {
400 self.registry = Some(registry);
401 self
402 }
403
404 pub fn with_timeouts_config(mut self, timeouts_config: TimeoutConfig) -> Self {
405 self.timeouts_config = Some(timeouts_config);
406 self
407 }
408
409 pub fn with_safe_client_metrics_base(
410 mut self,
411 safe_client_metrics_base: SafeClientMetricsBase,
412 ) -> Self {
413 self.safe_client_metrics_base = Some(safe_client_metrics_base);
414 self
415 }
416
417 fn get_network_committee(&self) -> CommitteeWithNetworkMetadata {
418 self.get_genesis()
419 .unwrap_or_else(|| panic!("need either NetworkConfig or Genesis."))
420 .committee_with_network()
421 }
422
423 fn get_committee_authority_names_to_hostnames(&self) -> HashMap<AuthorityName, String> {
424 if let Some(genesis) = self.get_genesis() {
425 let state = genesis
426 .sui_system_object()
427 .into_genesis_version_for_tooling();
428 state
429 .validators
430 .active_validators
431 .iter()
432 .map(|v| {
433 let metadata = v.verified_metadata();
434 let name = metadata.sui_pubkey_bytes();
435
436 (name, metadata.name.clone())
437 })
438 .collect()
439 } else {
440 HashMap::new()
441 }
442 }
443
444 fn get_reference_gas_price(&self) -> u64 {
445 self.reference_gas_price.unwrap_or_else(|| {
446 self.get_genesis()
447 .map(|g| g.reference_gas_price())
448 .unwrap_or(1000)
449 })
450 }
451
452 fn get_genesis(&self) -> Option<&Genesis> {
453 if let Some(network_config) = self.network_config {
454 Some(&network_config.genesis)
455 } else if let Some(genesis) = self.genesis {
456 Some(genesis)
457 } else {
458 None
459 }
460 }
461
462 fn get_committee(&self) -> Committee {
463 self.committee
464 .clone()
465 .unwrap_or_else(|| self.get_network_committee().committee().clone())
466 }
467
468 pub fn build_network_clients(
469 self,
470 ) -> (
471 AuthorityAggregator<NetworkAuthorityClient>,
472 BTreeMap<AuthorityPublicKeyBytes, NetworkAuthorityClient>,
473 ) {
474 let genesis_committee = self.get_genesis().unwrap().committee();
475 let network_committee = self.get_network_committee();
476 let auth_clients = make_authority_clients_with_timeout_config(
477 &network_committee,
478 DEFAULT_CONNECT_TIMEOUT_SEC,
479 DEFAULT_REQUEST_TIMEOUT_SEC,
480 );
481 let auth_agg = self.build_custom_clients(&genesis_committee, auth_clients.clone());
482 (auth_agg, auth_clients)
483 }
484
485 pub fn build_custom_clients<C: Clone>(
486 self,
487 genesis_committee: &Committee,
488 authority_clients: BTreeMap<AuthorityName, C>,
489 ) -> AuthorityAggregator<C> {
490 let committee = self.get_committee();
491 let validator_display_names = self.get_committee_authority_names_to_hostnames();
492 let reference_gas_price = self.get_reference_gas_price();
493 let registry = Registry::new();
494 let registry = self.registry.unwrap_or(®istry);
495 let safe_client_metrics_base = self
496 .safe_client_metrics_base
497 .unwrap_or_else(|| SafeClientMetricsBase::new(registry));
498
499 let committee_store = self
500 .committee_store
501 .unwrap_or_else(|| Arc::new(CommitteeStore::new_for_testing(genesis_committee)));
502
503 let timeouts_config = self.timeouts_config.unwrap_or_default();
504
505 AuthorityAggregator::new(
506 committee,
507 Arc::new(validator_display_names),
508 reference_gas_price,
509 committee_store,
510 authority_clients,
511 safe_client_metrics_base,
512 timeouts_config,
513 )
514 }
515
516 #[cfg(test)]
517 pub fn build_mock_authority_aggregator(self) -> AuthorityAggregator<MockAuthorityApi> {
518 let committee = self.get_committee();
519 let clients = committee
520 .names()
521 .map(|name| {
522 (
523 *name,
524 MockAuthorityApi::new(
525 Duration::from_millis(100),
526 Arc::new(std::sync::Mutex::new(30)),
527 ),
528 )
529 })
530 .collect();
531 self.build_custom_clients(&committee, clients)
532 }
533}