1use crate::authority_client::{
6 AuthorityAPI, NetworkAuthorityClient, make_authority_clients_with_timeout_config,
7 make_network_authority_clients_with_network_config,
8};
9use crate::safe_client::{SafeClient, SafeClientMetrics, SafeClientMetricsBase};
10#[cfg(test)]
11use crate::test_authority_clients::MockAuthorityApi;
12use futures::StreamExt;
13use mysten_metrics::{GaugeGuard, MonitorCancellation, spawn_monitored_task};
14use std::convert::AsRef;
15use std::net::SocketAddr;
16use sui_authority_aggregation::ReduceOutput;
17use sui_authority_aggregation::quorum_map_then_reduce_with_timeout;
18use sui_config::genesis::Genesis;
19use sui_network::{
20 DEFAULT_CONNECT_TIMEOUT_SEC, DEFAULT_REQUEST_TIMEOUT_SEC, default_mysten_network_config,
21};
22use sui_swarm_config::network_config::NetworkConfig;
23use sui_types::crypto::{AuthorityPublicKeyBytes, AuthoritySignInfo};
24use sui_types::error::{SuiErrorKind, UserInputError};
25use sui_types::message_envelope::Message;
26use sui_types::object::Object;
27use sui_types::quorum_driver_types::{GroupedErrors, QuorumDriverResponse};
28use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
29use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
30use sui_types::{
31 base_types::*,
32 committee::Committee,
33 error::{SuiError, SuiResult},
34 transaction::*,
35};
36use thiserror::Error;
37use tracing::{Instrument, debug, error, instrument, trace, trace_span, warn};
38
39use crate::epoch::committee_store::CommitteeStore;
40use crate::stake_aggregator::{InsertResult, MultiStakeAggregator, StakeAggregator};
41use prometheus::{
42 Histogram, IntCounter, IntCounterVec, IntGauge, Registry, register_histogram_with_registry,
43 register_int_counter_vec_with_registry, register_int_counter_with_registry,
44 register_int_gauge_with_registry,
45};
46use std::collections::{BTreeMap, HashMap, HashSet};
47use std::string::ToString;
48use std::sync::Arc;
49use std::time::Duration;
50use sui_types::committee::{CommitteeWithNetworkMetadata, StakeUnit};
51use sui_types::effects::{
52 CertifiedTransactionEffects, SignedTransactionEffects, TransactionEffects, TransactionEvents,
53 VerifiedCertifiedTransactionEffects,
54};
55use sui_types::messages_grpc::{
56 HandleCertificateRequestV3, HandleCertificateResponseV3, LayoutGenerationOption,
57 ObjectInfoRequest,
58};
59use sui_types::messages_safe_client::PlainTransactionInfoResponse;
60use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
61use tokio::time::sleep;
62
63pub const DEFAULT_RETRIES: usize = 4;
64
65#[cfg(test)]
66#[path = "unit_tests/authority_aggregator_tests.rs"]
67pub mod authority_aggregator_tests;
68
69#[derive(Clone)]
70pub struct TimeoutConfig {
71 pub pre_quorum_timeout: Duration,
72 pub post_quorum_timeout: Duration,
73}
74
75impl Default for TimeoutConfig {
76 fn default() -> Self {
77 Self {
78 pre_quorum_timeout: Duration::from_secs(60),
79 post_quorum_timeout: Duration::from_secs(7),
80 }
81 }
82}
83
84#[derive(Clone)]
86pub struct AuthAggMetrics {
87 pub total_tx_certificates_created: IntCounter,
88 pub process_tx_errors: IntCounterVec,
89 pub process_cert_errors: IntCounterVec,
90 pub total_client_double_spend_attempts_detected: IntCounter,
91 pub total_aggregated_err: IntCounterVec,
92 pub total_rpc_err: IntCounterVec,
93 pub inflight_transactions: IntGauge,
94 pub inflight_certificates: IntGauge,
95 pub inflight_transaction_requests: IntGauge,
96 pub inflight_certificate_requests: IntGauge,
97
98 pub cert_broadcasting_post_quorum_timeout: IntCounter,
99 pub remaining_tasks_when_reaching_cert_quorum: Histogram,
100 pub remaining_tasks_when_cert_broadcasting_post_quorum_timeout: Histogram,
101 pub quorum_reached_without_requested_objects: IntCounter,
102}
103
104impl AuthAggMetrics {
105 pub fn new(registry: &prometheus::Registry) -> Self {
106 Self {
107 total_tx_certificates_created: register_int_counter_with_registry!(
108 "total_tx_certificates_created",
109 "Total number of certificates made in the authority_aggregator",
110 registry,
111 )
112 .unwrap(),
113 process_tx_errors: register_int_counter_vec_with_registry!(
114 "process_tx_errors",
115 "Number of errors returned from validators when processing transaction, group by validator name and error type",
116 &["name","error"],
117 registry,
118 )
119 .unwrap(),
120 process_cert_errors: register_int_counter_vec_with_registry!(
121 "process_cert_errors",
122 "Number of errors returned from validators when processing certificate, group by validator name and error type",
123 &["name", "error"],
124 registry,
125 )
126 .unwrap(),
127 total_client_double_spend_attempts_detected: register_int_counter_with_registry!(
128 "total_client_double_spend_attempts_detected",
129 "Total number of client double spend attempts that are detected",
130 registry,
131 )
132 .unwrap(),
133 total_aggregated_err: register_int_counter_vec_with_registry!(
134 "total_aggregated_err",
135 "Total number of errors returned from validators, grouped by error type",
136 &["error", "tx_recoverable"],
137 registry,
138 )
139 .unwrap(),
140 total_rpc_err: register_int_counter_vec_with_registry!(
141 "total_rpc_err",
142 "Total number of rpc errors returned from validators, grouped by validator short name and RPC error message",
143 &["name", "error_message"],
144 registry,
145 )
146 .unwrap(),
147 inflight_transactions: register_int_gauge_with_registry!(
148 "auth_agg_inflight_transactions",
149 "Inflight transaction gathering signatures",
150 registry,
151 )
152 .unwrap(),
153 inflight_certificates: register_int_gauge_with_registry!(
154 "auth_agg_inflight_certificates",
155 "Inflight certificates gathering effects",
156 registry,
157 )
158 .unwrap(),
159 inflight_transaction_requests: register_int_gauge_with_registry!(
160 "auth_agg_inflight_transaction_requests",
161 "Inflight handle_transaction requests",
162 registry,
163 )
164 .unwrap(),
165 inflight_certificate_requests: register_int_gauge_with_registry!(
166 "auth_agg_inflight_certificate_requests",
167 "Inflight handle_certificate requests",
168 registry,
169 )
170 .unwrap(),
171 cert_broadcasting_post_quorum_timeout: register_int_counter_with_registry!(
172 "auth_agg_cert_broadcasting_post_quorum_timeout",
173 "Total number of timeout in cert processing post quorum",
174 registry,
175 )
176 .unwrap(),
177 remaining_tasks_when_reaching_cert_quorum: register_histogram_with_registry!(
178 "auth_agg_remaining_tasks_when_reaching_cert_quorum",
179 "Number of remaining tasks when reaching certificate quorum",
180 registry,
181 ).unwrap(),
182 remaining_tasks_when_cert_broadcasting_post_quorum_timeout: register_histogram_with_registry!(
183 "auth_agg_remaining_tasks_when_cert_broadcasting_post_quorum_timeout",
184 "Number of remaining tasks when post quorum certificate broadcasting times out",
185 registry,
186 ).unwrap(),
187 quorum_reached_without_requested_objects: register_int_counter_with_registry!(
188 "auth_agg_quorum_reached_without_requested_objects",
189 "Number of times quorum was reached without getting the requested objects back from at least 1 validator",
190 registry,
191 )
192 .unwrap(),
193 }
194 }
195
196 pub fn new_for_tests() -> Self {
197 let registry = prometheus::Registry::new();
198 Self::new(®istry)
199 }
200}
201
202#[derive(Error, Debug, Eq, PartialEq)]
203pub enum AggregatorProcessTransactionError {
204 #[error(
205 "Failed to execute transaction on a quorum of validators due to non-retryable errors. Validator errors: {:?}",
206 errors
207 )]
208 FatalTransaction { errors: GroupedErrors },
209
210 #[error(
211 "Failed to execute transaction on a quorum of validators but state is still retryable. Validator errors: {:?}",
212 errors
213 )]
214 RetryableTransaction { errors: GroupedErrors },
215
216 #[error(
217 "Failed to execute transaction on a quorum of validators due to conflicting transactions. Locked objects: {:?}. Validator errors: {:?}",
218 conflicting_tx_digests,
219 errors
220 )]
221 FatalConflictingTransaction {
222 errors: GroupedErrors,
223 conflicting_tx_digests:
224 BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
225 },
226
227 #[error(
228 "{} of the validators by stake are overloaded with transactions pending execution. Validator errors: {:?}",
229 overloaded_stake,
230 errors
231 )]
232 SystemOverload {
233 overloaded_stake: StakeUnit,
234 errors: GroupedErrors,
235 },
236
237 #[error("Transaction is already finalized but with different user signatures")]
238 TxAlreadyFinalizedWithDifferentUserSignatures,
239
240 #[error(
241 "{} of the validators by stake are overloaded and requested the client to retry after {} seconds. Validator errors: {:?}",
242 overload_stake,
243 retry_after_secs,
244 errors
245 )]
246 SystemOverloadRetryAfter {
247 overload_stake: StakeUnit,
248 errors: GroupedErrors,
249 retry_after_secs: u64,
250 },
251}
252
253#[derive(Error, Debug)]
254pub enum AggregatorProcessCertificateError {
255 #[error(
256 "Failed to execute certificate on a quorum of validators. Non-retryable errors: {:?}",
257 non_retryable_errors
258 )]
259 FatalExecuteCertificate { non_retryable_errors: GroupedErrors },
260
261 #[error(
262 "Failed to execute certificate on a quorum of validators but state is still retryable. Retryable errors: {:?}",
263 retryable_errors
264 )]
265 RetryableExecuteCertificate { retryable_errors: GroupedErrors },
266}
267
268pub fn group_errors(errors: Vec<(SuiError, Vec<AuthorityName>, StakeUnit)>) -> GroupedErrors {
269 #[allow(clippy::mutable_key_type)]
270 let mut grouped_errors = HashMap::new();
271 for (error, names, stake) in errors {
272 let entry = grouped_errors.entry(error).or_insert((0, vec![]));
273 entry.0 += stake;
274 entry.1.extend(
275 names
276 .into_iter()
277 .map(|n| n.concise_owned())
278 .collect::<Vec<_>>(),
279 );
280 }
281 grouped_errors
282 .into_iter()
283 .map(|(e, (s, n))| (e, s, n))
284 .collect()
285}
286
287#[derive(Debug, Default)]
288pub struct RetryableOverloadInfo {
289 pub total_stake: StakeUnit,
291
292 pub stake_requested_retry_after: BTreeMap<Duration, StakeUnit>,
294}
295
296impl RetryableOverloadInfo {
297 pub fn add_stake_retryable_overload(&mut self, stake: StakeUnit, retry_after: Duration) {
298 self.total_stake += stake;
299 self.stake_requested_retry_after
300 .entry(retry_after)
301 .and_modify(|s| *s += stake)
302 .or_insert(stake);
303 }
304
305 pub fn get_quorum_retry_after(
307 &self,
308 good_stake: StakeUnit,
309 quorum_threshold: StakeUnit,
310 ) -> Duration {
311 if self.stake_requested_retry_after.is_empty() {
312 return Duration::from_secs(0);
313 }
314
315 let mut quorum_stake = good_stake;
316 for (retry_after, stake) in self.stake_requested_retry_after.iter() {
317 quorum_stake += *stake;
318 if quorum_stake >= quorum_threshold {
319 return *retry_after;
320 }
321 }
322 *self.stake_requested_retry_after.last_key_value().unwrap().0
323 }
324}
325
326#[derive(Debug)]
327struct ProcessTransactionState {
328 tx_signatures: StakeAggregator<AuthoritySignInfo, true>,
330 effects_map: MultiStakeAggregator<TransactionEffectsDigest, TransactionEffects, true>,
331 errors: Vec<(SuiError, Vec<AuthorityName>, StakeUnit)>,
333 non_retryable_stake: StakeUnit,
335 object_or_package_not_found_stake: StakeUnit,
337 overloaded_stake: StakeUnit,
339 retryable_overload_info: RetryableOverloadInfo,
341 conflicting_tx_digests:
343 BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
344 retryable: bool,
349 tx_finalized_with_different_user_sig: bool,
350}
351
352impl ProcessTransactionState {
353 pub fn record_conflicting_transaction_if_any(
354 &mut self,
355 validator_name: AuthorityName,
356 weight: StakeUnit,
357 err: &SuiError,
358 ) {
359 if let SuiErrorKind::ObjectLockConflict {
360 obj_ref,
361 pending_transaction: transaction,
362 } = err.as_inner()
363 {
364 let (lock_records, total_stake) = self
365 .conflicting_tx_digests
366 .entry(*transaction)
367 .or_insert((Vec::new(), 0));
368 lock_records.push((validator_name, *obj_ref));
369 *total_stake += weight;
370 }
371 }
372
373 pub fn check_if_error_indicates_tx_finalized_with_different_user_sig(
374 &self,
375 validity_threshold: StakeUnit,
376 ) -> bool {
377 let invalid_sig_stake: StakeUnit = self
385 .errors
386 .iter()
387 .filter_map(|(e, _, stake)| {
388 if matches!(
389 e.as_inner(),
390 SuiErrorKind::FailedToVerifyTxCertWithExecutedEffects { .. }
391 ) {
392 Some(stake)
393 } else {
394 None
395 }
396 })
397 .sum();
398 invalid_sig_stake >= validity_threshold
399 }
400}
401
402struct ProcessCertificateState {
403 effects_map:
407 MultiStakeAggregator<(EpochId, TransactionEffectsDigest), TransactionEffects, true>,
408 non_retryable_stake: StakeUnit,
409 non_retryable_errors: Vec<(SuiError, Vec<AuthorityName>, StakeUnit)>,
410 retryable_errors: Vec<(SuiError, Vec<AuthorityName>, StakeUnit)>,
411 retryable: bool,
415
416 events: Option<TransactionEvents>,
420 input_objects: Option<Vec<Object>>,
421 output_objects: Option<Vec<Object>>,
422 auxiliary_data: Option<Vec<u8>>,
423 request: HandleCertificateRequestV3,
424}
425
426#[derive(Debug)]
427pub enum ProcessTransactionResult {
428 Certified {
429 certificate: CertifiedTransaction,
430 newly_formed: bool,
435 },
436 Executed(VerifiedCertifiedTransactionEffects, TransactionEvents),
437}
438
439impl ProcessTransactionResult {
440 pub fn into_cert_for_testing(self) -> CertifiedTransaction {
441 match self {
442 Self::Certified { certificate, .. } => certificate,
443 Self::Executed(..) => panic!("Wrong type"),
444 }
445 }
446
447 pub fn into_effects_for_testing(self) -> VerifiedCertifiedTransactionEffects {
448 match self {
449 Self::Certified { .. } => panic!("Wrong type"),
450 Self::Executed(effects, ..) => effects,
451 }
452 }
453}
454
455#[derive(Clone)]
456pub struct AuthorityAggregator<A: Clone> {
457 pub committee: Arc<Committee>,
459 pub validator_display_names: Arc<HashMap<AuthorityName, String>>,
463 pub reference_gas_price: u64,
465 pub authority_clients: Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>>,
467 pub metrics: Arc<AuthAggMetrics>,
469 pub safe_client_metrics_base: SafeClientMetricsBase,
471 pub timeouts: TimeoutConfig,
472 pub committee_store: Arc<CommitteeStore>,
474}
475
476impl<A: Clone> AuthorityAggregator<A> {
477 pub fn new(
478 committee: Committee,
479 validator_display_names: Arc<HashMap<AuthorityName, String>>,
480 reference_gas_price: u64,
481 committee_store: Arc<CommitteeStore>,
482 authority_clients: BTreeMap<AuthorityName, A>,
483 safe_client_metrics_base: SafeClientMetricsBase,
484 auth_agg_metrics: Arc<AuthAggMetrics>,
485 timeouts: TimeoutConfig,
486 ) -> Self {
487 Self {
488 committee: Arc::new(committee),
489 validator_display_names,
490 reference_gas_price,
491 authority_clients: create_safe_clients(
492 authority_clients,
493 &committee_store,
494 &safe_client_metrics_base,
495 ),
496 metrics: auth_agg_metrics,
497 safe_client_metrics_base,
498 timeouts,
499 committee_store,
500 }
501 }
502
503 pub fn get_client(&self, name: &AuthorityName) -> Option<&Arc<SafeClient<A>>> {
504 self.authority_clients.get(name)
505 }
506
507 pub fn clone_client_test_only(&self, name: &AuthorityName) -> Arc<SafeClient<A>>
508 where
509 A: Clone,
510 {
511 self.authority_clients[name].clone()
512 }
513
514 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
515 self.committee_store.clone()
516 }
517
518 pub fn clone_inner_committee_test_only(&self) -> Committee {
519 (*self.committee).clone()
520 }
521
522 pub fn clone_inner_clients_test_only(&self) -> BTreeMap<AuthorityName, SafeClient<A>> {
523 (*self.authority_clients)
524 .clone()
525 .into_iter()
526 .map(|(k, v)| (k, (*v).clone()))
527 .collect()
528 }
529
530 pub fn get_display_name(&self, name: &AuthorityName) -> String {
531 self.validator_display_names
532 .get(name)
533 .cloned()
534 .unwrap_or_else(|| name.concise().to_string())
535 }
536}
537
538fn create_safe_clients<A: Clone>(
539 authority_clients: BTreeMap<AuthorityName, A>,
540 committee_store: &Arc<CommitteeStore>,
541 safe_client_metrics_base: &SafeClientMetricsBase,
542) -> Arc<BTreeMap<AuthorityName, Arc<SafeClient<A>>>> {
543 Arc::new(
544 authority_clients
545 .into_iter()
546 .map(|(name, api)| {
547 (
548 name,
549 Arc::new(SafeClient::new(
550 api,
551 committee_store.clone(),
552 name,
553 SafeClientMetrics::new(safe_client_metrics_base, name),
554 )),
555 )
556 })
557 .collect(),
558 )
559}
560
561impl AuthorityAggregator<NetworkAuthorityClient> {
562 pub fn new_from_epoch_start_state(
565 epoch_start_state: &EpochStartSystemState,
566 committee_store: &Arc<CommitteeStore>,
567 safe_client_metrics_base: SafeClientMetricsBase,
568 auth_agg_metrics: Arc<AuthAggMetrics>,
569 ) -> Self {
570 let committee = epoch_start_state.get_sui_committee_with_network_metadata();
571 let validator_display_names = epoch_start_state.get_authority_names_to_hostnames();
572 Self::new_from_committee(
573 committee,
574 Arc::new(validator_display_names),
575 epoch_start_state.reference_gas_price(),
576 committee_store,
577 safe_client_metrics_base,
578 auth_agg_metrics,
579 )
580 }
581
582 pub fn recreate_with_new_epoch_start_state(
586 &self,
587 epoch_start_state: &EpochStartSystemState,
588 ) -> Self {
589 Self::new_from_epoch_start_state(
590 epoch_start_state,
591 &self.committee_store,
592 self.safe_client_metrics_base.clone(),
593 self.metrics.clone(),
594 )
595 }
596
597 pub fn new_from_committee(
598 committee: CommitteeWithNetworkMetadata,
599 validator_display_names: Arc<HashMap<AuthorityName, String>>,
600 reference_gas_price: u64,
601 committee_store: &Arc<CommitteeStore>,
602 safe_client_metrics_base: SafeClientMetricsBase,
603 auth_agg_metrics: Arc<AuthAggMetrics>,
604 ) -> Self {
605 let net_config = default_mysten_network_config();
606 let authority_clients =
607 make_network_authority_clients_with_network_config(&committee, &net_config);
608 Self::new(
609 committee.committee().clone(),
610 validator_display_names,
611 reference_gas_price,
612 committee_store.clone(),
613 authority_clients,
614 safe_client_metrics_base,
615 auth_agg_metrics,
616 Default::default(),
617 )
618 }
619}
620
621impl<A> AuthorityAggregator<A>
622where
623 A: AuthorityAPI + Send + Sync + 'static + Clone,
624{
625 pub async fn get_latest_object_version_for_testing(
631 &self,
632 object_id: ObjectID,
633 ) -> SuiResult<Object> {
634 #[derive(Debug, Default)]
635 struct State {
636 latest_object_version: Option<Object>,
637 total_weight: StakeUnit,
638 }
639 let initial_state = State::default();
640 let result = quorum_map_then_reduce_with_timeout(
641 self.committee.clone(),
642 self.authority_clients.clone(),
643 initial_state,
644 |_name, client| {
645 Box::pin(async move {
646 let request =
647 ObjectInfoRequest::latest_object_info_request(object_id, LayoutGenerationOption::None);
648 let mut retry_count = 0;
649 loop {
650 match client.handle_object_info_request(request.clone()).await {
651 Ok(object_info) => return Ok(object_info),
652 Err(err) => {
653 retry_count += 1;
654 if retry_count > 3 {
655 return Err(err);
656 }
657 tokio::time::sleep(Duration::from_secs(1)).await;
658 }
659 }
660 }
661 })
662 },
663 |mut state, name, weight, result| {
664 Box::pin(async move {
665 state.total_weight += weight;
666 match result {
667 Ok(object_info) => {
668 debug!("Received object info response from validator {:?} with version: {:?}", name.concise(), object_info.object.version());
669 if state.latest_object_version.as_ref().is_none_or(|latest| {
670 object_info.object.version() > latest.version()
671 }) {
672 state.latest_object_version = Some(object_info.object);
673 }
674 }
675 Err(err) => {
676 debug!("Received error from validator {:?}: {:?}", name.concise(), err);
677 }
678 };
679 if state.total_weight >= self.committee.quorum_threshold() {
680 if let Some(object) = state.latest_object_version {
681 return ReduceOutput::Success(object);
682 } else {
683 return ReduceOutput::Failed(state);
684 }
685 }
686 ReduceOutput::Continue(state)
687 })
688 },
689 self.timeouts.pre_quorum_timeout,
691 )
692 .await.map_err(|_state| SuiError::from(UserInputError::ObjectNotFound {
693 object_id,
694 version: None,
695 }))?;
696 Ok(result.0)
697 }
698
699 pub async fn get_latest_system_state_object_for_testing(
703 &self,
704 ) -> anyhow::Result<SuiSystemState> {
705 #[derive(Debug, Default)]
706 struct State {
707 latest_system_state: Option<SuiSystemState>,
708 total_weight: StakeUnit,
709 }
710 let initial_state = State::default();
711 let result = quorum_map_then_reduce_with_timeout(
712 self.committee.clone(),
713 self.authority_clients.clone(),
714 initial_state,
715 |_name, client| Box::pin(async move { client.handle_system_state_object().await }),
716 |mut state, name, weight, result| {
717 Box::pin(async move {
718 state.total_weight += weight;
719 match result {
720 Ok(system_state) => {
721 debug!(
722 "Received system state object from validator {:?} with epoch: {:?}",
723 name.concise(),
724 system_state.epoch()
725 );
726 if state
727 .latest_system_state
728 .as_ref()
729 .is_none_or(|latest| system_state.epoch() > latest.epoch())
730 {
731 state.latest_system_state = Some(system_state);
732 }
733 }
734 Err(err) => {
735 debug!(
736 "Received error from validator {:?}: {:?}",
737 name.concise(),
738 err
739 );
740 }
741 };
742 if state.total_weight >= self.committee.quorum_threshold() {
743 if let Some(system_state) = state.latest_system_state {
744 return ReduceOutput::Success(system_state);
745 } else {
746 return ReduceOutput::Failed(state);
747 }
748 }
749 ReduceOutput::Continue(state)
750 })
751 },
752 self.timeouts.pre_quorum_timeout,
754 )
755 .await
756 .map_err(|_| anyhow::anyhow!("Failed to get latest system state from the authorities"))?;
757 Ok(result.0)
758 }
759
760 #[instrument(level = "trace", skip_all)]
762 pub async fn process_transaction(
763 &self,
764 transaction: Transaction,
765 client_addr: Option<SocketAddr>,
766 ) -> Result<ProcessTransactionResult, AggregatorProcessTransactionError> {
767 let tx_digest = transaction.digest();
769 debug!(
770 tx_digest = ?tx_digest,
771 "Broadcasting transaction request to authorities"
772 );
773 trace!(
774 "Transaction data: {:?}",
775 transaction.data().intent_message().value
776 );
777 let committee = self.committee.clone();
778 let state = ProcessTransactionState {
779 tx_signatures: StakeAggregator::new(committee.clone()),
780 effects_map: MultiStakeAggregator::new(committee.clone()),
781 errors: vec![],
782 object_or_package_not_found_stake: 0,
783 non_retryable_stake: 0,
784 overloaded_stake: 0,
785 retryable_overload_info: Default::default(),
786 retryable: true,
787 conflicting_tx_digests: Default::default(),
788 tx_finalized_with_different_user_sig: false,
789 };
790
791 let transaction_ref = &transaction;
792 let validity_threshold = committee.validity_threshold();
793 let quorum_threshold = committee.quorum_threshold();
794 let validator_display_names = self.validator_display_names.clone();
795 let result = quorum_map_then_reduce_with_timeout(
796 committee.clone(),
797 self.authority_clients.clone(),
798 state,
799 |name, client| {
800 Box::pin(
801 async move {
802 let _guard = GaugeGuard::acquire(&self.metrics.inflight_transaction_requests);
803 let concise_name = name.concise_owned();
804 client.handle_transaction(transaction_ref.clone(), client_addr)
805 .monitor_cancellation()
806 .instrument(trace_span!("handle_transaction", cancelled = false, authority =? concise_name))
807 .await
808 },
809 )
810 },
811 |mut state, name, weight, response| {
812 let display_name = validator_display_names.get(&name).unwrap_or(&name.concise().to_string()).clone();
813 Box::pin(async move {
814 match self.handle_process_transaction_response(
815 tx_digest, &mut state, response, name, weight,
816 ) {
817 Ok(Some(result)) => {
818 self.record_process_transaction_metrics(tx_digest, &state);
819 return ReduceOutput::Success(result);
820 }
821 Ok(None) => {},
822 Err(err) => {
823 let concise_name = name.concise();
824 debug!(?tx_digest, name=?concise_name, weight, "Error processing transaction from validator: {:?}", err);
825 self.metrics
826 .process_tx_errors
827 .with_label_values(&[&display_name, err.as_ref()])
828 .inc();
829 Self::record_rpc_error_maybe(self.metrics.clone(), &display_name, &err);
830 state.record_conflicting_transaction_if_any(name, weight, &err);
832 let (retryable, categorized) = err.is_retryable();
833 if !categorized {
834 error!(?tx_digest, "uncategorized tx error: {err}");
837 }
838 if err.is_object_or_package_not_found() {
839 state.object_or_package_not_found_stake += weight;
844 }
845 else if err.is_overload() {
846 state.overloaded_stake += weight;
853 }
854 else if err.is_retryable_overload() {
855 state.retryable_overload_info.add_stake_retryable_overload(weight, Duration::from_secs(err.retry_after_secs()));
863 }
864 else if !retryable {
865 state.non_retryable_stake += weight;
866 }
867 state.errors.push((err, vec![name], weight));
868
869 }
870 };
871
872 let retryable_stake = self.get_retryable_stake(&state);
873 let good_stake = std::cmp::max(state.tx_signatures.total_votes(), state.effects_map.total_votes());
874 if good_stake + retryable_stake < quorum_threshold {
875 debug!(
876 tx_digest = ?tx_digest,
877 good_stake,
878 retryable_stake,
879 "No chance for any tx to get quorum, exiting. Conflicting_txes: {:?}",
880 state.conflicting_tx_digests
881 );
882 state.retryable = false;
884 return ReduceOutput::Failed(state);
885 }
886
887 let object_or_package_not_found_condition = state.object_or_package_not_found_stake >= quorum_threshold && std::env::var("NOT_RETRY_OBJECT_PACKAGE_NOT_FOUND").is_ok();
889 if state.non_retryable_stake >= validity_threshold
890 || object_or_package_not_found_condition || state.overloaded_stake >= quorum_threshold {
892 state.retryable = false;
895 ReduceOutput::Failed(state)
896 } else {
897 ReduceOutput::Continue(state)
898 }
899 })
900 },
901 self.timeouts.pre_quorum_timeout,
903 )
904 .await;
905
906 match result {
907 Ok((result, _)) => Ok(result),
908 Err(state) => {
909 self.record_process_transaction_metrics(tx_digest, &state);
910 let state = self.record_non_quorum_effects_maybe(tx_digest, state);
911 Err(self.handle_process_transaction_error(state))
912 }
913 }
914 }
915
916 fn record_rpc_error_maybe(metrics: Arc<AuthAggMetrics>, display_name: &str, error: &SuiError) {
917 if let SuiErrorKind::RpcError(_message, code) = error.as_inner() {
918 metrics
919 .total_rpc_err
920 .with_label_values(&[display_name, code.as_str()])
921 .inc();
922 }
923 }
924
925 fn handle_process_transaction_error(
926 &self,
927 state: ProcessTransactionState,
928 ) -> AggregatorProcessTransactionError {
929 let quorum_threshold = self.committee.quorum_threshold();
930
931 if state.overloaded_stake >= quorum_threshold {
933 return AggregatorProcessTransactionError::SystemOverload {
934 overloaded_stake: state.overloaded_stake,
935 errors: group_errors(state.errors),
936 };
937 }
938
939 if !state.retryable {
940 if state.tx_finalized_with_different_user_sig
941 || state.check_if_error_indicates_tx_finalized_with_different_user_sig(
942 self.committee.validity_threshold(),
943 )
944 {
945 return AggregatorProcessTransactionError::TxAlreadyFinalizedWithDifferentUserSignatures;
946 }
947
948 if !state.conflicting_tx_digests.is_empty() {
951 let good_stake = state.tx_signatures.total_votes();
952 warn!(
953 ?state.conflicting_tx_digests,
954 original_tx_stake = good_stake,
955 "Client double spend attempt detected!",
956 );
957 self.metrics
958 .total_client_double_spend_attempts_detected
959 .inc();
960 return AggregatorProcessTransactionError::FatalConflictingTransaction {
961 errors: group_errors(state.errors),
962 conflicting_tx_digests: state.conflicting_tx_digests,
963 };
964 }
965
966 return AggregatorProcessTransactionError::FatalTransaction {
967 errors: group_errors(state.errors),
968 };
969 }
970
971 if state.tx_signatures.total_votes() + state.retryable_overload_info.total_stake
976 >= quorum_threshold
977 {
978 let retry_after_secs = state
979 .retryable_overload_info
980 .get_quorum_retry_after(state.tx_signatures.total_votes(), quorum_threshold)
981 .as_secs();
982 return AggregatorProcessTransactionError::SystemOverloadRetryAfter {
983 overload_stake: state.retryable_overload_info.total_stake,
984 errors: group_errors(state.errors),
985 retry_after_secs,
986 };
987 }
988
989 AggregatorProcessTransactionError::RetryableTransaction {
991 errors: group_errors(state.errors),
992 }
993 }
994
995 fn record_process_transaction_metrics(
996 &self,
997 tx_digest: &TransactionDigest,
998 state: &ProcessTransactionState,
999 ) {
1000 let num_signatures = state.tx_signatures.validator_sig_count();
1001 let good_stake = state.tx_signatures.total_votes();
1002 debug!(
1003 ?tx_digest,
1004 num_errors = state.errors.iter().map(|e| e.1.len()).sum::<usize>(),
1005 num_unique_errors = state.errors.len(),
1006 ?good_stake,
1007 non_retryable_stake = state.non_retryable_stake,
1008 ?num_signatures,
1009 "Received signatures response from validators handle_transaction"
1010 );
1011 if !state.errors.is_empty() {
1012 debug!(?tx_digest, "Errors received: {:?}", state.errors);
1013 }
1014 }
1015
1016 fn handle_process_transaction_response(
1017 &self,
1018 tx_digest: &TransactionDigest,
1019 state: &mut ProcessTransactionState,
1020 response: SuiResult<PlainTransactionInfoResponse>,
1021 name: AuthorityName,
1022 weight: StakeUnit,
1023 ) -> SuiResult<Option<ProcessTransactionResult>> {
1024 match response {
1025 Ok(PlainTransactionInfoResponse::Signed(signed)) => {
1026 debug!(?tx_digest, name=?name.concise(), weight, "Received signed transaction from validator handle_transaction");
1027 self.handle_transaction_response_with_signed(state, signed)
1028 }
1029 Ok(PlainTransactionInfoResponse::ExecutedWithCert(cert, effects, events)) => {
1030 debug!(?tx_digest, name=?name.concise(), weight, "Received prev certificate and effects from validator handle_transaction");
1031 self.handle_transaction_response_with_executed(state, Some(cert), effects, events)
1032 }
1033 Ok(PlainTransactionInfoResponse::ExecutedWithoutCert(_, effects, events)) => {
1034 debug!(?tx_digest, name=?name.concise(), weight, "Received prev effects from validator handle_transaction");
1035 self.handle_transaction_response_with_executed(state, None, effects, events)
1036 }
1037 Err(err) => Err(err),
1038 }
1039 }
1040
1041 fn handle_transaction_response_with_signed(
1042 &self,
1043 state: &mut ProcessTransactionState,
1044 plain_tx: SignedTransaction,
1045 ) -> SuiResult<Option<ProcessTransactionResult>> {
1046 match state.tx_signatures.insert(plain_tx.clone()) {
1047 InsertResult::NotEnoughVotes {
1048 bad_votes,
1049 bad_authorities,
1050 } => {
1051 state.non_retryable_stake += bad_votes;
1052 if bad_votes > 0 {
1053 state.errors.push((
1054 SuiErrorKind::InvalidSignature {
1055 error: "Individual signature verification failed".to_string(),
1056 }
1057 .into(),
1058 bad_authorities,
1059 bad_votes,
1060 ));
1061 }
1062 Ok(None)
1063 }
1064 InsertResult::Failed { error } => Err(error),
1065 InsertResult::QuorumReached(cert_sig) => {
1066 let certificate =
1067 CertifiedTransaction::new_from_data_and_sig(plain_tx.into_data(), cert_sig);
1068 certificate.verify_committee_sigs_only(&self.committee)?;
1069 Ok(Some(ProcessTransactionResult::Certified {
1070 certificate,
1071 newly_formed: true,
1072 }))
1073 }
1074 }
1075 }
1076
1077 fn handle_transaction_response_with_executed(
1078 &self,
1079 state: &mut ProcessTransactionState,
1080 certificate: Option<CertifiedTransaction>,
1081 plain_tx_effects: SignedTransactionEffects,
1082 events: TransactionEvents,
1083 ) -> SuiResult<Option<ProcessTransactionResult>> {
1084 match certificate {
1085 Some(certificate) if certificate.epoch() == self.committee.epoch => {
1086 Ok(Some(ProcessTransactionResult::Certified {
1090 certificate,
1091 newly_formed: false,
1092 }))
1093 }
1094 _ => {
1095 let digest = plain_tx_effects.data().digest();
1099 match state.effects_map.insert(digest, plain_tx_effects.clone()) {
1100 InsertResult::NotEnoughVotes {
1101 bad_votes,
1102 bad_authorities,
1103 } => {
1104 state.non_retryable_stake += bad_votes;
1105 if bad_votes > 0 {
1106 state.errors.push((
1107 SuiErrorKind::InvalidSignature {
1108 error: "Individual signature verification failed".to_string(),
1109 }
1110 .into(),
1111 bad_authorities,
1112 bad_votes,
1113 ));
1114 }
1115 Ok(None)
1116 }
1117 InsertResult::Failed { error } => Err(error),
1118 InsertResult::QuorumReached(cert_sig) => {
1119 let ct = CertifiedTransactionEffects::new_from_data_and_sig(
1120 plain_tx_effects.into_data(),
1121 cert_sig,
1122 );
1123 Ok(Some(ProcessTransactionResult::Executed(
1124 ct.verify(&self.committee)?,
1125 events,
1126 )))
1127 }
1128 }
1129 }
1130 }
1131 }
1132
1133 fn record_non_quorum_effects_maybe(
1135 &self,
1136 tx_digest: &TransactionDigest,
1137 mut state: ProcessTransactionState,
1138 ) -> ProcessTransactionState {
1139 if state.effects_map.unique_key_count() > 0 {
1140 let non_quorum_effects = state.effects_map.get_all_unique_values();
1141 warn!(
1142 ?tx_digest,
1143 "Received signed Effects but not with a quorum {:?}", non_quorum_effects
1144 );
1145
1146 let (_most_staked_effects_digest, (_, most_staked_effects_digest_stake)) =
1149 non_quorum_effects
1150 .iter()
1151 .max_by_key(|&(_, (_, stake))| stake)
1152 .unwrap();
1153 if most_staked_effects_digest_stake + self.get_retryable_stake(&state)
1157 < self.committee.quorum_threshold()
1158 {
1159 state.retryable = false;
1160 if state.check_if_error_indicates_tx_finalized_with_different_user_sig(
1161 self.committee.validity_threshold(),
1162 ) {
1163 state.tx_finalized_with_different_user_sig = true;
1164 } else {
1165 error!(
1167 "We have seen signed effects but unable to reach quorum threshold even including retriable stakes. This is very rare. Tx: {tx_digest:?}. Non-quorum effects: {non_quorum_effects:?}."
1168 );
1169 }
1170 }
1171
1172 let mut involved_validators = Vec::new();
1173 let mut total_stake = 0;
1174 for (validators, stake) in non_quorum_effects.values() {
1175 involved_validators.extend_from_slice(validators);
1176 total_stake += stake;
1177 }
1178 state.errors.push((
1181 SuiErrorKind::QuorumFailedToGetEffectsQuorumWhenProcessingTransaction {
1182 effects_map: non_quorum_effects,
1183 }
1184 .into(),
1185 involved_validators,
1186 total_stake,
1187 ));
1188 }
1189 state
1190 }
1191
1192 fn get_retryable_stake(&self, state: &ProcessTransactionState) -> StakeUnit {
1193 self.committee.total_votes()
1194 - state.non_retryable_stake
1195 - state.effects_map.total_votes()
1196 - state.tx_signatures.total_votes()
1197 }
1198
1199 #[instrument(level = "trace", skip_all)]
1200 pub async fn process_certificate(
1201 &self,
1202 request: HandleCertificateRequestV3,
1203 client_addr: Option<SocketAddr>,
1204 ) -> Result<QuorumDriverResponse, AggregatorProcessCertificateError> {
1205 let state = ProcessCertificateState {
1206 effects_map: MultiStakeAggregator::new(self.committee.clone()),
1207 non_retryable_stake: 0,
1208 non_retryable_errors: vec![],
1209 retryable_errors: vec![],
1210 retryable: true,
1211 events: None,
1212 input_objects: None,
1213 output_objects: None,
1214 auxiliary_data: None,
1215 request: request.clone(),
1216 };
1217
1218 let validators_to_sample =
1220 if request.include_input_objects || request.include_output_objects {
1221 const NUMBER_TO_SAMPLE: usize = 10;
1223
1224 self.committee
1225 .choose_multiple_weighted_iter(NUMBER_TO_SAMPLE)
1226 .cloned()
1227 .collect()
1228 } else {
1229 HashSet::new()
1230 };
1231
1232 let tx_digest = *request.certificate.digest();
1233 let timeout_after_quorum = self.timeouts.post_quorum_timeout;
1234
1235 let request_ref = request;
1236 let threshold = self.committee.quorum_threshold();
1237 let validity = self.committee.validity_threshold();
1238
1239 debug!(
1240 ?tx_digest,
1241 quorum_threshold = threshold,
1242 validity_threshold = validity,
1243 ?timeout_after_quorum,
1244 "Broadcasting certificate to authorities"
1245 );
1246 let committee: Arc<Committee> = self.committee.clone();
1247 let authority_clients = self.authority_clients.clone();
1248 let metrics = self.metrics.clone();
1249 let metrics_clone = metrics.clone();
1250 let validator_display_names = self.validator_display_names.clone();
1251 let (result, mut remaining_tasks) = quorum_map_then_reduce_with_timeout(
1252 committee.clone(),
1253 authority_clients.clone(),
1254 state,
1255 move |name, client| {
1256 Box::pin(async move {
1257 let _guard = GaugeGuard::acquire(&metrics_clone.inflight_certificate_requests);
1258 let concise_name = name.concise_owned();
1259 if request_ref.include_input_objects || request_ref.include_output_objects {
1260
1261 let req = if validators_to_sample.contains(&name) {
1263 request_ref
1264 } else {
1265 HandleCertificateRequestV3 {
1266 include_input_objects: false,
1267 include_output_objects: false,
1268 include_auxiliary_data: false,
1269 ..request_ref
1270 }
1271 };
1272
1273 client
1274 .handle_certificate_v3(req, client_addr)
1275 .instrument(trace_span!("handle_certificate_v3", authority =? concise_name))
1276 .await
1277 } else {
1278 client
1279 .handle_certificate_v2(request_ref.certificate, client_addr)
1280 .instrument(trace_span!("handle_certificate_v2", authority =? concise_name))
1281 .await
1282 .map(|response| HandleCertificateResponseV3 {
1283 effects: response.signed_effects,
1284 events: Some(response.events),
1285 input_objects: None,
1286 output_objects: None,
1287 auxiliary_data: None,
1288 })
1289 }
1290 })
1291 },
1292 move |mut state, name, weight, response| {
1293 let committee_clone = committee.clone();
1294 let metrics = metrics.clone();
1295 let display_name = validator_display_names.get(&name).unwrap_or(&name.concise().to_string()).clone();
1296 Box::pin(async move {
1297 match AuthorityAggregator::<A>::handle_process_certificate_response(
1300 committee_clone,
1301 &metrics,
1302 &tx_digest, &mut state, response, name)
1303 {
1304 Ok(Some(effects)) => ReduceOutput::Success(effects),
1305 Ok(None) => {
1306 if state.non_retryable_stake >= validity {
1310 state.retryable = false;
1311 ReduceOutput::Failed(state)
1312 } else {
1313 ReduceOutput::Continue(state)
1314 }
1315 },
1316 Err(err) => {
1317 let concise_name = name.concise();
1318 debug!(?tx_digest, name=?concise_name, "Error processing certificate from validator: {:?}", err);
1319 metrics
1320 .process_cert_errors
1321 .with_label_values(&[&display_name, err.as_ref()])
1322 .inc();
1323 Self::record_rpc_error_maybe(metrics, &display_name, &err);
1324 let (retryable, categorized) = err.is_retryable();
1325 if !categorized {
1326 error!(?tx_digest, "[WATCHOUT] uncategorized tx error: {err}");
1329 }
1330 if !retryable {
1331 state.non_retryable_stake += weight;
1332 state.non_retryable_errors.push((err, vec![name], weight));
1333 } else {
1334 state.retryable_errors.push((err, vec![name], weight));
1335 }
1336 if state.non_retryable_stake >= validity {
1337 state.retryable = false;
1338 ReduceOutput::Failed(state)
1339 } else {
1340 ReduceOutput::Continue(state)
1341 }
1342 }
1343 }
1344 })
1345 },
1346 self.timeouts.pre_quorum_timeout,
1348 )
1349 .await
1350 .map_err(|state| {
1351 debug!(
1352 ?tx_digest,
1353 num_unique_effects = state.effects_map.unique_key_count(),
1354 non_retryable_stake = state.non_retryable_stake,
1355 "Received effects responses from validators"
1356 );
1357
1358 for (sui_err, _, _) in state.retryable_errors.iter().chain(state.non_retryable_errors.iter()) {
1360 self
1361 .metrics
1362 .total_aggregated_err
1363 .with_label_values(&[
1364 sui_err.as_ref(),
1365 if state.retryable {
1366 "recoverable"
1367 } else {
1368 "non-recoverable"
1369 },
1370 ])
1371 .inc();
1372 }
1373 if state.retryable {
1374 AggregatorProcessCertificateError::RetryableExecuteCertificate {
1375 retryable_errors: group_errors(state.retryable_errors),
1376 }
1377 } else {
1378 AggregatorProcessCertificateError::FatalExecuteCertificate {
1379 non_retryable_errors: group_errors(state.non_retryable_errors),
1380 }
1381 }
1382 })?;
1383
1384 let metrics = self.metrics.clone();
1385 metrics
1386 .remaining_tasks_when_reaching_cert_quorum
1387 .observe(remaining_tasks.len() as f64);
1388 if !remaining_tasks.is_empty() {
1389 spawn_monitored_task!(async move {
1391 let mut timeout = Box::pin(sleep(timeout_after_quorum));
1392 loop {
1393 tokio::select! {
1394 _ = &mut timeout => {
1395 debug!(?tx_digest, "Timed out in post quorum cert broadcasting: {:?}. Remaining tasks: {:?}", timeout_after_quorum, remaining_tasks.len());
1396 metrics.cert_broadcasting_post_quorum_timeout.inc();
1397 metrics.remaining_tasks_when_cert_broadcasting_post_quorum_timeout.observe(remaining_tasks.len() as f64);
1398 break;
1399 }
1400 res = remaining_tasks.next() => {
1401 if res.is_none() {
1402 break;
1403 }
1404 }
1405 }
1406 }
1407 });
1408 }
1409 Ok(result)
1410 }
1411
1412 fn handle_process_certificate_response(
1413 committee: Arc<Committee>,
1414 metrics: &AuthAggMetrics,
1415 tx_digest: &TransactionDigest,
1416 state: &mut ProcessCertificateState,
1417 response: SuiResult<HandleCertificateResponseV3>,
1418 name: AuthorityName,
1419 ) -> SuiResult<Option<QuorumDriverResponse>> {
1420 match response {
1421 Ok(HandleCertificateResponseV3 {
1422 effects: signed_effects,
1423 events,
1424 input_objects,
1425 output_objects,
1426 auxiliary_data,
1427 }) => {
1428 debug!(
1429 ?tx_digest,
1430 name = ?name.concise(),
1431 "Validator handled certificate successfully",
1432 );
1433
1434 if events.is_some() && state.events.is_none() {
1435 state.events = events;
1436 }
1437
1438 if input_objects.is_some() && state.input_objects.is_none() {
1439 state.input_objects = input_objects;
1440 }
1441
1442 if output_objects.is_some() && state.output_objects.is_none() {
1443 state.output_objects = output_objects;
1444 }
1445
1446 if auxiliary_data.is_some() && state.auxiliary_data.is_none() {
1447 state.auxiliary_data = auxiliary_data;
1448 }
1449
1450 let effects_digest = *signed_effects.digest();
1451 match state.effects_map.insert(
1453 (signed_effects.epoch(), effects_digest),
1454 signed_effects.clone(),
1455 ) {
1456 InsertResult::NotEnoughVotes {
1457 bad_votes,
1458 bad_authorities,
1459 } => {
1460 state.non_retryable_stake += bad_votes;
1461 if bad_votes > 0 {
1462 state.non_retryable_errors.push((
1463 SuiErrorKind::InvalidSignature {
1464 error: "Individual signature verification failed".to_string(),
1465 }
1466 .into(),
1467 bad_authorities,
1468 bad_votes,
1469 ));
1470 }
1471 Ok(None)
1472 }
1473 InsertResult::Failed { error } => Err(error),
1474 InsertResult::QuorumReached(cert_sig) => {
1475 let ct = CertifiedTransactionEffects::new_from_data_and_sig(
1476 signed_effects.into_data(),
1477 cert_sig,
1478 );
1479
1480 if (state.request.include_input_objects && state.input_objects.is_none())
1481 || (state.request.include_output_objects
1482 && state.output_objects.is_none())
1483 {
1484 metrics.quorum_reached_without_requested_objects.inc();
1485 debug!(
1486 ?tx_digest,
1487 "Quorum Reached but requested input/output objects were not returned"
1488 );
1489 }
1490
1491 ct.verify(&committee).map(|ct| {
1492 debug!(?tx_digest, "Got quorum for validators handle_certificate.");
1493 Some(QuorumDriverResponse {
1494 effects_cert: ct,
1495 events: state.events.take(),
1496 input_objects: state.input_objects.take(),
1497 output_objects: state.output_objects.take(),
1498 auxiliary_data: state.auxiliary_data.take(),
1499 })
1500 })
1501 }
1502 }
1503 }
1504 Err(err) => Err(err),
1505 }
1506 }
1507
1508 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1509 pub async fn execute_transaction_block(
1510 &self,
1511 transaction: &Transaction,
1512 client_addr: Option<SocketAddr>,
1513 ) -> Result<VerifiedCertifiedTransactionEffects, anyhow::Error> {
1514 let tx_guard = GaugeGuard::acquire(&self.metrics.inflight_transactions);
1515 let result = self
1516 .process_transaction(transaction.clone(), client_addr)
1517 .await?;
1518 let cert = match result {
1519 ProcessTransactionResult::Certified { certificate, .. } => certificate,
1520 ProcessTransactionResult::Executed(effects, _) => {
1521 return Ok(effects);
1522 }
1523 };
1524 self.metrics.total_tx_certificates_created.inc();
1525 drop(tx_guard);
1526
1527 let _cert_guard = GaugeGuard::acquire(&self.metrics.inflight_certificates);
1528 let response = self
1529 .process_certificate(
1530 HandleCertificateRequestV3 {
1531 certificate: cert.clone(),
1532 include_events: true,
1533 include_input_objects: false,
1534 include_output_objects: false,
1535 include_auxiliary_data: false,
1536 },
1537 client_addr,
1538 )
1539 .await?;
1540
1541 Ok(response.effects_cert)
1542 }
1543}
1544
1545#[derive(Default)]
1546pub struct AuthorityAggregatorBuilder<'a> {
1547 network_config: Option<&'a NetworkConfig>,
1548 genesis: Option<&'a Genesis>,
1549 committee: Option<Committee>,
1550 reference_gas_price: Option<u64>,
1551 committee_store: Option<Arc<CommitteeStore>>,
1552 registry: Option<&'a Registry>,
1553 timeouts_config: Option<TimeoutConfig>,
1554}
1555
1556impl<'a> AuthorityAggregatorBuilder<'a> {
1557 pub fn from_network_config(config: &'a NetworkConfig) -> Self {
1558 Self {
1559 network_config: Some(config),
1560 ..Default::default()
1561 }
1562 }
1563
1564 pub fn from_genesis(genesis: &'a Genesis) -> Self {
1565 Self {
1566 genesis: Some(genesis),
1567 ..Default::default()
1568 }
1569 }
1570
1571 pub fn from_committee(committee: Committee) -> Self {
1572 Self {
1573 committee: Some(committee),
1574 ..Default::default()
1575 }
1576 }
1577
1578 #[cfg(test)]
1579 pub fn from_committee_size(committee_size: usize) -> Self {
1580 let (committee, _keypairs) = Committee::new_simple_test_committee_of_size(committee_size);
1581 Self::from_committee(committee)
1582 }
1583
1584 pub fn with_committee_store(mut self, committee_store: Arc<CommitteeStore>) -> Self {
1585 self.committee_store = Some(committee_store);
1586 self
1587 }
1588
1589 pub fn with_registry(mut self, registry: &'a Registry) -> Self {
1590 self.registry = Some(registry);
1591 self
1592 }
1593
1594 pub fn with_timeouts_config(mut self, timeouts_config: TimeoutConfig) -> Self {
1595 self.timeouts_config = Some(timeouts_config);
1596 self
1597 }
1598
1599 fn get_network_committee(&self) -> CommitteeWithNetworkMetadata {
1600 self.get_genesis()
1601 .unwrap_or_else(|| panic!("need either NetworkConfig or Genesis."))
1602 .committee_with_network()
1603 }
1604
1605 fn get_committee_authority_names_to_hostnames(&self) -> HashMap<AuthorityName, String> {
1606 if let Some(genesis) = self.get_genesis() {
1607 let state = genesis
1608 .sui_system_object()
1609 .into_genesis_version_for_tooling();
1610 state
1611 .validators
1612 .active_validators
1613 .iter()
1614 .map(|v| {
1615 let metadata = v.verified_metadata();
1616 let name = metadata.sui_pubkey_bytes();
1617
1618 (name, metadata.name.clone())
1619 })
1620 .collect()
1621 } else {
1622 HashMap::new()
1623 }
1624 }
1625
1626 fn get_reference_gas_price(&self) -> u64 {
1627 self.reference_gas_price.unwrap_or_else(|| {
1628 self.get_genesis()
1629 .map(|g| g.reference_gas_price())
1630 .unwrap_or(1000)
1631 })
1632 }
1633
1634 fn get_genesis(&self) -> Option<&Genesis> {
1635 if let Some(network_config) = self.network_config {
1636 Some(&network_config.genesis)
1637 } else if let Some(genesis) = self.genesis {
1638 Some(genesis)
1639 } else {
1640 None
1641 }
1642 }
1643
1644 fn get_committee(&self) -> Committee {
1645 self.committee
1646 .clone()
1647 .unwrap_or_else(|| self.get_network_committee().committee().clone())
1648 }
1649
1650 pub fn build_network_clients(
1651 self,
1652 ) -> (
1653 AuthorityAggregator<NetworkAuthorityClient>,
1654 BTreeMap<AuthorityPublicKeyBytes, NetworkAuthorityClient>,
1655 ) {
1656 let network_committee = self.get_network_committee();
1657 let auth_clients = make_authority_clients_with_timeout_config(
1658 &network_committee,
1659 DEFAULT_CONNECT_TIMEOUT_SEC,
1660 DEFAULT_REQUEST_TIMEOUT_SEC,
1661 );
1662 let auth_agg = self.build_custom_clients(auth_clients.clone());
1663 (auth_agg, auth_clients)
1664 }
1665
1666 pub fn build_custom_clients<C: Clone>(
1667 self,
1668 authority_clients: BTreeMap<AuthorityName, C>,
1669 ) -> AuthorityAggregator<C> {
1670 let committee = self.get_committee();
1671 let validator_display_names = self.get_committee_authority_names_to_hostnames();
1672 let reference_gas_price = self.get_reference_gas_price();
1673 let registry = Registry::new();
1674 let registry = self.registry.unwrap_or(®istry);
1675 let safe_client_metrics_base = SafeClientMetricsBase::new(registry);
1676 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(registry));
1677
1678 let committee_store = self
1679 .committee_store
1680 .unwrap_or_else(|| Arc::new(CommitteeStore::new_for_testing(&committee)));
1681
1682 let timeouts_config = self.timeouts_config.unwrap_or_default();
1683
1684 AuthorityAggregator::new(
1685 committee,
1686 Arc::new(validator_display_names),
1687 reference_gas_price,
1688 committee_store,
1689 authority_clients,
1690 safe_client_metrics_base,
1691 auth_agg_metrics,
1692 timeouts_config,
1693 )
1694 }
1695
1696 #[cfg(test)]
1697 pub fn build_mock_authority_aggregator(self) -> AuthorityAggregator<MockAuthorityApi> {
1698 let committee = self.get_committee();
1699 let clients = committee
1700 .names()
1701 .map(|name| {
1702 (
1703 *name,
1704 MockAuthorityApi::new(
1705 Duration::from_millis(100),
1706 Arc::new(std::sync::Mutex::new(30)),
1707 ),
1708 )
1709 })
1710 .collect();
1711 self.build_custom_clients(clients)
1712 }
1713}