sui_core/
consensus_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet, VecDeque},
6    hash::Hash,
7    num::NonZeroUsize,
8    sync::Arc,
9    time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use arc_swap::ArcSwap;
13use consensus_config::Committee as ConsensusCommittee;
14use consensus_core::{CertifiedBlocksOutput, CommitConsumerMonitor, CommitIndex};
15use consensus_types::block::TransactionIndex;
16use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
17use itertools::Itertools as _;
18use lru::LruCache;
19use mysten_common::{
20    assert_reachable, assert_sometimes, debug_fatal, random_util::randomize_cache_capacity_in_tests,
21};
22use mysten_metrics::{
23    monitored_future,
24    monitored_mpsc::{self, UnboundedReceiver},
25    monitored_scope, spawn_monitored_task,
26};
27use parking_lot::RwLockWriteGuard;
28use serde::{Deserialize, Serialize};
29use sui_macros::{fail_point, fail_point_arg, fail_point_if};
30use sui_protocol_config::ProtocolConfig;
31use sui_types::{
32    SUI_RANDOMNESS_STATE_OBJECT_ID,
33    authenticator_state::ActiveJwk,
34    base_types::{
35        AuthorityName, ConciseableName, ConsensusObjectSequenceKey, SequenceNumber,
36        TransactionDigest,
37    },
38    crypto::RandomnessRound,
39    digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest},
40    executable_transaction::{
41        TrustedExecutableTransaction, VerifiedExecutableTransaction,
42        VerifiedExecutableTransactionWithAliases,
43    },
44    messages_checkpoint::CheckpointSignatureMessage,
45    messages_consensus::{
46        AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments,
47        ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
48        ExecutionTimeObservation,
49    },
50    sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
51    transaction::{
52        SenderSignedData, TransactionKey, VerifiedCertificate, VerifiedTransaction, WithAliases,
53    },
54};
55use tokio::task::JoinSet;
56use tracing::{debug, error, info, instrument, trace, warn};
57
58use crate::{
59    authority::{
60        AuthorityMetrics, AuthorityState, ExecutionEnv,
61        authority_per_epoch_store::{
62            AuthorityPerEpochStore, CancelConsensusCertificateReason, ConsensusStats,
63            ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStats,
64            consensus_quarantine::ConsensusCommitOutput,
65        },
66        backpressure::{BackpressureManager, BackpressureSubscriber},
67        consensus_tx_status_cache::ConsensusTxStatus,
68        epoch_start_configuration::EpochStartConfigTrait,
69        execution_time_estimator::ExecutionTimeEstimator,
70        shared_object_congestion_tracker::SharedObjectCongestionTracker,
71        shared_object_version_manager::{AssignedTxAndVersions, Schedulable},
72        transaction_deferral::{DeferralKey, DeferralReason, transaction_deferral_within_limit},
73    },
74    checkpoints::{
75        CheckpointService, CheckpointServiceNotify, PendingCheckpoint, PendingCheckpointInfo,
76    },
77    consensus_adapter::ConsensusAdapter,
78    consensus_throughput_calculator::ConsensusThroughputCalculator,
79    consensus_types::consensus_output_api::{ConsensusCommitAPI, parse_block_transactions},
80    epoch::{
81        randomness::{DkgStatus, RandomnessManager},
82        reconfiguration::ReconfigState,
83    },
84    execution_cache::ObjectCacheRead,
85    execution_scheduler::{ExecutionScheduler, SchedulingSource},
86    post_consensus_tx_reorder::PostConsensusTxReorder,
87    scoring_decision::update_low_scoring_authorities,
88    traffic_controller::{TrafficController, policies::TrafficTally},
89};
90
91pub struct ConsensusHandlerInitializer {
92    state: Arc<AuthorityState>,
93    checkpoint_service: Arc<CheckpointService>,
94    epoch_store: Arc<AuthorityPerEpochStore>,
95    consensus_adapter: Arc<ConsensusAdapter>,
96    low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
97    throughput_calculator: Arc<ConsensusThroughputCalculator>,
98    backpressure_manager: Arc<BackpressureManager>,
99}
100
101impl ConsensusHandlerInitializer {
102    pub fn new(
103        state: Arc<AuthorityState>,
104        checkpoint_service: Arc<CheckpointService>,
105        epoch_store: Arc<AuthorityPerEpochStore>,
106        consensus_adapter: Arc<ConsensusAdapter>,
107        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
108        throughput_calculator: Arc<ConsensusThroughputCalculator>,
109        backpressure_manager: Arc<BackpressureManager>,
110    ) -> Self {
111        Self {
112            state,
113            checkpoint_service,
114            epoch_store,
115            consensus_adapter,
116            low_scoring_authorities,
117            throughput_calculator,
118            backpressure_manager,
119        }
120    }
121
122    #[cfg(test)]
123    pub(crate) fn new_for_testing(
124        state: Arc<AuthorityState>,
125        checkpoint_service: Arc<CheckpointService>,
126    ) -> Self {
127        use crate::consensus_test_utils::make_consensus_adapter_for_test;
128        use std::collections::HashSet;
129
130        let backpressure_manager = BackpressureManager::new_for_tests();
131        let consensus_adapter =
132            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
133        Self {
134            state: state.clone(),
135            checkpoint_service,
136            epoch_store: state.epoch_store_for_testing().clone(),
137            consensus_adapter,
138            low_scoring_authorities: Arc::new(Default::default()),
139            throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
140                None,
141                state.metrics.clone(),
142            )),
143            backpressure_manager,
144        }
145    }
146
147    pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
148        let new_epoch_start_state = self.epoch_store.epoch_start_state();
149        let consensus_committee = new_epoch_start_state.get_consensus_committee();
150
151        ConsensusHandler::new(
152            self.epoch_store.clone(),
153            self.checkpoint_service.clone(),
154            self.state.execution_scheduler().clone(),
155            self.consensus_adapter.clone(),
156            self.state.get_object_cache_reader().clone(),
157            self.low_scoring_authorities.clone(),
158            consensus_committee,
159            self.state.metrics.clone(),
160            self.throughput_calculator.clone(),
161            self.backpressure_manager.subscribe(),
162            self.state.traffic_controller.clone(),
163        )
164    }
165
166    pub(crate) fn metrics(&self) -> &Arc<AuthorityMetrics> {
167        &self.state.metrics
168    }
169
170    pub(crate) fn backpressure_subscriber(&self) -> BackpressureSubscriber {
171        self.backpressure_manager.subscribe()
172    }
173}
174
175mod additional_consensus_state {
176    use std::marker::PhantomData;
177
178    use consensus_core::CommitRef;
179    use fastcrypto::hash::HashFunction as _;
180    use sui_types::{crypto::DefaultHash, digests::Digest};
181
182    use super::*;
183    /// AdditionalConsensusState tracks any in-memory state that is retained by ConsensusHandler
184    /// between consensus commits. Because of crash recovery, using such data is inherently risky.
185    /// In order to do this safely, we must store data from a fixed number of previous commits.
186    /// Then, at start-up, that same fixed number of already processed commits is replayed to
187    /// reconstruct the state.
188    ///
189    /// To make sure that bugs in this process appear immediately, we record the digest of this
190    /// state in ConsensusCommitPrologue, so that any deviation causes an immediate fork.
191    #[derive(Serialize, Deserialize)]
192    pub(super) struct AdditionalConsensusState {
193        commit_interval_observer: CommitIntervalObserver,
194    }
195
196    impl AdditionalConsensusState {
197        pub fn new(additional_consensus_state_window_size: u32) -> Self {
198            Self {
199                commit_interval_observer: CommitIntervalObserver::new(
200                    additional_consensus_state_window_size,
201                ),
202            }
203        }
204
205        /// Update all internal state based on the new commit
206        pub(crate) fn observe_commit(
207            &mut self,
208            protocol_config: &ProtocolConfig,
209            epoch_start_time: u64,
210            consensus_commit: &impl ConsensusCommitAPI,
211        ) -> ConsensusCommitInfo {
212            self.commit_interval_observer
213                .observe_commit_time(consensus_commit);
214
215            let estimated_commit_period = self
216                .commit_interval_observer
217                .commit_interval_estimate()
218                .unwrap_or(Duration::from_millis(
219                    protocol_config.min_checkpoint_interval_ms(),
220                ));
221
222            info!("estimated commit rate: {:?}", estimated_commit_period);
223
224            self.commit_info_impl(
225                epoch_start_time,
226                consensus_commit,
227                Some(estimated_commit_period),
228            )
229        }
230
231        fn commit_info_impl(
232            &self,
233            epoch_start_time: u64,
234            consensus_commit: &impl ConsensusCommitAPI,
235            estimated_commit_period: Option<Duration>,
236        ) -> ConsensusCommitInfo {
237            let leader_author = consensus_commit.leader_author_index();
238            let timestamp = consensus_commit.commit_timestamp_ms();
239
240            let timestamp = if timestamp < epoch_start_time {
241                error!(
242                    "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}"
243                );
244                epoch_start_time
245            } else {
246                timestamp
247            };
248
249            ConsensusCommitInfo {
250                _phantom: PhantomData,
251                round: consensus_commit.leader_round(),
252                timestamp,
253                leader_author,
254                consensus_commit_ref: consensus_commit.commit_ref(),
255                rejected_transactions_digest: consensus_commit.rejected_transactions_digest(),
256                additional_state_digest: Some(self.digest()),
257                estimated_commit_period,
258                skip_consensus_commit_prologue_in_test: false,
259            }
260        }
261
262        /// Get the digest of the current state.
263        fn digest(&self) -> AdditionalConsensusStateDigest {
264            let mut hash = DefaultHash::new();
265            bcs::serialize_into(&mut hash, self).unwrap();
266            AdditionalConsensusStateDigest::new(hash.finalize().into())
267        }
268    }
269
270    pub struct ConsensusCommitInfo {
271        // prevent public construction
272        _phantom: PhantomData<()>,
273
274        pub round: u64,
275        pub timestamp: u64,
276        pub leader_author: AuthorityIndex,
277        pub consensus_commit_ref: CommitRef,
278        pub rejected_transactions_digest: Digest,
279
280        additional_state_digest: Option<AdditionalConsensusStateDigest>,
281        estimated_commit_period: Option<Duration>,
282
283        pub skip_consensus_commit_prologue_in_test: bool,
284    }
285
286    impl ConsensusCommitInfo {
287        pub fn new_for_test(
288            commit_round: u64,
289            commit_timestamp: u64,
290            estimated_commit_period: Option<Duration>,
291            skip_consensus_commit_prologue_in_test: bool,
292        ) -> Self {
293            Self {
294                _phantom: PhantomData,
295                round: commit_round,
296                timestamp: commit_timestamp,
297                leader_author: 0,
298                consensus_commit_ref: CommitRef::default(),
299                rejected_transactions_digest: Digest::default(),
300                additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO),
301                estimated_commit_period,
302                skip_consensus_commit_prologue_in_test,
303            }
304        }
305
306        pub fn new_for_congestion_test(
307            commit_round: u64,
308            commit_timestamp: u64,
309            estimated_commit_period: Duration,
310        ) -> Self {
311            Self::new_for_test(
312                commit_round,
313                commit_timestamp,
314                Some(estimated_commit_period),
315                true,
316            )
317        }
318
319        pub fn additional_state_digest(&self) -> AdditionalConsensusStateDigest {
320            // this method cannot be called if stateless_commit_info is used
321            self.additional_state_digest
322                .expect("additional_state_digest is not available")
323        }
324
325        pub fn estimated_commit_period(&self) -> Duration {
326            // this method cannot be called if stateless_commit_info is used
327            self.estimated_commit_period
328                .expect("estimated commit period is not available")
329        }
330
331        fn consensus_commit_digest(&self) -> ConsensusCommitDigest {
332            ConsensusCommitDigest::new(self.consensus_commit_ref.digest.into_inner())
333        }
334
335        fn consensus_commit_prologue_transaction(
336            &self,
337            epoch: u64,
338        ) -> VerifiedExecutableTransaction {
339            let transaction = VerifiedTransaction::new_consensus_commit_prologue(
340                epoch,
341                self.round,
342                self.timestamp,
343            );
344            VerifiedExecutableTransaction::new_system(transaction, epoch)
345        }
346
347        fn consensus_commit_prologue_v2_transaction(
348            &self,
349            epoch: u64,
350        ) -> VerifiedExecutableTransaction {
351            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
352                epoch,
353                self.round,
354                self.timestamp,
355                self.consensus_commit_digest(),
356            );
357            VerifiedExecutableTransaction::new_system(transaction, epoch)
358        }
359
360        fn consensus_commit_prologue_v3_transaction(
361            &self,
362            epoch: u64,
363            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
364        ) -> VerifiedExecutableTransaction {
365            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v3(
366                epoch,
367                self.round,
368                self.timestamp,
369                self.consensus_commit_digest(),
370                consensus_determined_version_assignments,
371            );
372            VerifiedExecutableTransaction::new_system(transaction, epoch)
373        }
374
375        fn consensus_commit_prologue_v4_transaction(
376            &self,
377            epoch: u64,
378            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
379            additional_state_digest: AdditionalConsensusStateDigest,
380        ) -> VerifiedExecutableTransaction {
381            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v4(
382                epoch,
383                self.round,
384                self.timestamp,
385                self.consensus_commit_digest(),
386                consensus_determined_version_assignments,
387                additional_state_digest,
388            );
389            VerifiedExecutableTransaction::new_system(transaction, epoch)
390        }
391
392        pub fn create_consensus_commit_prologue_transaction(
393            &self,
394            epoch: u64,
395            protocol_config: &ProtocolConfig,
396            cancelled_txn_version_assignment: Vec<(
397                TransactionDigest,
398                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
399            )>,
400            commit_info: &ConsensusCommitInfo,
401            indirect_state_observer: IndirectStateObserver,
402        ) -> VerifiedExecutableTransaction {
403            let version_assignments = if protocol_config
404                .record_consensus_determined_version_assignments_in_prologue_v2()
405            {
406                Some(
407                    ConsensusDeterminedVersionAssignments::CancelledTransactionsV2(
408                        cancelled_txn_version_assignment,
409                    ),
410                )
411            } else if protocol_config.record_consensus_determined_version_assignments_in_prologue()
412            {
413                Some(
414                    ConsensusDeterminedVersionAssignments::CancelledTransactions(
415                        cancelled_txn_version_assignment
416                            .into_iter()
417                            .map(|(tx_digest, versions)| {
418                                (
419                                    tx_digest,
420                                    versions.into_iter().map(|(id, v)| (id.0, v)).collect(),
421                                )
422                            })
423                            .collect(),
424                    ),
425                )
426            } else {
427                None
428            };
429
430            if protocol_config.record_additional_state_digest_in_prologue() {
431                let additional_state_digest =
432                    if protocol_config.additional_consensus_digest_indirect_state() {
433                        let d1 = commit_info.additional_state_digest();
434                        indirect_state_observer.fold_with(d1)
435                    } else {
436                        commit_info.additional_state_digest()
437                    };
438
439                self.consensus_commit_prologue_v4_transaction(
440                    epoch,
441                    version_assignments.unwrap(),
442                    additional_state_digest,
443                )
444            } else if let Some(version_assignments) = version_assignments {
445                self.consensus_commit_prologue_v3_transaction(epoch, version_assignments)
446            } else if protocol_config.include_consensus_digest_in_prologue() {
447                self.consensus_commit_prologue_v2_transaction(epoch)
448            } else {
449                self.consensus_commit_prologue_transaction(epoch)
450            }
451        }
452    }
453
454    #[derive(Default)]
455    pub struct IndirectStateObserver {
456        hash: DefaultHash,
457    }
458
459    impl IndirectStateObserver {
460        pub fn new() -> Self {
461            Self::default()
462        }
463
464        pub fn observe_indirect_state<T: Serialize>(&mut self, state: &T) {
465            bcs::serialize_into(&mut self.hash, state).unwrap();
466        }
467
468        pub fn fold_with(
469            self,
470            d1: AdditionalConsensusStateDigest,
471        ) -> AdditionalConsensusStateDigest {
472            let hash = self.hash.finalize();
473            let d2 = AdditionalConsensusStateDigest::new(hash.into());
474
475            let mut hasher = DefaultHash::new();
476            bcs::serialize_into(&mut hasher, &d1).unwrap();
477            bcs::serialize_into(&mut hasher, &d2).unwrap();
478            AdditionalConsensusStateDigest::new(hasher.finalize().into())
479        }
480    }
481
482    #[test]
483    fn test_additional_consensus_state() {
484        use crate::consensus_test_utils::TestConsensusCommit;
485
486        fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) {
487            let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
488            state.observe_commit(
489                &protocol_config,
490                100,
491                &TestConsensusCommit::empty(round, timestamp, 0),
492            );
493        }
494
495        let mut s1 = AdditionalConsensusState::new(3);
496        observe(&mut s1, 1, 1000);
497        observe(&mut s1, 2, 2000);
498        observe(&mut s1, 3, 3000);
499        observe(&mut s1, 4, 4000);
500
501        let mut s2 = AdditionalConsensusState::new(3);
502        // Because state uses a ring buffer, we should get the same digest
503        // even though we only added the 3 latest observations.
504        observe(&mut s2, 2, 2000);
505        observe(&mut s2, 3, 3000);
506        observe(&mut s2, 4, 4000);
507
508        assert_eq!(s1.digest(), s2.digest());
509
510        observe(&mut s1, 5, 5000);
511        observe(&mut s2, 5, 5000);
512
513        assert_eq!(s1.digest(), s2.digest());
514    }
515}
516use additional_consensus_state::AdditionalConsensusState;
517pub(crate) use additional_consensus_state::{ConsensusCommitInfo, IndirectStateObserver};
518
519pub struct ConsensusHandler<C> {
520    /// A store created for each epoch. ConsensusHandler is recreated each epoch, with the
521    /// corresponding store. This store is also used to get the current epoch ID.
522    epoch_store: Arc<AuthorityPerEpochStore>,
523    /// Holds the indices, hash and stats after the last consensus commit
524    /// It is used for avoiding replaying already processed transactions,
525    /// checking chain consistency, and accumulating per-epoch consensus output stats.
526    last_consensus_stats: ExecutionIndicesWithStats,
527    checkpoint_service: Arc<C>,
528    /// cache reader is needed when determining the next version to assign for shared objects.
529    cache_reader: Arc<dyn ObjectCacheRead>,
530    /// Reputation scores used by consensus adapter that we update, forwarded from consensus
531    low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
532    /// The consensus committee used to do stake computations for deciding set of low scoring authorities
533    committee: ConsensusCommittee,
534    // TODO: ConsensusHandler doesn't really share metrics with AuthorityState. We could define
535    // a new metrics type here if we want to.
536    metrics: Arc<AuthorityMetrics>,
537    /// Lru cache to quickly discard transactions processed by consensus
538    processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
539    /// Enqueues transactions to the execution scheduler via a separate task.
540    execution_scheduler_sender: ExecutionSchedulerSender,
541    /// Consensus adapter for submitting transactions to consensus
542    consensus_adapter: Arc<ConsensusAdapter>,
543
544    /// Using the throughput calculator to record the current consensus throughput
545    throughput_calculator: Arc<ConsensusThroughputCalculator>,
546
547    additional_consensus_state: AdditionalConsensusState,
548
549    backpressure_subscriber: BackpressureSubscriber,
550
551    traffic_controller: Option<Arc<TrafficController>>,
552}
553
554const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
555
556impl<C> ConsensusHandler<C> {
557    pub(crate) fn new(
558        epoch_store: Arc<AuthorityPerEpochStore>,
559        checkpoint_service: Arc<C>,
560        execution_scheduler: Arc<ExecutionScheduler>,
561        consensus_adapter: Arc<ConsensusAdapter>,
562        cache_reader: Arc<dyn ObjectCacheRead>,
563        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
564        committee: ConsensusCommittee,
565        metrics: Arc<AuthorityMetrics>,
566        throughput_calculator: Arc<ConsensusThroughputCalculator>,
567        backpressure_subscriber: BackpressureSubscriber,
568        traffic_controller: Option<Arc<TrafficController>>,
569    ) -> Self {
570        // Recover last_consensus_stats so it is consistent across validators.
571        let mut last_consensus_stats = epoch_store
572            .get_last_consensus_stats()
573            .expect("Should be able to read last consensus index");
574        // stats is empty at the beginning of epoch.
575        if !last_consensus_stats.stats.is_initialized() {
576            last_consensus_stats.stats = ConsensusStats::new(committee.size());
577        }
578        let execution_scheduler_sender =
579            ExecutionSchedulerSender::start(execution_scheduler, epoch_store.clone());
580        let commit_rate_estimate_window_size = epoch_store
581            .protocol_config()
582            .get_consensus_commit_rate_estimation_window_size();
583        Self {
584            epoch_store,
585            last_consensus_stats,
586            checkpoint_service,
587            cache_reader,
588            low_scoring_authorities,
589            committee,
590            metrics,
591            processed_cache: LruCache::new(
592                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
593            ),
594            execution_scheduler_sender,
595            consensus_adapter,
596            throughput_calculator,
597            additional_consensus_state: AdditionalConsensusState::new(
598                commit_rate_estimate_window_size,
599            ),
600            backpressure_subscriber,
601            traffic_controller,
602        }
603    }
604
605    /// Returns the last subdag index processed by the handler.
606    pub(crate) fn last_processed_subdag_index(&self) -> u64 {
607        self.last_consensus_stats.index.sub_dag_index
608    }
609
610    pub(crate) fn execution_scheduler_sender(&self) -> &ExecutionSchedulerSender {
611        &self.execution_scheduler_sender
612    }
613
614    pub(crate) fn new_for_testing(
615        epoch_store: Arc<AuthorityPerEpochStore>,
616        checkpoint_service: Arc<C>,
617        execution_scheduler_sender: ExecutionSchedulerSender,
618        consensus_adapter: Arc<ConsensusAdapter>,
619        cache_reader: Arc<dyn ObjectCacheRead>,
620        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
621        committee: ConsensusCommittee,
622        metrics: Arc<AuthorityMetrics>,
623        throughput_calculator: Arc<ConsensusThroughputCalculator>,
624        backpressure_subscriber: BackpressureSubscriber,
625        traffic_controller: Option<Arc<TrafficController>>,
626        last_consensus_stats: ExecutionIndicesWithStats,
627    ) -> Self {
628        let commit_rate_estimate_window_size = epoch_store
629            .protocol_config()
630            .get_consensus_commit_rate_estimation_window_size();
631        Self {
632            epoch_store,
633            last_consensus_stats,
634            checkpoint_service,
635            cache_reader,
636            low_scoring_authorities,
637            committee,
638            metrics,
639            processed_cache: LruCache::new(
640                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
641            ),
642            execution_scheduler_sender,
643            consensus_adapter,
644            throughput_calculator,
645            additional_consensus_state: AdditionalConsensusState::new(
646                commit_rate_estimate_window_size,
647            ),
648            backpressure_subscriber,
649            traffic_controller,
650        }
651    }
652}
653
654#[derive(Default)]
655struct CommitHandlerInput {
656    user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
657    capability_notifications: Vec<AuthorityCapabilitiesV2>,
658    execution_time_observations: Vec<ExecutionTimeObservation>,
659    checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
660    randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
661    randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
662    end_of_publish_transactions: Vec<AuthorityName>,
663    new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
664}
665
666struct CommitHandlerState {
667    dkg_failed: bool,
668    randomness_round: Option<RandomnessRound>,
669    output: ConsensusCommitOutput,
670    indirect_state_observer: Option<IndirectStateObserver>,
671    initial_reconfig_state: ReconfigState,
672}
673
674impl CommitHandlerState {
675    fn get_notifications(&self) -> Vec<SequencedConsensusTransactionKey> {
676        self.output
677            .get_consensus_messages_processed()
678            .cloned()
679            .collect()
680    }
681
682    fn init_randomness<'a, 'epoch>(
683        &'a mut self,
684        epoch_store: &'epoch AuthorityPerEpochStore,
685        commit_info: &'a ConsensusCommitInfo,
686    ) -> Option<tokio::sync::MutexGuard<'epoch, RandomnessManager>> {
687        let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| {
688            rm.try_lock()
689                .expect("should only ever be called from the commit handler thread")
690        });
691
692        let mut dkg_failed = false;
693        let randomness_round = if epoch_store.randomness_state_enabled() {
694            let randomness_manager = randomness_manager
695                .as_mut()
696                .expect("randomness manager should exist if randomness is enabled");
697            match randomness_manager.dkg_status() {
698                DkgStatus::Pending => None,
699                DkgStatus::Failed => {
700                    dkg_failed = true;
701                    None
702                }
703                DkgStatus::Successful => {
704                    // Generate randomness for this commit if DKG is successful and we are still
705                    // accepting certs.
706                    if self.initial_reconfig_state.should_accept_tx() {
707                        randomness_manager
708                            // TODO: make infallible
709                            .reserve_next_randomness(commit_info.timestamp, &mut self.output)
710                            .expect("epoch ended")
711                    } else {
712                        None
713                    }
714                }
715            }
716        } else {
717            None
718        };
719
720        if randomness_round.is_some() {
721            assert!(!dkg_failed); // invariant check
722        }
723
724        self.randomness_round = randomness_round;
725        self.dkg_failed = dkg_failed;
726
727        randomness_manager
728    }
729}
730
731impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
732    /// Called during startup to allow us to observe commits we previously processed, for crash recovery.
733    /// Any state computed here must be a pure function of the commits observed, it cannot depend on any
734    /// state recorded in the epoch db.
735    fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
736        assert!(
737            self.epoch_store
738                .protocol_config()
739                .record_additional_state_digest_in_prologue()
740        );
741        let protocol_config = self.epoch_store.protocol_config();
742        let epoch_start_time = self
743            .epoch_store
744            .epoch_start_config()
745            .epoch_start_timestamp_ms();
746
747        self.additional_consensus_state.observe_commit(
748            protocol_config,
749            epoch_start_time,
750            &consensus_commit,
751        );
752    }
753
754    #[cfg(test)]
755    pub(crate) async fn handle_consensus_commit_for_test(
756        &mut self,
757        consensus_commit: impl ConsensusCommitAPI,
758    ) {
759        self.handle_consensus_commit(consensus_commit).await;
760    }
761
762    #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))]
763    pub(crate) async fn handle_consensus_commit(
764        &mut self,
765        consensus_commit: impl ConsensusCommitAPI,
766    ) {
767        let protocol_config = self.epoch_store.protocol_config();
768
769        // Assert all protocol config settings for which we don't support old behavior.
770        assert!(protocol_config.ignore_execution_time_observations_after_certs_closed());
771        assert!(protocol_config.record_time_estimate_processed());
772        assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints());
773        assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest());
774        assert!(protocol_config.authority_capabilities_v2());
775        assert!(protocol_config.cancel_for_failed_dkg_early());
776
777        // This may block until one of two conditions happens:
778        // - Number of uncommitted transactions in the writeback cache goes below the
779        //   backpressure threshold.
780        // - The highest executed checkpoint catches up to the highest certified checkpoint.
781        self.backpressure_subscriber.await_no_backpressure().await;
782
783        let epoch = self.epoch_store.epoch();
784
785        let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit");
786
787        let last_committed_round = self.last_consensus_stats.index.last_committed_round;
788
789        if let Some(consensus_tx_status_cache) = self.epoch_store.consensus_tx_status_cache.as_ref()
790        {
791            consensus_tx_status_cache
792                .update_last_committed_leader_round(last_committed_round as u32)
793                .await;
794        }
795        if let Some(tx_reject_reason_cache) = self.epoch_store.tx_reject_reason_cache.as_ref() {
796            tx_reject_reason_cache.set_last_committed_leader_round(last_committed_round as u32);
797        }
798
799        let commit_info = self.additional_consensus_state.observe_commit(
800            protocol_config,
801            self.epoch_store
802                .epoch_start_config()
803                .epoch_start_timestamp_ms(),
804            &consensus_commit,
805        );
806        assert!(commit_info.round > last_committed_round);
807
808        let (timestamp, leader_author, commit_sub_dag_index) =
809            self.gather_commit_metadata(&consensus_commit);
810
811        info!(
812            %consensus_commit,
813            "Received consensus output. Rejected transactions: {}",
814            consensus_commit.rejected_transactions_debug_string(),
815        );
816
817        self.last_consensus_stats.index = ExecutionIndices {
818            last_committed_round: commit_info.round,
819            sub_dag_index: commit_sub_dag_index,
820            transaction_index: 0_u64,
821        };
822
823        update_low_scoring_authorities(
824            self.low_scoring_authorities.clone(),
825            self.epoch_store.committee(),
826            &self.committee,
827            consensus_commit.reputation_score_sorted_desc(),
828            &self.metrics,
829            protocol_config.consensus_bad_nodes_stake_threshold(),
830        );
831
832        self.metrics
833            .consensus_committed_subdags
834            .with_label_values(&[&leader_author.to_string()])
835            .inc();
836
837        let mut state = CommitHandlerState {
838            output: ConsensusCommitOutput::new(commit_info.round),
839            dkg_failed: false,
840            randomness_round: None,
841            indirect_state_observer: Some(IndirectStateObserver::new()),
842            initial_reconfig_state: self
843                .epoch_store
844                .get_reconfig_state_read_lock_guard()
845                .clone(),
846        };
847
848        let transactions = self.filter_consensus_txns(
849            state.initial_reconfig_state.clone(),
850            &commit_info,
851            &consensus_commit,
852        );
853        let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions);
854
855        let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info);
856
857        let CommitHandlerInput {
858            user_transactions,
859            capability_notifications,
860            execution_time_observations,
861            checkpoint_signature_messages,
862            randomness_dkg_messages,
863            randomness_dkg_confirmations,
864            end_of_publish_transactions,
865            new_jwks,
866        } = self.build_commit_handler_input(transactions);
867
868        self.process_jwks(&mut state, &commit_info, new_jwks);
869        self.process_capability_notifications(capability_notifications);
870        self.process_execution_time_observations(&mut state, execution_time_observations);
871        self.process_checkpoint_signature_messages(checkpoint_signature_messages);
872
873        self.process_dkg_updates(
874            &mut state,
875            &commit_info,
876            randomness_manager.as_deref_mut(),
877            randomness_dkg_messages,
878            randomness_dkg_confirmations,
879        )
880        .await;
881
882        let mut execution_time_estimator = self
883            .epoch_store
884            .execution_time_estimator
885            .try_lock()
886            .expect("should only ever be called from the commit handler thread");
887
888        let authenticator_state_update_transaction =
889            self.create_authenticator_state_update(last_committed_round, &commit_info);
890
891        let (schedulables, randomness_schedulables, assigned_versions) = self.process_transactions(
892            &mut state,
893            &mut execution_time_estimator,
894            &commit_info,
895            authenticator_state_update_transaction,
896            user_transactions,
897        );
898
899        let (should_accept_tx, lock, final_round) =
900            self.handle_eop(&mut state, end_of_publish_transactions);
901
902        let make_checkpoint = should_accept_tx || final_round;
903        if !make_checkpoint {
904            // No need for any further processing
905            return;
906        }
907
908        // If this is the final round, record execution time observations for storage in the
909        // end-of-epoch tx.
910        if final_round {
911            self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
912        }
913
914        self.create_pending_checkpoints(
915            &mut state,
916            &commit_info,
917            &schedulables,
918            &randomness_schedulables,
919            final_round,
920        );
921
922        let notifications = state.get_notifications();
923
924        state
925            .output
926            .record_consensus_commit_stats(self.last_consensus_stats.clone());
927
928        self.record_deferral_deletion(&mut state);
929
930        self.epoch_store
931            .consensus_quarantine
932            .write()
933            .push_consensus_output(state.output, &self.epoch_store)
934            .expect("push_consensus_output should not fail");
935
936        // Only after batch is written, notify checkpoint service to start building any new
937        // pending checkpoints.
938        debug!(
939            ?commit_info.round,
940            "Notifying checkpoint service about new pending checkpoint(s)",
941        );
942        self.checkpoint_service
943            .notify_checkpoint()
944            .expect("failed to notify checkpoint service");
945
946        if let Some(randomness_round) = state.randomness_round {
947            randomness_manager
948                .as_ref()
949                .expect("randomness manager should exist if randomness round is provided")
950                .generate_randomness(epoch, randomness_round);
951        }
952
953        self.epoch_store.process_notifications(notifications.iter());
954
955        // pass lock by value to ensure that it is held until this point
956        self.log_final_round(lock, final_round);
957
958        // update the calculated throughput
959        self.throughput_calculator
960            .add_transactions(timestamp, schedulables.len() as u64);
961
962        fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
963            let key = [commit_sub_dag_index, epoch];
964            if sui_simulator::random::deterministic_probability_once(&key, 0.01) {
965                sui_simulator::task::kill_current_node(None);
966            }
967        });
968
969        fail_point!("crash"); // for tests that produce random crashes
970
971        let mut schedulables = schedulables;
972        schedulables.extend(randomness_schedulables);
973        self.execution_scheduler_sender.send(
974            schedulables,
975            assigned_versions,
976            SchedulingSource::NonFastPath,
977        );
978
979        self.send_end_of_publish_if_needed().await;
980    }
981
982    fn handle_eop(
983        &self,
984        state: &mut CommitHandlerState,
985        end_of_publish_transactions: Vec<AuthorityName>,
986    ) -> (bool, Option<RwLockWriteGuard<'_, ReconfigState>>, bool) {
987        let collected_eop =
988            self.process_end_of_publish_transactions(state, end_of_publish_transactions);
989        if collected_eop {
990            let (lock, final_round) = self.advance_eop_state_machine(state);
991            (lock.should_accept_tx(), Some(lock), final_round)
992        } else {
993            (true, None, false)
994        }
995    }
996
997    fn record_end_of_epoch_execution_time_observations(
998        &self,
999        estimator: &mut ExecutionTimeEstimator,
1000    ) {
1001        self.epoch_store
1002            .end_of_epoch_execution_time_observations
1003            .set(estimator.take_observations())
1004            .expect("`stored_execution_time_observations` should only be set once at end of epoch");
1005    }
1006
1007    fn record_deferral_deletion(&self, state: &mut CommitHandlerState) {
1008        let mut deferred_transactions = self
1009            .epoch_store
1010            .consensus_output_cache
1011            .deferred_transactions_v2
1012            .lock();
1013        for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() {
1014            deferred_transactions.remove(&deleted_deferred_key);
1015        }
1016    }
1017
1018    fn log_final_round(&self, lock: Option<RwLockWriteGuard<ReconfigState>>, final_round: bool) {
1019        if final_round {
1020            let epoch = self.epoch_store.epoch();
1021            info!(
1022                ?epoch,
1023                lock=?lock.as_ref(),
1024                final_round=?final_round,
1025                "Notified last checkpoint"
1026            );
1027            self.epoch_store.record_end_of_message_quorum_time_metric();
1028        }
1029    }
1030
1031    fn create_pending_checkpoints(
1032        &self,
1033        state: &mut CommitHandlerState,
1034        commit_info: &ConsensusCommitInfo,
1035        schedulables: &[Schedulable],
1036        randomness_schedulables: &[Schedulable],
1037        final_round: bool,
1038    ) {
1039        let checkpoint_height = self
1040            .epoch_store
1041            .calculate_pending_checkpoint_height(commit_info.round);
1042
1043        // Determine whether to write pending checkpoint for user tx with randomness.
1044        // - If randomness is not generated for this commit, we will skip the
1045        //   checkpoint with the associated height. Therefore checkpoint heights may
1046        //   not be contiguous.
1047        // - Exception: if DKG fails, we always need to write out a PendingCheckpoint
1048        //   for randomness tx that are canceled.
1049        let should_write_random_checkpoint = state.randomness_round.is_some()
1050            || (state.dkg_failed && !randomness_schedulables.is_empty());
1051
1052        let pending_checkpoint = PendingCheckpoint {
1053            roots: schedulables.iter().map(|s| s.key()).collect(),
1054            details: PendingCheckpointInfo {
1055                timestamp_ms: commit_info.timestamp,
1056                last_of_epoch: final_round && !should_write_random_checkpoint,
1057                checkpoint_height,
1058                consensus_commit_ref: commit_info.consensus_commit_ref,
1059                rejected_transactions_digest: commit_info.rejected_transactions_digest,
1060            },
1061        };
1062        self.epoch_store
1063            .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1064            .expect("failed to write pending checkpoint");
1065
1066        info!(
1067            "Written pending checkpoint: {:?}",
1068            pending_checkpoint.details,
1069        );
1070
1071        if should_write_random_checkpoint {
1072            let pending_checkpoint = PendingCheckpoint {
1073                roots: randomness_schedulables.iter().map(|s| s.key()).collect(),
1074                details: PendingCheckpointInfo {
1075                    timestamp_ms: commit_info.timestamp,
1076                    last_of_epoch: final_round,
1077                    checkpoint_height: checkpoint_height + 1,
1078                    consensus_commit_ref: commit_info.consensus_commit_ref,
1079                    rejected_transactions_digest: commit_info.rejected_transactions_digest,
1080                },
1081            };
1082            self.epoch_store
1083                .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1084                .expect("failed to write pending checkpoint");
1085        }
1086    }
1087
1088    fn process_transactions(
1089        &self,
1090        state: &mut CommitHandlerState,
1091        execution_time_estimator: &mut ExecutionTimeEstimator,
1092        commit_info: &ConsensusCommitInfo,
1093        authenticator_state_update_transaction: Option<VerifiedExecutableTransactionWithAliases>,
1094        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1095    ) -> (Vec<Schedulable>, Vec<Schedulable>, AssignedTxAndVersions) {
1096        let protocol_config = self.epoch_store.protocol_config();
1097        let epoch = self.epoch_store.epoch();
1098
1099        // Get the ordered set of all transactions to process, which includes deferred and
1100        // newly arrived transactions.
1101        let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) =
1102            self.merge_and_reorder_transactions(state, commit_info, user_transactions);
1103
1104        let mut shared_object_congestion_tracker =
1105            self.init_congestion_tracker(commit_info, false, &ordered_txns);
1106        let mut shared_object_using_randomness_congestion_tracker =
1107            self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns);
1108
1109        let randomness_state_update_transaction = state
1110            .randomness_round
1111            .map(|round| Schedulable::RandomnessStateUpdate(epoch, round));
1112        debug!(
1113            "Randomness state update transaction: {:?}",
1114            randomness_state_update_transaction
1115                .as_ref()
1116                .map(|t| t.key())
1117        );
1118
1119        let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len());
1120        let mut randomness_transactions_to_schedule =
1121            Vec::with_capacity(ordered_randomness_txns.len());
1122        let mut deferred_txns = BTreeMap::new();
1123        let mut cancelled_txns = BTreeMap::new();
1124
1125        for transaction in ordered_txns {
1126            self.handle_deferral_and_cancellation(
1127                state,
1128                &mut cancelled_txns,
1129                &mut deferred_txns,
1130                &mut transactions_to_schedule,
1131                protocol_config,
1132                commit_info,
1133                transaction,
1134                &mut shared_object_congestion_tracker,
1135                &previously_deferred_tx_digests,
1136                execution_time_estimator,
1137            );
1138        }
1139
1140        for transaction in ordered_randomness_txns {
1141            if state.dkg_failed {
1142                debug!(
1143                    "Canceling randomness-using transaction {:?} because DKG failed",
1144                    transaction.tx().digest(),
1145                );
1146                cancelled_txns.insert(
1147                    *transaction.tx().digest(),
1148                    CancelConsensusCertificateReason::DkgFailed,
1149                );
1150                randomness_transactions_to_schedule.push(transaction);
1151                continue;
1152            }
1153            self.handle_deferral_and_cancellation(
1154                state,
1155                &mut cancelled_txns,
1156                &mut deferred_txns,
1157                &mut randomness_transactions_to_schedule,
1158                protocol_config,
1159                commit_info,
1160                transaction,
1161                &mut shared_object_using_randomness_congestion_tracker,
1162                &previously_deferred_tx_digests,
1163                execution_time_estimator,
1164            );
1165        }
1166
1167        let mut total_deferred_txns = 0;
1168        {
1169            let mut deferred_transactions = self
1170                .epoch_store
1171                .consensus_output_cache
1172                .deferred_transactions_v2
1173                .lock();
1174            for (key, txns) in deferred_txns.into_iter() {
1175                total_deferred_txns += txns.len();
1176                deferred_transactions.insert(key, txns.clone());
1177                state.output.defer_transactions(key, txns);
1178            }
1179        }
1180
1181        self.metrics
1182            .consensus_handler_deferred_transactions
1183            .inc_by(total_deferred_txns as u64);
1184        self.metrics
1185            .consensus_handler_cancelled_transactions
1186            .inc_by(cancelled_txns.len() as u64);
1187        self.metrics
1188            .consensus_handler_max_object_costs
1189            .with_label_values(&["regular_commit"])
1190            .set(shared_object_congestion_tracker.max_cost() as i64);
1191        self.metrics
1192            .consensus_handler_max_object_costs
1193            .with_label_values(&["randomness_commit"])
1194            .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
1195
1196        let object_debts = shared_object_congestion_tracker.accumulated_debts(commit_info);
1197        let randomness_object_debts =
1198            shared_object_using_randomness_congestion_tracker.accumulated_debts(commit_info);
1199        if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get()
1200            && let Err(e) = tx_object_debts.try_send(
1201                object_debts
1202                    .iter()
1203                    .chain(randomness_object_debts.iter())
1204                    .map(|(id, _)| *id)
1205                    .collect(),
1206            )
1207        {
1208            info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
1209        }
1210
1211        state
1212            .output
1213            .set_congestion_control_object_debts(object_debts);
1214        state
1215            .output
1216            .set_congestion_control_randomness_object_debts(randomness_object_debts);
1217
1218        let mut settlement = None;
1219        let mut randomness_settlement = None;
1220        if self.epoch_store.accumulators_enabled() {
1221            let checkpoint_height = self
1222                .epoch_store
1223                .calculate_pending_checkpoint_height(commit_info.round);
1224
1225            settlement = Some(Schedulable::AccumulatorSettlement(epoch, checkpoint_height));
1226
1227            if state.randomness_round.is_some() || !randomness_transactions_to_schedule.is_empty() {
1228                randomness_settlement = Some(Schedulable::AccumulatorSettlement(
1229                    epoch,
1230                    checkpoint_height + 1,
1231                ));
1232            }
1233        }
1234
1235        let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1236            .then_some(Schedulable::ConsensusCommitPrologue(
1237                epoch,
1238                commit_info.round,
1239                commit_info.consensus_commit_ref.index,
1240            ));
1241
1242        let schedulables: Vec<_> = itertools::chain!(
1243            consensus_commit_prologue.into_iter(),
1244            authenticator_state_update_transaction
1245                .into_iter()
1246                .map(Schedulable::Transaction),
1247            transactions_to_schedule
1248                .into_iter()
1249                .map(Schedulable::Transaction),
1250            settlement,
1251        )
1252        .collect();
1253
1254        let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1255            .into_iter()
1256            .chain(
1257                randomness_transactions_to_schedule
1258                    .into_iter()
1259                    .map(Schedulable::Transaction),
1260            )
1261            .chain(randomness_settlement)
1262            .collect();
1263
1264        let assigned_versions = self
1265            .epoch_store
1266            .process_consensus_transaction_shared_object_versions(
1267                self.cache_reader.as_ref(),
1268                schedulables.iter(),
1269                randomness_schedulables.iter(),
1270                &cancelled_txns,
1271                &mut state.output,
1272            )
1273            .expect("failed to assign shared object versions");
1274
1275        let consensus_commit_prologue =
1276            self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1277
1278        let mut schedulables = schedulables;
1279        let mut assigned_versions = assigned_versions;
1280        if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1281            assert!(matches!(
1282                schedulables[0],
1283                Schedulable::ConsensusCommitPrologue(..)
1284            ));
1285            assert!(matches!(
1286                assigned_versions.0[0].0,
1287                TransactionKey::ConsensusCommitPrologue(..)
1288            ));
1289            assigned_versions.0[0].0 =
1290                TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1291            schedulables[0] = Schedulable::Transaction(consensus_commit_prologue);
1292        }
1293
1294        self.epoch_store
1295            .process_user_signatures(schedulables.iter().chain(randomness_schedulables.iter()));
1296
1297        // After this point we can throw away alias version information.
1298        let schedulables: Vec<Schedulable> = schedulables.into_iter().map(|s| s.into()).collect();
1299        let randomness_schedulables: Vec<Schedulable> = randomness_schedulables
1300            .into_iter()
1301            .map(|s| s.into())
1302            .collect();
1303
1304        (schedulables, randomness_schedulables, assigned_versions)
1305    }
1306
1307    // Adds the consensus commit prologue transaction to the beginning of input `transactions` to update
1308    // the system clock used in all transactions in the current consensus commit.
1309    // Returns the root of the consensus commit prologue transaction if it was added to the input.
1310    fn add_consensus_commit_prologue_transaction<'a>(
1311        &'a self,
1312        state: &'a mut CommitHandlerState,
1313        commit_info: &'a ConsensusCommitInfo,
1314        assigned_versions: &AssignedTxAndVersions,
1315    ) -> Option<VerifiedExecutableTransactionWithAliases> {
1316        {
1317            if commit_info.skip_consensus_commit_prologue_in_test {
1318                return None;
1319            }
1320        }
1321
1322        let mut cancelled_txn_version_assignment = Vec::new();
1323
1324        let protocol_config = self.epoch_store.protocol_config();
1325
1326        for (txn_key, assigned_versions) in assigned_versions.0.iter() {
1327            let Some(d) = txn_key.as_digest() else {
1328                continue;
1329            };
1330
1331            if !protocol_config.include_cancelled_randomness_txns_in_prologue()
1332                && assigned_versions
1333                    .shared_object_versions
1334                    .iter()
1335                    .any(|((id, _), _)| *id == SUI_RANDOMNESS_STATE_OBJECT_ID)
1336            {
1337                continue;
1338            }
1339
1340            if assigned_versions
1341                .shared_object_versions
1342                .iter()
1343                .any(|(_, version)| version.is_cancelled())
1344            {
1345                assert_reachable!("cancelled transactions");
1346                cancelled_txn_version_assignment
1347                    .push((*d, assigned_versions.shared_object_versions.clone()));
1348            }
1349        }
1350
1351        fail_point_arg!(
1352            "additional_cancelled_txns_for_tests",
1353            |additional_cancelled_txns: Vec<(
1354                TransactionDigest,
1355                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>
1356            )>| {
1357                cancelled_txn_version_assignment.extend(additional_cancelled_txns);
1358            }
1359        );
1360
1361        let transaction = commit_info.create_consensus_commit_prologue_transaction(
1362            self.epoch_store.epoch(),
1363            self.epoch_store.protocol_config(),
1364            cancelled_txn_version_assignment,
1365            commit_info,
1366            state.indirect_state_observer.take().unwrap(),
1367        );
1368        Some(VerifiedExecutableTransactionWithAliases::no_aliases(
1369            transaction,
1370        ))
1371    }
1372
1373    fn handle_deferral_and_cancellation(
1374        &self,
1375        state: &mut CommitHandlerState,
1376        cancelled_txns: &mut BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1377        deferred_txns: &mut BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>,
1378        scheduled_txns: &mut Vec<VerifiedExecutableTransactionWithAliases>,
1379        protocol_config: &ProtocolConfig,
1380        commit_info: &ConsensusCommitInfo,
1381        transaction: VerifiedExecutableTransactionWithAliases,
1382        shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
1383        previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
1384        execution_time_estimator: &ExecutionTimeEstimator,
1385    ) {
1386        let tx_cost = shared_object_congestion_tracker.get_tx_cost(
1387            execution_time_estimator,
1388            transaction.tx(),
1389            state.indirect_state_observer.as_mut().unwrap(),
1390        );
1391
1392        let deferral_info = self.epoch_store.should_defer(
1393            transaction.tx(),
1394            commit_info,
1395            state.dkg_failed,
1396            state.randomness_round.is_some(),
1397            previously_deferred_tx_digests,
1398            shared_object_congestion_tracker,
1399        );
1400
1401        if let Some((deferral_key, deferral_reason)) = deferral_info {
1402            debug!(
1403                "Deferring consensus certificate for transaction {:?} until {:?}",
1404                transaction.tx().digest(),
1405                deferral_key
1406            );
1407
1408            match deferral_reason {
1409                DeferralReason::RandomnessNotReady => {
1410                    deferred_txns
1411                        .entry(deferral_key)
1412                        .or_default()
1413                        .push(transaction);
1414                }
1415                DeferralReason::SharedObjectCongestion(congested_objects) => {
1416                    self.metrics.consensus_handler_congested_transactions.inc();
1417                    if transaction_deferral_within_limit(
1418                        &deferral_key,
1419                        protocol_config.max_deferral_rounds_for_congestion_control(),
1420                    ) {
1421                        deferred_txns
1422                            .entry(deferral_key)
1423                            .or_default()
1424                            .push(transaction);
1425                    } else {
1426                        assert_sometimes!(
1427                            transaction.tx().data().transaction_data().uses_randomness(),
1428                            "cancelled randomness-using transaction"
1429                        );
1430                        assert_sometimes!(
1431                            !transaction.tx().data().transaction_data().uses_randomness(),
1432                            "cancelled non-randomness-using transaction"
1433                        );
1434
1435                        // Cancel the transaction that has been deferred for too long.
1436                        debug!(
1437                            "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}",
1438                            transaction.tx().digest(),
1439                            deferral_key,
1440                            congested_objects
1441                        );
1442                        cancelled_txns.insert(
1443                            *transaction.tx().digest(),
1444                            CancelConsensusCertificateReason::CongestionOnObjects(
1445                                congested_objects,
1446                            ),
1447                        );
1448                        scheduled_txns.push(transaction);
1449                    }
1450                }
1451            }
1452        } else {
1453            // Update object execution cost for all scheduled transactions
1454            shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, transaction.tx());
1455            scheduled_txns.push(transaction);
1456        }
1457    }
1458
1459    fn merge_and_reorder_transactions(
1460        &self,
1461        state: &mut CommitHandlerState,
1462        commit_info: &ConsensusCommitInfo,
1463        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1464    ) -> (
1465        Vec<VerifiedExecutableTransactionWithAliases>,
1466        Vec<VerifiedExecutableTransactionWithAliases>,
1467        HashMap<TransactionDigest, DeferralKey>,
1468    ) {
1469        let protocol_config = self.epoch_store.protocol_config();
1470
1471        let (mut txns, mut randomness_txns, previously_deferred_tx_digests) =
1472            self.load_deferred_transactions(state, commit_info);
1473
1474        txns.reserve(user_transactions.len());
1475        randomness_txns.reserve(user_transactions.len());
1476
1477        // There may be randomness transactions in `txns`, which were deferred due to congestion.
1478        // They must be placed back into `randomness_txns`.
1479        let mut txns: Vec<_> = txns
1480            .into_iter()
1481            .filter_map(|tx| {
1482                if tx.tx().transaction_data().uses_randomness() {
1483                    randomness_txns.push(tx);
1484                    None
1485                } else {
1486                    Some(tx)
1487                }
1488            })
1489            .collect();
1490
1491        for txn in user_transactions {
1492            if txn.tx().transaction_data().uses_randomness() {
1493                randomness_txns.push(txn);
1494            } else {
1495                txns.push(txn);
1496            }
1497        }
1498
1499        PostConsensusTxReorder::reorder(
1500            &mut txns,
1501            protocol_config.consensus_transaction_ordering(),
1502        );
1503        PostConsensusTxReorder::reorder(
1504            &mut randomness_txns,
1505            protocol_config.consensus_transaction_ordering(),
1506        );
1507
1508        (txns, randomness_txns, previously_deferred_tx_digests)
1509    }
1510
1511    fn load_deferred_transactions(
1512        &self,
1513        state: &mut CommitHandlerState,
1514        commit_info: &ConsensusCommitInfo,
1515    ) -> (
1516        Vec<VerifiedExecutableTransactionWithAliases>,
1517        Vec<VerifiedExecutableTransactionWithAliases>,
1518        HashMap<TransactionDigest, DeferralKey>,
1519    ) {
1520        let mut previously_deferred_tx_digests = HashMap::new();
1521
1522        let deferred_txs: Vec<_> = self
1523            .epoch_store
1524            .load_deferred_transactions_for_up_to_consensus_round_v2(
1525                &mut state.output,
1526                commit_info.round,
1527            )
1528            .expect("db error")
1529            .into_iter()
1530            .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
1531            .map(|(key, tx)| {
1532                previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
1533                tx
1534            })
1535            .collect();
1536        trace!(
1537            "loading deferred transactions: {:?}",
1538            deferred_txs.iter().map(|tx| tx.tx().digest())
1539        );
1540
1541        let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() {
1542            let txns: Vec<_> = self
1543                .epoch_store
1544                .load_deferred_transactions_for_randomness_v2(&mut state.output)
1545                .expect("db error")
1546                .into_iter()
1547                .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
1548                .map(|(key, tx)| {
1549                    previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
1550                    tx
1551                })
1552                .collect();
1553            trace!(
1554                "loading deferred randomness transactions: {:?}",
1555                txns.iter().map(|tx| tx.tx().digest())
1556            );
1557            txns
1558        } else {
1559            vec![]
1560        };
1561
1562        (
1563            deferred_txs,
1564            deferred_randomness_txs,
1565            previously_deferred_tx_digests,
1566        )
1567    }
1568
1569    fn init_congestion_tracker(
1570        &self,
1571        commit_info: &ConsensusCommitInfo,
1572        for_randomness: bool,
1573        txns: &[VerifiedExecutableTransactionWithAliases],
1574    ) -> SharedObjectCongestionTracker {
1575        #[allow(unused_mut)]
1576        let mut ret = SharedObjectCongestionTracker::from_protocol_config(
1577            self.epoch_store
1578                .consensus_quarantine
1579                .read()
1580                .load_initial_object_debts(
1581                    &self.epoch_store,
1582                    commit_info.round,
1583                    for_randomness,
1584                    txns,
1585                )
1586                .expect("db error"),
1587            self.epoch_store.protocol_config(),
1588            for_randomness,
1589        );
1590
1591        fail_point_arg!(
1592            "initial_congestion_tracker",
1593            |tracker: SharedObjectCongestionTracker| {
1594                info!(
1595                    "Initialize shared_object_congestion_tracker to  {:?}",
1596                    tracker
1597                );
1598                ret = tracker;
1599            }
1600        );
1601
1602        ret
1603    }
1604
1605    fn process_jwks(
1606        &self,
1607        state: &mut CommitHandlerState,
1608        commit_info: &ConsensusCommitInfo,
1609        new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
1610    ) {
1611        for (authority_name, jwk_id, jwk) in new_jwks {
1612            self.epoch_store.record_jwk_vote(
1613                &mut state.output,
1614                commit_info.round,
1615                authority_name,
1616                &jwk_id,
1617                &jwk,
1618            );
1619        }
1620    }
1621
1622    fn process_capability_notifications(
1623        &self,
1624        capability_notifications: Vec<AuthorityCapabilitiesV2>,
1625    ) {
1626        for capabilities in capability_notifications {
1627            self.epoch_store
1628                .record_capabilities_v2(&capabilities)
1629                .expect("db error");
1630        }
1631    }
1632
1633    fn process_execution_time_observations(
1634        &self,
1635        state: &mut CommitHandlerState,
1636        execution_time_observations: Vec<ExecutionTimeObservation>,
1637    ) {
1638        let mut execution_time_estimator = self
1639            .epoch_store
1640            .execution_time_estimator
1641            .try_lock()
1642            .expect("should only ever be called from the commit handler thread");
1643
1644        for ExecutionTimeObservation {
1645            authority,
1646            generation,
1647            estimates,
1648        } in execution_time_observations
1649        {
1650            let authority_index = self
1651                .epoch_store
1652                .committee()
1653                .authority_index(&authority)
1654                .unwrap();
1655            execution_time_estimator.process_observations_from_consensus(
1656                authority_index,
1657                Some(generation),
1658                &estimates,
1659            );
1660            state
1661                .output
1662                .insert_execution_time_observation(authority_index, generation, estimates);
1663        }
1664    }
1665
1666    fn process_checkpoint_signature_messages(
1667        &self,
1668        checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
1669    ) {
1670        for checkpoint_signature_message in checkpoint_signature_messages {
1671            self.checkpoint_service
1672                .notify_checkpoint_signature(&self.epoch_store, &checkpoint_signature_message)
1673                .expect("db error");
1674        }
1675    }
1676
1677    async fn process_dkg_updates(
1678        &self,
1679        state: &mut CommitHandlerState,
1680        commit_info: &ConsensusCommitInfo,
1681        randomness_manager: Option<&mut RandomnessManager>,
1682        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
1683        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
1684    ) {
1685        if !self.epoch_store.randomness_state_enabled() {
1686            let num_dkg_messages = randomness_dkg_messages.len();
1687            let num_dkg_confirmations = randomness_dkg_confirmations.len();
1688            if num_dkg_messages + num_dkg_confirmations > 0 {
1689                debug_fatal!(
1690                    "received {} RandomnessDkgMessage and {} RandomnessDkgConfirmation messages when randomness is not enabled",
1691                    num_dkg_messages,
1692                    num_dkg_confirmations
1693                );
1694            }
1695            return;
1696        }
1697
1698        let randomness_manager =
1699            randomness_manager.expect("randomness manager should exist if randomness is enabled");
1700
1701        let randomness_dkg_updates =
1702            self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages);
1703
1704        let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations(
1705            state,
1706            randomness_manager,
1707            randomness_dkg_confirmations,
1708        );
1709
1710        if randomness_dkg_updates || randomness_dkg_confirmation_updates {
1711            randomness_manager
1712                .advance_dkg(&mut state.output, commit_info.round)
1713                .await
1714                .expect("epoch ended");
1715        }
1716    }
1717
1718    fn process_randomness_dkg_messages(
1719        &self,
1720        randomness_manager: &mut RandomnessManager,
1721        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
1722    ) -> bool /* randomness state updated */ {
1723        if randomness_dkg_messages.is_empty() {
1724            return false;
1725        }
1726
1727        let mut randomness_state_updated = false;
1728        for (authority, bytes) in randomness_dkg_messages {
1729            match bcs::from_bytes(&bytes) {
1730                Ok(message) => {
1731                    randomness_manager
1732                        .add_message(&authority, message)
1733                        // TODO: make infallible
1734                        .expect("epoch ended");
1735                    randomness_state_updated = true;
1736                }
1737
1738                Err(e) => {
1739                    warn!(
1740                        "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
1741                        authority.concise(),
1742                    );
1743                }
1744            }
1745        }
1746
1747        randomness_state_updated
1748    }
1749
1750    fn process_randomness_dkg_confirmations(
1751        &self,
1752        state: &mut CommitHandlerState,
1753        randomness_manager: &mut RandomnessManager,
1754        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
1755    ) -> bool /* randomness state updated */ {
1756        if randomness_dkg_confirmations.is_empty() {
1757            return false;
1758        }
1759
1760        let mut randomness_state_updated = false;
1761        for (authority, bytes) in randomness_dkg_confirmations {
1762            match bcs::from_bytes(&bytes) {
1763                Ok(message) => {
1764                    randomness_manager
1765                        .add_confirmation(&mut state.output, &authority, message)
1766                        // TODO: make infallible
1767                        .expect("epoch ended");
1768                    randomness_state_updated = true;
1769                }
1770                Err(e) => {
1771                    warn!(
1772                        "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}",
1773                        authority.concise(),
1774                    );
1775                }
1776            }
1777        }
1778
1779        randomness_state_updated
1780    }
1781
1782    /// Returns true if we have collected a quorum of end of publish messages (either in this round or a previous round).
1783    fn process_end_of_publish_transactions(
1784        &self,
1785        state: &mut CommitHandlerState,
1786        end_of_publish_transactions: Vec<AuthorityName>,
1787    ) -> bool {
1788        let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect(
1789            "No contention on end_of_publish as it is only accessed from consensus handler",
1790        );
1791
1792        if eop_aggregator.has_quorum() {
1793            return true;
1794        }
1795
1796        if end_of_publish_transactions.is_empty() {
1797            return false;
1798        }
1799
1800        for authority in end_of_publish_transactions {
1801            info!("Received EndOfPublish from {:?}", authority.concise());
1802
1803            // It is ok to just release lock here as this function is the only place that transition into RejectAllCerts state
1804            // And this function itself is always executed from consensus task
1805            state.output.insert_end_of_publish(authority);
1806            if eop_aggregator
1807                .insert_generic(authority, ())
1808                .is_quorum_reached()
1809            {
1810                debug!(
1811                    "Collected enough end_of_publish messages with last message from validator {:?}",
1812                    authority.concise(),
1813                );
1814                return true;
1815            }
1816        }
1817
1818        false
1819    }
1820
1821    /// After we have collected 2f+1 EndOfPublish messages, we call this function every round until the epoch
1822    /// ends.
1823    fn advance_eop_state_machine(
1824        &self,
1825        state: &mut CommitHandlerState,
1826    ) -> (
1827        RwLockWriteGuard<'_, ReconfigState>,
1828        bool, // true if final round
1829    ) {
1830        let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard();
1831        let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx();
1832
1833        reconfig_state.close_all_certs();
1834
1835        let commit_has_deferred_txns = state.output.has_deferred_transactions();
1836        let previous_commits_have_deferred_txns =
1837            !self.epoch_store.deferred_transactions_empty_v2();
1838
1839        if !commit_has_deferred_txns && !previous_commits_have_deferred_txns {
1840            if !start_state_is_reject_all_tx {
1841                info!("Transitioning to RejectAllTx");
1842            }
1843            reconfig_state.close_all_tx();
1844        } else {
1845            debug!(
1846                "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}",
1847                previous_commits_have_deferred_txns, commit_has_deferred_txns,
1848            );
1849        }
1850
1851        state.output.store_reconfig_state(reconfig_state.clone());
1852
1853        if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() {
1854            (reconfig_state, true)
1855        } else {
1856            (reconfig_state, false)
1857        }
1858    }
1859
1860    fn gather_commit_metadata(
1861        &self,
1862        consensus_commit: &impl ConsensusCommitAPI,
1863    ) -> (u64, AuthorityIndex, u64) {
1864        let timestamp = consensus_commit.commit_timestamp_ms();
1865        let leader_author = consensus_commit.leader_author_index();
1866        let commit_sub_dag_index = consensus_commit.commit_sub_dag_index();
1867
1868        let system_time_ms = SystemTime::now()
1869            .duration_since(UNIX_EPOCH)
1870            .unwrap()
1871            .as_millis() as i64;
1872
1873        let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64);
1874        let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0;
1875        self.metrics
1876            .consensus_timestamp_bias
1877            .observe(consensus_timestamp_bias_seconds);
1878
1879        let epoch_start = self
1880            .epoch_store
1881            .epoch_start_config()
1882            .epoch_start_timestamp_ms();
1883        let timestamp = if timestamp < epoch_start {
1884            error!(
1885                "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}"
1886            );
1887            epoch_start
1888        } else {
1889            timestamp
1890        };
1891
1892        (timestamp, leader_author, commit_sub_dag_index)
1893    }
1894
1895    fn create_authenticator_state_update(
1896        &self,
1897        last_committed_round: u64,
1898        commit_info: &ConsensusCommitInfo,
1899    ) -> Option<VerifiedExecutableTransactionWithAliases> {
1900        // Load all jwks that became active in the previous round, and commit them in this round.
1901        // We want to delay one round because none of the transactions in the previous round could
1902        // have been authenticated with the jwks that became active in that round.
1903        //
1904        // Because of this delay, jwks that become active in the last round of the epoch will
1905        // never be committed. That is ok, because in the new epoch, the validators should
1906        // immediately re-submit these jwks, and they can become active then.
1907        let new_jwks = self
1908            .epoch_store
1909            .get_new_jwks(last_committed_round)
1910            .expect("Unrecoverable error in consensus handler");
1911
1912        if !new_jwks.is_empty() {
1913            let authenticator_state_update_transaction = authenticator_state_update_transaction(
1914                &self.epoch_store,
1915                commit_info.round,
1916                new_jwks,
1917            );
1918            debug!(
1919                "adding AuthenticatorStateUpdate({:?}) tx: {:?}",
1920                authenticator_state_update_transaction.digest(),
1921                authenticator_state_update_transaction,
1922            );
1923
1924            Some(VerifiedExecutableTransactionWithAliases::no_aliases(
1925                authenticator_state_update_transaction,
1926            ))
1927        } else {
1928            None
1929        }
1930    }
1931
1932    // Filters out rejected or deprecated transactions.
1933    #[instrument(level = "trace", skip_all)]
1934    fn filter_consensus_txns(
1935        &mut self,
1936        initial_reconfig_state: ReconfigState,
1937        commit_info: &ConsensusCommitInfo,
1938        consensus_commit: &impl ConsensusCommitAPI,
1939    ) -> Vec<(SequencedConsensusTransactionKind, u32)> {
1940        let mut transactions = Vec::new();
1941        let epoch = self.epoch_store.epoch();
1942        let mut num_finalized_user_transactions = vec![0; self.committee.size()];
1943        let mut num_rejected_user_transactions = vec![0; self.committee.size()];
1944        for (block, parsed_transactions) in consensus_commit.transactions() {
1945            let author = block.author.value();
1946            // TODO: consider only messages within 1~3 rounds of the leader?
1947            self.last_consensus_stats.stats.inc_num_messages(author);
1948
1949            // Set the "ping" transaction status for this block. This is necessary as there might be some ping requests waiting for the ping transaction to be certified.
1950            self.epoch_store.set_consensus_tx_status(
1951                ConsensusPosition::ping(epoch, block),
1952                ConsensusTxStatus::Finalized,
1953            );
1954
1955            for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() {
1956                let position = ConsensusPosition {
1957                    epoch,
1958                    block,
1959                    index: tx_index as TransactionIndex,
1960                };
1961
1962                // Transaction has appeared in consensus output, we can increment the submission count
1963                // for this tx for DoS protection.
1964                if self.epoch_store.protocol_config().mysticeti_fastpath()
1965                    && let Some(tx) = parsed.transaction.kind.as_user_transaction()
1966                {
1967                    let digest = tx.digest();
1968                    if let Some((spam_weight, submitter_client_addrs)) = self
1969                        .epoch_store
1970                        .submitted_transaction_cache
1971                        .increment_submission_count(digest)
1972                    {
1973                        if let Some(ref traffic_controller) = self.traffic_controller {
1974                            debug!(
1975                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses",
1976                                submitter_client_addrs.len()
1977                            );
1978
1979                            // Apply spam weight to all client addresses that submitted this transaction
1980                            for addr in submitter_client_addrs {
1981                                traffic_controller.tally(TrafficTally::new(
1982                                    Some(addr),
1983                                    None,
1984                                    None,
1985                                    spam_weight.clone(),
1986                                ));
1987                            }
1988                        } else {
1989                            warn!(
1990                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)",
1991                                submitter_client_addrs.len()
1992                            );
1993                        }
1994                    }
1995                }
1996
1997                if parsed.rejected {
1998                    // TODO(fastpath): Add metrics for rejected transactions.
1999                    if matches!(
2000                        parsed.transaction.kind,
2001                        ConsensusTransactionKind::UserTransaction(_)
2002                            | ConsensusTransactionKind::UserTransactionV2(_)
2003                    ) {
2004                        self.epoch_store
2005                            .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
2006                        num_rejected_user_transactions[author] += 1;
2007                    }
2008                    // Skip processing rejected transactions.
2009                    // TODO(fastpath): Handle unlocking.
2010                    continue;
2011                }
2012                if matches!(
2013                    parsed.transaction.kind,
2014                    ConsensusTransactionKind::UserTransaction(_)
2015                        | ConsensusTransactionKind::UserTransactionV2(_)
2016                ) {
2017                    self.epoch_store
2018                        .set_consensus_tx_status(position, ConsensusTxStatus::Finalized);
2019                    num_finalized_user_transactions[author] += 1;
2020                }
2021                let kind = classify(&parsed.transaction);
2022                self.metrics
2023                    .consensus_handler_processed
2024                    .with_label_values(&[kind])
2025                    .inc();
2026                self.metrics
2027                    .consensus_handler_transaction_sizes
2028                    .with_label_values(&[kind])
2029                    .observe(parsed.serialized_len as f64);
2030                // UserTransaction exists only when mysticeti_fastpath is enabled in protocol config.
2031                if matches!(
2032                    &parsed.transaction.kind,
2033                    ConsensusTransactionKind::CertifiedTransaction(_)
2034                        | ConsensusTransactionKind::UserTransaction(_)
2035                        | ConsensusTransactionKind::UserTransactionV2(_)
2036                ) {
2037                    self.last_consensus_stats
2038                        .stats
2039                        .inc_num_user_transactions(author);
2040                }
2041
2042                if !initial_reconfig_state.should_accept_consensus_certs() {
2043                    // (Note: we no longer need to worry about the previously deferred condition, since we are only
2044                    // processing newly-received transactions at this time).
2045                    match &parsed.transaction.kind {
2046                        ConsensusTransactionKind::UserTransaction(_)
2047                        | ConsensusTransactionKind::UserTransactionV2(_)
2048                        | ConsensusTransactionKind::CertifiedTransaction(_)
2049                        // deprecated and ignore later, but added for exhaustive match
2050                        | ConsensusTransactionKind::CapabilityNotification(_)
2051                        | ConsensusTransactionKind::CapabilityNotificationV2(_)
2052                        | ConsensusTransactionKind::EndOfPublish(_)
2053                        // Note: we no longer have to check protocol_config.ignore_execution_time_observations_after_certs_closed()
2054                        | ConsensusTransactionKind::ExecutionTimeObservation(_)
2055                        | ConsensusTransactionKind::NewJWKFetched(_, _, _) => {
2056                            debug!(
2057                                "Ignoring consensus transaction {:?} because of end of epoch",
2058                                parsed.transaction.key()
2059                            );
2060                            continue;
2061                        }
2062
2063                        // These are the message types that are still processed even if !should_accept_consensus_certs()
2064                        ConsensusTransactionKind::CheckpointSignature(_)
2065                        | ConsensusTransactionKind::CheckpointSignatureV2(_)
2066                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2067                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
2068                        | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => ()
2069                    }
2070                }
2071
2072                if !initial_reconfig_state.should_accept_tx() {
2073                    match &parsed.transaction.kind {
2074                        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
2075                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue,
2076                        _ => {}
2077                    }
2078                }
2079
2080                if parsed.transaction.is_mfp_transaction()
2081                    && !self.epoch_store.protocol_config().mysticeti_fastpath()
2082                {
2083                    debug!(
2084                        "Ignoring MFP transaction {:?} because MFP is disabled",
2085                        parsed.transaction.key()
2086                    );
2087                    continue;
2088                }
2089
2090                if let ConsensusTransactionKind::CertifiedTransaction(certificate) =
2091                    &parsed.transaction.kind
2092                    && certificate.epoch() != epoch
2093                {
2094                    debug!(
2095                        "Certificate epoch ({:?}) doesn't match the current epoch ({:?})",
2096                        certificate.epoch(),
2097                        epoch
2098                    );
2099                    continue;
2100                }
2101
2102                // Handle deprecated messages
2103                match &parsed.transaction.kind {
2104                    ConsensusTransactionKind::CapabilityNotification(_)
2105                    | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2106                    | ConsensusTransactionKind::CheckpointSignature(_) => {
2107                        debug_fatal!(
2108                            "BUG: saw deprecated tx {:?}for commit round {}",
2109                            parsed.transaction.key(),
2110                            commit_info.round
2111                        );
2112                        continue;
2113                    }
2114                    _ => {}
2115                }
2116
2117                if matches!(
2118                    &parsed.transaction.kind,
2119                    ConsensusTransactionKind::UserTransaction(_)
2120                        | ConsensusTransactionKind::UserTransactionV2(_)
2121                        | ConsensusTransactionKind::CertifiedTransaction(_)
2122                ) {
2123                    let author_name = self
2124                        .epoch_store
2125                        .committee()
2126                        .authority_by_index(author as u32)
2127                        .unwrap();
2128                    if self
2129                        .epoch_store
2130                        .has_received_end_of_publish_from(author_name)
2131                    {
2132                        // In some edge cases, consensus might resend previously seen certificate after EndOfPublish
2133                        // An honest validator should not send a new transaction after EndOfPublish. Whether the
2134                        // transaction is duplicate or not, we filter it out here.
2135                        warn!(
2136                            "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus",
2137                            author_name.concise(),
2138                            parsed.transaction.key(),
2139                        );
2140                        continue;
2141                    }
2142                }
2143
2144                let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);
2145                transactions.push((transaction, author as u32));
2146            }
2147        }
2148
2149        for (i, authority) in self.committee.authorities() {
2150            let hostname = &authority.hostname;
2151            self.metrics
2152                .consensus_committed_messages
2153                .with_label_values(&[hostname])
2154                .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
2155            self.metrics
2156                .consensus_committed_user_transactions
2157                .with_label_values(&[hostname])
2158                .set(
2159                    self.last_consensus_stats
2160                        .stats
2161                        .get_num_user_transactions(i.value()) as i64,
2162                );
2163            self.metrics
2164                .consensus_finalized_user_transactions
2165                .with_label_values(&[hostname])
2166                .add(num_finalized_user_transactions[i.value()] as i64);
2167            self.metrics
2168                .consensus_rejected_user_transactions
2169                .with_label_values(&[hostname])
2170                .add(num_rejected_user_transactions[i.value()] as i64);
2171        }
2172
2173        transactions
2174    }
2175
2176    fn deduplicate_consensus_txns(
2177        &mut self,
2178        state: &mut CommitHandlerState,
2179        commit_info: &ConsensusCommitInfo,
2180        transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
2181    ) -> Vec<VerifiedSequencedConsensusTransaction> {
2182        // We need a set here as well, since the processed_cache is a LRU cache and can drop
2183        // entries while we're iterating over the sequenced transactions.
2184        let mut processed_set = HashSet::new();
2185
2186        let mut all_transactions = Vec::new();
2187
2188        // All of these TODOs are handled here in the new code, whereas in the old code, they were
2189        // each handled separately. The key thing to see is that all messages are marked as processed
2190        // here, except for ones that are filtered out earlier (e.g. due to !should_accept_consensus_certs()).
2191
2192        for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
2193            // SequencedConsensusTransaction for commit prologue any more.
2194            // In process_consensus_transactions_and_commit_boundary(), we will add a system consensus commit
2195            // prologue transaction, which will be the first transaction in this consensus commit batch.
2196            // Therefore, the transaction sequence number starts from 1 here.
2197            let current_tx_index = ExecutionIndices {
2198                last_committed_round: commit_info.round,
2199                sub_dag_index: commit_info.consensus_commit_ref.index.into(),
2200                transaction_index: (seq + 1) as u64,
2201            };
2202
2203            self.last_consensus_stats.index = current_tx_index;
2204
2205            let certificate_author = *self
2206                .epoch_store
2207                .committee()
2208                .authority_by_index(cert_origin)
2209                .unwrap();
2210
2211            let sequenced_transaction = SequencedConsensusTransaction {
2212                certificate_author_index: cert_origin,
2213                certificate_author,
2214                consensus_index: current_tx_index,
2215                transaction,
2216            };
2217
2218            let Some(verified_transaction) = self
2219                .epoch_store
2220                .verify_consensus_transaction(sequenced_transaction)
2221            else {
2222                continue;
2223            };
2224
2225            let key = verified_transaction.0.key();
2226
2227            if let Some(tx_digest) = key.user_transaction_digest() {
2228                self.epoch_store
2229                    .cache_recently_finalized_transaction(tx_digest);
2230            }
2231
2232            let in_set = !processed_set.insert(key.clone());
2233            let in_cache = self.processed_cache.put(key.clone(), ()).is_some();
2234            if in_set || in_cache {
2235                self.metrics.skipped_consensus_txns_cache_hit.inc();
2236                continue;
2237            }
2238            if self
2239                .epoch_store
2240                .is_consensus_message_processed(&key)
2241                .expect("db error")
2242            {
2243                self.metrics.skipped_consensus_txns.inc();
2244                continue;
2245            }
2246
2247            state.output.record_consensus_message_processed(key);
2248
2249            all_transactions.push(verified_transaction);
2250        }
2251
2252        all_transactions
2253    }
2254
2255    fn build_commit_handler_input(
2256        &self,
2257        transactions: Vec<VerifiedSequencedConsensusTransaction>,
2258    ) -> CommitHandlerInput {
2259        let epoch = self.epoch_store.epoch();
2260        let mut commit_handler_input = CommitHandlerInput::default();
2261
2262        for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() {
2263            match transaction.transaction {
2264                SequencedConsensusTransactionKind::External(consensus_transaction) => {
2265                    match consensus_transaction.kind {
2266                        // === User transactions ===
2267                        ConsensusTransactionKind::CertifiedTransaction(cert) => {
2268                            // Safe because signatures are verified when consensus called into SuiTxValidator::validate_batch.
2269                            let cert = VerifiedCertificate::new_unchecked(*cert);
2270                            let transaction =
2271                                VerifiedExecutableTransaction::new_from_certificate(cert);
2272                            commit_handler_input.user_transactions.push(
2273                                VerifiedExecutableTransactionWithAliases::no_aliases(transaction),
2274                            );
2275                        }
2276                        ConsensusTransactionKind::UserTransaction(tx) => {
2277                            // Safe because transactions are certified by consensus.
2278                            let tx = VerifiedTransaction::new_unchecked(*tx);
2279                            // TODO(fastpath): accept position in consensus, after plumbing consensus round, authority index, and transaction index here.
2280                            let transaction =
2281                                VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
2282                            commit_handler_input
2283                                .user_transactions
2284                                // Use of v1 UserTransaction implies commitment to no aliases.
2285                                .push(VerifiedExecutableTransactionWithAliases::no_aliases(
2286                                    transaction,
2287                                ));
2288                        }
2289                        ConsensusTransactionKind::UserTransactionV2(tx) => {
2290                            let (tx, used_alias_versions) = tx.into_inner();
2291                            // Safe because transactions are certified by consensus.
2292                            let tx = VerifiedTransaction::new_unchecked(tx);
2293                            // TODO(fastpath): accept position in consensus, after plumbing consensus round, authority index, and transaction index here.
2294                            let transaction =
2295                                VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
2296                            commit_handler_input
2297                                .user_transactions
2298                                .push(WithAliases::new(transaction, used_alias_versions));
2299                        }
2300
2301                        // === State machines ===
2302                        ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => {
2303                            commit_handler_input
2304                                .end_of_publish_transactions
2305                                .push(authority_public_key_bytes);
2306                        }
2307                        ConsensusTransactionKind::NewJWKFetched(
2308                            authority_public_key_bytes,
2309                            jwk_id,
2310                            jwk,
2311                        ) => {
2312                            commit_handler_input.new_jwks.push((
2313                                authority_public_key_bytes,
2314                                jwk_id,
2315                                jwk,
2316                            ));
2317                        }
2318                        ConsensusTransactionKind::RandomnessDkgMessage(
2319                            authority_public_key_bytes,
2320                            items,
2321                        ) => {
2322                            commit_handler_input
2323                                .randomness_dkg_messages
2324                                .push((authority_public_key_bytes, items));
2325                        }
2326                        ConsensusTransactionKind::RandomnessDkgConfirmation(
2327                            authority_public_key_bytes,
2328                            items,
2329                        ) => {
2330                            commit_handler_input
2331                                .randomness_dkg_confirmations
2332                                .push((authority_public_key_bytes, items));
2333                        }
2334                        ConsensusTransactionKind::CapabilityNotificationV2(
2335                            authority_capabilities_v2,
2336                        ) => {
2337                            commit_handler_input
2338                                .capability_notifications
2339                                .push(authority_capabilities_v2);
2340                        }
2341                        ConsensusTransactionKind::ExecutionTimeObservation(
2342                            execution_time_observation,
2343                        ) => {
2344                            commit_handler_input
2345                                .execution_time_observations
2346                                .push(execution_time_observation);
2347                        }
2348                        ConsensusTransactionKind::CheckpointSignatureV2(
2349                            checkpoint_signature_message,
2350                        ) => {
2351                            commit_handler_input
2352                                .checkpoint_signature_messages
2353                                .push(*checkpoint_signature_message);
2354                        }
2355
2356                        // Deprecated messages, filtered earlier by filter_consensus_txns()
2357                        ConsensusTransactionKind::CheckpointSignature(_)
2358                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2359                        | ConsensusTransactionKind::CapabilityNotification(_) => {
2360                            unreachable!("filtered earlier")
2361                        }
2362                    }
2363                }
2364                // TODO: I think we can delete this, it was only used to inject randomness state update into the tx stream.
2365                SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(),
2366            }
2367        }
2368
2369        commit_handler_input
2370    }
2371
2372    async fn send_end_of_publish_if_needed(&self) {
2373        if !self.epoch_store.should_send_end_of_publish() {
2374            return;
2375        }
2376
2377        let end_of_publish = ConsensusTransaction::new_end_of_publish(self.epoch_store.name);
2378        if let Err(err) =
2379            self.consensus_adapter
2380                .submit(end_of_publish, None, &self.epoch_store, None, None)
2381        {
2382            warn!(
2383                "Error when sending EndOfPublish message from ConsensusHandler: {:?}",
2384                err
2385            );
2386        } else {
2387            info!(epoch=?self.epoch_store.epoch(), "Sending EndOfPublish message to consensus");
2388        }
2389    }
2390}
2391
2392/// Sends transactions to the execution scheduler in a separate task,
2393/// to avoid blocking consensus handler.
2394#[derive(Clone)]
2395pub(crate) struct ExecutionSchedulerSender {
2396    // Using unbounded channel to avoid blocking consensus commit and transaction handler.
2397    sender: monitored_mpsc::UnboundedSender<(
2398        Vec<Schedulable>,
2399        AssignedTxAndVersions,
2400        SchedulingSource,
2401    )>,
2402}
2403
2404impl ExecutionSchedulerSender {
2405    fn start(
2406        execution_scheduler: Arc<ExecutionScheduler>,
2407        epoch_store: Arc<AuthorityPerEpochStore>,
2408    ) -> Self {
2409        let (sender, recv) = monitored_mpsc::unbounded_channel("execution_scheduler_sender");
2410        spawn_monitored_task!(Self::run(recv, execution_scheduler, epoch_store));
2411        Self { sender }
2412    }
2413
2414    pub(crate) fn new_for_testing(
2415        sender: monitored_mpsc::UnboundedSender<(
2416            Vec<Schedulable>,
2417            AssignedTxAndVersions,
2418            SchedulingSource,
2419        )>,
2420    ) -> Self {
2421        Self { sender }
2422    }
2423
2424    fn send(
2425        &self,
2426        transactions: Vec<Schedulable>,
2427        assigned_versions: AssignedTxAndVersions,
2428        scheduling_source: SchedulingSource,
2429    ) {
2430        let _ = self
2431            .sender
2432            .send((transactions, assigned_versions, scheduling_source));
2433    }
2434
2435    async fn run(
2436        mut recv: monitored_mpsc::UnboundedReceiver<(
2437            Vec<Schedulable>,
2438            AssignedTxAndVersions,
2439            SchedulingSource,
2440        )>,
2441        execution_scheduler: Arc<ExecutionScheduler>,
2442        epoch_store: Arc<AuthorityPerEpochStore>,
2443    ) {
2444        while let Some((transactions, assigned_versions, scheduling_source)) = recv.recv().await {
2445            let _guard = monitored_scope("ConsensusHandler::enqueue");
2446            let assigned_versions = assigned_versions.into_map();
2447            let txns = transactions
2448                .into_iter()
2449                .map(|txn| {
2450                    let key = txn.key();
2451                    (
2452                        txn,
2453                        ExecutionEnv::new()
2454                            .with_scheduling_source(scheduling_source)
2455                            .with_assigned_versions(
2456                                assigned_versions.get(&key).cloned().unwrap_or_default(),
2457                            ),
2458                    )
2459                })
2460                .collect();
2461            execution_scheduler.enqueue(txns, &epoch_store);
2462        }
2463    }
2464}
2465
2466/// Manages the lifetime of tasks handling the commits and transactions output by consensus.
2467pub(crate) struct MysticetiConsensusHandler {
2468    tasks: JoinSet<()>,
2469}
2470
2471impl MysticetiConsensusHandler {
2472    pub(crate) fn new(
2473        last_processed_commit_at_startup: CommitIndex,
2474        mut consensus_handler: ConsensusHandler<CheckpointService>,
2475        consensus_block_handler: ConsensusBlockHandler,
2476        mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
2477        mut block_receiver: UnboundedReceiver<consensus_core::CertifiedBlocksOutput>,
2478        commit_consumer_monitor: Arc<CommitConsumerMonitor>,
2479    ) -> Self {
2480        let mut tasks = JoinSet::new();
2481        tasks.spawn(monitored_future!(async move {
2482            // TODO: pause when execution is overloaded, so consensus can detect the backpressure.
2483            while let Some(consensus_commit) = commit_receiver.recv().await {
2484                let commit_index = consensus_commit.commit_ref.index;
2485                if commit_index <= last_processed_commit_at_startup {
2486                    consensus_handler.handle_prior_consensus_commit(consensus_commit);
2487                } else {
2488                    consensus_handler
2489                        .handle_consensus_commit(consensus_commit)
2490                        .await;
2491                }
2492                commit_consumer_monitor.set_highest_handled_commit(commit_index);
2493            }
2494        }));
2495        if consensus_block_handler.enabled() {
2496            tasks.spawn(monitored_future!(async move {
2497                while let Some(blocks) = block_receiver.recv().await {
2498                    consensus_block_handler
2499                        .handle_certified_blocks(blocks)
2500                        .await;
2501                }
2502            }));
2503        }
2504        Self { tasks }
2505    }
2506
2507    pub(crate) async fn abort(&mut self) {
2508        self.tasks.shutdown().await;
2509    }
2510}
2511
2512fn authenticator_state_update_transaction(
2513    epoch_store: &AuthorityPerEpochStore,
2514    round: u64,
2515    mut new_active_jwks: Vec<ActiveJwk>,
2516) -> VerifiedExecutableTransaction {
2517    let epoch = epoch_store.epoch();
2518    new_active_jwks.sort();
2519
2520    info!("creating authenticator state update transaction");
2521    assert!(epoch_store.authenticator_state_enabled());
2522    let transaction = VerifiedTransaction::new_authenticator_state_update(
2523        epoch,
2524        round,
2525        new_active_jwks,
2526        epoch_store
2527            .epoch_start_config()
2528            .authenticator_obj_initial_shared_version()
2529            .expect("authenticator state obj must exist"),
2530    );
2531    VerifiedExecutableTransaction::new_system(transaction, epoch)
2532}
2533
2534pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
2535    match &transaction.kind {
2536        ConsensusTransactionKind::CertifiedTransaction(certificate) => {
2537            if certificate.is_consensus_tx() {
2538                "shared_certificate"
2539            } else {
2540                "owned_certificate"
2541            }
2542        }
2543        ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
2544        ConsensusTransactionKind::CheckpointSignatureV2(_) => "checkpoint_signature",
2545        ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
2546        ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
2547        ConsensusTransactionKind::CapabilityNotificationV2(_) => "capability_notification_v2",
2548        ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
2549        ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
2550        ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
2551        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
2552        ConsensusTransactionKind::UserTransaction(tx) => {
2553            if tx.is_consensus_tx() {
2554                "shared_user_transaction"
2555            } else {
2556                "owned_user_transaction"
2557            }
2558        }
2559        ConsensusTransactionKind::UserTransactionV2(tx) => {
2560            if tx.tx().is_consensus_tx() {
2561                "shared_user_transaction_v2"
2562            } else {
2563                "owned_user_transaction_v2"
2564            }
2565        }
2566        ConsensusTransactionKind::ExecutionTimeObservation(_) => "execution_time_observation",
2567    }
2568}
2569
2570#[derive(Debug, Clone, Serialize, Deserialize)]
2571pub struct SequencedConsensusTransaction {
2572    pub certificate_author_index: AuthorityIndex,
2573    pub certificate_author: AuthorityName,
2574    pub consensus_index: ExecutionIndices,
2575    pub transaction: SequencedConsensusTransactionKind,
2576}
2577
2578#[derive(Debug, Clone)]
2579#[allow(clippy::large_enum_variant)]
2580pub enum SequencedConsensusTransactionKind {
2581    External(ConsensusTransaction),
2582    System(VerifiedExecutableTransaction),
2583}
2584
2585impl Serialize for SequencedConsensusTransactionKind {
2586    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2587        let serializable = SerializableSequencedConsensusTransactionKind::from(self);
2588        serializable.serialize(serializer)
2589    }
2590}
2591
2592impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
2593    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
2594        let serializable =
2595            SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
2596        Ok(serializable.into())
2597    }
2598}
2599
2600// We can't serialize SequencedConsensusTransactionKind directly because it contains a
2601// VerifiedExecutableTransaction, which is not serializable (by design). This wrapper allows us to
2602// convert to a serializable format easily.
2603#[derive(Debug, Clone, Serialize, Deserialize)]
2604#[allow(clippy::large_enum_variant)]
2605enum SerializableSequencedConsensusTransactionKind {
2606    External(ConsensusTransaction),
2607    System(TrustedExecutableTransaction),
2608}
2609
2610impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
2611    fn from(kind: &SequencedConsensusTransactionKind) -> Self {
2612        match kind {
2613            SequencedConsensusTransactionKind::External(ext) => {
2614                SerializableSequencedConsensusTransactionKind::External(ext.clone())
2615            }
2616            SequencedConsensusTransactionKind::System(txn) => {
2617                SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
2618            }
2619        }
2620    }
2621}
2622
2623impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
2624    fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
2625        match kind {
2626            SerializableSequencedConsensusTransactionKind::External(ext) => {
2627                SequencedConsensusTransactionKind::External(ext)
2628            }
2629            SerializableSequencedConsensusTransactionKind::System(txn) => {
2630                SequencedConsensusTransactionKind::System(txn.into())
2631            }
2632        }
2633    }
2634}
2635
2636#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
2637pub enum SequencedConsensusTransactionKey {
2638    External(ConsensusTransactionKey),
2639    System(TransactionDigest),
2640}
2641
2642impl SequencedConsensusTransactionKey {
2643    pub fn user_transaction_digest(&self) -> Option<TransactionDigest> {
2644        match self {
2645            SequencedConsensusTransactionKey::External(key) => match key {
2646                ConsensusTransactionKey::Certificate(digest) => Some(*digest),
2647                _ => None,
2648            },
2649            SequencedConsensusTransactionKey::System(_) => None,
2650        }
2651    }
2652}
2653
2654impl SequencedConsensusTransactionKind {
2655    pub fn key(&self) -> SequencedConsensusTransactionKey {
2656        match self {
2657            SequencedConsensusTransactionKind::External(ext) => {
2658                SequencedConsensusTransactionKey::External(ext.key())
2659            }
2660            SequencedConsensusTransactionKind::System(txn) => {
2661                SequencedConsensusTransactionKey::System(*txn.digest())
2662            }
2663        }
2664    }
2665
2666    pub fn get_tracking_id(&self) -> u64 {
2667        match self {
2668            SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
2669            SequencedConsensusTransactionKind::System(_txn) => 0,
2670        }
2671    }
2672
2673    pub fn is_executable_transaction(&self) -> bool {
2674        match self {
2675            SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(),
2676            SequencedConsensusTransactionKind::System(_) => true,
2677        }
2678    }
2679
2680    pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
2681        match self {
2682            SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
2683                ConsensusTransactionKind::CertifiedTransaction(txn) => Some(*txn.digest()),
2684                ConsensusTransactionKind::UserTransaction(txn) => Some(*txn.digest()),
2685                ConsensusTransactionKind::UserTransactionV2(txn) => Some(*txn.tx().digest()),
2686                _ => None,
2687            },
2688            SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
2689        }
2690    }
2691
2692    pub fn is_end_of_publish(&self) -> bool {
2693        match self {
2694            SequencedConsensusTransactionKind::External(ext) => {
2695                matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
2696            }
2697            SequencedConsensusTransactionKind::System(_) => false,
2698        }
2699    }
2700}
2701
2702impl SequencedConsensusTransaction {
2703    pub fn sender_authority(&self) -> AuthorityName {
2704        self.certificate_author
2705    }
2706
2707    pub fn key(&self) -> SequencedConsensusTransactionKey {
2708        self.transaction.key()
2709    }
2710
2711    pub fn is_end_of_publish(&self) -> bool {
2712        if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
2713            matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
2714        } else {
2715            false
2716        }
2717    }
2718
2719    pub fn try_take_execution_time_observation(&mut self) -> Option<ExecutionTimeObservation> {
2720        if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
2721            kind: ConsensusTransactionKind::ExecutionTimeObservation(observation),
2722            ..
2723        }) = &mut self.transaction
2724        {
2725            Some(std::mem::take(observation))
2726        } else {
2727            None
2728        }
2729    }
2730
2731    pub fn is_system(&self) -> bool {
2732        matches!(
2733            self.transaction,
2734            SequencedConsensusTransactionKind::System(_)
2735        )
2736    }
2737
2738    pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
2739        if !randomness_state_enabled {
2740            // If randomness is disabled, these should be processed same as a tx without randomness,
2741            // which will eventually fail when the randomness state object is not found.
2742            return false;
2743        }
2744        match &self.transaction {
2745            SequencedConsensusTransactionKind::External(ConsensusTransaction {
2746                kind: ConsensusTransactionKind::CertifiedTransaction(cert),
2747                ..
2748            }) => cert.transaction_data().uses_randomness(),
2749            SequencedConsensusTransactionKind::External(ConsensusTransaction {
2750                kind: ConsensusTransactionKind::UserTransaction(txn),
2751                ..
2752            }) => txn.transaction_data().uses_randomness(),
2753            SequencedConsensusTransactionKind::External(ConsensusTransaction {
2754                kind: ConsensusTransactionKind::UserTransactionV2(txn),
2755                ..
2756            }) => txn.tx().transaction_data().uses_randomness(),
2757            _ => false,
2758        }
2759    }
2760
2761    pub fn as_consensus_txn(&self) -> Option<&SenderSignedData> {
2762        match &self.transaction {
2763            SequencedConsensusTransactionKind::External(ConsensusTransaction {
2764                kind: ConsensusTransactionKind::CertifiedTransaction(certificate),
2765                ..
2766            }) if certificate.is_consensus_tx() => Some(certificate.data()),
2767            SequencedConsensusTransactionKind::External(ConsensusTransaction {
2768                kind: ConsensusTransactionKind::UserTransaction(txn),
2769                ..
2770            }) if txn.is_consensus_tx() => Some(txn.data()),
2771            SequencedConsensusTransactionKind::External(ConsensusTransaction {
2772                kind: ConsensusTransactionKind::UserTransactionV2(txn),
2773                ..
2774            }) if txn.tx().is_consensus_tx() => Some(txn.tx().data()),
2775            SequencedConsensusTransactionKind::System(txn) if txn.is_consensus_tx() => {
2776                Some(txn.data())
2777            }
2778            _ => None,
2779        }
2780    }
2781}
2782
2783#[derive(Debug, Clone, Serialize, Deserialize)]
2784pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
2785
2786#[cfg(test)]
2787impl VerifiedSequencedConsensusTransaction {
2788    pub fn new_test(transaction: ConsensusTransaction) -> Self {
2789        Self(SequencedConsensusTransaction::new_test(transaction))
2790    }
2791}
2792
2793impl SequencedConsensusTransaction {
2794    pub fn new_test(transaction: ConsensusTransaction) -> Self {
2795        Self {
2796            certificate_author_index: 0,
2797            certificate_author: AuthorityName::ZERO,
2798            consensus_index: Default::default(),
2799            transaction: SequencedConsensusTransactionKind::External(transaction),
2800        }
2801    }
2802}
2803
2804/// Handles certified and rejected transactions output by consensus.
2805pub(crate) struct ConsensusBlockHandler {
2806    /// Whether to enable handling certified transactions.
2807    enabled: bool,
2808    /// Per-epoch store.
2809    epoch_store: Arc<AuthorityPerEpochStore>,
2810    /// Enqueues transactions to the execution scheduler via a separate task.
2811    execution_scheduler_sender: ExecutionSchedulerSender,
2812    /// Backpressure subscriber to wait for backpressure to be resolved.
2813    backpressure_subscriber: BackpressureSubscriber,
2814    /// Metrics for consensus transaction handling.
2815    metrics: Arc<AuthorityMetrics>,
2816}
2817
2818impl ConsensusBlockHandler {
2819    pub fn new(
2820        epoch_store: Arc<AuthorityPerEpochStore>,
2821        execution_scheduler_sender: ExecutionSchedulerSender,
2822        backpressure_subscriber: BackpressureSubscriber,
2823        metrics: Arc<AuthorityMetrics>,
2824    ) -> Self {
2825        Self {
2826            enabled: epoch_store.protocol_config().mysticeti_fastpath(),
2827            epoch_store,
2828            execution_scheduler_sender,
2829            backpressure_subscriber,
2830            metrics,
2831        }
2832    }
2833
2834    pub fn enabled(&self) -> bool {
2835        self.enabled
2836    }
2837
2838    #[instrument(level = "debug", skip_all)]
2839    async fn handle_certified_blocks(&self, blocks_output: CertifiedBlocksOutput) {
2840        self.backpressure_subscriber.await_no_backpressure().await;
2841
2842        let _scope = monitored_scope("ConsensusBlockHandler::handle_certified_blocks");
2843
2844        // Avoid triggering fastpath execution or setting transaction status to fastpath certified, during reconfiguration.
2845        let reconfiguration_lock = self.epoch_store.get_reconfig_state_read_lock_guard();
2846        if !reconfiguration_lock.should_accept_user_certs() {
2847            debug!(
2848                "Skipping fastpath execution because epoch {} is closing user transactions: {}",
2849                self.epoch_store.epoch(),
2850                blocks_output
2851                    .blocks
2852                    .iter()
2853                    .map(|b| b.block.reference().to_string())
2854                    .join(", "),
2855            );
2856            return;
2857        }
2858
2859        self.metrics.consensus_block_handler_block_processed.inc();
2860        let epoch = self.epoch_store.epoch();
2861        let parsed_transactions = blocks_output
2862            .blocks
2863            .into_iter()
2864            .map(|certified_block| {
2865                let block_ref = certified_block.block.reference();
2866                let transactions =
2867                    parse_block_transactions(&certified_block.block, &certified_block.rejected);
2868                (block_ref, transactions)
2869            })
2870            .collect::<Vec<_>>();
2871        let mut executable_transactions = vec![];
2872        for (block, transactions) in parsed_transactions.into_iter() {
2873            // Set the "ping" transaction status for this block. This is ncecessary as there might be some ping requests waiting for the ping transaction to be certified.
2874            self.epoch_store.set_consensus_tx_status(
2875                ConsensusPosition::ping(epoch, block),
2876                ConsensusTxStatus::FastpathCertified,
2877            );
2878
2879            for (txn_idx, parsed) in transactions.into_iter().enumerate() {
2880                let position = ConsensusPosition {
2881                    epoch,
2882                    block,
2883                    index: txn_idx as TransactionIndex,
2884                };
2885
2886                let status_str = if parsed.rejected {
2887                    "rejected"
2888                } else {
2889                    "certified"
2890                };
2891                if let Some(tx) = parsed.transaction.kind.as_user_transaction() {
2892                    debug!(
2893                        "User Transaction in position: {:} with digest {:} is {:}",
2894                        position,
2895                        tx.digest(),
2896                        status_str
2897                    );
2898                } else {
2899                    debug!(
2900                        "System Transaction in position: {:} is {:}",
2901                        position, status_str
2902                    );
2903                }
2904
2905                if parsed.rejected {
2906                    // TODO(fastpath): avoid parsing blocks twice between handling commit and fastpath transactions?
2907                    self.epoch_store
2908                        .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
2909                    self.metrics
2910                        .consensus_block_handler_txn_processed
2911                        .with_label_values(&["rejected"])
2912                        .inc();
2913                    continue;
2914                }
2915
2916                self.metrics
2917                    .consensus_block_handler_txn_processed
2918                    .with_label_values(&["certified"])
2919                    .inc();
2920
2921                if let Some(tx) = parsed.transaction.kind.into_user_transaction() {
2922                    if tx.is_consensus_tx() {
2923                        continue;
2924                    }
2925                    // Only set fastpath certified status on transactions intended for fastpath execution.
2926                    self.epoch_store
2927                        .set_consensus_tx_status(position, ConsensusTxStatus::FastpathCertified);
2928                    let tx = VerifiedTransaction::new_unchecked(tx);
2929                    executable_transactions.push(Schedulable::Transaction(
2930                        VerifiedExecutableTransaction::new_from_consensus(
2931                            tx,
2932                            self.epoch_store.epoch(),
2933                        ),
2934                    ));
2935                }
2936            }
2937        }
2938
2939        if executable_transactions.is_empty() {
2940            return;
2941        }
2942        self.metrics
2943            .consensus_block_handler_fastpath_executions
2944            .inc_by(executable_transactions.len() as u64);
2945
2946        self.execution_scheduler_sender.send(
2947            executable_transactions,
2948            Default::default(),
2949            SchedulingSource::MysticetiFastPath,
2950        );
2951    }
2952}
2953
2954#[derive(Serialize, Deserialize)]
2955pub(crate) struct CommitIntervalObserver {
2956    ring_buffer: VecDeque<u64>,
2957}
2958
2959impl CommitIntervalObserver {
2960    pub fn new(window_size: u32) -> Self {
2961        Self {
2962            ring_buffer: VecDeque::with_capacity(window_size as usize),
2963        }
2964    }
2965
2966    pub fn observe_commit_time(&mut self, consensus_commit: &impl ConsensusCommitAPI) {
2967        let commit_time = consensus_commit.commit_timestamp_ms();
2968        if self.ring_buffer.len() == self.ring_buffer.capacity() {
2969            self.ring_buffer.pop_front();
2970        }
2971        self.ring_buffer.push_back(commit_time);
2972    }
2973
2974    pub fn commit_interval_estimate(&self) -> Option<Duration> {
2975        if self.ring_buffer.len() <= 1 {
2976            None
2977        } else {
2978            let first = self.ring_buffer.front().unwrap();
2979            let last = self.ring_buffer.back().unwrap();
2980            let duration = last.saturating_sub(*first);
2981            let num_commits = self.ring_buffer.len() as u64;
2982            Some(Duration::from_millis(duration.div_ceil(num_commits)))
2983        }
2984    }
2985}
2986
2987#[cfg(test)]
2988mod tests {
2989    use std::collections::HashSet;
2990
2991    use consensus_core::{
2992        BlockAPI, CertifiedBlock, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction,
2993        VerifiedBlock,
2994    };
2995    use consensus_types::block::TransactionIndex;
2996    use futures::pin_mut;
2997    use prometheus::Registry;
2998    use sui_protocol_config::{ConsensusTransactionOrdering, ProtocolConfig};
2999    use sui_types::{
3000        base_types::ExecutionDigests,
3001        base_types::{AuthorityName, FullObjectRef, ObjectID, SuiAddress, random_object_ref},
3002        committee::Committee,
3003        crypto::deterministic_random_account_key,
3004        gas::GasCostSummary,
3005        message_envelope::Message,
3006        messages_checkpoint::{
3007            CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
3008            SignedCheckpointSummary,
3009        },
3010        messages_consensus::ConsensusTransaction,
3011        object::Object,
3012        transaction::{
3013            CertifiedTransaction, SenderSignedData, TransactionData, TransactionDataAPI,
3014        },
3015    };
3016
3017    use super::*;
3018    use crate::{
3019        authority::{
3020            authority_per_epoch_store::ConsensusStatsAPI,
3021            test_authority_builder::TestAuthorityBuilder,
3022        },
3023        checkpoints::CheckpointServiceNoop,
3024        consensus_adapter::consensus_tests::{
3025            test_certificates_with_gas_objects, test_user_transaction,
3026        },
3027        consensus_test_utils::make_consensus_adapter_for_test,
3028        post_consensus_tx_reorder::PostConsensusTxReorder,
3029    };
3030
3031    #[tokio::test(flavor = "current_thread", start_paused = true)]
3032    async fn test_consensus_commit_handler() {
3033        telemetry_subscribers::init_for_testing();
3034
3035        // GIVEN
3036        // 1 account keypair
3037        let (sender, keypair) = deterministic_random_account_key();
3038        // 12 gas objects.
3039        let gas_objects: Vec<Object> = (0..12)
3040            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3041            .collect();
3042        // 4 owned objects.
3043        let owned_objects: Vec<Object> = (0..4)
3044            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3045            .collect();
3046        // 6 shared objects.
3047        let shared_objects: Vec<Object> = (0..6)
3048            .map(|_| Object::shared_for_testing())
3049            .collect::<Vec<_>>();
3050        let mut all_objects = gas_objects.clone();
3051        all_objects.extend(owned_objects.clone());
3052        all_objects.extend(shared_objects.clone());
3053
3054        let network_config =
3055            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3056                .with_objects(all_objects.clone())
3057                .build();
3058
3059        let state = TestAuthorityBuilder::new()
3060            .with_network_config(&network_config, 0)
3061            .build()
3062            .await;
3063
3064        let epoch_store = state.epoch_store_for_testing().clone();
3065        let new_epoch_start_state = epoch_store.epoch_start_state();
3066        let consensus_committee = new_epoch_start_state.get_consensus_committee();
3067
3068        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3069
3070        let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
3071
3072        let backpressure_manager = BackpressureManager::new_for_tests();
3073        let consensus_adapter =
3074            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3075        let mut consensus_handler = ConsensusHandler::new(
3076            epoch_store,
3077            Arc::new(CheckpointServiceNoop {}),
3078            state.execution_scheduler().clone(),
3079            consensus_adapter,
3080            state.get_object_cache_reader().clone(),
3081            Arc::new(ArcSwap::default()),
3082            consensus_committee.clone(),
3083            metrics,
3084            Arc::new(throughput_calculator),
3085            backpressure_manager.subscribe(),
3086            state.traffic_controller.clone(),
3087        );
3088
3089        // AND create test user transactions alternating between owned and shared input.
3090        let mut user_transactions = vec![];
3091        for (i, gas_object) in gas_objects[0..8].iter().enumerate() {
3092            let input_object = if i % 2 == 0 {
3093                owned_objects.get(i / 2).unwrap().clone()
3094            } else {
3095                shared_objects.get(i / 2).unwrap().clone()
3096            };
3097            let transaction = test_user_transaction(
3098                &state,
3099                sender,
3100                &keypair,
3101                gas_object.clone(),
3102                vec![input_object],
3103            )
3104            .await;
3105            user_transactions.push(transaction);
3106        }
3107
3108        // AND create 4 certified transactions with remaining gas objects and 2 shared objects.
3109        // Having more txns on the same shared object may get deferred.
3110        let certified_transactions = [
3111            test_certificates_with_gas_objects(
3112                &state,
3113                &gas_objects[8..10],
3114                shared_objects[4].clone(),
3115            )
3116            .await,
3117            test_certificates_with_gas_objects(
3118                &state,
3119                &gas_objects[10..12],
3120                shared_objects[5].clone(),
3121            )
3122            .await,
3123        ]
3124        .concat();
3125
3126        // AND create block for each user and certified transaction
3127        let mut blocks = Vec::new();
3128        for (i, consensus_transaction) in user_transactions
3129            .iter()
3130            .cloned()
3131            .map(|t| ConsensusTransaction::new_user_transaction_v2_message(&state.name, t.into()))
3132            .chain(
3133                certified_transactions
3134                    .iter()
3135                    .map(|t| ConsensusTransaction::new_certificate_message(&state.name, t.clone())),
3136            )
3137            .enumerate()
3138        {
3139            let transaction_bytes = bcs::to_bytes(&consensus_transaction).unwrap();
3140            let block = VerifiedBlock::new_for_test(
3141                TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
3142                    .set_transactions(vec![Transaction::new(transaction_bytes)])
3143                    .build(),
3144            );
3145
3146            blocks.push(block);
3147        }
3148
3149        // AND create the consensus commit
3150        let leader_block = blocks[0].clone();
3151        let committed_sub_dag = CommittedSubDag::new(
3152            leader_block.reference(),
3153            blocks.clone(),
3154            leader_block.timestamp_ms(),
3155            CommitRef::new(10, CommitDigest::MIN),
3156        );
3157
3158        // Test that the consensus handler respects backpressure.
3159        backpressure_manager.set_backpressure(true);
3160        // Default watermarks are 0,0 which will suppress the backpressure.
3161        backpressure_manager.update_highest_certified_checkpoint(1);
3162
3163        // AND process the consensus commit once
3164        {
3165            let waiter = consensus_handler.handle_consensus_commit(committed_sub_dag.clone());
3166            pin_mut!(waiter);
3167
3168            // waiter should not complete within 5 seconds
3169            tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
3170                .await
3171                .unwrap_err();
3172
3173            // lift backpressure
3174            backpressure_manager.set_backpressure(false);
3175
3176            // waiter completes now.
3177            tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
3178                .await
3179                .unwrap();
3180        }
3181
3182        // THEN check the consensus stats
3183        let num_blocks = blocks.len();
3184        let num_transactions = user_transactions.len() + certified_transactions.len();
3185        let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
3186        assert_eq!(
3187            last_consensus_stats_1.index.transaction_index,
3188            num_transactions as u64
3189        );
3190        assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
3191        assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
3192        assert_eq!(last_consensus_stats_1.hash, 0);
3193        assert_eq!(
3194            last_consensus_stats_1.stats.get_num_messages(0),
3195            num_blocks as u64
3196        );
3197        assert_eq!(
3198            last_consensus_stats_1.stats.get_num_user_transactions(0),
3199            num_transactions as u64
3200        );
3201
3202        // THEN check for execution status of user transactions.
3203        for (i, t) in user_transactions.iter().enumerate() {
3204            let digest = t.tx().digest();
3205            if let Ok(Ok(_)) = tokio::time::timeout(
3206                std::time::Duration::from_secs(10),
3207                state.notify_read_effects("", *digest),
3208            )
3209            .await
3210            {
3211                // Effects exist as expected.
3212            } else {
3213                panic!("User transaction {} {} did not execute", i, digest);
3214            }
3215        }
3216
3217        // THEN check for execution status of certified transactions.
3218        for (i, t) in certified_transactions.iter().enumerate() {
3219            let digest = t.digest();
3220            if let Ok(Ok(_)) = tokio::time::timeout(
3221                std::time::Duration::from_secs(10),
3222                state.notify_read_effects("", *digest),
3223            )
3224            .await
3225            {
3226                // Effects exist as expected.
3227            } else {
3228                panic!("Certified transaction {} {} did not execute", i, digest);
3229            }
3230        }
3231
3232        // THEN check for no inflight or suspended transactions.
3233        state.execution_scheduler().check_empty_for_testing();
3234    }
3235
3236    #[tokio::test]
3237    async fn test_consensus_block_handler() {
3238        // GIVEN
3239        // 1 account keypair
3240        let (sender, keypair) = deterministic_random_account_key();
3241        // 8 gas objects.
3242        let gas_objects: Vec<Object> = (0..8)
3243            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3244            .collect();
3245        // 4 owned objects.
3246        let owned_objects: Vec<Object> = (0..4)
3247            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3248            .collect();
3249        // 4 shared objects.
3250        let shared_objects: Vec<Object> = (0..4)
3251            .map(|_| Object::shared_for_testing())
3252            .collect::<Vec<_>>();
3253        let mut all_objects = gas_objects.clone();
3254        all_objects.extend(owned_objects.clone());
3255        all_objects.extend(shared_objects.clone());
3256
3257        let network_config =
3258            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3259                .with_objects(all_objects.clone())
3260                .build();
3261
3262        let state = TestAuthorityBuilder::new()
3263            .with_network_config(&network_config, 0)
3264            .build()
3265            .await;
3266        let epoch_store = state.epoch_store_for_testing().clone();
3267        let execution_scheduler_sender = ExecutionSchedulerSender::start(
3268            state.execution_scheduler().clone(),
3269            epoch_store.clone(),
3270        );
3271
3272        let backpressure_manager = BackpressureManager::new_for_tests();
3273        let block_handler = ConsensusBlockHandler::new(
3274            epoch_store.clone(),
3275            execution_scheduler_sender,
3276            backpressure_manager.subscribe(),
3277            state.metrics.clone(),
3278        );
3279
3280        // AND create test transactions alternating between owned and shared input.
3281        let mut transactions = vec![];
3282        for (i, gas_object) in gas_objects.iter().enumerate() {
3283            let input_object = if i % 2 == 0 {
3284                owned_objects.get(i / 2).unwrap().clone()
3285            } else {
3286                shared_objects.get(i / 2).unwrap().clone()
3287            };
3288            let transaction = test_user_transaction(
3289                &state,
3290                sender,
3291                &keypair,
3292                gas_object.clone(),
3293                vec![input_object],
3294            )
3295            .await;
3296            transactions.push(transaction);
3297        }
3298
3299        let serialized_transactions: Vec<_> = transactions
3300            .iter()
3301            .cloned()
3302            .map(|t| {
3303                Transaction::new(
3304                    bcs::to_bytes(&ConsensusTransaction::new_user_transaction_v2_message(
3305                        &state.name,
3306                        t.into(),
3307                    ))
3308                    .unwrap(),
3309                )
3310            })
3311            .collect();
3312
3313        // AND create block for all transactions
3314        let block = VerifiedBlock::new_for_test(
3315            TestBlock::new(100, 1)
3316                .set_transactions(serialized_transactions.clone())
3317                .build(),
3318        );
3319
3320        // AND set rejected transactions.
3321        let rejected_transactions = vec![0, 3, 4];
3322
3323        // AND process the transactions from consensus output.
3324        block_handler
3325            .handle_certified_blocks(CertifiedBlocksOutput {
3326                blocks: vec![CertifiedBlock {
3327                    block: block.clone(),
3328                    rejected: rejected_transactions.clone(),
3329                }],
3330            })
3331            .await;
3332
3333        // Ensure the correct consensus status is set for the correct consensus position
3334        let consensus_tx_status_cache = epoch_store.consensus_tx_status_cache.as_ref().unwrap();
3335        for txn_idx in 0..transactions.len() {
3336            let position = ConsensusPosition {
3337                epoch: epoch_store.epoch(),
3338                block: block.reference(),
3339                index: txn_idx as TransactionIndex,
3340            };
3341            if rejected_transactions.contains(&(txn_idx as TransactionIndex)) {
3342                // Expect rejected transactions to be marked as such.
3343                assert_eq!(
3344                    consensus_tx_status_cache.get_transaction_status(&position),
3345                    Some(ConsensusTxStatus::Rejected)
3346                );
3347            } else if txn_idx % 2 == 0 {
3348                // Expect owned object transactions to be marked as fastpath certified.
3349                assert_eq!(
3350                    consensus_tx_status_cache.get_transaction_status(&position),
3351                    Some(ConsensusTxStatus::FastpathCertified),
3352                );
3353            } else {
3354                // Expect shared object transactions to be marked as fastpath certified.
3355                assert_eq!(
3356                    consensus_tx_status_cache.get_transaction_status(&position),
3357                    None,
3358                );
3359            }
3360        }
3361
3362        // THEN check for status of transactions that should have been executed.
3363        for (i, t) in transactions.iter().enumerate() {
3364            // Do not expect shared transactions or rejected transactions to be executed.
3365            if i % 2 == 1 || rejected_transactions.contains(&(i as TransactionIndex)) {
3366                continue;
3367            }
3368            let digest = t.tx().digest();
3369            if tokio::time::timeout(
3370                std::time::Duration::from_secs(10),
3371                state
3372                    .get_transaction_cache_reader()
3373                    .notify_read_fastpath_transaction_outputs(&[*digest]),
3374            )
3375            .await
3376            .is_err()
3377            {
3378                panic!("Transaction {} {} did not execute", i, digest);
3379            }
3380        }
3381
3382        // THEN check for no inflight or suspended transactions.
3383        state.execution_scheduler().check_empty_for_testing();
3384
3385        // THEN check that rejected transactions are not executed.
3386        for (i, t) in transactions.iter().enumerate() {
3387            // Expect shared transactions or rejected transactions to not have executed.
3388            if i % 2 == 0 && !rejected_transactions.contains(&(i as TransactionIndex)) {
3389                continue;
3390            }
3391            let digest = t.tx().digest();
3392            assert!(
3393                !state.is_tx_already_executed(digest),
3394                "Rejected transaction {} {} should not have been executed",
3395                i,
3396                digest
3397            );
3398        }
3399    }
3400
3401    fn to_short_strings(txs: Vec<VerifiedExecutableTransactionWithAliases>) -> Vec<String> {
3402        txs.into_iter()
3403            .map(|tx| format!("transaction({})", tx.tx().transaction_data().gas_price()))
3404            .collect()
3405    }
3406
3407    #[test]
3408    fn test_order_by_gas_price() {
3409        let mut v = vec![user_txn(42), user_txn(100)];
3410        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3411        assert_eq!(
3412            to_short_strings(v),
3413            vec![
3414                "transaction(100)".to_string(),
3415                "transaction(42)".to_string(),
3416            ]
3417        );
3418
3419        let mut v = vec![
3420            user_txn(1200),
3421            user_txn(12),
3422            user_txn(1000),
3423            user_txn(42),
3424            user_txn(100),
3425            user_txn(1000),
3426        ];
3427        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3428        assert_eq!(
3429            to_short_strings(v),
3430            vec![
3431                "transaction(1200)".to_string(),
3432                "transaction(1000)".to_string(),
3433                "transaction(1000)".to_string(),
3434                "transaction(100)".to_string(),
3435                "transaction(42)".to_string(),
3436                "transaction(12)".to_string(),
3437            ]
3438        );
3439    }
3440
3441    #[tokio::test(flavor = "current_thread")]
3442    async fn test_checkpoint_signature_dedup() {
3443        telemetry_subscribers::init_for_testing();
3444
3445        let network_config =
3446            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3447        let state = TestAuthorityBuilder::new()
3448            .with_network_config(&network_config, 0)
3449            .build()
3450            .await;
3451
3452        let epoch_store = state.epoch_store_for_testing().clone();
3453        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3454
3455        let make_signed = || {
3456            let epoch = epoch_store.epoch();
3457            let contents =
3458                CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3459            let summary = CheckpointSummary::new(
3460                &ProtocolConfig::get_for_max_version_UNSAFE(),
3461                epoch,
3462                42, // sequence number
3463                10, // network_total_transactions
3464                &contents,
3465                None, // previous_digest
3466                GasCostSummary::default(),
3467                None,       // end_of_epoch_data
3468                0,          // timestamp
3469                Vec::new(), // randomness_rounds
3470                Vec::new(), // checkpoint_artifact_digests
3471            );
3472            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name)
3473        };
3474
3475        // Prepare V2 pair: same (authority, seq), different digests => different keys
3476        let v2_s1 = make_signed();
3477        let v2_s1_clone = v2_s1.clone();
3478        let v2_digest_a = v2_s1.data().digest();
3479        let v2_a =
3480            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3481                summary: v2_s1,
3482            });
3483
3484        let v2_s2 = make_signed();
3485        let v2_digest_b = v2_s2.data().digest();
3486        let v2_b =
3487            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3488                summary: v2_s2,
3489            });
3490
3491        assert_ne!(v2_digest_a, v2_digest_b);
3492
3493        // Create an exact duplicate with same digest to exercise valid dedup
3494        assert_eq!(v2_s1_clone.data().digest(), v2_digest_a);
3495        let v2_dup =
3496            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3497                summary: v2_s1_clone,
3498            });
3499
3500        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3501        let block = VerifiedBlock::new_for_test(
3502            TestBlock::new(100, 0)
3503                .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)])
3504                .build(),
3505        );
3506        let commit = CommittedSubDag::new(
3507            block.reference(),
3508            vec![block.clone()],
3509            block.timestamp_ms(),
3510            CommitRef::new(10, CommitDigest::MIN),
3511        );
3512
3513        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3514        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3515        let backpressure = BackpressureManager::new_for_tests();
3516        let consensus_adapter =
3517            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3518        let mut handler = ConsensusHandler::new(
3519            epoch_store.clone(),
3520            Arc::new(CheckpointServiceNoop {}),
3521            state.execution_scheduler().clone(),
3522            consensus_adapter,
3523            state.get_object_cache_reader().clone(),
3524            Arc::new(ArcSwap::default()),
3525            consensus_committee.clone(),
3526            metrics,
3527            Arc::new(throughput),
3528            backpressure.subscribe(),
3529            state.traffic_controller.clone(),
3530        );
3531
3532        handler.handle_consensus_commit(commit).await;
3533
3534        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3535        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3536
3537        // V2 distinct digests: both must be processed. If these were collapsed to one CheckpointSeq num, only one would process.
3538        let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a));
3539        let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b));
3540        assert!(
3541            epoch_store
3542                .is_consensus_message_processed(&v2_key_a)
3543                .unwrap()
3544        );
3545        assert!(
3546            epoch_store
3547                .is_consensus_message_processed(&v2_key_b)
3548                .unwrap()
3549        );
3550    }
3551
3552    #[tokio::test(flavor = "current_thread")]
3553    async fn test_verify_consensus_transaction_filters_mismatched_authorities() {
3554        telemetry_subscribers::init_for_testing();
3555
3556        let network_config =
3557            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3558        let state = TestAuthorityBuilder::new()
3559            .with_network_config(&network_config, 0)
3560            .build()
3561            .await;
3562
3563        let epoch_store = state.epoch_store_for_testing().clone();
3564        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3565
3566        // Create a different authority than our test authority
3567        use fastcrypto::traits::KeyPair;
3568        let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair();
3569        let wrong_authority: AuthorityName = wrong_keypair.public().into();
3570
3571        // Create EndOfPublish transaction with mismatched authority
3572        let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority);
3573
3574        // Create valid EndOfPublish transaction with correct authority
3575        let valid_eop = ConsensusTransaction::new_end_of_publish(state.name);
3576
3577        // Create CheckpointSignature with mismatched authority
3578        let epoch = epoch_store.epoch();
3579        let contents =
3580            CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3581        let summary = CheckpointSummary::new(
3582            &ProtocolConfig::get_for_max_version_UNSAFE(),
3583            epoch,
3584            42, // sequence number
3585            10, // network_total_transactions
3586            &contents,
3587            None, // previous_digest
3588            GasCostSummary::default(),
3589            None,       // end_of_epoch_data
3590            0,          // timestamp
3591            Vec::new(), // randomness_rounds
3592            Vec::new(), // checkpoint commitments
3593        );
3594
3595        // Create a signed checkpoint with the wrong authority
3596        let mismatched_checkpoint_signed =
3597            SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority);
3598        let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest();
3599        let mismatched_checkpoint =
3600            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3601                summary: mismatched_checkpoint_signed,
3602            });
3603
3604        // Create a valid checkpoint signature with correct authority
3605        let valid_checkpoint_signed =
3606            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name);
3607        let valid_checkpoint_digest = valid_checkpoint_signed.data().digest();
3608        let valid_checkpoint =
3609            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3610                summary: valid_checkpoint_signed,
3611            });
3612
3613        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3614
3615        // Create a block with both valid and invalid transactions
3616        let block = VerifiedBlock::new_for_test(
3617            TestBlock::new(100, 0)
3618                .set_transactions(vec![
3619                    to_tx(&mismatched_eop),
3620                    to_tx(&valid_eop),
3621                    to_tx(&mismatched_checkpoint),
3622                    to_tx(&valid_checkpoint),
3623                ])
3624                .build(),
3625        );
3626        let commit = CommittedSubDag::new(
3627            block.reference(),
3628            vec![block.clone()],
3629            block.timestamp_ms(),
3630            CommitRef::new(10, CommitDigest::MIN),
3631        );
3632
3633        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3634        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3635        let backpressure = BackpressureManager::new_for_tests();
3636        let consensus_adapter =
3637            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3638        let mut handler = ConsensusHandler::new(
3639            epoch_store.clone(),
3640            Arc::new(CheckpointServiceNoop {}),
3641            state.execution_scheduler().clone(),
3642            consensus_adapter,
3643            state.get_object_cache_reader().clone(),
3644            Arc::new(ArcSwap::default()),
3645            consensus_committee.clone(),
3646            metrics,
3647            Arc::new(throughput),
3648            backpressure.subscribe(),
3649            state.traffic_controller.clone(),
3650        );
3651
3652        handler.handle_consensus_commit(commit).await;
3653
3654        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3655        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3656
3657        // Check that valid transactions were processed
3658        let valid_eop_key = SK::External(CK::EndOfPublish(state.name));
3659        assert!(
3660            epoch_store
3661                .is_consensus_message_processed(&valid_eop_key)
3662                .unwrap(),
3663            "Valid EndOfPublish should have been processed"
3664        );
3665
3666        let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
3667            state.name,
3668            42,
3669            valid_checkpoint_digest,
3670        ));
3671        assert!(
3672            epoch_store
3673                .is_consensus_message_processed(&valid_checkpoint_key)
3674                .unwrap(),
3675            "Valid CheckpointSignature should have been processed"
3676        );
3677
3678        // Check that mismatched authority transactions were NOT processed (filtered out by verify_consensus_transaction)
3679        let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority));
3680        assert!(
3681            !epoch_store
3682                .is_consensus_message_processed(&mismatched_eop_key)
3683                .unwrap(),
3684            "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)"
3685        );
3686
3687        let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
3688            wrong_authority,
3689            42,
3690            mismatched_checkpoint_digest,
3691        ));
3692        assert!(
3693            !epoch_store
3694                .is_consensus_message_processed(&mismatched_checkpoint_key)
3695                .unwrap(),
3696            "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)"
3697        );
3698    }
3699
3700    fn user_txn(gas_price: u64) -> VerifiedExecutableTransactionWithAliases {
3701        let (committee, keypairs) = Committee::new_simple_test_committee();
3702        let data = SenderSignedData::new(
3703            TransactionData::new_transfer(
3704                SuiAddress::default(),
3705                FullObjectRef::from_fastpath_ref(random_object_ref()),
3706                SuiAddress::default(),
3707                random_object_ref(),
3708                1000 * gas_price,
3709                gas_price,
3710            ),
3711            vec![],
3712        );
3713        let tx = VerifiedExecutableTransaction::new_from_certificate(
3714            VerifiedCertificate::new_unchecked(
3715                CertifiedTransaction::new_from_keypairs_for_testing(data, &keypairs, &committee),
3716            ),
3717        );
3718        VerifiedExecutableTransactionWithAliases::no_aliases(tx)
3719    }
3720}