1use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
5use crate::authority_aggregator::AuthorityAggregator;
6use crate::authority_client::AuthorityAPI;
7use crate::execution_cache::TransactionCacheRead;
8use arc_swap::ArcSwap;
9use mysten_metrics::LATENCY_SEC_BUCKETS;
10use prometheus::{
11 Histogram, IntCounter, Registry, register_histogram_with_registry,
12 register_int_counter_with_registry,
13};
14use std::cmp::min;
15use std::ops::Add;
16use std::sync::Arc;
17#[cfg(any(msim, test))]
18use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
19use std::time::Duration;
20use sui_types::base_types::{AuthorityName, TransactionDigest};
21use sui_types::transaction::VerifiedSignedTransaction;
22use tokio::select;
23use tokio::time::Instant;
24use tracing::{debug, error, trace};
25
26struct ValidatorTxFinalizerMetrics {
27 num_finalization_attempts: IntCounter,
28 num_successful_finalizations: IntCounter,
29 finalization_latency: Histogram,
30 validator_tx_finalizer_attempt_delay: Histogram,
31 #[cfg(any(msim, test))]
32 num_finalization_attempts_for_testing: AtomicU64,
33 #[cfg(test)]
34 num_successful_finalizations_for_testing: AtomicU64,
35}
36
37impl ValidatorTxFinalizerMetrics {
38 fn new(registry: &Registry) -> Self {
39 Self {
40 num_finalization_attempts: register_int_counter_with_registry!(
41 "validator_tx_finalizer_num_finalization_attempts",
42 "Total number of attempts to finalize a transaction",
43 registry,
44 )
45 .unwrap(),
46 num_successful_finalizations: register_int_counter_with_registry!(
47 "validator_tx_finalizer_num_successful_finalizations",
48 "Number of transactions successfully finalized",
49 registry,
50 )
51 .unwrap(),
52 finalization_latency: register_histogram_with_registry!(
53 "validator_tx_finalizer_finalization_latency",
54 "Latency of transaction finalization",
55 LATENCY_SEC_BUCKETS.to_vec(),
56 registry,
57 )
58 .unwrap(),
59 validator_tx_finalizer_attempt_delay: register_histogram_with_registry!(
60 "validator_tx_finalizer_attempt_delay",
61 "Duration that a validator in the committee waited before attempting to finalize the transaction",
62 vec![60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0],
63 registry,
64 )
65 .unwrap(),
66 #[cfg(any(msim, test))]
67 num_finalization_attempts_for_testing: AtomicU64::new(0),
68 #[cfg(test)]
69 num_successful_finalizations_for_testing: AtomicU64::new(0),
70 }
71 }
72
73 fn start_finalization(&self) -> Instant {
74 self.num_finalization_attempts.inc();
75 #[cfg(any(msim, test))]
76 self.num_finalization_attempts_for_testing
77 .fetch_add(1, Relaxed);
78 Instant::now()
79 }
80
81 fn finalization_succeeded(&self, start_time: Instant) {
82 let latency = start_time.elapsed();
83 self.num_successful_finalizations.inc();
84 self.finalization_latency.observe(latency.as_secs_f64());
85 #[cfg(test)]
86 self.num_successful_finalizations_for_testing
87 .fetch_add(1, Relaxed);
88 }
89}
90
91pub struct ValidatorTxFinalizerConfig {
92 pub tx_finalization_delay: Duration,
93 pub tx_finalization_timeout: Duration,
94 pub validator_delay_increments_sec: u64,
96 pub validator_max_delay: Duration,
97}
98
99#[cfg(not(any(msim, test)))]
100impl Default for ValidatorTxFinalizerConfig {
101 fn default() -> Self {
102 Self {
103 tx_finalization_delay: Duration::from_secs(60),
108 tx_finalization_timeout: Duration::from_secs(60),
110 validator_delay_increments_sec: 10,
111 validator_max_delay: Duration::from_secs(180),
112 }
113 }
114}
115
116#[cfg(any(msim, test))]
117impl Default for ValidatorTxFinalizerConfig {
118 fn default() -> Self {
119 Self {
120 tx_finalization_delay: Duration::from_secs(5),
121 tx_finalization_timeout: Duration::from_secs(60),
122 validator_delay_increments_sec: 1,
123 validator_max_delay: Duration::from_secs(15),
124 }
125 }
126}
127
128pub struct ValidatorTxFinalizer<C: Clone> {
133 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
134 name: AuthorityName,
135 config: Arc<ValidatorTxFinalizerConfig>,
136 metrics: Arc<ValidatorTxFinalizerMetrics>,
137}
138
139impl<C: Clone> ValidatorTxFinalizer<C> {
140 pub fn new(
141 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
142 name: AuthorityName,
143 registry: &Registry,
144 ) -> Self {
145 Self {
146 agg,
147 name,
148 config: Arc::new(ValidatorTxFinalizerConfig::default()),
149 metrics: Arc::new(ValidatorTxFinalizerMetrics::new(registry)),
150 }
151 }
152
153 #[cfg(test)]
154 pub(crate) fn new_for_testing(
155 agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
156 name: AuthorityName,
157 ) -> Self {
158 Self::new(agg, name, &Registry::new())
159 }
160
161 #[cfg(test)]
162 pub(crate) fn auth_agg(&self) -> &Arc<ArcSwap<AuthorityAggregator<C>>> {
163 &self.agg
164 }
165
166 #[cfg(any(msim, test))]
167 pub fn num_finalization_attempts_for_testing(&self) -> u64 {
168 self.metrics
169 .num_finalization_attempts_for_testing
170 .load(Relaxed)
171 }
172}
173
174impl<C> ValidatorTxFinalizer<C>
175where
176 C: Clone + AuthorityAPI + Send + Sync + 'static,
177{
178 pub async fn track_signed_tx(
179 &self,
180 cache_read: Arc<dyn TransactionCacheRead>,
181 epoch_store: &Arc<AuthorityPerEpochStore>,
182 tx: VerifiedSignedTransaction,
183 ) {
184 let tx_digest = *tx.digest();
185 trace!(?tx_digest, "Tracking signed transaction");
186 match self
187 .delay_and_finalize_tx(cache_read, epoch_store, tx)
188 .await
189 {
190 Ok(did_run) => {
191 if did_run {
192 debug!(?tx_digest, "Transaction finalized");
193 }
194 }
195 Err(err) => {
196 debug!(?tx_digest, "Failed to finalize transaction: {err}");
197 }
198 }
199 }
200
201 async fn delay_and_finalize_tx(
202 &self,
203 cache_read: Arc<dyn TransactionCacheRead>,
204 epoch_store: &Arc<AuthorityPerEpochStore>,
205 tx: VerifiedSignedTransaction,
206 ) -> anyhow::Result<bool> {
207 let tx_digest = *tx.digest();
208 let Some(tx_finalization_delay) = self.determine_finalization_delay(&tx_digest) else {
209 return Ok(false);
210 };
211 let digests = [tx_digest];
212 select! {
213 _ = tokio::time::sleep(tx_finalization_delay) => {
214 trace!(?tx_digest, "Waking up to finalize transaction");
215 }
216 _ = cache_read.notify_read_executed_effects_digests(
217 "ValidatorTxFinalizer::notify_read_executed_effects_digests",
218 &digests,
219 ) => {
220 trace!(?tx_digest, "Transaction already finalized");
221 return Ok(false);
222 }
223 }
224
225 if epoch_store.is_pending_consensus_certificate(&tx_digest) {
226 trace!(
227 ?tx_digest,
228 "Transaction has been submitted to consensus, no need to help drive finality"
229 );
230 return Ok(false);
231 }
232
233 self.metrics
234 .validator_tx_finalizer_attempt_delay
235 .observe(tx_finalization_delay.as_secs_f64());
236 let start_time = self.metrics.start_finalization();
237 debug!(
238 ?tx_digest,
239 "Invoking authority aggregator to finalize transaction"
240 );
241 tokio::time::timeout(
242 self.config.tx_finalization_timeout,
243 self.agg
244 .load()
245 .execute_transaction_block(tx.into_unsigned().inner(), None),
246 )
247 .await??;
248 self.metrics.finalization_succeeded(start_time);
249 Ok(true)
250 }
251
252 fn determine_finalization_delay(&self, tx_digest: &TransactionDigest) -> Option<Duration> {
259 let agg = self.agg.load();
260 let order = agg.committee.shuffle_by_stake_from_tx_digest(tx_digest);
261 let Some(position) = order.iter().position(|&name| name == self.name) else {
262 error!("Validator {} not found in the committee", self.name);
265 return None;
266 };
267 let extra_delay = position as u64 * self.config.validator_delay_increments_sec;
269 let delay = self
270 .config
271 .tx_finalization_delay
272 .add(Duration::from_secs(extra_delay));
273 Some(min(delay, self.config.validator_max_delay))
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use crate::authority::test_authority_builder::TestAuthorityBuilder;
280 use crate::authority::{AuthorityState, ExecutionEnv};
281 use crate::authority_aggregator::{AuthorityAggregator, AuthorityAggregatorBuilder};
282 use crate::authority_client::AuthorityAPI;
283 use crate::execution_scheduler::SchedulingSource;
284 use crate::validator_tx_finalizer::ValidatorTxFinalizer;
285 use arc_swap::ArcSwap;
286 use async_trait::async_trait;
287 use std::cmp::min;
288 use std::collections::BTreeMap;
289 use std::iter;
290 use std::net::SocketAddr;
291 use std::num::NonZeroUsize;
292 use std::sync::Arc;
293 use std::sync::atomic::AtomicBool;
294 use std::sync::atomic::Ordering::Relaxed;
295 use sui_macros::sim_test;
296 use sui_swarm_config::network_config_builder::ConfigBuilder;
297 use sui_test_transaction_builder::TestTransactionBuilder;
298 use sui_types::base_types::{AuthorityName, ObjectID, SuiAddress, TransactionDigest};
299 use sui_types::committee::{CommitteeTrait, StakeUnit};
300 use sui_types::crypto::{AccountKeyPair, get_account_key_pair};
301 use sui_types::effects::{TransactionEffectsAPI, TransactionEvents};
302 use sui_types::error::{SuiError, SuiErrorKind};
303 use sui_types::executable_transaction::VerifiedExecutableTransaction;
304 use sui_types::messages_checkpoint::{
305 CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
306 };
307 use sui_types::messages_grpc::{
308 HandleCertificateRequestV3, HandleCertificateResponseV2, HandleCertificateResponseV3,
309 HandleSoftBundleCertificatesRequestV3, HandleSoftBundleCertificatesResponseV3,
310 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SubmitTxRequest,
311 SubmitTxResponse, SystemStateRequest, TransactionInfoRequest, TransactionInfoResponse,
312 ValidatorHealthRequest, ValidatorHealthResponse, WaitForEffectsRequest,
313 WaitForEffectsResponse,
314 };
315 use sui_types::object::Object;
316 use sui_types::sui_system_state::SuiSystemState;
317 use sui_types::transaction::{
318 CertifiedTransaction, SignedTransaction, Transaction, VerifiedCertificate,
319 VerifiedSignedTransaction, VerifiedTransaction,
320 };
321 use sui_types::utils::to_sender_signed_transaction;
322
323 #[derive(Clone)]
324 struct MockAuthorityClient {
325 authority: Arc<AuthorityState>,
326 inject_fault: Arc<AtomicBool>,
327 }
328
329 #[async_trait]
330 impl AuthorityAPI for MockAuthorityClient {
331 async fn submit_transaction(
332 &self,
333 _request: SubmitTxRequest,
334 _client_addr: Option<SocketAddr>,
335 ) -> Result<SubmitTxResponse, SuiError> {
336 unimplemented!();
337 }
338
339 async fn wait_for_effects(
340 &self,
341 _request: WaitForEffectsRequest,
342 _client_addr: Option<SocketAddr>,
343 ) -> Result<WaitForEffectsResponse, SuiError> {
344 unimplemented!()
345 }
346
347 async fn handle_transaction(
348 &self,
349 transaction: Transaction,
350 _client_addr: Option<SocketAddr>,
351 ) -> Result<HandleTransactionResponse, SuiError> {
352 if self.inject_fault.load(Relaxed) {
353 return Err(SuiErrorKind::TimeoutError.into());
354 }
355 let epoch_store = self.authority.epoch_store_for_testing();
356 self.authority
357 .handle_transaction(
358 &epoch_store,
359 VerifiedTransaction::new_unchecked(transaction),
360 )
361 .await
362 }
363
364 async fn handle_certificate_v2(
365 &self,
366 certificate: CertifiedTransaction,
367 _client_addr: Option<SocketAddr>,
368 ) -> Result<HandleCertificateResponseV2, SuiError> {
369 let epoch_store = self.authority.epoch_store_for_testing();
370 let (effects, _) = self
371 .authority
372 .try_execute_immediately(
373 &VerifiedExecutableTransaction::new_from_certificate(
374 VerifiedCertificate::new_unchecked(certificate),
375 ),
376 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
377 &epoch_store,
378 )
379 .await
380 .unwrap();
381 let events = if effects.events_digest().is_some() {
382 self.authority
383 .get_transaction_events(effects.transaction_digest())?
384 } else {
385 TransactionEvents::default()
386 };
387 let signed_effects = self
388 .authority
389 .sign_effects(effects, &epoch_store)?
390 .into_inner();
391 Ok(HandleCertificateResponseV2 {
392 signed_effects,
393 events,
394 fastpath_input_objects: vec![],
395 })
396 }
397
398 async fn handle_certificate_v3(
399 &self,
400 _request: HandleCertificateRequestV3,
401 _client_addr: Option<SocketAddr>,
402 ) -> Result<HandleCertificateResponseV3, SuiError> {
403 unimplemented!()
404 }
405
406 async fn handle_soft_bundle_certificates_v3(
407 &self,
408 _request: HandleSoftBundleCertificatesRequestV3,
409 _client_addr: Option<SocketAddr>,
410 ) -> Result<HandleSoftBundleCertificatesResponseV3, SuiError> {
411 unimplemented!()
412 }
413
414 async fn handle_object_info_request(
415 &self,
416 _request: ObjectInfoRequest,
417 ) -> Result<ObjectInfoResponse, SuiError> {
418 unimplemented!()
419 }
420
421 async fn handle_transaction_info_request(
422 &self,
423 _request: TransactionInfoRequest,
424 ) -> Result<TransactionInfoResponse, SuiError> {
425 unimplemented!()
426 }
427
428 async fn handle_checkpoint(
429 &self,
430 _request: CheckpointRequest,
431 ) -> Result<CheckpointResponse, SuiError> {
432 unimplemented!()
433 }
434
435 async fn handle_checkpoint_v2(
436 &self,
437 _request: CheckpointRequestV2,
438 ) -> Result<CheckpointResponseV2, SuiError> {
439 unimplemented!()
440 }
441
442 async fn handle_system_state_object(
443 &self,
444 _request: SystemStateRequest,
445 ) -> Result<SuiSystemState, SuiError> {
446 unimplemented!()
447 }
448
449 async fn validator_health(
450 &self,
451 _request: ValidatorHealthRequest,
452 ) -> Result<ValidatorHealthResponse, SuiError> {
453 Ok(ValidatorHealthResponse {
454 last_committed_leader_round: 1000,
455 last_locally_built_checkpoint: 500,
456 ..Default::default()
457 })
458 }
459 }
460
461 #[sim_test]
462 async fn test_validator_tx_finalizer_basic_flow() {
463 telemetry_subscribers::init_for_testing();
464 let (sender, keypair) = get_account_key_pair();
465 let gas_object = Object::with_owner_for_testing(sender);
466 let gas_object_id = gas_object.id();
467 let (states, auth_agg, clients) = create_validators(gas_object).await;
468 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
469 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
470 let tx_digest = *signed_tx.digest();
471 let cache_read = states[0].get_transaction_cache_reader().clone();
472 let epoch_store = states[0].epoch_store_for_testing();
473 let metrics = finalizer1.metrics.clone();
474 let handle = tokio::spawn(async move {
475 finalizer1
476 .track_signed_tx(cache_read, &epoch_store, signed_tx)
477 .await;
478 });
479 handle.await.unwrap();
480 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
481 assert_eq!(
482 metrics.num_finalization_attempts_for_testing.load(Relaxed),
483 1
484 );
485 assert_eq!(
486 metrics
487 .num_successful_finalizations_for_testing
488 .load(Relaxed),
489 1
490 );
491 }
492
493 #[tokio::test]
494 async fn test_validator_tx_finalizer_new_epoch() {
495 let (sender, keypair) = get_account_key_pair();
496 let gas_object = Object::with_owner_for_testing(sender);
497 let gas_object_id = gas_object.id();
498 let (states, auth_agg, clients) = create_validators(gas_object).await;
499 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
500 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
501 let tx_digest = *signed_tx.digest();
502 let epoch_store = states[0].epoch_store_for_testing();
503 let cache_read = states[0].get_transaction_cache_reader().clone();
504
505 let metrics = finalizer1.metrics.clone();
506 let handle = tokio::spawn(async move {
507 let _ = epoch_store
508 .within_alive_epoch(finalizer1.track_signed_tx(cache_read, &epoch_store, signed_tx))
509 .await;
510 });
511 states[0].reconfigure_for_testing().await;
512 handle.await.unwrap();
513 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
514 assert_eq!(
515 metrics.num_finalization_attempts_for_testing.load(Relaxed),
516 0
517 );
518 assert_eq!(
519 metrics
520 .num_successful_finalizations_for_testing
521 .load(Relaxed),
522 0
523 );
524 }
525
526 #[tokio::test]
527 async fn test_validator_tx_finalizer_auth_agg_reconfig() {
528 let (sender, _) = get_account_key_pair();
529 let gas_object = Object::with_owner_for_testing(sender);
530 let (states, auth_agg, _clients) = create_validators(gas_object).await;
531 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
532 let mut new_auth_agg = (**auth_agg.load()).clone();
533 let mut new_committee = (*new_auth_agg.committee).clone();
534 new_committee.epoch = 100;
535 new_auth_agg.committee = Arc::new(new_committee);
536 auth_agg.store(Arc::new(new_auth_agg));
537 assert_eq!(
538 finalizer1.auth_agg().load().committee.epoch,
539 100,
540 "AuthorityAggregator not updated"
541 );
542 }
543
544 #[tokio::test]
545 async fn test_validator_tx_finalizer_already_executed() {
546 telemetry_subscribers::init_for_testing();
547 let (sender, keypair) = get_account_key_pair();
548 let gas_object = Object::with_owner_for_testing(sender);
549 let gas_object_id = gas_object.id();
550 let (states, auth_agg, clients) = create_validators(gas_object).await;
551 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
552 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
553 let tx_digest = *signed_tx.digest();
554 let cache_read = states[0].get_transaction_cache_reader().clone();
555 let epoch_store = states[0].epoch_store_for_testing();
556
557 let metrics = finalizer1.metrics.clone();
558 let signed_tx_clone = signed_tx.clone();
559 let handle = tokio::spawn(async move {
560 finalizer1
561 .track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
562 .await;
563 });
564 auth_agg
565 .load()
566 .execute_transaction_block(&signed_tx.into_inner().into_unsigned(), None)
567 .await
568 .unwrap();
569 handle.await.unwrap();
570 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
571 assert_eq!(
572 metrics.num_finalization_attempts_for_testing.load(Relaxed),
573 0
574 );
575 assert_eq!(
576 metrics
577 .num_successful_finalizations_for_testing
578 .load(Relaxed),
579 0
580 );
581 }
582
583 #[tokio::test]
584 async fn test_validator_tx_finalizer_timeout() {
585 telemetry_subscribers::init_for_testing();
586 let (sender, keypair) = get_account_key_pair();
587 let gas_object = Object::with_owner_for_testing(sender);
588 let gas_object_id = gas_object.id();
589 let (states, auth_agg, clients) = create_validators(gas_object).await;
590 let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
591 let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
592 let tx_digest = *signed_tx.digest();
593 let cache_read = states[0].get_transaction_cache_reader().clone();
594 let epoch_store = states[0].epoch_store_for_testing();
595 for client in clients.values() {
596 client.inject_fault.store(true, Relaxed);
597 }
598
599 let metrics = finalizer1.metrics.clone();
600 let signed_tx_clone = signed_tx.clone();
601 let handle = tokio::spawn(async move {
602 finalizer1
603 .track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
604 .await;
605 });
606 handle.await.unwrap();
607 check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
608 assert_eq!(
609 metrics.num_finalization_attempts_for_testing.load(Relaxed),
610 1
611 );
612 assert_eq!(
613 metrics
614 .num_successful_finalizations_for_testing
615 .load(Relaxed),
616 0
617 );
618 }
619
620 #[tokio::test]
621 async fn test_validator_tx_finalizer_determine_finalization_delay() {
622 const COMMITTEE_SIZE: usize = 15;
623 let network_config = ConfigBuilder::new_with_temp_dir()
624 .committee_size(NonZeroUsize::new(COMMITTEE_SIZE).unwrap())
625 .build();
626 let (auth_agg, _) = AuthorityAggregatorBuilder::from_network_config(&network_config)
627 .build_network_clients();
628 let auth_agg = Arc::new(auth_agg);
629 let finalizers = (0..COMMITTEE_SIZE)
630 .map(|idx| {
631 ValidatorTxFinalizer::new_for_testing(
632 Arc::new(ArcSwap::new(auth_agg.clone())),
633 auth_agg.committee.voting_rights[idx].0,
634 )
635 })
636 .collect::<Vec<_>>();
637 let config = finalizers[0].config.clone();
638 for _ in 0..100 {
639 let tx_digest = TransactionDigest::random();
640 let mut delays: Vec<_> = finalizers
641 .iter()
642 .map(|finalizer| {
643 finalizer
644 .determine_finalization_delay(&tx_digest)
645 .map(|delay| delay.as_secs())
646 .unwrap()
647 })
648 .collect();
649 delays.sort();
650 for (idx, delay) in delays.iter().enumerate() {
651 assert_eq!(
652 *delay,
653 min(
654 config.validator_max_delay.as_secs(),
655 config.tx_finalization_delay.as_secs()
656 + idx as u64 * config.validator_delay_increments_sec
657 )
658 );
659 }
660 }
661 }
662
663 async fn create_validators(
664 gas_object: Object,
665 ) -> (
666 Vec<Arc<AuthorityState>>,
667 Arc<ArcSwap<AuthorityAggregator<MockAuthorityClient>>>,
668 BTreeMap<AuthorityName, MockAuthorityClient>,
669 ) {
670 let network_config = ConfigBuilder::new_with_temp_dir()
671 .committee_size(NonZeroUsize::new(4).unwrap())
672 .with_objects(iter::once(gas_object))
673 .build();
674 let mut authority_states = vec![];
675 for idx in 0..4 {
676 let state = TestAuthorityBuilder::new()
677 .with_network_config(&network_config, idx)
678 .build()
679 .await;
680 authority_states.push(state);
681 }
682 let clients: BTreeMap<_, _> = authority_states
683 .iter()
684 .map(|state| {
685 (
686 state.name,
687 MockAuthorityClient {
688 authority: state.clone(),
689 inject_fault: Arc::new(AtomicBool::new(false)),
690 },
691 )
692 })
693 .collect();
694 let auth_agg = AuthorityAggregatorBuilder::from_network_config(&network_config)
695 .build_custom_clients(clients.clone());
696 (
697 authority_states,
698 Arc::new(ArcSwap::new(Arc::new(auth_agg))),
699 clients,
700 )
701 }
702
703 async fn create_tx(
704 clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
705 state: &Arc<AuthorityState>,
706 sender: SuiAddress,
707 keypair: &AccountKeyPair,
708 gas_object_id: ObjectID,
709 ) -> VerifiedSignedTransaction {
710 let gas_object_ref = state
711 .get_object(&gas_object_id)
712 .await
713 .unwrap()
714 .compute_object_reference();
715 let tx_data = TestTransactionBuilder::new(
716 sender,
717 gas_object_ref,
718 state.reference_gas_price_for_testing().unwrap(),
719 )
720 .transfer_sui(None, sender)
721 .build();
722 let tx = to_sender_signed_transaction(tx_data, keypair);
723 let response = clients
724 .get(&state.name)
725 .unwrap()
726 .handle_transaction(tx.clone(), None)
727 .await
728 .unwrap();
729 VerifiedSignedTransaction::new_unchecked(SignedTransaction::new_from_data_and_sig(
730 tx.into_data(),
731 response.status.into_signed_for_testing(),
732 ))
733 }
734
735 fn check_quorum_execution(
736 auth_agg: &Arc<AuthorityAggregator<MockAuthorityClient>>,
737 clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
738 tx_digest: &TransactionDigest,
739 expected: bool,
740 ) {
741 let quorum = auth_agg.committee.quorum_threshold();
742 let executed_weight: StakeUnit = clients
743 .iter()
744 .filter_map(|(name, client)| {
745 client
746 .authority
747 .is_tx_already_executed(tx_digest)
748 .then_some(auth_agg.committee.weight(name))
749 })
750 .sum();
751 assert_eq!(executed_weight >= quorum, expected);
752 }
753}