sui_core/
validator_tx_finalizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
5use crate::authority_aggregator::AuthorityAggregator;
6use crate::authority_client::AuthorityAPI;
7use crate::execution_cache::TransactionCacheRead;
8use arc_swap::ArcSwap;
9use mysten_metrics::LATENCY_SEC_BUCKETS;
10use prometheus::{
11    Histogram, IntCounter, Registry, register_histogram_with_registry,
12    register_int_counter_with_registry,
13};
14use std::cmp::min;
15use std::ops::Add;
16use std::sync::Arc;
17#[cfg(any(msim, test))]
18use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
19use std::time::Duration;
20use sui_types::base_types::{AuthorityName, TransactionDigest};
21use sui_types::transaction::VerifiedSignedTransaction;
22use tokio::select;
23use tokio::time::Instant;
24use tracing::{debug, error, trace};
25
26struct ValidatorTxFinalizerMetrics {
27    num_finalization_attempts: IntCounter,
28    num_successful_finalizations: IntCounter,
29    finalization_latency: Histogram,
30    validator_tx_finalizer_attempt_delay: Histogram,
31    #[cfg(any(msim, test))]
32    num_finalization_attempts_for_testing: AtomicU64,
33    #[cfg(test)]
34    num_successful_finalizations_for_testing: AtomicU64,
35}
36
37impl ValidatorTxFinalizerMetrics {
38    fn new(registry: &Registry) -> Self {
39        Self {
40            num_finalization_attempts: register_int_counter_with_registry!(
41                "validator_tx_finalizer_num_finalization_attempts",
42                "Total number of attempts to finalize a transaction",
43                registry,
44            )
45            .unwrap(),
46            num_successful_finalizations: register_int_counter_with_registry!(
47                "validator_tx_finalizer_num_successful_finalizations",
48                "Number of transactions successfully finalized",
49                registry,
50            )
51            .unwrap(),
52            finalization_latency: register_histogram_with_registry!(
53                "validator_tx_finalizer_finalization_latency",
54                "Latency of transaction finalization",
55                LATENCY_SEC_BUCKETS.to_vec(),
56                registry,
57            )
58            .unwrap(),
59            validator_tx_finalizer_attempt_delay: register_histogram_with_registry!(
60                "validator_tx_finalizer_attempt_delay",
61                "Duration that a validator in the committee waited before attempting to finalize the transaction",
62                vec![60.0, 70.0, 80.0, 90.0, 100.0, 110.0, 120.0, 130.0, 140.0, 150.0, 160.0, 170.0, 180.0],
63                registry,
64            )
65            .unwrap(),
66            #[cfg(any(msim, test))]
67            num_finalization_attempts_for_testing: AtomicU64::new(0),
68            #[cfg(test)]
69            num_successful_finalizations_for_testing: AtomicU64::new(0),
70        }
71    }
72
73    fn start_finalization(&self) -> Instant {
74        self.num_finalization_attempts.inc();
75        #[cfg(any(msim, test))]
76        self.num_finalization_attempts_for_testing
77            .fetch_add(1, Relaxed);
78        Instant::now()
79    }
80
81    fn finalization_succeeded(&self, start_time: Instant) {
82        let latency = start_time.elapsed();
83        self.num_successful_finalizations.inc();
84        self.finalization_latency.observe(latency.as_secs_f64());
85        #[cfg(test)]
86        self.num_successful_finalizations_for_testing
87            .fetch_add(1, Relaxed);
88    }
89}
90
91pub struct ValidatorTxFinalizerConfig {
92    pub tx_finalization_delay: Duration,
93    pub tx_finalization_timeout: Duration,
94    /// Incremental delay for validators to wake up to finalize a transaction.
95    pub validator_delay_increments_sec: u64,
96    pub validator_max_delay: Duration,
97}
98
99#[cfg(not(any(msim, test)))]
100impl Default for ValidatorTxFinalizerConfig {
101    fn default() -> Self {
102        Self {
103            // Only wake up the transaction finalization task for a given transaction
104            // after 1 mins of seeing it. This gives plenty of time for the transaction
105            // to become final in the normal way. We also don't want this delay to be too long
106            // to reduce memory usage held up by the finalizer threads.
107            tx_finalization_delay: Duration::from_secs(60),
108            // If a transaction can not be finalized within 1 min of being woken up, give up.
109            tx_finalization_timeout: Duration::from_secs(60),
110            validator_delay_increments_sec: 10,
111            validator_max_delay: Duration::from_secs(180),
112        }
113    }
114}
115
116#[cfg(any(msim, test))]
117impl Default for ValidatorTxFinalizerConfig {
118    fn default() -> Self {
119        Self {
120            tx_finalization_delay: Duration::from_secs(5),
121            tx_finalization_timeout: Duration::from_secs(60),
122            validator_delay_increments_sec: 1,
123            validator_max_delay: Duration::from_secs(15),
124        }
125    }
126}
127
128/// The `ValidatorTxFinalizer` is responsible for finalizing transactions that
129/// have been signed by the validator. It does this by waiting for a delay
130/// after the transaction has been signed, and then attempting to finalize
131/// the transaction if it has not yet been done by a fullnode.
132pub struct ValidatorTxFinalizer<C: Clone> {
133    agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
134    name: AuthorityName,
135    config: Arc<ValidatorTxFinalizerConfig>,
136    metrics: Arc<ValidatorTxFinalizerMetrics>,
137}
138
139impl<C: Clone> ValidatorTxFinalizer<C> {
140    pub fn new(
141        agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
142        name: AuthorityName,
143        registry: &Registry,
144    ) -> Self {
145        Self {
146            agg,
147            name,
148            config: Arc::new(ValidatorTxFinalizerConfig::default()),
149            metrics: Arc::new(ValidatorTxFinalizerMetrics::new(registry)),
150        }
151    }
152
153    #[cfg(test)]
154    pub(crate) fn new_for_testing(
155        agg: Arc<ArcSwap<AuthorityAggregator<C>>>,
156        name: AuthorityName,
157    ) -> Self {
158        Self::new(agg, name, &Registry::new())
159    }
160
161    #[cfg(test)]
162    pub(crate) fn auth_agg(&self) -> &Arc<ArcSwap<AuthorityAggregator<C>>> {
163        &self.agg
164    }
165
166    #[cfg(any(msim, test))]
167    pub fn num_finalization_attempts_for_testing(&self) -> u64 {
168        self.metrics
169            .num_finalization_attempts_for_testing
170            .load(Relaxed)
171    }
172}
173
174impl<C> ValidatorTxFinalizer<C>
175where
176    C: Clone + AuthorityAPI + Send + Sync + 'static,
177{
178    pub async fn track_signed_tx(
179        &self,
180        cache_read: Arc<dyn TransactionCacheRead>,
181        epoch_store: &Arc<AuthorityPerEpochStore>,
182        tx: VerifiedSignedTransaction,
183    ) {
184        let tx_digest = *tx.digest();
185        trace!(?tx_digest, "Tracking signed transaction");
186        match self
187            .delay_and_finalize_tx(cache_read, epoch_store, tx)
188            .await
189        {
190            Ok(did_run) => {
191                if did_run {
192                    debug!(?tx_digest, "Transaction finalized");
193                }
194            }
195            Err(err) => {
196                debug!(?tx_digest, "Failed to finalize transaction: {err}");
197            }
198        }
199    }
200
201    async fn delay_and_finalize_tx(
202        &self,
203        cache_read: Arc<dyn TransactionCacheRead>,
204        epoch_store: &Arc<AuthorityPerEpochStore>,
205        tx: VerifiedSignedTransaction,
206    ) -> anyhow::Result<bool> {
207        let tx_digest = *tx.digest();
208        let Some(tx_finalization_delay) = self.determine_finalization_delay(&tx_digest) else {
209            return Ok(false);
210        };
211        let digests = [tx_digest];
212        select! {
213            _ = tokio::time::sleep(tx_finalization_delay) => {
214                trace!(?tx_digest, "Waking up to finalize transaction");
215            }
216            _ = cache_read.notify_read_executed_effects_digests(
217                "ValidatorTxFinalizer::notify_read_executed_effects_digests",
218                &digests,
219            ) => {
220                trace!(?tx_digest, "Transaction already finalized");
221                return Ok(false);
222            }
223        }
224
225        if epoch_store.is_pending_consensus_certificate(&tx_digest) {
226            trace!(
227                ?tx_digest,
228                "Transaction has been submitted to consensus, no need to help drive finality"
229            );
230            return Ok(false);
231        }
232
233        self.metrics
234            .validator_tx_finalizer_attempt_delay
235            .observe(tx_finalization_delay.as_secs_f64());
236        let start_time = self.metrics.start_finalization();
237        debug!(
238            ?tx_digest,
239            "Invoking authority aggregator to finalize transaction"
240        );
241        tokio::time::timeout(
242            self.config.tx_finalization_timeout,
243            self.agg
244                .load()
245                .execute_transaction_block(tx.into_unsigned().inner(), None),
246        )
247        .await??;
248        self.metrics.finalization_succeeded(start_time);
249        Ok(true)
250    }
251
252    // We want to avoid all validators waking up at the same time to finalize the same transaction.
253    // That can lead to a waste of resource and flood the network unnecessarily.
254    // Here we use the transaction digest to determine an order of all validators.
255    // Validators will wake up one by one with incremental delays to finalize the transaction.
256    // The hope is that the first few should be able to finalize the transaction,
257    // and the rest will see it already executed and do not need to do anything.
258    fn determine_finalization_delay(&self, tx_digest: &TransactionDigest) -> Option<Duration> {
259        let agg = self.agg.load();
260        let order = agg.committee.shuffle_by_stake_from_tx_digest(tx_digest);
261        let Some(position) = order.iter().position(|&name| name == self.name) else {
262            // Somehow the validator is not found in the committee. This should never happen.
263            // TODO: This is where we should report system invariant violation.
264            error!("Validator {} not found in the committee", self.name);
265            return None;
266        };
267        // TODO: As an optimization, we could also limit the number of validators that would do this.
268        let extra_delay = position as u64 * self.config.validator_delay_increments_sec;
269        let delay = self
270            .config
271            .tx_finalization_delay
272            .add(Duration::from_secs(extra_delay));
273        Some(min(delay, self.config.validator_max_delay))
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use crate::authority::test_authority_builder::TestAuthorityBuilder;
280    use crate::authority::{AuthorityState, ExecutionEnv};
281    use crate::authority_aggregator::{AuthorityAggregator, AuthorityAggregatorBuilder};
282    use crate::authority_client::AuthorityAPI;
283    use crate::execution_scheduler::SchedulingSource;
284    use crate::validator_tx_finalizer::ValidatorTxFinalizer;
285    use arc_swap::ArcSwap;
286    use async_trait::async_trait;
287    use std::cmp::min;
288    use std::collections::BTreeMap;
289    use std::iter;
290    use std::net::SocketAddr;
291    use std::num::NonZeroUsize;
292    use std::sync::Arc;
293    use std::sync::atomic::AtomicBool;
294    use std::sync::atomic::Ordering::Relaxed;
295    use sui_macros::sim_test;
296    use sui_swarm_config::network_config_builder::ConfigBuilder;
297    use sui_test_transaction_builder::TestTransactionBuilder;
298    use sui_types::base_types::{AuthorityName, ObjectID, SuiAddress, TransactionDigest};
299    use sui_types::committee::{CommitteeTrait, StakeUnit};
300    use sui_types::crypto::{AccountKeyPair, get_account_key_pair};
301    use sui_types::effects::{TransactionEffectsAPI, TransactionEvents};
302    use sui_types::error::{SuiError, SuiErrorKind};
303    use sui_types::executable_transaction::VerifiedExecutableTransaction;
304    use sui_types::messages_checkpoint::{
305        CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
306    };
307    use sui_types::messages_grpc::{
308        HandleCertificateRequestV3, HandleCertificateResponseV2, HandleCertificateResponseV3,
309        HandleSoftBundleCertificatesRequestV3, HandleSoftBundleCertificatesResponseV3,
310        HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SubmitTxRequest,
311        SubmitTxResponse, SystemStateRequest, TransactionInfoRequest, TransactionInfoResponse,
312        ValidatorHealthRequest, ValidatorHealthResponse, WaitForEffectsRequest,
313        WaitForEffectsResponse,
314    };
315    use sui_types::object::Object;
316    use sui_types::sui_system_state::SuiSystemState;
317    use sui_types::transaction::{
318        CertifiedTransaction, SignedTransaction, Transaction, VerifiedCertificate,
319        VerifiedSignedTransaction, VerifiedTransaction,
320    };
321    use sui_types::utils::to_sender_signed_transaction;
322
323    #[derive(Clone)]
324    struct MockAuthorityClient {
325        authority: Arc<AuthorityState>,
326        inject_fault: Arc<AtomicBool>,
327    }
328
329    #[async_trait]
330    impl AuthorityAPI for MockAuthorityClient {
331        async fn submit_transaction(
332            &self,
333            _request: SubmitTxRequest,
334            _client_addr: Option<SocketAddr>,
335        ) -> Result<SubmitTxResponse, SuiError> {
336            unimplemented!();
337        }
338
339        async fn wait_for_effects(
340            &self,
341            _request: WaitForEffectsRequest,
342            _client_addr: Option<SocketAddr>,
343        ) -> Result<WaitForEffectsResponse, SuiError> {
344            unimplemented!()
345        }
346
347        async fn handle_transaction(
348            &self,
349            transaction: Transaction,
350            _client_addr: Option<SocketAddr>,
351        ) -> Result<HandleTransactionResponse, SuiError> {
352            if self.inject_fault.load(Relaxed) {
353                return Err(SuiErrorKind::TimeoutError.into());
354            }
355            let epoch_store = self.authority.epoch_store_for_testing();
356            self.authority
357                .handle_transaction(
358                    &epoch_store,
359                    VerifiedTransaction::new_unchecked(transaction),
360                )
361                .await
362        }
363
364        async fn handle_certificate_v2(
365            &self,
366            certificate: CertifiedTransaction,
367            _client_addr: Option<SocketAddr>,
368        ) -> Result<HandleCertificateResponseV2, SuiError> {
369            let epoch_store = self.authority.epoch_store_for_testing();
370            let (effects, _) = self
371                .authority
372                .try_execute_immediately(
373                    &VerifiedExecutableTransaction::new_from_certificate(
374                        VerifiedCertificate::new_unchecked(certificate),
375                    ),
376                    ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
377                    &epoch_store,
378                )
379                .await
380                .unwrap();
381            let events = if effects.events_digest().is_some() {
382                self.authority
383                    .get_transaction_events(effects.transaction_digest())?
384            } else {
385                TransactionEvents::default()
386            };
387            let signed_effects = self
388                .authority
389                .sign_effects(effects, &epoch_store)?
390                .into_inner();
391            Ok(HandleCertificateResponseV2 {
392                signed_effects,
393                events,
394                fastpath_input_objects: vec![],
395            })
396        }
397
398        async fn handle_certificate_v3(
399            &self,
400            _request: HandleCertificateRequestV3,
401            _client_addr: Option<SocketAddr>,
402        ) -> Result<HandleCertificateResponseV3, SuiError> {
403            unimplemented!()
404        }
405
406        async fn handle_soft_bundle_certificates_v3(
407            &self,
408            _request: HandleSoftBundleCertificatesRequestV3,
409            _client_addr: Option<SocketAddr>,
410        ) -> Result<HandleSoftBundleCertificatesResponseV3, SuiError> {
411            unimplemented!()
412        }
413
414        async fn handle_object_info_request(
415            &self,
416            _request: ObjectInfoRequest,
417        ) -> Result<ObjectInfoResponse, SuiError> {
418            unimplemented!()
419        }
420
421        async fn handle_transaction_info_request(
422            &self,
423            _request: TransactionInfoRequest,
424        ) -> Result<TransactionInfoResponse, SuiError> {
425            unimplemented!()
426        }
427
428        async fn handle_checkpoint(
429            &self,
430            _request: CheckpointRequest,
431        ) -> Result<CheckpointResponse, SuiError> {
432            unimplemented!()
433        }
434
435        async fn handle_checkpoint_v2(
436            &self,
437            _request: CheckpointRequestV2,
438        ) -> Result<CheckpointResponseV2, SuiError> {
439            unimplemented!()
440        }
441
442        async fn handle_system_state_object(
443            &self,
444            _request: SystemStateRequest,
445        ) -> Result<SuiSystemState, SuiError> {
446            unimplemented!()
447        }
448
449        async fn validator_health(
450            &self,
451            _request: ValidatorHealthRequest,
452        ) -> Result<ValidatorHealthResponse, SuiError> {
453            Ok(ValidatorHealthResponse {
454                last_committed_leader_round: 1000,
455                last_locally_built_checkpoint: 500,
456                ..Default::default()
457            })
458        }
459    }
460
461    #[sim_test]
462    async fn test_validator_tx_finalizer_basic_flow() {
463        telemetry_subscribers::init_for_testing();
464        let (sender, keypair) = get_account_key_pair();
465        let gas_object = Object::with_owner_for_testing(sender);
466        let gas_object_id = gas_object.id();
467        let (states, auth_agg, clients) = create_validators(gas_object).await;
468        let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
469        let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
470        let tx_digest = *signed_tx.digest();
471        let cache_read = states[0].get_transaction_cache_reader().clone();
472        let epoch_store = states[0].epoch_store_for_testing();
473        let metrics = finalizer1.metrics.clone();
474        let handle = tokio::spawn(async move {
475            finalizer1
476                .track_signed_tx(cache_read, &epoch_store, signed_tx)
477                .await;
478        });
479        handle.await.unwrap();
480        check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
481        assert_eq!(
482            metrics.num_finalization_attempts_for_testing.load(Relaxed),
483            1
484        );
485        assert_eq!(
486            metrics
487                .num_successful_finalizations_for_testing
488                .load(Relaxed),
489            1
490        );
491    }
492
493    #[tokio::test]
494    async fn test_validator_tx_finalizer_new_epoch() {
495        let (sender, keypair) = get_account_key_pair();
496        let gas_object = Object::with_owner_for_testing(sender);
497        let gas_object_id = gas_object.id();
498        let (states, auth_agg, clients) = create_validators(gas_object).await;
499        let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
500        let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
501        let tx_digest = *signed_tx.digest();
502        let epoch_store = states[0].epoch_store_for_testing();
503        let cache_read = states[0].get_transaction_cache_reader().clone();
504
505        let metrics = finalizer1.metrics.clone();
506        let handle = tokio::spawn(async move {
507            let _ = epoch_store
508                .within_alive_epoch(finalizer1.track_signed_tx(cache_read, &epoch_store, signed_tx))
509                .await;
510        });
511        states[0].reconfigure_for_testing().await;
512        handle.await.unwrap();
513        check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
514        assert_eq!(
515            metrics.num_finalization_attempts_for_testing.load(Relaxed),
516            0
517        );
518        assert_eq!(
519            metrics
520                .num_successful_finalizations_for_testing
521                .load(Relaxed),
522            0
523        );
524    }
525
526    #[tokio::test]
527    async fn test_validator_tx_finalizer_auth_agg_reconfig() {
528        let (sender, _) = get_account_key_pair();
529        let gas_object = Object::with_owner_for_testing(sender);
530        let (states, auth_agg, _clients) = create_validators(gas_object).await;
531        let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
532        let mut new_auth_agg = (**auth_agg.load()).clone();
533        let mut new_committee = (*new_auth_agg.committee).clone();
534        new_committee.epoch = 100;
535        new_auth_agg.committee = Arc::new(new_committee);
536        auth_agg.store(Arc::new(new_auth_agg));
537        assert_eq!(
538            finalizer1.auth_agg().load().committee.epoch,
539            100,
540            "AuthorityAggregator not updated"
541        );
542    }
543
544    #[tokio::test]
545    async fn test_validator_tx_finalizer_already_executed() {
546        telemetry_subscribers::init_for_testing();
547        let (sender, keypair) = get_account_key_pair();
548        let gas_object = Object::with_owner_for_testing(sender);
549        let gas_object_id = gas_object.id();
550        let (states, auth_agg, clients) = create_validators(gas_object).await;
551        let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
552        let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
553        let tx_digest = *signed_tx.digest();
554        let cache_read = states[0].get_transaction_cache_reader().clone();
555        let epoch_store = states[0].epoch_store_for_testing();
556
557        let metrics = finalizer1.metrics.clone();
558        let signed_tx_clone = signed_tx.clone();
559        let handle = tokio::spawn(async move {
560            finalizer1
561                .track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
562                .await;
563        });
564        auth_agg
565            .load()
566            .execute_transaction_block(&signed_tx.into_inner().into_unsigned(), None)
567            .await
568            .unwrap();
569        handle.await.unwrap();
570        check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, true);
571        assert_eq!(
572            metrics.num_finalization_attempts_for_testing.load(Relaxed),
573            0
574        );
575        assert_eq!(
576            metrics
577                .num_successful_finalizations_for_testing
578                .load(Relaxed),
579            0
580        );
581    }
582
583    #[tokio::test]
584    async fn test_validator_tx_finalizer_timeout() {
585        telemetry_subscribers::init_for_testing();
586        let (sender, keypair) = get_account_key_pair();
587        let gas_object = Object::with_owner_for_testing(sender);
588        let gas_object_id = gas_object.id();
589        let (states, auth_agg, clients) = create_validators(gas_object).await;
590        let finalizer1 = ValidatorTxFinalizer::new_for_testing(auth_agg.clone(), states[0].name);
591        let signed_tx = create_tx(&clients, &states[0], sender, &keypair, gas_object_id).await;
592        let tx_digest = *signed_tx.digest();
593        let cache_read = states[0].get_transaction_cache_reader().clone();
594        let epoch_store = states[0].epoch_store_for_testing();
595        for client in clients.values() {
596            client.inject_fault.store(true, Relaxed);
597        }
598
599        let metrics = finalizer1.metrics.clone();
600        let signed_tx_clone = signed_tx.clone();
601        let handle = tokio::spawn(async move {
602            finalizer1
603                .track_signed_tx(cache_read, &epoch_store, signed_tx_clone)
604                .await;
605        });
606        handle.await.unwrap();
607        check_quorum_execution(&auth_agg.load(), &clients, &tx_digest, false);
608        assert_eq!(
609            metrics.num_finalization_attempts_for_testing.load(Relaxed),
610            1
611        );
612        assert_eq!(
613            metrics
614                .num_successful_finalizations_for_testing
615                .load(Relaxed),
616            0
617        );
618    }
619
620    #[tokio::test]
621    async fn test_validator_tx_finalizer_determine_finalization_delay() {
622        const COMMITTEE_SIZE: usize = 15;
623        let network_config = ConfigBuilder::new_with_temp_dir()
624            .committee_size(NonZeroUsize::new(COMMITTEE_SIZE).unwrap())
625            .build();
626        let (auth_agg, _) = AuthorityAggregatorBuilder::from_network_config(&network_config)
627            .build_network_clients();
628        let auth_agg = Arc::new(auth_agg);
629        let finalizers = (0..COMMITTEE_SIZE)
630            .map(|idx| {
631                ValidatorTxFinalizer::new_for_testing(
632                    Arc::new(ArcSwap::new(auth_agg.clone())),
633                    auth_agg.committee.voting_rights[idx].0,
634                )
635            })
636            .collect::<Vec<_>>();
637        let config = finalizers[0].config.clone();
638        for _ in 0..100 {
639            let tx_digest = TransactionDigest::random();
640            let mut delays: Vec<_> = finalizers
641                .iter()
642                .map(|finalizer| {
643                    finalizer
644                        .determine_finalization_delay(&tx_digest)
645                        .map(|delay| delay.as_secs())
646                        .unwrap()
647                })
648                .collect();
649            delays.sort();
650            for (idx, delay) in delays.iter().enumerate() {
651                assert_eq!(
652                    *delay,
653                    min(
654                        config.validator_max_delay.as_secs(),
655                        config.tx_finalization_delay.as_secs()
656                            + idx as u64 * config.validator_delay_increments_sec
657                    )
658                );
659            }
660        }
661    }
662
663    async fn create_validators(
664        gas_object: Object,
665    ) -> (
666        Vec<Arc<AuthorityState>>,
667        Arc<ArcSwap<AuthorityAggregator<MockAuthorityClient>>>,
668        BTreeMap<AuthorityName, MockAuthorityClient>,
669    ) {
670        let network_config = ConfigBuilder::new_with_temp_dir()
671            .committee_size(NonZeroUsize::new(4).unwrap())
672            .with_objects(iter::once(gas_object))
673            .build();
674        let mut authority_states = vec![];
675        for idx in 0..4 {
676            let state = TestAuthorityBuilder::new()
677                .with_network_config(&network_config, idx)
678                .build()
679                .await;
680            authority_states.push(state);
681        }
682        let clients: BTreeMap<_, _> = authority_states
683            .iter()
684            .map(|state| {
685                (
686                    state.name,
687                    MockAuthorityClient {
688                        authority: state.clone(),
689                        inject_fault: Arc::new(AtomicBool::new(false)),
690                    },
691                )
692            })
693            .collect();
694        let auth_agg = AuthorityAggregatorBuilder::from_network_config(&network_config)
695            .build_custom_clients(clients.clone());
696        (
697            authority_states,
698            Arc::new(ArcSwap::new(Arc::new(auth_agg))),
699            clients,
700        )
701    }
702
703    async fn create_tx(
704        clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
705        state: &Arc<AuthorityState>,
706        sender: SuiAddress,
707        keypair: &AccountKeyPair,
708        gas_object_id: ObjectID,
709    ) -> VerifiedSignedTransaction {
710        let gas_object_ref = state
711            .get_object(&gas_object_id)
712            .await
713            .unwrap()
714            .compute_object_reference();
715        let tx_data = TestTransactionBuilder::new(
716            sender,
717            gas_object_ref,
718            state.reference_gas_price_for_testing().unwrap(),
719        )
720        .transfer_sui(None, sender)
721        .build();
722        let tx = to_sender_signed_transaction(tx_data, keypair);
723        let response = clients
724            .get(&state.name)
725            .unwrap()
726            .handle_transaction(tx.clone(), None)
727            .await
728            .unwrap();
729        VerifiedSignedTransaction::new_unchecked(SignedTransaction::new_from_data_and_sig(
730            tx.into_data(),
731            response.status.into_signed_for_testing(),
732        ))
733    }
734
735    fn check_quorum_execution(
736        auth_agg: &Arc<AuthorityAggregator<MockAuthorityClient>>,
737        clients: &BTreeMap<AuthorityName, MockAuthorityClient>,
738        tx_digest: &TransactionDigest,
739        expected: bool,
740    ) {
741        let quorum = auth_agg.committee.quorum_threshold();
742        let executed_weight: StakeUnit = clients
743            .iter()
744            .filter_map(|(name, client)| {
745                client
746                    .authority
747                    .is_tx_already_executed(tx_digest)
748                    .then_some(auth_agg.committee.weight(name))
749            })
750            .sum();
751        assert_eq!(executed_weight >= quorum, expected);
752    }
753}