sui_core/
authority_aggregator.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::authority_client::{
6    AuthorityAPI, NetworkAuthorityClient, make_authority_clients_with_timeout_config,
7    make_network_authority_clients_with_network_config,
8};
9use crate::safe_client::{SafeClient, SafeClientMetrics, SafeClientMetricsBase};
10#[cfg(test)]
11use crate::test_authority_clients::MockAuthorityApi;
12use futures::StreamExt;
13use mysten_metrics::{GaugeGuard, MonitorCancellation, spawn_monitored_task};
14use std::convert::AsRef;
15use std::net::SocketAddr;
16use sui_authority_aggregation::ReduceOutput;
17use sui_authority_aggregation::quorum_map_then_reduce_with_timeout;
18use sui_config::genesis::Genesis;
19use sui_network::{
20    DEFAULT_CONNECT_TIMEOUT_SEC, DEFAULT_REQUEST_TIMEOUT_SEC, default_mysten_network_config,
21};
22use sui_swarm_config::network_config::NetworkConfig;
23use sui_types::crypto::{AuthorityPublicKeyBytes, AuthoritySignInfo};
24use sui_types::error::{SuiErrorKind, UserInputError};
25use sui_types::message_envelope::Message;
26use sui_types::object::Object;
27use sui_types::quorum_driver_types::{GroupedErrors, QuorumDriverResponse};
28use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
29use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
30use sui_types::{
31    base_types::*,
32    committee::Committee,
33    error::{SuiError, SuiResult},
34    transaction::*,
35};
36use thiserror::Error;
37use tracing::{Instrument, debug, error, instrument, trace, trace_span, warn};
38
39use crate::epoch::committee_store::CommitteeStore;
40use crate::stake_aggregator::{InsertResult, MultiStakeAggregator, StakeAggregator};
41use prometheus::{
42    Histogram, IntCounter, IntCounterVec, IntGauge, Registry, register_histogram_with_registry,
43    register_int_counter_vec_with_registry, register_int_counter_with_registry,
44    register_int_gauge_with_registry,
45};
46use std::collections::{BTreeMap, HashMap, HashSet};
47use std::string::ToString;
48use std::sync::Arc;
49use std::time::Duration;
50use sui_types::committee::{CommitteeWithNetworkMetadata, StakeUnit};
51use sui_types::effects::{
52    CertifiedTransactionEffects, SignedTransactionEffects, TransactionEffects, TransactionEvents,
53    VerifiedCertifiedTransactionEffects,
54};
55use sui_types::messages_grpc::{
56    HandleCertificateRequestV3, HandleCertificateResponseV3, LayoutGenerationOption,
57    ObjectInfoRequest,
58};
59use sui_types::messages_safe_client::PlainTransactionInfoResponse;
60use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
61use tokio::time::sleep;
62
63pub const DEFAULT_RETRIES: usize = 4;
64
65#[cfg(test)]
66#[path = "unit_tests/authority_aggregator_tests.rs"]
67pub mod authority_aggregator_tests;
68
69#[derive(Clone)]
70pub struct TimeoutConfig {
71    pub pre_quorum_timeout: Duration,
72    pub post_quorum_timeout: Duration,
73}
74
75impl Default for TimeoutConfig {
76    fn default() -> Self {
77        Self {
78            pre_quorum_timeout: Duration::from_secs(60),
79            post_quorum_timeout: Duration::from_secs(7),
80        }
81    }
82}
83
84/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
85#[derive(Clone)]
86pub struct AuthAggMetrics {
87    pub total_tx_certificates_created: IntCounter,
88    pub process_tx_errors: IntCounterVec,
89    pub process_cert_errors: IntCounterVec,
90    pub total_client_double_spend_attempts_detected: IntCounter,
91    pub total_aggregated_err: IntCounterVec,
92    pub total_rpc_err: IntCounterVec,
93    pub inflight_transactions: IntGauge,
94    pub inflight_certificates: IntGauge,
95    pub inflight_transaction_requests: IntGauge,
96    pub inflight_certificate_requests: IntGauge,
97
98    pub cert_broadcasting_post_quorum_timeout: IntCounter,
99    pub remaining_tasks_when_reaching_cert_quorum: Histogram,
100    pub remaining_tasks_when_cert_broadcasting_post_quorum_timeout: Histogram,
101    pub quorum_reached_without_requested_objects: IntCounter,
102}
103
104impl AuthAggMetrics {
105    pub fn new(registry: &prometheus::Registry) -> Self {
106        Self {
107            total_tx_certificates_created: register_int_counter_with_registry!(
108                "total_tx_certificates_created",
109                "Total number of certificates made in the authority_aggregator",
110                registry,
111            )
112            .unwrap(),
113            process_tx_errors: register_int_counter_vec_with_registry!(
114                "process_tx_errors",
115                "Number of errors returned from validators when processing transaction, group by validator name and error type",
116                &["name","error"],
117                registry,
118            )
119            .unwrap(),
120            process_cert_errors: register_int_counter_vec_with_registry!(
121                "process_cert_errors",
122                "Number of errors returned from validators when processing certificate, group by validator name and error type",
123                &["name", "error"],
124                registry,
125            )
126            .unwrap(),
127            total_client_double_spend_attempts_detected: register_int_counter_with_registry!(
128                "total_client_double_spend_attempts_detected",
129                "Total number of client double spend attempts that are detected",
130                registry,
131            )
132            .unwrap(),
133            total_aggregated_err: register_int_counter_vec_with_registry!(
134                "total_aggregated_err",
135                "Total number of errors returned from validators, grouped by error type",
136                &["error", "tx_recoverable"],
137                registry,
138            )
139            .unwrap(),
140            total_rpc_err: register_int_counter_vec_with_registry!(
141                "total_rpc_err",
142                "Total number of rpc errors returned from validators, grouped by validator short name and RPC error message",
143                &["name", "error_message"],
144                registry,
145            )
146            .unwrap(),
147            inflight_transactions: register_int_gauge_with_registry!(
148                "auth_agg_inflight_transactions",
149                "Inflight transaction gathering signatures",
150                registry,
151            )
152            .unwrap(),
153            inflight_certificates: register_int_gauge_with_registry!(
154                "auth_agg_inflight_certificates",
155                "Inflight certificates gathering effects",
156                registry,
157            )
158            .unwrap(),
159            inflight_transaction_requests: register_int_gauge_with_registry!(
160                "auth_agg_inflight_transaction_requests",
161                "Inflight handle_transaction requests",
162                registry,
163            )
164            .unwrap(),
165            inflight_certificate_requests: register_int_gauge_with_registry!(
166                "auth_agg_inflight_certificate_requests",
167                "Inflight handle_certificate requests",
168                registry,
169            )
170            .unwrap(),
171            cert_broadcasting_post_quorum_timeout: register_int_counter_with_registry!(
172                "auth_agg_cert_broadcasting_post_quorum_timeout",
173                "Total number of timeout in cert processing post quorum",
174                registry,
175            )
176            .unwrap(),
177            remaining_tasks_when_reaching_cert_quorum: register_histogram_with_registry!(
178                "auth_agg_remaining_tasks_when_reaching_cert_quorum",
179                "Number of remaining tasks when reaching certificate quorum",
180                registry,
181            ).unwrap(),
182            remaining_tasks_when_cert_broadcasting_post_quorum_timeout: register_histogram_with_registry!(
183                "auth_agg_remaining_tasks_when_cert_broadcasting_post_quorum_timeout",
184                "Number of remaining tasks when post quorum certificate broadcasting times out",
185                registry,
186            ).unwrap(),
187            quorum_reached_without_requested_objects: register_int_counter_with_registry!(
188                "auth_agg_quorum_reached_without_requested_objects",
189                "Number of times quorum was reached without getting the requested objects back from at least 1 validator",
190                registry,
191            )
192            .unwrap(),
193        }
194    }
195
196    pub fn new_for_tests() -> Self {
197        let registry = prometheus::Registry::new();
198        Self::new(&registry)
199    }
200}
201
202#[derive(Error, Debug, Eq, PartialEq)]
203pub enum AggregatorProcessTransactionError {
204    #[error(
205        "Failed to execute transaction on a quorum of validators due to non-retryable errors. Validator errors: {:?}",
206        errors
207    )]
208    FatalTransaction { errors: GroupedErrors },
209
210    #[error(
211        "Failed to execute transaction on a quorum of validators but state is still retryable. Validator errors: {:?}",
212        errors
213    )]
214    RetryableTransaction { errors: GroupedErrors },
215
216    #[error(
217        "Failed to execute transaction on a quorum of validators due to conflicting transactions. Locked objects: {:?}. Validator errors: {:?}",
218        conflicting_tx_digests,
219        errors
220    )]
221    FatalConflictingTransaction {
222        errors: GroupedErrors,
223        conflicting_tx_digests:
224            BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
225    },
226
227    #[error(
228        "{} of the validators by stake are overloaded with transactions pending execution. Validator errors: {:?}",
229        overloaded_stake,
230        errors
231    )]
232    SystemOverload {
233        overloaded_stake: StakeUnit,
234        errors: GroupedErrors,
235    },
236
237    #[error("Transaction is already finalized but with different user signatures")]
238    TxAlreadyFinalizedWithDifferentUserSignatures,
239
240    #[error(
241        "{} of the validators by stake are overloaded and requested the client to retry after {} seconds. Validator errors: {:?}",
242        overload_stake,
243        retry_after_secs,
244        errors
245    )]
246    SystemOverloadRetryAfter {
247        overload_stake: StakeUnit,
248        errors: GroupedErrors,
249        retry_after_secs: u64,
250    },
251}
252
253#[derive(Error, Debug)]
254pub enum AggregatorProcessCertificateError {
255    #[error(
256        "Failed to execute certificate on a quorum of validators. Non-retryable errors: {:?}",
257        non_retryable_errors
258    )]
259    FatalExecuteCertificate { non_retryable_errors: GroupedErrors },
260
261    #[error(
262        "Failed to execute certificate on a quorum of validators but state is still retryable. Retryable errors: {:?}",
263        retryable_errors
264    )]
265    RetryableExecuteCertificate { retryable_errors: GroupedErrors },
266}
267
268pub fn group_errors(errors: Vec<(SuiError, Vec<AuthorityName>, StakeUnit)>) -> GroupedErrors {
269    #[allow(clippy::mutable_key_type)]
270    let mut grouped_errors = HashMap::new();
271    for (error, names, stake) in errors {
272        let entry = grouped_errors.entry(error).or_insert((0, vec![]));
273        entry.0 += stake;
274        entry.1.extend(
275            names
276                .into_iter()
277                .map(|n| n.concise_owned())
278                .collect::<Vec<_>>(),
279        );
280    }
281    grouped_errors
282        .into_iter()
283        .map(|(e, (s, n))| (e, s, n))
284        .collect()
285}
286
287#[derive(Debug, Default)]
288pub struct RetryableOverloadInfo {
289    // Total stake of validators that are overloaded and request client to retry.
290    pub total_stake: StakeUnit,
291
292    // Records requested retry duration by stakes.
293    pub stake_requested_retry_after: BTreeMap<Duration, StakeUnit>,
294}
295
296impl RetryableOverloadInfo {
297    pub fn add_stake_retryable_overload(&mut self, stake: StakeUnit, retry_after: Duration) {
298        self.total_stake += stake;
299        self.stake_requested_retry_after
300            .entry(retry_after)
301            .and_modify(|s| *s += stake)
302            .or_insert(stake);
303    }
304
305    // Gets the duration of retry requested by a quorum of validators with smallest retry durations.
306    pub fn get_quorum_retry_after(
307        &self,
308        good_stake: StakeUnit,
309        quorum_threshold: StakeUnit,
310    ) -> Duration {
311        if self.stake_requested_retry_after.is_empty() {
312            return Duration::from_secs(0);
313        }
314
315        let mut quorum_stake = good_stake;
316        for (retry_after, stake) in self.stake_requested_retry_after.iter() {
317            quorum_stake += *stake;
318            if quorum_stake >= quorum_threshold {
319                return *retry_after;
320            }
321        }
322        *self.stake_requested_retry_after.last_key_value().unwrap().0
323    }
324}
325
326#[derive(Debug)]
327struct ProcessTransactionState {
328    // The list of signatures gathered at any point
329    tx_signatures: StakeAggregator<AuthoritySignInfo, true>,
330    effects_map: MultiStakeAggregator<TransactionEffectsDigest, TransactionEffects, true>,
331    // The list of errors gathered at any point
332    errors: Vec<(SuiError, Vec<AuthorityName>, StakeUnit)>,
333    // This is exclusively non-retryable stake.
334    non_retryable_stake: StakeUnit,
335    // This includes both object and package not found sui errors.
336    object_or_package_not_found_stake: StakeUnit,
337    // Validators that are overloaded with txns pending execution.
338    overloaded_stake: StakeUnit,
339    // Validators that are overloaded and request client to retry.
340    retryable_overload_info: RetryableOverloadInfo,
341    // If there are conflicting transactions, we note them down to report to user.
342    conflicting_tx_digests:
343        BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
344    // As long as none of the exit criteria are met we consider the state retryable
345    // 1) >= 2f+1 signatures
346    // 2) >= f+1 non-retryable errors
347    // 3) >= 2f+1 object not found errors
348    retryable: bool,
349    tx_finalized_with_different_user_sig: bool,
350}
351
352impl ProcessTransactionState {
353    pub fn record_conflicting_transaction_if_any(
354        &mut self,
355        validator_name: AuthorityName,
356        weight: StakeUnit,
357        err: &SuiError,
358    ) {
359        if let SuiErrorKind::ObjectLockConflict {
360            obj_ref,
361            pending_transaction: transaction,
362        } = err.as_inner()
363        {
364            let (lock_records, total_stake) = self
365                .conflicting_tx_digests
366                .entry(*transaction)
367                .or_insert((Vec::new(), 0));
368            lock_records.push((validator_name, *obj_ref));
369            *total_stake += weight;
370        }
371    }
372
373    pub fn check_if_error_indicates_tx_finalized_with_different_user_sig(
374        &self,
375        validity_threshold: StakeUnit,
376    ) -> bool {
377        // In some edge cases, the client may send the same transaction multiple times but with different user signatures.
378        // When this happens, the "minority" tx will fail in safe_client because the certificate verification would fail
379        // and return Sui::FailedToVerifyTxCertWithExecutedEffects.
380        // Here, we check if there are f+1 validators return this error. If so, the transaction is already finalized
381        // with a different set of user signatures. It's not trivial to return the results of that successful transaction
382        // because we don't want fullnode to store the transaction with non-canonical user signatures. Given that this is
383        // very rare, we simply return an error here.
384        let invalid_sig_stake: StakeUnit = self
385            .errors
386            .iter()
387            .filter_map(|(e, _, stake)| {
388                if matches!(
389                    e.as_inner(),
390                    SuiErrorKind::FailedToVerifyTxCertWithExecutedEffects { .. }
391                ) {
392                    Some(stake)
393                } else {
394                    None
395                }
396            })
397            .sum();
398        invalid_sig_stake >= validity_threshold
399    }
400}
401
402struct ProcessCertificateState {
403    // Different authorities could return different effects.  We want at least one effect to come
404    // from 2f+1 authorities, which meets quorum and can be considered the approved effect.
405    // The map here allows us to count the stake for each unique effect.
406    effects_map:
407        MultiStakeAggregator<(EpochId, TransactionEffectsDigest), TransactionEffects, true>,
408    non_retryable_stake: StakeUnit,
409    non_retryable_errors: Vec<(SuiError, Vec<AuthorityName>, StakeUnit)>,
410    retryable_errors: Vec<(SuiError, Vec<AuthorityName>, StakeUnit)>,
411    // As long as none of the exit criteria are met we consider the state retryable
412    // 1) >= 2f+1 signatures
413    // 2) >= f+1 non-retryable errors
414    retryable: bool,
415
416    // collection of extended data returned from the validators.
417    // Not all validators will be asked to return this data so we need to hold onto it when one
418    // validator has provided it
419    events: Option<TransactionEvents>,
420    input_objects: Option<Vec<Object>>,
421    output_objects: Option<Vec<Object>>,
422    auxiliary_data: Option<Vec<u8>>,
423    request: HandleCertificateRequestV3,
424}
425
426#[derive(Debug)]
427pub enum ProcessTransactionResult {
428    Certified {
429        certificate: CertifiedTransaction,
430        /// Whether this certificate is newly created by aggregating 2f+1 signatures.
431        /// If a validator returned a cert directly, this will be false.
432        /// This is used to inform the quorum driver, which could make better decisions on telemetry
433        /// such as settlement latency.
434        newly_formed: bool,
435    },
436    Executed(VerifiedCertifiedTransactionEffects, TransactionEvents),
437}
438
439impl ProcessTransactionResult {
440    pub fn into_cert_for_testing(self) -> CertifiedTransaction {
441        match self {
442            Self::Certified { certificate, .. } => certificate,
443            Self::Executed(..) => panic!("Wrong type"),
444        }
445    }
446
447    pub fn into_effects_for_testing(self) -> VerifiedCertifiedTransactionEffects {
448        match self {
449            Self::Certified { .. } => panic!("Wrong type"),
450            Self::Executed(effects, ..) => effects,
451        }
452    }
453}
454
455#[derive(Clone)]
456pub struct AuthorityAggregator<A: Clone> {
457    /// Our Sui committee.
458    pub committee: Arc<Committee>,
459    /// For more human readable metrics reporting.
460    /// It's OK for this map to be empty or missing validators, it then defaults
461    /// to use concise validator public keys.
462    pub validator_display_names: Arc<HashMap<AuthorityName, String>>,
463    /// Reference gas price for the current epoch.
464    pub reference_gas_price: u64,
465    /// How to talk to this committee.
466    pub authority_clients: Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>>,
467    /// Metrics
468    pub metrics: Arc<AuthAggMetrics>,
469    /// Metric base for the purpose of creating new safe clients during reconfiguration.
470    pub safe_client_metrics_base: SafeClientMetricsBase,
471    pub timeouts: TimeoutConfig,
472    /// Store here for clone during re-config.
473    pub committee_store: Arc<CommitteeStore>,
474}
475
476impl<A: Clone> AuthorityAggregator<A> {
477    pub fn new(
478        committee: Committee,
479        validator_display_names: Arc<HashMap<AuthorityName, String>>,
480        reference_gas_price: u64,
481        committee_store: Arc<CommitteeStore>,
482        authority_clients: BTreeMap<AuthorityName, A>,
483        safe_client_metrics_base: SafeClientMetricsBase,
484        auth_agg_metrics: Arc<AuthAggMetrics>,
485        timeouts: TimeoutConfig,
486    ) -> Self {
487        Self {
488            committee: Arc::new(committee),
489            validator_display_names,
490            reference_gas_price,
491            authority_clients: create_safe_clients(
492                authority_clients,
493                &committee_store,
494                &safe_client_metrics_base,
495            ),
496            metrics: auth_agg_metrics,
497            safe_client_metrics_base,
498            timeouts,
499            committee_store,
500        }
501    }
502
503    pub fn get_client(&self, name: &AuthorityName) -> Option<&Arc<SafeClient<A>>> {
504        self.authority_clients.get(name)
505    }
506
507    pub fn clone_client_test_only(&self, name: &AuthorityName) -> Arc<SafeClient<A>>
508    where
509        A: Clone,
510    {
511        self.authority_clients[name].clone()
512    }
513
514    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
515        self.committee_store.clone()
516    }
517
518    pub fn clone_inner_committee_test_only(&self) -> Committee {
519        (*self.committee).clone()
520    }
521
522    pub fn clone_inner_clients_test_only(&self) -> BTreeMap<AuthorityName, SafeClient<A>> {
523        (*self.authority_clients)
524            .clone()
525            .into_iter()
526            .map(|(k, v)| (k, (*v).clone()))
527            .collect()
528    }
529
530    pub fn get_display_name(&self, name: &AuthorityName) -> String {
531        self.validator_display_names
532            .get(name)
533            .cloned()
534            .unwrap_or_else(|| name.concise().to_string())
535    }
536}
537
538fn create_safe_clients<A: Clone>(
539    authority_clients: BTreeMap<AuthorityName, A>,
540    committee_store: &Arc<CommitteeStore>,
541    safe_client_metrics_base: &SafeClientMetricsBase,
542) -> Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>> {
543    Arc::new(
544        authority_clients
545            .into_iter()
546            .map(|(name, api)| {
547                (
548                    name,
549                    Arc::new(SafeClient::new(
550                        api,
551                        committee_store.clone(),
552                        name,
553                        SafeClientMetrics::new(safe_client_metrics_base, name),
554                    )),
555                )
556            })
557            .collect(),
558    )
559}
560
561impl AuthorityAggregator<NetworkAuthorityClient> {
562    /// Create a new network authority aggregator by reading the committee and network addresses
563    /// information from the given epoch start system state.
564    pub fn new_from_epoch_start_state(
565        epoch_start_state: &EpochStartSystemState,
566        committee_store: &Arc<CommitteeStore>,
567        safe_client_metrics_base: SafeClientMetricsBase,
568        auth_agg_metrics: Arc<AuthAggMetrics>,
569    ) -> Self {
570        let committee = epoch_start_state.get_sui_committee_with_network_metadata();
571        let validator_display_names = epoch_start_state.get_authority_names_to_hostnames();
572        Self::new_from_committee(
573            committee,
574            Arc::new(validator_display_names),
575            epoch_start_state.reference_gas_price(),
576            committee_store,
577            safe_client_metrics_base,
578            auth_agg_metrics,
579        )
580    }
581
582    /// Create a new AuthorityAggregator using information from the given epoch start system state.
583    /// This is typically used during reconfiguration to create a new AuthorityAggregator with the
584    /// new committee and network addresses.
585    pub fn recreate_with_new_epoch_start_state(
586        &self,
587        epoch_start_state: &EpochStartSystemState,
588    ) -> Self {
589        Self::new_from_epoch_start_state(
590            epoch_start_state,
591            &self.committee_store,
592            self.safe_client_metrics_base.clone(),
593            self.metrics.clone(),
594        )
595    }
596
597    pub fn new_from_committee(
598        committee: CommitteeWithNetworkMetadata,
599        validator_display_names: Arc<HashMap<AuthorityName, String>>,
600        reference_gas_price: u64,
601        committee_store: &Arc<CommitteeStore>,
602        safe_client_metrics_base: SafeClientMetricsBase,
603        auth_agg_metrics: Arc<AuthAggMetrics>,
604    ) -> Self {
605        let net_config = default_mysten_network_config();
606        let authority_clients =
607            make_network_authority_clients_with_network_config(&committee, &net_config);
608        Self::new(
609            committee.committee().clone(),
610            validator_display_names,
611            reference_gas_price,
612            committee_store.clone(),
613            authority_clients,
614            safe_client_metrics_base,
615            auth_agg_metrics,
616            Default::default(),
617        )
618    }
619}
620
621impl<A> AuthorityAggregator<A>
622where
623    A: AuthorityAPI + Send + Sync + 'static + Clone,
624{
625    /// Query the object with highest version number from the authorities.
626    /// We stop after receiving responses from 2f+1 validators.
627    /// This function is untrusted because we simply assume each response is valid and there are no
628    /// byzantine validators.
629    /// Because of this, this function should only be used for testing or benchmarking.
630    pub async fn get_latest_object_version_for_testing(
631        &self,
632        object_id: ObjectID,
633    ) -> SuiResult<Object> {
634        #[derive(Debug, Default)]
635        struct State {
636            latest_object_version: Option<Object>,
637            total_weight: StakeUnit,
638        }
639        let initial_state = State::default();
640        let result = quorum_map_then_reduce_with_timeout(
641                self.committee.clone(),
642                self.authority_clients.clone(),
643                initial_state,
644                |_name, client| {
645                    Box::pin(async move {
646                        let request =
647                            ObjectInfoRequest::latest_object_info_request(object_id, /* generate_layout */ LayoutGenerationOption::None);
648                        let mut retry_count = 0;
649                        loop {
650                            match client.handle_object_info_request(request.clone()).await {
651                                Ok(object_info) => return Ok(object_info),
652                                Err(err) => {
653                                    retry_count += 1;
654                                    if retry_count > 3 {
655                                        return Err(err);
656                                    }
657                                    tokio::time::sleep(Duration::from_secs(1)).await;
658                                }
659                            }
660                        }
661                    })
662                },
663                |mut state, name, weight, result| {
664                    Box::pin(async move {
665                        state.total_weight += weight;
666                        match result {
667                            Ok(object_info) => {
668                                debug!("Received object info response from validator {:?} with version: {:?}", name.concise(), object_info.object.version());
669                                if state.latest_object_version.as_ref().is_none_or(|latest| {
670                                    object_info.object.version() > latest.version()
671                                }) {
672                                    state.latest_object_version = Some(object_info.object);
673                                }
674                            }
675                            Err(err) => {
676                                debug!("Received error from validator {:?}: {:?}", name.concise(), err);
677                            }
678                        };
679                        if state.total_weight >= self.committee.quorum_threshold() {
680                            if let Some(object) = state.latest_object_version {
681                                return ReduceOutput::Success(object);
682                            } else {
683                                return ReduceOutput::Failed(state);
684                            }
685                        }
686                        ReduceOutput::Continue(state)
687                    })
688                },
689                // A long timeout before we hear back from a quorum
690                self.timeouts.pre_quorum_timeout,
691            )
692            .await.map_err(|_state| SuiError::from(UserInputError::ObjectNotFound {
693                object_id,
694                version: None,
695            }))?;
696        Ok(result.0)
697    }
698
699    /// Get the latest system state object from the authorities.
700    /// This function assumes all validators are honest.
701    /// It should only be used for testing or benchmarking.
702    pub async fn get_latest_system_state_object_for_testing(
703        &self,
704    ) -> anyhow::Result<SuiSystemState> {
705        #[derive(Debug, Default)]
706        struct State {
707            latest_system_state: Option<SuiSystemState>,
708            total_weight: StakeUnit,
709        }
710        let initial_state = State::default();
711        let result = quorum_map_then_reduce_with_timeout(
712            self.committee.clone(),
713            self.authority_clients.clone(),
714            initial_state,
715            |_name, client| Box::pin(async move { client.handle_system_state_object().await }),
716            |mut state, name, weight, result| {
717                Box::pin(async move {
718                    state.total_weight += weight;
719                    match result {
720                        Ok(system_state) => {
721                            debug!(
722                                "Received system state object from validator {:?} with epoch: {:?}",
723                                name.concise(),
724                                system_state.epoch()
725                            );
726                            if state
727                                .latest_system_state
728                                .as_ref()
729                                .is_none_or(|latest| system_state.epoch() > latest.epoch())
730                            {
731                                state.latest_system_state = Some(system_state);
732                            }
733                        }
734                        Err(err) => {
735                            debug!(
736                                "Received error from validator {:?}: {:?}",
737                                name.concise(),
738                                err
739                            );
740                        }
741                    };
742                    if state.total_weight >= self.committee.quorum_threshold() {
743                        if let Some(system_state) = state.latest_system_state {
744                            return ReduceOutput::Success(system_state);
745                        } else {
746                            return ReduceOutput::Failed(state);
747                        }
748                    }
749                    ReduceOutput::Continue(state)
750                })
751            },
752            // A long timeout before we hear back from a quorum
753            self.timeouts.pre_quorum_timeout,
754        )
755        .await
756        .map_err(|_| anyhow::anyhow!("Failed to get latest system state from the authorities"))?;
757        Ok(result.0)
758    }
759
760    /// Submits the transaction to a quorum of validators to make a certificate.
761    #[instrument(level = "trace", skip_all)]
762    pub async fn process_transaction(
763        &self,
764        transaction: Transaction,
765        client_addr: Option<SocketAddr>,
766    ) -> Result<ProcessTransactionResult, AggregatorProcessTransactionError> {
767        // Now broadcast the transaction to all authorities.
768        let tx_digest = transaction.digest();
769        debug!(
770            tx_digest = ?tx_digest,
771            "Broadcasting transaction request to authorities"
772        );
773        trace!(
774            "Transaction data: {:?}",
775            transaction.data().intent_message().value
776        );
777        let committee = self.committee.clone();
778        let state = ProcessTransactionState {
779            tx_signatures: StakeAggregator::new(committee.clone()),
780            effects_map: MultiStakeAggregator::new(committee.clone()),
781            errors: vec![],
782            object_or_package_not_found_stake: 0,
783            non_retryable_stake: 0,
784            overloaded_stake: 0,
785            retryable_overload_info: Default::default(),
786            retryable: true,
787            conflicting_tx_digests: Default::default(),
788            tx_finalized_with_different_user_sig: false,
789        };
790
791        let transaction_ref = &transaction;
792        let validity_threshold = committee.validity_threshold();
793        let quorum_threshold = committee.quorum_threshold();
794        let validator_display_names = self.validator_display_names.clone();
795        let result = quorum_map_then_reduce_with_timeout(
796                committee.clone(),
797                self.authority_clients.clone(),
798                state,
799                |name, client| {
800                    Box::pin(
801                        async move {
802                            let _guard = GaugeGuard::acquire(&self.metrics.inflight_transaction_requests);
803                            let concise_name = name.concise_owned();
804                            client.handle_transaction(transaction_ref.clone(), client_addr)
805                                .monitor_cancellation()
806                                .instrument(trace_span!("handle_transaction", cancelled = false, authority =? concise_name))
807                                .await
808                        },
809                    )
810                },
811                |mut state, name, weight, response| {
812                    let display_name = validator_display_names.get(&name).unwrap_or(&name.concise().to_string()).clone();
813                    Box::pin(async move {
814                        match self.handle_process_transaction_response(
815                            tx_digest, &mut state, response, name, weight,
816                        ) {
817                            Ok(Some(result)) => {
818                                self.record_process_transaction_metrics(tx_digest, &state);
819                                return ReduceOutput::Success(result);
820                            }
821                            Ok(None) => {},
822                            Err(err) => {
823                                let concise_name = name.concise();
824                                debug!(?tx_digest, name=?concise_name, weight, "Error processing transaction from validator: {:?}", err);
825                                self.metrics
826                                    .process_tx_errors
827                                    .with_label_values(&[&display_name, err.as_ref()])
828                                    .inc();
829                                Self::record_rpc_error_maybe(self.metrics.clone(), &display_name, &err);
830                                // Record conflicting transactions if any to report to user.
831                                state.record_conflicting_transaction_if_any(name, weight, &err);
832                                let (retryable, categorized) = err.is_retryable();
833                                if !categorized {
834                                    // TODO: Should minimize possible uncategorized errors here
835                                    // use ERROR for now to make them easier to spot.
836                                    error!(?tx_digest, "uncategorized tx error: {err}");
837                                }
838                                if err.is_object_or_package_not_found() {
839                                    // Special case for object not found because we can
840                                    // retry if we have < 2f+1 object not found errors.
841                                    // However once we reach >= 2f+1 object not found errors
842                                    // we cannot retry.
843                                    state.object_or_package_not_found_stake += weight;
844                                }
845                                else if err.is_overload() {
846                                    // Special case for validator overload too. Once we have >= 2f + 1
847                                    // overloaded validators we consider the system overloaded so we exit
848                                    // and notify the user.
849                                    // Note that currently, this overload account for
850                                    //   - per object queue overload
851                                    //   - consensus overload
852                                    state.overloaded_stake += weight;
853                                }
854                                else if err.is_retryable_overload() {
855                                    // Different from above overload error, retryable overload targets authority overload (entire
856                                    // authority server is overload). In this case, the retry behavior is different from
857                                    // above that we may perform continuous retry due to that objects may have been locked
858                                    // in the validator.
859                                    //
860                                    // TODO: currently retryable overload and above overload error look redundant. We want to have a unified
861                                    // code path to handle both overload scenarios.
862                                    state.retryable_overload_info.add_stake_retryable_overload(weight, Duration::from_secs(err.retry_after_secs()));
863                                }
864                                else if !retryable {
865                                    state.non_retryable_stake += weight;
866                                }
867                                state.errors.push((err, vec![name], weight));
868
869                            }
870                        };
871
872                        let retryable_stake = self.get_retryable_stake(&state);
873                        let good_stake = std::cmp::max(state.tx_signatures.total_votes(), state.effects_map.total_votes());
874                        if good_stake + retryable_stake < quorum_threshold {
875                            debug!(
876                                tx_digest = ?tx_digest,
877                                good_stake,
878                                retryable_stake,
879                                "No chance for any tx to get quorum, exiting. Conflicting_txes: {:?}",
880                                state.conflicting_tx_digests
881                            );
882                            // If there is no chance for any tx to get quorum, exit.
883                            state.retryable = false;
884                            return ReduceOutput::Failed(state);
885                        }
886
887                        // TODO: add more comments to explain each condition.
888                        let object_or_package_not_found_condition = state.object_or_package_not_found_stake >= quorum_threshold && std::env::var("NOT_RETRY_OBJECT_PACKAGE_NOT_FOUND").is_ok();
889                        if state.non_retryable_stake >= validity_threshold
890                            || object_or_package_not_found_condition // In normal case, object/package not found should be more than f+1
891                            || state.overloaded_stake >= quorum_threshold {
892                            // We have hit an exit condition, f+1 non-retryable err or 2f+1 object not found or overload,
893                            // so we no longer consider the transaction state as retryable.
894                            state.retryable = false;
895                            ReduceOutput::Failed(state)
896                        } else {
897                            ReduceOutput::Continue(state)
898                        }
899                    })
900                },
901                // A long timeout before we hear back from a quorum
902                self.timeouts.pre_quorum_timeout,
903            )
904            .await;
905
906        match result {
907            Ok((result, _)) => Ok(result),
908            Err(state) => {
909                self.record_process_transaction_metrics(tx_digest, &state);
910                let state = self.record_non_quorum_effects_maybe(tx_digest, state);
911                Err(self.handle_process_transaction_error(state))
912            }
913        }
914    }
915
916    fn record_rpc_error_maybe(metrics: Arc<AuthAggMetrics>, display_name: &str, error: &SuiError) {
917        if let SuiErrorKind::RpcError(_message, code) = error.as_inner() {
918            metrics
919                .total_rpc_err
920                .with_label_values(&[display_name, code.as_str()])
921                .inc();
922        }
923    }
924
925    fn handle_process_transaction_error(
926        &self,
927        state: ProcessTransactionState,
928    ) -> AggregatorProcessTransactionError {
929        let quorum_threshold = self.committee.quorum_threshold();
930
931        // Return system overload error if we see >= 2f + 1 overloaded stake.
932        if state.overloaded_stake >= quorum_threshold {
933            return AggregatorProcessTransactionError::SystemOverload {
934                overloaded_stake: state.overloaded_stake,
935                errors: group_errors(state.errors),
936            };
937        }
938
939        if !state.retryable {
940            if state.tx_finalized_with_different_user_sig
941                || state.check_if_error_indicates_tx_finalized_with_different_user_sig(
942                    self.committee.validity_threshold(),
943                )
944            {
945                return AggregatorProcessTransactionError::TxAlreadyFinalizedWithDifferentUserSignatures;
946            }
947
948            // Handle conflicts first as `FatalConflictingTransaction` which is
949            // more meaningful than `FatalTransaction`
950            if !state.conflicting_tx_digests.is_empty() {
951                let good_stake = state.tx_signatures.total_votes();
952                warn!(
953                    ?state.conflicting_tx_digests,
954                    original_tx_stake = good_stake,
955                    "Client double spend attempt detected!",
956                );
957                self.metrics
958                    .total_client_double_spend_attempts_detected
959                    .inc();
960                return AggregatorProcessTransactionError::FatalConflictingTransaction {
961                    errors: group_errors(state.errors),
962                    conflicting_tx_digests: state.conflicting_tx_digests,
963                };
964            }
965
966            return AggregatorProcessTransactionError::FatalTransaction {
967                errors: group_errors(state.errors),
968            };
969        }
970
971        // When state is in a retryable state and process transaction was not successful, it indicates that
972        // we have heard from *all* validators. Check if any SystemOverloadRetryAfter error caused the txn
973        // to fail. If so, return explicit SystemOverloadRetryAfter error for continuous retry (since objects
974        // are locked in validators). If not, retry regular RetryableTransaction error.
975        if state.tx_signatures.total_votes() + state.retryable_overload_info.total_stake
976            >= quorum_threshold
977        {
978            let retry_after_secs = state
979                .retryable_overload_info
980                .get_quorum_retry_after(state.tx_signatures.total_votes(), quorum_threshold)
981                .as_secs();
982            return AggregatorProcessTransactionError::SystemOverloadRetryAfter {
983                overload_stake: state.retryable_overload_info.total_stake,
984                errors: group_errors(state.errors),
985                retry_after_secs,
986            };
987        }
988
989        // The system is not overloaded and transaction state is still retryable.
990        AggregatorProcessTransactionError::RetryableTransaction {
991            errors: group_errors(state.errors),
992        }
993    }
994
995    fn record_process_transaction_metrics(
996        &self,
997        tx_digest: &TransactionDigest,
998        state: &ProcessTransactionState,
999    ) {
1000        let num_signatures = state.tx_signatures.validator_sig_count();
1001        let good_stake = state.tx_signatures.total_votes();
1002        debug!(
1003            ?tx_digest,
1004            num_errors = state.errors.iter().map(|e| e.1.len()).sum::<usize>(),
1005            num_unique_errors = state.errors.len(),
1006            ?good_stake,
1007            non_retryable_stake = state.non_retryable_stake,
1008            ?num_signatures,
1009            "Received signatures response from validators handle_transaction"
1010        );
1011        if !state.errors.is_empty() {
1012            debug!(?tx_digest, "Errors received: {:?}", state.errors);
1013        }
1014    }
1015
1016    fn handle_process_transaction_response(
1017        &self,
1018        tx_digest: &TransactionDigest,
1019        state: &mut ProcessTransactionState,
1020        response: SuiResult<PlainTransactionInfoResponse>,
1021        name: AuthorityName,
1022        weight: StakeUnit,
1023    ) -> SuiResult<Option<ProcessTransactionResult>> {
1024        match response {
1025            Ok(PlainTransactionInfoResponse::Signed(signed)) => {
1026                debug!(?tx_digest, name=?name.concise(), weight, "Received signed transaction from validator handle_transaction");
1027                self.handle_transaction_response_with_signed(state, signed)
1028            }
1029            Ok(PlainTransactionInfoResponse::ExecutedWithCert(cert, effects, events)) => {
1030                debug!(?tx_digest, name=?name.concise(), weight, "Received prev certificate and effects from validator handle_transaction");
1031                self.handle_transaction_response_with_executed(state, Some(cert), effects, events)
1032            }
1033            Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(_, effects, events)) => {
1034                debug!(?tx_digest, name=?name.concise(), weight, "Received prev effects from validator handle_transaction");
1035                self.handle_transaction_response_with_executed(state, None, effects, events)
1036            }
1037            Err(err) => Err(err),
1038        }
1039    }
1040
1041    fn handle_transaction_response_with_signed(
1042        &self,
1043        state: &mut ProcessTransactionState,
1044        plain_tx: SignedTransaction,
1045    ) -> SuiResult<Option<ProcessTransactionResult>> {
1046        match state.tx_signatures.insert(plain_tx.clone()) {
1047            InsertResult::NotEnoughVotes {
1048                bad_votes,
1049                bad_authorities,
1050            } => {
1051                state.non_retryable_stake += bad_votes;
1052                if bad_votes > 0 {
1053                    state.errors.push((
1054                        SuiErrorKind::InvalidSignature {
1055                            error: "Individual signature verification failed".to_string(),
1056                        }
1057                        .into(),
1058                        bad_authorities,
1059                        bad_votes,
1060                    ));
1061                }
1062                Ok(None)
1063            }
1064            InsertResult::Failed { error } => Err(error),
1065            InsertResult::QuorumReached(cert_sig) => {
1066                let certificate =
1067                    CertifiedTransaction::new_from_data_and_sig(plain_tx.into_data(), cert_sig);
1068                certificate.verify_committee_sigs_only(&self.committee)?;
1069                Ok(Some(ProcessTransactionResult::Certified {
1070                    certificate,
1071                    newly_formed: true,
1072                }))
1073            }
1074        }
1075    }
1076
1077    fn handle_transaction_response_with_executed(
1078        &self,
1079        state: &mut ProcessTransactionState,
1080        certificate: Option<CertifiedTransaction>,
1081        plain_tx_effects: SignedTransactionEffects,
1082        events: TransactionEvents,
1083    ) -> SuiResult<Option<ProcessTransactionResult>> {
1084        match certificate {
1085            Some(certificate) if certificate.epoch() == self.committee.epoch => {
1086                // If we get a certificate in the same epoch, then we use it.
1087                // A certificate in a past epoch does not guarantee finality
1088                // and validators may reject to process it.
1089                Ok(Some(ProcessTransactionResult::Certified {
1090                    certificate,
1091                    newly_formed: false,
1092                }))
1093            }
1094            _ => {
1095                // If we get 2f+1 effects, it's a proof that the transaction
1096                // has already been finalized. This works because validators would re-sign effects for transactions
1097                // that were finalized in previous epochs.
1098                let digest = plain_tx_effects.data().digest();
1099                match state.effects_map.insert(digest, plain_tx_effects.clone()) {
1100                    InsertResult::NotEnoughVotes {
1101                        bad_votes,
1102                        bad_authorities,
1103                    } => {
1104                        state.non_retryable_stake += bad_votes;
1105                        if bad_votes > 0 {
1106                            state.errors.push((
1107                                SuiErrorKind::InvalidSignature {
1108                                    error: "Individual signature verification failed".to_string(),
1109                                }
1110                                .into(),
1111                                bad_authorities,
1112                                bad_votes,
1113                            ));
1114                        }
1115                        Ok(None)
1116                    }
1117                    InsertResult::Failed { error } => Err(error),
1118                    InsertResult::QuorumReached(cert_sig) => {
1119                        let ct = CertifiedTransactionEffects::new_from_data_and_sig(
1120                            plain_tx_effects.into_data(),
1121                            cert_sig,
1122                        );
1123                        Ok(Some(ProcessTransactionResult::Executed(
1124                            ct.verify(&self.committee)?,
1125                            events,
1126                        )))
1127                    }
1128                }
1129            }
1130        }
1131    }
1132
1133    /// Check if we have some signed TransactionEffects but not a quorum
1134    fn record_non_quorum_effects_maybe(
1135        &self,
1136        tx_digest: &TransactionDigest,
1137        mut state: ProcessTransactionState,
1138    ) -> ProcessTransactionState {
1139        if state.effects_map.unique_key_count() > 0 {
1140            let non_quorum_effects = state.effects_map.get_all_unique_values();
1141            warn!(
1142                ?tx_digest,
1143                "Received signed Effects but not with a quorum {:?}", non_quorum_effects
1144            );
1145
1146            // Safe to unwrap because we know that there is at least one entry in the map
1147            // from the check above.
1148            let (_most_staked_effects_digest, (_, most_staked_effects_digest_stake)) =
1149                non_quorum_effects
1150                    .iter()
1151                    .max_by_key(|&(_, (_, stake))| stake)
1152                    .unwrap();
1153            // We check if we have enough retryable stake to get quorum for the most staked
1154            // effects digest, otherwise it indicates we have violated safety assumptions
1155            // or we have forked.
1156            if most_staked_effects_digest_stake + self.get_retryable_stake(&state)
1157                < self.committee.quorum_threshold()
1158            {
1159                state.retryable = false;
1160                if state.check_if_error_indicates_tx_finalized_with_different_user_sig(
1161                    self.committee.validity_threshold(),
1162                ) {
1163                    state.tx_finalized_with_different_user_sig = true;
1164                } else {
1165                    // TODO: Figure out a more reliable way to detect invariance violations.
1166                    error!(
1167                        "We have seen signed effects but unable to reach quorum threshold even including retriable stakes. This is very rare. Tx: {tx_digest:?}. Non-quorum effects: {non_quorum_effects:?}."
1168                    );
1169                }
1170            }
1171
1172            let mut involved_validators = Vec::new();
1173            let mut total_stake = 0;
1174            for (validators, stake) in non_quorum_effects.values() {
1175                involved_validators.extend_from_slice(validators);
1176                total_stake += stake;
1177            }
1178            // TODO: Instead of pushing a new error, we should add more information about the non-quorum effects
1179            // in the final error if state is no longer retryable
1180            state.errors.push((
1181                SuiErrorKind::QuorumFailedToGetEffectsQuorumWhenProcessingTransaction {
1182                    effects_map: non_quorum_effects,
1183                }
1184                .into(),
1185                involved_validators,
1186                total_stake,
1187            ));
1188        }
1189        state
1190    }
1191
1192    fn get_retryable_stake(&self, state: &ProcessTransactionState) -> StakeUnit {
1193        self.committee.total_votes()
1194            - state.non_retryable_stake
1195            - state.effects_map.total_votes()
1196            - state.tx_signatures.total_votes()
1197    }
1198
1199    #[instrument(level = "trace", skip_all)]
1200    pub async fn process_certificate(
1201        &self,
1202        request: HandleCertificateRequestV3,
1203        client_addr: Option<SocketAddr>,
1204    ) -> Result<QuorumDriverResponse, AggregatorProcessCertificateError> {
1205        let state = ProcessCertificateState {
1206            effects_map: MultiStakeAggregator::new(self.committee.clone()),
1207            non_retryable_stake: 0,
1208            non_retryable_errors: vec![],
1209            retryable_errors: vec![],
1210            retryable: true,
1211            events: None,
1212            input_objects: None,
1213            output_objects: None,
1214            auxiliary_data: None,
1215            request: request.clone(),
1216        };
1217
1218        // create a set of validators that we should sample to request input/output objects from
1219        let validators_to_sample =
1220            if request.include_input_objects || request.include_output_objects {
1221                // Number of validators to request input/output objects from
1222                const NUMBER_TO_SAMPLE: usize = 10;
1223
1224                self.committee
1225                    .choose_multiple_weighted_iter(NUMBER_TO_SAMPLE)
1226                    .cloned()
1227                    .collect()
1228            } else {
1229                HashSet::new()
1230            };
1231
1232        let tx_digest = *request.certificate.digest();
1233        let timeout_after_quorum = self.timeouts.post_quorum_timeout;
1234
1235        let request_ref = request;
1236        let threshold = self.committee.quorum_threshold();
1237        let validity = self.committee.validity_threshold();
1238
1239        debug!(
1240            ?tx_digest,
1241            quorum_threshold = threshold,
1242            validity_threshold = validity,
1243            ?timeout_after_quorum,
1244            "Broadcasting certificate to authorities"
1245        );
1246        let committee: Arc<Committee> = self.committee.clone();
1247        let authority_clients = self.authority_clients.clone();
1248        let metrics = self.metrics.clone();
1249        let metrics_clone = metrics.clone();
1250        let validator_display_names = self.validator_display_names.clone();
1251        let (result, mut remaining_tasks) = quorum_map_then_reduce_with_timeout(
1252            committee.clone(),
1253            authority_clients.clone(),
1254            state,
1255            move |name, client| {
1256                Box::pin(async move {
1257                    let _guard = GaugeGuard::acquire(&metrics_clone.inflight_certificate_requests);
1258                    let concise_name = name.concise_owned();
1259                    if request_ref.include_input_objects || request_ref.include_output_objects {
1260
1261                        // adjust the request to validators we aren't planning on sampling
1262                        let req = if validators_to_sample.contains(&name) {
1263                            request_ref
1264                        } else {
1265                            HandleCertificateRequestV3 {
1266                                include_input_objects: false,
1267                                include_output_objects: false,
1268                                include_auxiliary_data: false,
1269                                ..request_ref
1270                            }
1271                        };
1272
1273                        client
1274                            .handle_certificate_v3(req, client_addr)
1275                            .instrument(trace_span!("handle_certificate_v3", authority =? concise_name))
1276                            .await
1277                    } else {
1278                        client
1279                            .handle_certificate_v2(request_ref.certificate, client_addr)
1280                            .instrument(trace_span!("handle_certificate_v2", authority =? concise_name))
1281                            .await
1282                            .map(|response| HandleCertificateResponseV3 {
1283                                effects: response.signed_effects,
1284                                events: Some(response.events),
1285                                input_objects: None,
1286                                output_objects: None,
1287                                auxiliary_data: None,
1288                            })
1289                    }
1290                })
1291            },
1292            move |mut state, name, weight, response| {
1293                let committee_clone = committee.clone();
1294                let metrics = metrics.clone();
1295                let display_name = validator_display_names.get(&name).unwrap_or(&name.concise().to_string()).clone();
1296                Box::pin(async move {
1297                    // We aggregate the effects response, until we have more than 2f
1298                    // and return.
1299                    match AuthorityAggregator::<A>::handle_process_certificate_response(
1300                        committee_clone,
1301                        &metrics,
1302                        &tx_digest, &mut state, response, name)
1303                    {
1304                        Ok(Some(effects)) => ReduceOutput::Success(effects),
1305                        Ok(None) => {
1306                            // When the result is none, it is possible that the
1307                            // non_retryable_stake had been incremented due to
1308                            // failed individual signature verification.
1309                            if state.non_retryable_stake >= validity {
1310                                state.retryable = false;
1311                                ReduceOutput::Failed(state)
1312                            } else {
1313                                ReduceOutput::Continue(state)
1314                            }
1315                        },
1316                        Err(err) => {
1317                            let concise_name = name.concise();
1318                            debug!(?tx_digest, name=?concise_name, "Error processing certificate from validator: {:?}", err);
1319                            metrics
1320                                .process_cert_errors
1321                                .with_label_values(&[&display_name, err.as_ref()])
1322                                .inc();
1323                            Self::record_rpc_error_maybe(metrics, &display_name, &err);
1324                            let (retryable, categorized) = err.is_retryable();
1325                            if !categorized {
1326                                // TODO: Should minimize possible uncategorized errors here
1327                                // use ERROR for now to make them easier to spot.
1328                                error!(?tx_digest, "[WATCHOUT] uncategorized tx error: {err}");
1329                            }
1330                            if !retryable {
1331                                state.non_retryable_stake += weight;
1332                                state.non_retryable_errors.push((err, vec![name], weight));
1333                            } else {
1334                                state.retryable_errors.push((err, vec![name], weight));
1335                            }
1336                            if state.non_retryable_stake >= validity {
1337                                state.retryable = false;
1338                                ReduceOutput::Failed(state)
1339                            } else {
1340                                ReduceOutput::Continue(state)
1341                            }
1342                        }
1343                    }
1344                })
1345            },
1346            // A long timeout before we hear back from a quorum
1347            self.timeouts.pre_quorum_timeout,
1348        )
1349        .await
1350        .map_err(|state| {
1351            debug!(
1352                ?tx_digest,
1353                num_unique_effects = state.effects_map.unique_key_count(),
1354                non_retryable_stake = state.non_retryable_stake,
1355                "Received effects responses from validators"
1356            );
1357
1358            // record errors and tx retryable state
1359            for (sui_err, _, _) in state.retryable_errors.iter().chain(state.non_retryable_errors.iter()) {
1360                self
1361                    .metrics
1362                    .total_aggregated_err
1363                    .with_label_values(&[
1364                        sui_err.as_ref(),
1365                        if state.retryable {
1366                            "recoverable"
1367                        } else {
1368                            "non-recoverable"
1369                        },
1370                    ])
1371                    .inc();
1372            }
1373            if state.retryable {
1374                AggregatorProcessCertificateError::RetryableExecuteCertificate {
1375                    retryable_errors: group_errors(state.retryable_errors),
1376                }
1377            } else {
1378                AggregatorProcessCertificateError::FatalExecuteCertificate {
1379                    non_retryable_errors: group_errors(state.non_retryable_errors),
1380                }
1381            }
1382        })?;
1383
1384        let metrics = self.metrics.clone();
1385        metrics
1386            .remaining_tasks_when_reaching_cert_quorum
1387            .observe(remaining_tasks.len() as f64);
1388        if !remaining_tasks.is_empty() {
1389            // Use best efforts to send the cert to remaining validators.
1390            spawn_monitored_task!(async move {
1391                let mut timeout = Box::pin(sleep(timeout_after_quorum));
1392                loop {
1393                    tokio::select! {
1394                        _ = &mut timeout => {
1395                            debug!(?tx_digest, "Timed out in post quorum cert broadcasting: {:?}. Remaining tasks: {:?}", timeout_after_quorum, remaining_tasks.len());
1396                            metrics.cert_broadcasting_post_quorum_timeout.inc();
1397                            metrics.remaining_tasks_when_cert_broadcasting_post_quorum_timeout.observe(remaining_tasks.len() as f64);
1398                            break;
1399                        }
1400                        res = remaining_tasks.next() => {
1401                            if res.is_none() {
1402                                break;
1403                            }
1404                        }
1405                    }
1406                }
1407            });
1408        }
1409        Ok(result)
1410    }
1411
1412    fn handle_process_certificate_response(
1413        committee: Arc<Committee>,
1414        metrics: &AuthAggMetrics,
1415        tx_digest: &TransactionDigest,
1416        state: &mut ProcessCertificateState,
1417        response: SuiResult<HandleCertificateResponseV3>,
1418        name: AuthorityName,
1419    ) -> SuiResult<Option<QuorumDriverResponse>> {
1420        match response {
1421            Ok(HandleCertificateResponseV3 {
1422                effects: signed_effects,
1423                events,
1424                input_objects,
1425                output_objects,
1426                auxiliary_data,
1427            }) => {
1428                debug!(
1429                    ?tx_digest,
1430                    name = ?name.concise(),
1431                    "Validator handled certificate successfully",
1432                );
1433
1434                if events.is_some() && state.events.is_none() {
1435                    state.events = events;
1436                }
1437
1438                if input_objects.is_some() && state.input_objects.is_none() {
1439                    state.input_objects = input_objects;
1440                }
1441
1442                if output_objects.is_some() && state.output_objects.is_none() {
1443                    state.output_objects = output_objects;
1444                }
1445
1446                if auxiliary_data.is_some() && state.auxiliary_data.is_none() {
1447                    state.auxiliary_data = auxiliary_data;
1448                }
1449
1450                let effects_digest = *signed_effects.digest();
1451                // Note: here we aggregate votes by the hash of the effects structure
1452                match state.effects_map.insert(
1453                    (signed_effects.epoch(), effects_digest),
1454                    signed_effects.clone(),
1455                ) {
1456                    InsertResult::NotEnoughVotes {
1457                        bad_votes,
1458                        bad_authorities,
1459                    } => {
1460                        state.non_retryable_stake += bad_votes;
1461                        if bad_votes > 0 {
1462                            state.non_retryable_errors.push((
1463                                SuiErrorKind::InvalidSignature {
1464                                    error: "Individual signature verification failed".to_string(),
1465                                }
1466                                .into(),
1467                                bad_authorities,
1468                                bad_votes,
1469                            ));
1470                        }
1471                        Ok(None)
1472                    }
1473                    InsertResult::Failed { error } => Err(error),
1474                    InsertResult::QuorumReached(cert_sig) => {
1475                        let ct = CertifiedTransactionEffects::new_from_data_and_sig(
1476                            signed_effects.into_data(),
1477                            cert_sig,
1478                        );
1479
1480                        if (state.request.include_input_objects && state.input_objects.is_none())
1481                            || (state.request.include_output_objects
1482                                && state.output_objects.is_none())
1483                        {
1484                            metrics.quorum_reached_without_requested_objects.inc();
1485                            debug!(
1486                                ?tx_digest,
1487                                "Quorum Reached but requested input/output objects were not returned"
1488                            );
1489                        }
1490
1491                        ct.verify(&committee).map(|ct| {
1492                            debug!(?tx_digest, "Got quorum for validators handle_certificate.");
1493                            Some(QuorumDriverResponse {
1494                                effects_cert: ct,
1495                                events: state.events.take(),
1496                                input_objects: state.input_objects.take(),
1497                                output_objects: state.output_objects.take(),
1498                                auxiliary_data: state.auxiliary_data.take(),
1499                            })
1500                        })
1501                    }
1502                }
1503            }
1504            Err(err) => Err(err),
1505        }
1506    }
1507
1508    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1509    pub async fn execute_transaction_block(
1510        &self,
1511        transaction: &Transaction,
1512        client_addr: Option<SocketAddr>,
1513    ) -> Result<VerifiedCertifiedTransactionEffects, anyhow::Error> {
1514        let tx_guard = GaugeGuard::acquire(&self.metrics.inflight_transactions);
1515        let result = self
1516            .process_transaction(transaction.clone(), client_addr)
1517            .await?;
1518        let cert = match result {
1519            ProcessTransactionResult::Certified { certificate, .. } => certificate,
1520            ProcessTransactionResult::Executed(effects, _) => {
1521                return Ok(effects);
1522            }
1523        };
1524        self.metrics.total_tx_certificates_created.inc();
1525        drop(tx_guard);
1526
1527        let _cert_guard = GaugeGuard::acquire(&self.metrics.inflight_certificates);
1528        let response = self
1529            .process_certificate(
1530                HandleCertificateRequestV3 {
1531                    certificate: cert.clone(),
1532                    include_events: true,
1533                    include_input_objects: false,
1534                    include_output_objects: false,
1535                    include_auxiliary_data: false,
1536                },
1537                client_addr,
1538            )
1539            .await?;
1540
1541        Ok(response.effects_cert)
1542    }
1543}
1544
1545#[derive(Default)]
1546pub struct AuthorityAggregatorBuilder<'a> {
1547    network_config: Option<&'a NetworkConfig>,
1548    genesis: Option<&'a Genesis>,
1549    committee: Option<Committee>,
1550    reference_gas_price: Option<u64>,
1551    committee_store: Option<Arc<CommitteeStore>>,
1552    registry: Option<&'a Registry>,
1553    timeouts_config: Option<TimeoutConfig>,
1554}
1555
1556impl<'a> AuthorityAggregatorBuilder<'a> {
1557    pub fn from_network_config(config: &'a NetworkConfig) -> Self {
1558        Self {
1559            network_config: Some(config),
1560            ..Default::default()
1561        }
1562    }
1563
1564    pub fn from_genesis(genesis: &'a Genesis) -> Self {
1565        Self {
1566            genesis: Some(genesis),
1567            ..Default::default()
1568        }
1569    }
1570
1571    pub fn from_committee(committee: Committee) -> Self {
1572        Self {
1573            committee: Some(committee),
1574            ..Default::default()
1575        }
1576    }
1577
1578    #[cfg(test)]
1579    pub fn from_committee_size(committee_size: usize) -> Self {
1580        let (committee, _keypairs) = Committee::new_simple_test_committee_of_size(committee_size);
1581        Self::from_committee(committee)
1582    }
1583
1584    pub fn with_committee_store(mut self, committee_store: Arc<CommitteeStore>) -> Self {
1585        self.committee_store = Some(committee_store);
1586        self
1587    }
1588
1589    pub fn with_registry(mut self, registry: &'a Registry) -> Self {
1590        self.registry = Some(registry);
1591        self
1592    }
1593
1594    pub fn with_timeouts_config(mut self, timeouts_config: TimeoutConfig) -> Self {
1595        self.timeouts_config = Some(timeouts_config);
1596        self
1597    }
1598
1599    fn get_network_committee(&self) -> CommitteeWithNetworkMetadata {
1600        self.get_genesis()
1601            .unwrap_or_else(|| panic!("need either NetworkConfig or Genesis."))
1602            .committee_with_network()
1603    }
1604
1605    fn get_committee_authority_names_to_hostnames(&self) -> HashMap<AuthorityName, String> {
1606        if let Some(genesis) = self.get_genesis() {
1607            let state = genesis
1608                .sui_system_object()
1609                .into_genesis_version_for_tooling();
1610            state
1611                .validators
1612                .active_validators
1613                .iter()
1614                .map(|v| {
1615                    let metadata = v.verified_metadata();
1616                    let name = metadata.sui_pubkey_bytes();
1617
1618                    (name, metadata.name.clone())
1619                })
1620                .collect()
1621        } else {
1622            HashMap::new()
1623        }
1624    }
1625
1626    fn get_reference_gas_price(&self) -> u64 {
1627        self.reference_gas_price.unwrap_or_else(|| {
1628            self.get_genesis()
1629                .map(|g| g.reference_gas_price())
1630                .unwrap_or(1000)
1631        })
1632    }
1633
1634    fn get_genesis(&self) -> Option<&Genesis> {
1635        if let Some(network_config) = self.network_config {
1636            Some(&network_config.genesis)
1637        } else if let Some(genesis) = self.genesis {
1638            Some(genesis)
1639        } else {
1640            None
1641        }
1642    }
1643
1644    fn get_committee(&self) -> Committee {
1645        self.committee
1646            .clone()
1647            .unwrap_or_else(|| self.get_network_committee().committee().clone())
1648    }
1649
1650    pub fn build_network_clients(
1651        self,
1652    ) -> (
1653        AuthorityAggregator<NetworkAuthorityClient>,
1654        BTreeMap<AuthorityPublicKeyBytes, NetworkAuthorityClient>,
1655    ) {
1656        let network_committee = self.get_network_committee();
1657        let auth_clients = make_authority_clients_with_timeout_config(
1658            &network_committee,
1659            DEFAULT_CONNECT_TIMEOUT_SEC,
1660            DEFAULT_REQUEST_TIMEOUT_SEC,
1661        );
1662        let auth_agg = self.build_custom_clients(auth_clients.clone());
1663        (auth_agg, auth_clients)
1664    }
1665
1666    pub fn build_custom_clients<C: Clone>(
1667        self,
1668        authority_clients: BTreeMap<AuthorityName, C>,
1669    ) -> AuthorityAggregator<C> {
1670        let committee = self.get_committee();
1671        let validator_display_names = self.get_committee_authority_names_to_hostnames();
1672        let reference_gas_price = self.get_reference_gas_price();
1673        let registry = Registry::new();
1674        let registry = self.registry.unwrap_or(&registry);
1675        let safe_client_metrics_base = SafeClientMetricsBase::new(registry);
1676        let auth_agg_metrics = Arc::new(AuthAggMetrics::new(registry));
1677
1678        let committee_store = self
1679            .committee_store
1680            .unwrap_or_else(|| Arc::new(CommitteeStore::new_for_testing(&committee)));
1681
1682        let timeouts_config = self.timeouts_config.unwrap_or_default();
1683
1684        AuthorityAggregator::new(
1685            committee,
1686            Arc::new(validator_display_names),
1687            reference_gas_price,
1688            committee_store,
1689            authority_clients,
1690            safe_client_metrics_base,
1691            auth_agg_metrics,
1692            timeouts_config,
1693        )
1694    }
1695
1696    #[cfg(test)]
1697    pub fn build_mock_authority_aggregator(self) -> AuthorityAggregator<MockAuthorityApi> {
1698        let committee = self.get_committee();
1699        let clients = committee
1700            .names()
1701            .map(|name| {
1702                (
1703                    *name,
1704                    MockAuthorityApi::new(
1705                        Duration::from_millis(100),
1706                        Arc::new(std::sync::Mutex::new(30)),
1707                    ),
1708                )
1709            })
1710            .collect();
1711        self.build_custom_clients(clients)
1712    }
1713}