sui_core/
consensus_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet, VecDeque},
6    hash::Hash,
7    num::NonZeroUsize,
8    sync::Arc,
9    time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use arc_swap::ArcSwap;
13use consensus_config::Committee as ConsensusCommittee;
14use consensus_core::{CertifiedBlocksOutput, CommitConsumerMonitor, CommitIndex};
15use consensus_types::block::TransactionIndex;
16use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
17use itertools::Itertools as _;
18use lru::LruCache;
19use mysten_common::{
20    assert_reachable, assert_sometimes, debug_fatal, random_util::randomize_cache_capacity_in_tests,
21};
22use mysten_metrics::{
23    monitored_future,
24    monitored_mpsc::{self, UnboundedReceiver},
25    monitored_scope, spawn_monitored_task,
26};
27use parking_lot::RwLockWriteGuard;
28use serde::{Deserialize, Serialize};
29use sui_macros::{fail_point, fail_point_arg, fail_point_if};
30use sui_protocol_config::{PerObjectCongestionControlMode, ProtocolConfig};
31use sui_types::{
32    SUI_RANDOMNESS_STATE_OBJECT_ID,
33    authenticator_state::ActiveJwk,
34    base_types::{
35        AuthorityName, ConciseableName, ConsensusObjectSequenceKey, ObjectID, ObjectRef,
36        SequenceNumber, TransactionDigest,
37    },
38    crypto::RandomnessRound,
39    digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest},
40    executable_transaction::{
41        TrustedExecutableTransaction, VerifiedExecutableTransaction,
42        VerifiedExecutableTransactionWithAliases,
43    },
44    messages_checkpoint::CheckpointSignatureMessage,
45    messages_consensus::{
46        AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments,
47        ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
48        ExecutionTimeObservation,
49    },
50    sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
51    transaction::{
52        InputObjectKind, SenderSignedData, TransactionDataAPI, TransactionKey, VerifiedCertificate,
53        VerifiedTransaction, WithAliases,
54    },
55};
56use tokio::task::JoinSet;
57use tracing::{debug, error, info, instrument, trace, warn};
58
59use crate::{
60    authority::{
61        AuthorityMetrics, AuthorityState, ExecutionEnv,
62        authority_per_epoch_store::{
63            AuthorityPerEpochStore, CancelConsensusCertificateReason, ConsensusStats,
64            ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStats,
65            consensus_quarantine::ConsensusCommitOutput,
66        },
67        backpressure::{BackpressureManager, BackpressureSubscriber},
68        consensus_tx_status_cache::ConsensusTxStatus,
69        epoch_start_configuration::EpochStartConfigTrait,
70        execution_time_estimator::ExecutionTimeEstimator,
71        shared_object_congestion_tracker::SharedObjectCongestionTracker,
72        shared_object_version_manager::{AssignedTxAndVersions, Schedulable},
73        transaction_deferral::{DeferralKey, DeferralReason, transaction_deferral_within_limit},
74    },
75    checkpoints::{
76        CheckpointService, CheckpointServiceNotify, PendingCheckpoint, PendingCheckpointInfo,
77    },
78    consensus_adapter::ConsensusAdapter,
79    consensus_throughput_calculator::ConsensusThroughputCalculator,
80    consensus_types::consensus_output_api::{ConsensusCommitAPI, parse_block_transactions},
81    epoch::{
82        randomness::{DkgStatus, RandomnessManager},
83        reconfiguration::ReconfigState,
84    },
85    execution_cache::ObjectCacheRead,
86    execution_scheduler::{ExecutionScheduler, SchedulingSource},
87    post_consensus_tx_reorder::PostConsensusTxReorder,
88    scoring_decision::update_low_scoring_authorities,
89    traffic_controller::{TrafficController, policies::TrafficTally},
90};
91
92/// Output from filtering consensus transactions.
93/// Contains the filtered transactions and any owned object locks acquired post-consensus.
94struct FilteredConsensusOutput {
95    transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
96    owned_object_locks: HashMap<ObjectRef, TransactionDigest>,
97}
98
99pub struct ConsensusHandlerInitializer {
100    state: Arc<AuthorityState>,
101    checkpoint_service: Arc<CheckpointService>,
102    epoch_store: Arc<AuthorityPerEpochStore>,
103    consensus_adapter: Arc<ConsensusAdapter>,
104    low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
105    throughput_calculator: Arc<ConsensusThroughputCalculator>,
106    backpressure_manager: Arc<BackpressureManager>,
107}
108
109impl ConsensusHandlerInitializer {
110    pub fn new(
111        state: Arc<AuthorityState>,
112        checkpoint_service: Arc<CheckpointService>,
113        epoch_store: Arc<AuthorityPerEpochStore>,
114        consensus_adapter: Arc<ConsensusAdapter>,
115        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
116        throughput_calculator: Arc<ConsensusThroughputCalculator>,
117        backpressure_manager: Arc<BackpressureManager>,
118    ) -> Self {
119        Self {
120            state,
121            checkpoint_service,
122            epoch_store,
123            consensus_adapter,
124            low_scoring_authorities,
125            throughput_calculator,
126            backpressure_manager,
127        }
128    }
129
130    #[cfg(test)]
131    pub(crate) fn new_for_testing(
132        state: Arc<AuthorityState>,
133        checkpoint_service: Arc<CheckpointService>,
134    ) -> Self {
135        use crate::consensus_test_utils::make_consensus_adapter_for_test;
136        use std::collections::HashSet;
137
138        let backpressure_manager = BackpressureManager::new_for_tests();
139        let consensus_adapter =
140            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
141        Self {
142            state: state.clone(),
143            checkpoint_service,
144            epoch_store: state.epoch_store_for_testing().clone(),
145            consensus_adapter,
146            low_scoring_authorities: Arc::new(Default::default()),
147            throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
148                None,
149                state.metrics.clone(),
150            )),
151            backpressure_manager,
152        }
153    }
154
155    pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
156        let new_epoch_start_state = self.epoch_store.epoch_start_state();
157        let consensus_committee = new_epoch_start_state.get_consensus_committee();
158
159        ConsensusHandler::new(
160            self.epoch_store.clone(),
161            self.checkpoint_service.clone(),
162            self.state.execution_scheduler().clone(),
163            self.consensus_adapter.clone(),
164            self.state.get_object_cache_reader().clone(),
165            self.low_scoring_authorities.clone(),
166            consensus_committee,
167            self.state.metrics.clone(),
168            self.throughput_calculator.clone(),
169            self.backpressure_manager.subscribe(),
170            self.state.traffic_controller.clone(),
171        )
172    }
173
174    pub(crate) fn metrics(&self) -> &Arc<AuthorityMetrics> {
175        &self.state.metrics
176    }
177
178    pub(crate) fn backpressure_subscriber(&self) -> BackpressureSubscriber {
179        self.backpressure_manager.subscribe()
180    }
181}
182
183mod additional_consensus_state {
184    use std::marker::PhantomData;
185
186    use consensus_core::CommitRef;
187    use fastcrypto::hash::HashFunction as _;
188    use sui_types::{crypto::DefaultHash, digests::Digest};
189
190    use super::*;
191    /// AdditionalConsensusState tracks any in-memory state that is retained by ConsensusHandler
192    /// between consensus commits. Because of crash recovery, using such data is inherently risky.
193    /// In order to do this safely, we must store data from a fixed number of previous commits.
194    /// Then, at start-up, that same fixed number of already processed commits is replayed to
195    /// reconstruct the state.
196    ///
197    /// To make sure that bugs in this process appear immediately, we record the digest of this
198    /// state in ConsensusCommitPrologue, so that any deviation causes an immediate fork.
199    #[derive(Serialize, Deserialize)]
200    pub(super) struct AdditionalConsensusState {
201        commit_interval_observer: CommitIntervalObserver,
202    }
203
204    impl AdditionalConsensusState {
205        pub fn new(additional_consensus_state_window_size: u32) -> Self {
206            Self {
207                commit_interval_observer: CommitIntervalObserver::new(
208                    additional_consensus_state_window_size,
209                ),
210            }
211        }
212
213        /// Update all internal state based on the new commit
214        pub(crate) fn observe_commit(
215            &mut self,
216            protocol_config: &ProtocolConfig,
217            epoch_start_time: u64,
218            consensus_commit: &impl ConsensusCommitAPI,
219        ) -> ConsensusCommitInfo {
220            self.commit_interval_observer
221                .observe_commit_time(consensus_commit);
222
223            let estimated_commit_period = self
224                .commit_interval_observer
225                .commit_interval_estimate()
226                .unwrap_or(Duration::from_millis(
227                    protocol_config.min_checkpoint_interval_ms(),
228                ));
229
230            info!("estimated commit rate: {:?}", estimated_commit_period);
231
232            self.commit_info_impl(
233                epoch_start_time,
234                consensus_commit,
235                Some(estimated_commit_period),
236            )
237        }
238
239        fn commit_info_impl(
240            &self,
241            epoch_start_time: u64,
242            consensus_commit: &impl ConsensusCommitAPI,
243            estimated_commit_period: Option<Duration>,
244        ) -> ConsensusCommitInfo {
245            let leader_author = consensus_commit.leader_author_index();
246            let timestamp = consensus_commit.commit_timestamp_ms();
247
248            let timestamp = if timestamp < epoch_start_time {
249                error!(
250                    "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}"
251                );
252                epoch_start_time
253            } else {
254                timestamp
255            };
256
257            ConsensusCommitInfo {
258                _phantom: PhantomData,
259                round: consensus_commit.leader_round(),
260                timestamp,
261                leader_author,
262                consensus_commit_ref: consensus_commit.commit_ref(),
263                rejected_transactions_digest: consensus_commit.rejected_transactions_digest(),
264                additional_state_digest: Some(self.digest()),
265                estimated_commit_period,
266                skip_consensus_commit_prologue_in_test: false,
267            }
268        }
269
270        /// Get the digest of the current state.
271        fn digest(&self) -> AdditionalConsensusStateDigest {
272            let mut hash = DefaultHash::new();
273            bcs::serialize_into(&mut hash, self).unwrap();
274            AdditionalConsensusStateDigest::new(hash.finalize().into())
275        }
276    }
277
278    pub struct ConsensusCommitInfo {
279        // prevent public construction
280        _phantom: PhantomData<()>,
281
282        pub round: u64,
283        pub timestamp: u64,
284        pub leader_author: AuthorityIndex,
285        pub consensus_commit_ref: CommitRef,
286        pub rejected_transactions_digest: Digest,
287
288        additional_state_digest: Option<AdditionalConsensusStateDigest>,
289        estimated_commit_period: Option<Duration>,
290
291        pub skip_consensus_commit_prologue_in_test: bool,
292    }
293
294    impl ConsensusCommitInfo {
295        pub fn new_for_test(
296            commit_round: u64,
297            commit_timestamp: u64,
298            estimated_commit_period: Option<Duration>,
299            skip_consensus_commit_prologue_in_test: bool,
300        ) -> Self {
301            Self {
302                _phantom: PhantomData,
303                round: commit_round,
304                timestamp: commit_timestamp,
305                leader_author: 0,
306                consensus_commit_ref: CommitRef::default(),
307                rejected_transactions_digest: Digest::default(),
308                additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO),
309                estimated_commit_period,
310                skip_consensus_commit_prologue_in_test,
311            }
312        }
313
314        pub fn new_for_congestion_test(
315            commit_round: u64,
316            commit_timestamp: u64,
317            estimated_commit_period: Duration,
318        ) -> Self {
319            Self::new_for_test(
320                commit_round,
321                commit_timestamp,
322                Some(estimated_commit_period),
323                true,
324            )
325        }
326
327        pub fn additional_state_digest(&self) -> AdditionalConsensusStateDigest {
328            // this method cannot be called if stateless_commit_info is used
329            self.additional_state_digest
330                .expect("additional_state_digest is not available")
331        }
332
333        pub fn estimated_commit_period(&self) -> Duration {
334            // this method cannot be called if stateless_commit_info is used
335            self.estimated_commit_period
336                .expect("estimated commit period is not available")
337        }
338
339        fn consensus_commit_digest(&self) -> ConsensusCommitDigest {
340            ConsensusCommitDigest::new(self.consensus_commit_ref.digest.into_inner())
341        }
342
343        fn consensus_commit_prologue_transaction(
344            &self,
345            epoch: u64,
346        ) -> VerifiedExecutableTransaction {
347            let transaction = VerifiedTransaction::new_consensus_commit_prologue(
348                epoch,
349                self.round,
350                self.timestamp,
351            );
352            VerifiedExecutableTransaction::new_system(transaction, epoch)
353        }
354
355        fn consensus_commit_prologue_v2_transaction(
356            &self,
357            epoch: u64,
358        ) -> VerifiedExecutableTransaction {
359            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
360                epoch,
361                self.round,
362                self.timestamp,
363                self.consensus_commit_digest(),
364            );
365            VerifiedExecutableTransaction::new_system(transaction, epoch)
366        }
367
368        fn consensus_commit_prologue_v3_transaction(
369            &self,
370            epoch: u64,
371            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
372        ) -> VerifiedExecutableTransaction {
373            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v3(
374                epoch,
375                self.round,
376                self.timestamp,
377                self.consensus_commit_digest(),
378                consensus_determined_version_assignments,
379            );
380            VerifiedExecutableTransaction::new_system(transaction, epoch)
381        }
382
383        fn consensus_commit_prologue_v4_transaction(
384            &self,
385            epoch: u64,
386            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
387            additional_state_digest: AdditionalConsensusStateDigest,
388        ) -> VerifiedExecutableTransaction {
389            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v4(
390                epoch,
391                self.round,
392                self.timestamp,
393                self.consensus_commit_digest(),
394                consensus_determined_version_assignments,
395                additional_state_digest,
396            );
397            VerifiedExecutableTransaction::new_system(transaction, epoch)
398        }
399
400        pub fn create_consensus_commit_prologue_transaction(
401            &self,
402            epoch: u64,
403            protocol_config: &ProtocolConfig,
404            cancelled_txn_version_assignment: Vec<(
405                TransactionDigest,
406                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
407            )>,
408            commit_info: &ConsensusCommitInfo,
409            indirect_state_observer: IndirectStateObserver,
410        ) -> VerifiedExecutableTransaction {
411            let version_assignments = if protocol_config
412                .record_consensus_determined_version_assignments_in_prologue_v2()
413            {
414                Some(
415                    ConsensusDeterminedVersionAssignments::CancelledTransactionsV2(
416                        cancelled_txn_version_assignment,
417                    ),
418                )
419            } else if protocol_config.record_consensus_determined_version_assignments_in_prologue()
420            {
421                Some(
422                    ConsensusDeterminedVersionAssignments::CancelledTransactions(
423                        cancelled_txn_version_assignment
424                            .into_iter()
425                            .map(|(tx_digest, versions)| {
426                                (
427                                    tx_digest,
428                                    versions.into_iter().map(|(id, v)| (id.0, v)).collect(),
429                                )
430                            })
431                            .collect(),
432                    ),
433                )
434            } else {
435                None
436            };
437
438            if protocol_config.record_additional_state_digest_in_prologue() {
439                let additional_state_digest =
440                    if protocol_config.additional_consensus_digest_indirect_state() {
441                        let d1 = commit_info.additional_state_digest();
442                        indirect_state_observer.fold_with(d1)
443                    } else {
444                        commit_info.additional_state_digest()
445                    };
446
447                self.consensus_commit_prologue_v4_transaction(
448                    epoch,
449                    version_assignments.unwrap(),
450                    additional_state_digest,
451                )
452            } else if let Some(version_assignments) = version_assignments {
453                self.consensus_commit_prologue_v3_transaction(epoch, version_assignments)
454            } else if protocol_config.include_consensus_digest_in_prologue() {
455                self.consensus_commit_prologue_v2_transaction(epoch)
456            } else {
457                self.consensus_commit_prologue_transaction(epoch)
458            }
459        }
460    }
461
462    #[derive(Default)]
463    pub struct IndirectStateObserver {
464        hash: DefaultHash,
465    }
466
467    impl IndirectStateObserver {
468        pub fn new() -> Self {
469            Self::default()
470        }
471
472        pub fn observe_indirect_state<T: Serialize>(&mut self, state: &T) {
473            bcs::serialize_into(&mut self.hash, state).unwrap();
474        }
475
476        pub fn fold_with(
477            self,
478            d1: AdditionalConsensusStateDigest,
479        ) -> AdditionalConsensusStateDigest {
480            let hash = self.hash.finalize();
481            let d2 = AdditionalConsensusStateDigest::new(hash.into());
482
483            let mut hasher = DefaultHash::new();
484            bcs::serialize_into(&mut hasher, &d1).unwrap();
485            bcs::serialize_into(&mut hasher, &d2).unwrap();
486            AdditionalConsensusStateDigest::new(hasher.finalize().into())
487        }
488    }
489
490    #[test]
491    fn test_additional_consensus_state() {
492        use crate::consensus_test_utils::TestConsensusCommit;
493
494        fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) {
495            let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
496            state.observe_commit(
497                &protocol_config,
498                100,
499                &TestConsensusCommit::empty(round, timestamp, 0),
500            );
501        }
502
503        let mut s1 = AdditionalConsensusState::new(3);
504        observe(&mut s1, 1, 1000);
505        observe(&mut s1, 2, 2000);
506        observe(&mut s1, 3, 3000);
507        observe(&mut s1, 4, 4000);
508
509        let mut s2 = AdditionalConsensusState::new(3);
510        // Because state uses a ring buffer, we should get the same digest
511        // even though we only added the 3 latest observations.
512        observe(&mut s2, 2, 2000);
513        observe(&mut s2, 3, 3000);
514        observe(&mut s2, 4, 4000);
515
516        assert_eq!(s1.digest(), s2.digest());
517
518        observe(&mut s1, 5, 5000);
519        observe(&mut s2, 5, 5000);
520
521        assert_eq!(s1.digest(), s2.digest());
522    }
523}
524use additional_consensus_state::AdditionalConsensusState;
525pub(crate) use additional_consensus_state::{ConsensusCommitInfo, IndirectStateObserver};
526
527pub struct ConsensusHandler<C> {
528    /// A store created for each epoch. ConsensusHandler is recreated each epoch, with the
529    /// corresponding store. This store is also used to get the current epoch ID.
530    epoch_store: Arc<AuthorityPerEpochStore>,
531    /// Holds the indices, hash and stats after the last consensus commit
532    /// It is used for avoiding replaying already processed transactions,
533    /// checking chain consistency, and accumulating per-epoch consensus output stats.
534    last_consensus_stats: ExecutionIndicesWithStats,
535    checkpoint_service: Arc<C>,
536    /// cache reader is needed when determining the next version to assign for shared objects.
537    cache_reader: Arc<dyn ObjectCacheRead>,
538    /// Reputation scores used by consensus adapter that we update, forwarded from consensus
539    low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
540    /// The consensus committee used to do stake computations for deciding set of low scoring authorities
541    committee: ConsensusCommittee,
542    // TODO: ConsensusHandler doesn't really share metrics with AuthorityState. We could define
543    // a new metrics type here if we want to.
544    metrics: Arc<AuthorityMetrics>,
545    /// Lru cache to quickly discard transactions processed by consensus
546    processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
547    /// Enqueues transactions to the execution scheduler via a separate task.
548    execution_scheduler_sender: ExecutionSchedulerSender,
549    /// Consensus adapter for submitting transactions to consensus
550    consensus_adapter: Arc<ConsensusAdapter>,
551
552    /// Using the throughput calculator to record the current consensus throughput
553    throughput_calculator: Arc<ConsensusThroughputCalculator>,
554
555    additional_consensus_state: AdditionalConsensusState,
556
557    backpressure_subscriber: BackpressureSubscriber,
558
559    traffic_controller: Option<Arc<TrafficController>>,
560}
561
562const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
563
564impl<C> ConsensusHandler<C> {
565    pub(crate) fn new(
566        epoch_store: Arc<AuthorityPerEpochStore>,
567        checkpoint_service: Arc<C>,
568        execution_scheduler: Arc<ExecutionScheduler>,
569        consensus_adapter: Arc<ConsensusAdapter>,
570        cache_reader: Arc<dyn ObjectCacheRead>,
571        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
572        committee: ConsensusCommittee,
573        metrics: Arc<AuthorityMetrics>,
574        throughput_calculator: Arc<ConsensusThroughputCalculator>,
575        backpressure_subscriber: BackpressureSubscriber,
576        traffic_controller: Option<Arc<TrafficController>>,
577    ) -> Self {
578        assert!(
579            matches!(
580                epoch_store
581                    .protocol_config()
582                    .per_object_congestion_control_mode(),
583                PerObjectCongestionControlMode::ExecutionTimeEstimate(_)
584            ),
585            "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
586        );
587
588        // Recover last_consensus_stats so it is consistent across validators.
589        let mut last_consensus_stats = epoch_store
590            .get_last_consensus_stats()
591            .expect("Should be able to read last consensus index");
592        // stats is empty at the beginning of epoch.
593        if !last_consensus_stats.stats.is_initialized() {
594            last_consensus_stats.stats = ConsensusStats::new(committee.size());
595        }
596        let execution_scheduler_sender =
597            ExecutionSchedulerSender::start(execution_scheduler, epoch_store.clone());
598        let commit_rate_estimate_window_size = epoch_store
599            .protocol_config()
600            .get_consensus_commit_rate_estimation_window_size();
601        Self {
602            epoch_store,
603            last_consensus_stats,
604            checkpoint_service,
605            cache_reader,
606            low_scoring_authorities,
607            committee,
608            metrics,
609            processed_cache: LruCache::new(
610                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
611            ),
612            execution_scheduler_sender,
613            consensus_adapter,
614            throughput_calculator,
615            additional_consensus_state: AdditionalConsensusState::new(
616                commit_rate_estimate_window_size,
617            ),
618            backpressure_subscriber,
619            traffic_controller,
620        }
621    }
622
623    /// Returns the last subdag index processed by the handler.
624    pub(crate) fn last_processed_subdag_index(&self) -> u64 {
625        self.last_consensus_stats.index.sub_dag_index
626    }
627
628    pub(crate) fn execution_scheduler_sender(&self) -> &ExecutionSchedulerSender {
629        &self.execution_scheduler_sender
630    }
631
632    pub(crate) fn new_for_testing(
633        epoch_store: Arc<AuthorityPerEpochStore>,
634        checkpoint_service: Arc<C>,
635        execution_scheduler_sender: ExecutionSchedulerSender,
636        consensus_adapter: Arc<ConsensusAdapter>,
637        cache_reader: Arc<dyn ObjectCacheRead>,
638        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
639        committee: ConsensusCommittee,
640        metrics: Arc<AuthorityMetrics>,
641        throughput_calculator: Arc<ConsensusThroughputCalculator>,
642        backpressure_subscriber: BackpressureSubscriber,
643        traffic_controller: Option<Arc<TrafficController>>,
644        last_consensus_stats: ExecutionIndicesWithStats,
645    ) -> Self {
646        let commit_rate_estimate_window_size = epoch_store
647            .protocol_config()
648            .get_consensus_commit_rate_estimation_window_size();
649        Self {
650            epoch_store,
651            last_consensus_stats,
652            checkpoint_service,
653            cache_reader,
654            low_scoring_authorities,
655            committee,
656            metrics,
657            processed_cache: LruCache::new(
658                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
659            ),
660            execution_scheduler_sender,
661            consensus_adapter,
662            throughput_calculator,
663            additional_consensus_state: AdditionalConsensusState::new(
664                commit_rate_estimate_window_size,
665            ),
666            backpressure_subscriber,
667            traffic_controller,
668        }
669    }
670}
671
672#[derive(Default)]
673struct CommitHandlerInput {
674    user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
675    capability_notifications: Vec<AuthorityCapabilitiesV2>,
676    execution_time_observations: Vec<ExecutionTimeObservation>,
677    checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
678    randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
679    randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
680    end_of_publish_transactions: Vec<AuthorityName>,
681    new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
682}
683
684struct CommitHandlerState {
685    dkg_failed: bool,
686    randomness_round: Option<RandomnessRound>,
687    output: ConsensusCommitOutput,
688    indirect_state_observer: Option<IndirectStateObserver>,
689    initial_reconfig_state: ReconfigState,
690}
691
692impl CommitHandlerState {
693    fn get_notifications(&self) -> Vec<SequencedConsensusTransactionKey> {
694        self.output
695            .get_consensus_messages_processed()
696            .cloned()
697            .collect()
698    }
699
700    fn init_randomness<'a, 'epoch>(
701        &'a mut self,
702        epoch_store: &'epoch AuthorityPerEpochStore,
703        commit_info: &'a ConsensusCommitInfo,
704    ) -> Option<tokio::sync::MutexGuard<'epoch, RandomnessManager>> {
705        let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| {
706            rm.try_lock()
707                .expect("should only ever be called from the commit handler thread")
708        });
709
710        let mut dkg_failed = false;
711        let randomness_round = if epoch_store.randomness_state_enabled() {
712            let randomness_manager = randomness_manager
713                .as_mut()
714                .expect("randomness manager should exist if randomness is enabled");
715            match randomness_manager.dkg_status() {
716                DkgStatus::Pending => None,
717                DkgStatus::Failed => {
718                    dkg_failed = true;
719                    None
720                }
721                DkgStatus::Successful => {
722                    // Generate randomness for this commit if DKG is successful and we are still
723                    // accepting certs.
724                    if self.initial_reconfig_state.should_accept_tx() {
725                        randomness_manager
726                            // TODO: make infallible
727                            .reserve_next_randomness(commit_info.timestamp, &mut self.output)
728                            .expect("epoch ended")
729                    } else {
730                        None
731                    }
732                }
733            }
734        } else {
735            None
736        };
737
738        if randomness_round.is_some() {
739            assert!(!dkg_failed); // invariant check
740        }
741
742        self.randomness_round = randomness_round;
743        self.dkg_failed = dkg_failed;
744
745        randomness_manager
746    }
747}
748
749impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
750    /// Called during startup to allow us to observe commits we previously processed, for crash recovery.
751    /// Any state computed here must be a pure function of the commits observed, it cannot depend on any
752    /// state recorded in the epoch db.
753    fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
754        assert!(
755            self.epoch_store
756                .protocol_config()
757                .record_additional_state_digest_in_prologue()
758        );
759        let protocol_config = self.epoch_store.protocol_config();
760        let epoch_start_time = self
761            .epoch_store
762            .epoch_start_config()
763            .epoch_start_timestamp_ms();
764
765        self.additional_consensus_state.observe_commit(
766            protocol_config,
767            epoch_start_time,
768            &consensus_commit,
769        );
770    }
771
772    #[cfg(test)]
773    pub(crate) async fn handle_consensus_commit_for_test(
774        &mut self,
775        consensus_commit: impl ConsensusCommitAPI,
776    ) {
777        self.handle_consensus_commit(consensus_commit).await;
778    }
779
780    #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))]
781    pub(crate) async fn handle_consensus_commit(
782        &mut self,
783        consensus_commit: impl ConsensusCommitAPI,
784    ) {
785        let protocol_config = self.epoch_store.protocol_config();
786
787        // Assert all protocol config settings for which we don't support old behavior.
788        assert!(protocol_config.ignore_execution_time_observations_after_certs_closed());
789        assert!(protocol_config.record_time_estimate_processed());
790        assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints());
791        assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest());
792        assert!(protocol_config.authority_capabilities_v2());
793        assert!(protocol_config.cancel_for_failed_dkg_early());
794
795        // This may block until one of two conditions happens:
796        // - Number of uncommitted transactions in the writeback cache goes below the
797        //   backpressure threshold.
798        // - The highest executed checkpoint catches up to the highest certified checkpoint.
799        self.backpressure_subscriber.await_no_backpressure().await;
800
801        let epoch = self.epoch_store.epoch();
802
803        let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit");
804
805        let last_committed_round = self.last_consensus_stats.index.last_committed_round;
806
807        if let Some(consensus_tx_status_cache) = self.epoch_store.consensus_tx_status_cache.as_ref()
808        {
809            consensus_tx_status_cache
810                .update_last_committed_leader_round(last_committed_round as u32)
811                .await;
812        }
813        if let Some(tx_reject_reason_cache) = self.epoch_store.tx_reject_reason_cache.as_ref() {
814            tx_reject_reason_cache.set_last_committed_leader_round(last_committed_round as u32);
815        }
816
817        let commit_info = self.additional_consensus_state.observe_commit(
818            protocol_config,
819            self.epoch_store
820                .epoch_start_config()
821                .epoch_start_timestamp_ms(),
822            &consensus_commit,
823        );
824        assert!(commit_info.round > last_committed_round);
825
826        let (timestamp, leader_author, commit_sub_dag_index) =
827            self.gather_commit_metadata(&consensus_commit);
828
829        info!(
830            %consensus_commit,
831            "Received consensus output {}. Rejected transactions {}",
832            consensus_commit.commit_ref(),
833            consensus_commit.rejected_transactions_debug_string(),
834        );
835
836        self.last_consensus_stats.index = ExecutionIndices {
837            last_committed_round: commit_info.round,
838            sub_dag_index: commit_sub_dag_index,
839            transaction_index: 0_u64,
840        };
841
842        update_low_scoring_authorities(
843            self.low_scoring_authorities.clone(),
844            self.epoch_store.committee(),
845            &self.committee,
846            consensus_commit.reputation_score_sorted_desc(),
847            &self.metrics,
848            protocol_config.consensus_bad_nodes_stake_threshold(),
849        );
850
851        self.metrics
852            .consensus_committed_subdags
853            .with_label_values(&[&leader_author.to_string()])
854            .inc();
855
856        let mut state = CommitHandlerState {
857            output: ConsensusCommitOutput::new(commit_info.round),
858            dkg_failed: false,
859            randomness_round: None,
860            indirect_state_observer: Some(IndirectStateObserver::new()),
861            initial_reconfig_state: self
862                .epoch_store
863                .get_reconfig_state_read_lock_guard()
864                .clone(),
865        };
866
867        let FilteredConsensusOutput {
868            transactions,
869            owned_object_locks,
870        } = self.filter_consensus_txns(
871            state.initial_reconfig_state.clone(),
872            &commit_info,
873            &consensus_commit,
874        );
875        // Buffer owned object locks for batch write when preconsensus locking is disabled
876        if !owned_object_locks.is_empty() {
877            state.output.set_owned_object_locks(owned_object_locks);
878        }
879        let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions);
880
881        let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info);
882
883        let CommitHandlerInput {
884            user_transactions,
885            capability_notifications,
886            execution_time_observations,
887            checkpoint_signature_messages,
888            randomness_dkg_messages,
889            randomness_dkg_confirmations,
890            end_of_publish_transactions,
891            new_jwks,
892        } = self.build_commit_handler_input(transactions);
893
894        self.process_jwks(&mut state, &commit_info, new_jwks);
895        self.process_capability_notifications(capability_notifications);
896        self.process_execution_time_observations(&mut state, execution_time_observations);
897        self.process_checkpoint_signature_messages(checkpoint_signature_messages);
898
899        self.process_dkg_updates(
900            &mut state,
901            &commit_info,
902            randomness_manager.as_deref_mut(),
903            randomness_dkg_messages,
904            randomness_dkg_confirmations,
905        )
906        .await;
907
908        let mut execution_time_estimator = self
909            .epoch_store
910            .execution_time_estimator
911            .try_lock()
912            .expect("should only ever be called from the commit handler thread");
913
914        let authenticator_state_update_transaction =
915            self.create_authenticator_state_update(last_committed_round, &commit_info);
916
917        let (schedulables, randomness_schedulables, assigned_versions) = self.process_transactions(
918            &mut state,
919            &mut execution_time_estimator,
920            &commit_info,
921            authenticator_state_update_transaction,
922            user_transactions,
923        );
924
925        let (should_accept_tx, lock, final_round) =
926            self.handle_eop(&mut state, end_of_publish_transactions);
927
928        let make_checkpoint = should_accept_tx || final_round;
929        if !make_checkpoint {
930            // No need for any further processing
931            return;
932        }
933
934        // If this is the final round, record execution time observations for storage in the
935        // end-of-epoch tx.
936        if final_round {
937            self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
938        }
939
940        self.create_pending_checkpoints(
941            &mut state,
942            &commit_info,
943            &schedulables,
944            &randomness_schedulables,
945            final_round,
946        );
947
948        let notifications = state.get_notifications();
949
950        state
951            .output
952            .record_consensus_commit_stats(self.last_consensus_stats.clone());
953
954        self.record_deferral_deletion(&mut state);
955
956        self.epoch_store
957            .consensus_quarantine
958            .write()
959            .push_consensus_output(state.output, &self.epoch_store)
960            .expect("push_consensus_output should not fail");
961
962        // Only after batch is written, notify checkpoint service to start building any new
963        // pending checkpoints.
964        debug!(
965            ?commit_info.round,
966            "Notifying checkpoint service about new pending checkpoint(s)",
967        );
968        self.checkpoint_service
969            .notify_checkpoint()
970            .expect("failed to notify checkpoint service");
971
972        if let Some(randomness_round) = state.randomness_round {
973            randomness_manager
974                .as_ref()
975                .expect("randomness manager should exist if randomness round is provided")
976                .generate_randomness(epoch, randomness_round);
977        }
978
979        self.epoch_store.process_notifications(notifications.iter());
980
981        // pass lock by value to ensure that it is held until this point
982        self.log_final_round(lock, final_round);
983
984        // update the calculated throughput
985        self.throughput_calculator
986            .add_transactions(timestamp, schedulables.len() as u64);
987
988        fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
989            let key = [commit_sub_dag_index, epoch];
990            if sui_simulator::random::deterministic_probability_once(&key, 0.01) {
991                sui_simulator::task::kill_current_node(None);
992            }
993        });
994
995        fail_point!("crash"); // for tests that produce random crashes
996
997        let mut schedulables = schedulables;
998        schedulables.extend(randomness_schedulables);
999        self.execution_scheduler_sender.send(
1000            schedulables,
1001            assigned_versions,
1002            SchedulingSource::NonFastPath,
1003        );
1004
1005        self.send_end_of_publish_if_needed().await;
1006    }
1007
1008    fn handle_eop(
1009        &self,
1010        state: &mut CommitHandlerState,
1011        end_of_publish_transactions: Vec<AuthorityName>,
1012    ) -> (bool, Option<RwLockWriteGuard<'_, ReconfigState>>, bool) {
1013        let collected_eop =
1014            self.process_end_of_publish_transactions(state, end_of_publish_transactions);
1015        if collected_eop {
1016            let (lock, final_round) = self.advance_eop_state_machine(state);
1017            (lock.should_accept_tx(), Some(lock), final_round)
1018        } else {
1019            (true, None, false)
1020        }
1021    }
1022
1023    fn record_end_of_epoch_execution_time_observations(
1024        &self,
1025        estimator: &mut ExecutionTimeEstimator,
1026    ) {
1027        self.epoch_store
1028            .end_of_epoch_execution_time_observations
1029            .set(estimator.take_observations())
1030            .expect("`stored_execution_time_observations` should only be set once at end of epoch");
1031    }
1032
1033    fn record_deferral_deletion(&self, state: &mut CommitHandlerState) {
1034        let mut deferred_transactions = self
1035            .epoch_store
1036            .consensus_output_cache
1037            .deferred_transactions
1038            .lock();
1039        for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() {
1040            deferred_transactions.remove(&deleted_deferred_key);
1041        }
1042    }
1043
1044    fn log_final_round(&self, lock: Option<RwLockWriteGuard<ReconfigState>>, final_round: bool) {
1045        if final_round {
1046            let epoch = self.epoch_store.epoch();
1047            info!(
1048                ?epoch,
1049                lock=?lock.as_ref(),
1050                final_round=?final_round,
1051                "Notified last checkpoint"
1052            );
1053            self.epoch_store.record_end_of_message_quorum_time_metric();
1054        }
1055    }
1056
1057    fn create_pending_checkpoints(
1058        &self,
1059        state: &mut CommitHandlerState,
1060        commit_info: &ConsensusCommitInfo,
1061        schedulables: &[Schedulable],
1062        randomness_schedulables: &[Schedulable],
1063        final_round: bool,
1064    ) {
1065        let checkpoint_height = self
1066            .epoch_store
1067            .calculate_pending_checkpoint_height(commit_info.round);
1068
1069        // Determine whether to write pending checkpoint for user tx with randomness.
1070        // - If randomness is not generated for this commit, we will skip the
1071        //   checkpoint with the associated height. Therefore checkpoint heights may
1072        //   not be contiguous.
1073        // - Exception: if DKG fails, we always need to write out a PendingCheckpoint
1074        //   for randomness tx that are canceled.
1075        let should_write_random_checkpoint = state.randomness_round.is_some()
1076            || (state.dkg_failed && !randomness_schedulables.is_empty());
1077
1078        let pending_checkpoint = PendingCheckpoint {
1079            roots: schedulables.iter().map(|s| s.key()).collect(),
1080            details: PendingCheckpointInfo {
1081                timestamp_ms: commit_info.timestamp,
1082                last_of_epoch: final_round && !should_write_random_checkpoint,
1083                checkpoint_height,
1084                consensus_commit_ref: commit_info.consensus_commit_ref,
1085                rejected_transactions_digest: commit_info.rejected_transactions_digest,
1086            },
1087        };
1088        self.epoch_store
1089            .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1090            .expect("failed to write pending checkpoint");
1091
1092        info!(
1093            "Written pending checkpoint: {:?}",
1094            pending_checkpoint.details,
1095        );
1096
1097        if should_write_random_checkpoint {
1098            let pending_checkpoint = PendingCheckpoint {
1099                roots: randomness_schedulables.iter().map(|s| s.key()).collect(),
1100                details: PendingCheckpointInfo {
1101                    timestamp_ms: commit_info.timestamp,
1102                    last_of_epoch: final_round,
1103                    checkpoint_height: checkpoint_height + 1,
1104                    consensus_commit_ref: commit_info.consensus_commit_ref,
1105                    rejected_transactions_digest: commit_info.rejected_transactions_digest,
1106                },
1107            };
1108            self.epoch_store
1109                .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1110                .expect("failed to write pending checkpoint");
1111        }
1112    }
1113
1114    fn process_transactions(
1115        &self,
1116        state: &mut CommitHandlerState,
1117        execution_time_estimator: &mut ExecutionTimeEstimator,
1118        commit_info: &ConsensusCommitInfo,
1119        authenticator_state_update_transaction: Option<VerifiedExecutableTransactionWithAliases>,
1120        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1121    ) -> (Vec<Schedulable>, Vec<Schedulable>, AssignedTxAndVersions) {
1122        let protocol_config = self.epoch_store.protocol_config();
1123        let epoch = self.epoch_store.epoch();
1124
1125        // Get the ordered set of all transactions to process, which includes deferred and
1126        // newly arrived transactions.
1127        let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) =
1128            self.merge_and_reorder_transactions(state, commit_info, user_transactions);
1129
1130        let mut shared_object_congestion_tracker =
1131            self.init_congestion_tracker(commit_info, false, &ordered_txns);
1132        let mut shared_object_using_randomness_congestion_tracker =
1133            self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns);
1134
1135        let randomness_state_update_transaction = state
1136            .randomness_round
1137            .map(|round| Schedulable::RandomnessStateUpdate(epoch, round));
1138        debug!(
1139            "Randomness state update transaction: {:?}",
1140            randomness_state_update_transaction
1141                .as_ref()
1142                .map(|t| t.key())
1143        );
1144
1145        let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len());
1146        let mut randomness_transactions_to_schedule =
1147            Vec::with_capacity(ordered_randomness_txns.len());
1148        let mut deferred_txns = BTreeMap::new();
1149        let mut cancelled_txns = BTreeMap::new();
1150
1151        for transaction in ordered_txns {
1152            self.handle_deferral_and_cancellation(
1153                state,
1154                &mut cancelled_txns,
1155                &mut deferred_txns,
1156                &mut transactions_to_schedule,
1157                protocol_config,
1158                commit_info,
1159                transaction,
1160                &mut shared_object_congestion_tracker,
1161                &previously_deferred_tx_digests,
1162                execution_time_estimator,
1163            );
1164        }
1165
1166        for transaction in ordered_randomness_txns {
1167            if state.dkg_failed {
1168                debug!(
1169                    "Canceling randomness-using transaction {:?} because DKG failed",
1170                    transaction.tx().digest(),
1171                );
1172                cancelled_txns.insert(
1173                    *transaction.tx().digest(),
1174                    CancelConsensusCertificateReason::DkgFailed,
1175                );
1176                randomness_transactions_to_schedule.push(transaction);
1177                continue;
1178            }
1179            self.handle_deferral_and_cancellation(
1180                state,
1181                &mut cancelled_txns,
1182                &mut deferred_txns,
1183                &mut randomness_transactions_to_schedule,
1184                protocol_config,
1185                commit_info,
1186                transaction,
1187                &mut shared_object_using_randomness_congestion_tracker,
1188                &previously_deferred_tx_digests,
1189                execution_time_estimator,
1190            );
1191        }
1192
1193        let mut total_deferred_txns = 0;
1194        {
1195            let mut deferred_transactions = self
1196                .epoch_store
1197                .consensus_output_cache
1198                .deferred_transactions
1199                .lock();
1200            for (key, txns) in deferred_txns.into_iter() {
1201                total_deferred_txns += txns.len();
1202                deferred_transactions.insert(key, txns.clone());
1203                state.output.defer_transactions(key, txns);
1204            }
1205        }
1206
1207        self.metrics
1208            .consensus_handler_deferred_transactions
1209            .inc_by(total_deferred_txns as u64);
1210        self.metrics
1211            .consensus_handler_cancelled_transactions
1212            .inc_by(cancelled_txns.len() as u64);
1213        self.metrics
1214            .consensus_handler_max_object_costs
1215            .with_label_values(&["regular_commit"])
1216            .set(shared_object_congestion_tracker.max_cost() as i64);
1217        self.metrics
1218            .consensus_handler_max_object_costs
1219            .with_label_values(&["randomness_commit"])
1220            .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
1221
1222        let object_debts = shared_object_congestion_tracker.accumulated_debts(commit_info);
1223        let randomness_object_debts =
1224            shared_object_using_randomness_congestion_tracker.accumulated_debts(commit_info);
1225        if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get()
1226            && let Err(e) = tx_object_debts.try_send(
1227                object_debts
1228                    .iter()
1229                    .chain(randomness_object_debts.iter())
1230                    .map(|(id, _)| *id)
1231                    .collect(),
1232            )
1233        {
1234            info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
1235        }
1236
1237        state
1238            .output
1239            .set_congestion_control_object_debts(object_debts);
1240        state
1241            .output
1242            .set_congestion_control_randomness_object_debts(randomness_object_debts);
1243
1244        let mut settlement = None;
1245        let mut randomness_settlement = None;
1246        if self.epoch_store.accumulators_enabled() {
1247            let checkpoint_height = self
1248                .epoch_store
1249                .calculate_pending_checkpoint_height(commit_info.round);
1250
1251            settlement = Some(Schedulable::AccumulatorSettlement(epoch, checkpoint_height));
1252
1253            if state.randomness_round.is_some() || !randomness_transactions_to_schedule.is_empty() {
1254                randomness_settlement = Some(Schedulable::AccumulatorSettlement(
1255                    epoch,
1256                    checkpoint_height + 1,
1257                ));
1258            }
1259        }
1260
1261        let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1262            .then_some(Schedulable::ConsensusCommitPrologue(
1263                epoch,
1264                commit_info.round,
1265                commit_info.consensus_commit_ref.index,
1266            ));
1267
1268        let schedulables: Vec<_> = itertools::chain!(
1269            consensus_commit_prologue.into_iter(),
1270            authenticator_state_update_transaction
1271                .into_iter()
1272                .map(Schedulable::Transaction),
1273            transactions_to_schedule
1274                .into_iter()
1275                .map(Schedulable::Transaction),
1276            settlement,
1277        )
1278        .collect();
1279
1280        let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1281            .into_iter()
1282            .chain(
1283                randomness_transactions_to_schedule
1284                    .into_iter()
1285                    .map(Schedulable::Transaction),
1286            )
1287            .chain(randomness_settlement)
1288            .collect();
1289
1290        let assigned_versions = self
1291            .epoch_store
1292            .process_consensus_transaction_shared_object_versions(
1293                self.cache_reader.as_ref(),
1294                schedulables.iter(),
1295                randomness_schedulables.iter(),
1296                &cancelled_txns,
1297                &mut state.output,
1298            )
1299            .expect("failed to assign shared object versions");
1300
1301        let consensus_commit_prologue =
1302            self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1303
1304        let mut schedulables = schedulables;
1305        let mut assigned_versions = assigned_versions;
1306        if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1307            assert!(matches!(
1308                schedulables[0],
1309                Schedulable::ConsensusCommitPrologue(..)
1310            ));
1311            assert!(matches!(
1312                assigned_versions.0[0].0,
1313                TransactionKey::ConsensusCommitPrologue(..)
1314            ));
1315            assigned_versions.0[0].0 =
1316                TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1317            schedulables[0] = Schedulable::Transaction(consensus_commit_prologue);
1318        }
1319
1320        self.epoch_store
1321            .process_user_signatures(schedulables.iter().chain(randomness_schedulables.iter()));
1322
1323        // After this point we can throw away alias version information.
1324        let schedulables: Vec<Schedulable> = schedulables.into_iter().map(|s| s.into()).collect();
1325        let randomness_schedulables: Vec<Schedulable> = randomness_schedulables
1326            .into_iter()
1327            .map(|s| s.into())
1328            .collect();
1329
1330        (schedulables, randomness_schedulables, assigned_versions)
1331    }
1332
1333    // Adds the consensus commit prologue transaction to the beginning of input `transactions` to update
1334    // the system clock used in all transactions in the current consensus commit.
1335    // Returns the root of the consensus commit prologue transaction if it was added to the input.
1336    fn add_consensus_commit_prologue_transaction<'a>(
1337        &'a self,
1338        state: &'a mut CommitHandlerState,
1339        commit_info: &'a ConsensusCommitInfo,
1340        assigned_versions: &AssignedTxAndVersions,
1341    ) -> Option<VerifiedExecutableTransactionWithAliases> {
1342        {
1343            if commit_info.skip_consensus_commit_prologue_in_test {
1344                return None;
1345            }
1346        }
1347
1348        let mut cancelled_txn_version_assignment = Vec::new();
1349
1350        let protocol_config = self.epoch_store.protocol_config();
1351
1352        for (txn_key, assigned_versions) in assigned_versions.0.iter() {
1353            let Some(d) = txn_key.as_digest() else {
1354                continue;
1355            };
1356
1357            if !protocol_config.include_cancelled_randomness_txns_in_prologue()
1358                && assigned_versions
1359                    .shared_object_versions
1360                    .iter()
1361                    .any(|((id, _), _)| *id == SUI_RANDOMNESS_STATE_OBJECT_ID)
1362            {
1363                continue;
1364            }
1365
1366            if assigned_versions
1367                .shared_object_versions
1368                .iter()
1369                .any(|(_, version)| version.is_cancelled())
1370            {
1371                assert_reachable!("cancelled transactions");
1372                cancelled_txn_version_assignment
1373                    .push((*d, assigned_versions.shared_object_versions.clone()));
1374            }
1375        }
1376
1377        fail_point_arg!(
1378            "additional_cancelled_txns_for_tests",
1379            |additional_cancelled_txns: Vec<(
1380                TransactionDigest,
1381                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>
1382            )>| {
1383                cancelled_txn_version_assignment.extend(additional_cancelled_txns);
1384            }
1385        );
1386
1387        let transaction = commit_info.create_consensus_commit_prologue_transaction(
1388            self.epoch_store.epoch(),
1389            self.epoch_store.protocol_config(),
1390            cancelled_txn_version_assignment,
1391            commit_info,
1392            state.indirect_state_observer.take().unwrap(),
1393        );
1394        Some(VerifiedExecutableTransactionWithAliases::no_aliases(
1395            transaction,
1396        ))
1397    }
1398
1399    fn handle_deferral_and_cancellation(
1400        &self,
1401        state: &mut CommitHandlerState,
1402        cancelled_txns: &mut BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1403        deferred_txns: &mut BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>,
1404        scheduled_txns: &mut Vec<VerifiedExecutableTransactionWithAliases>,
1405        protocol_config: &ProtocolConfig,
1406        commit_info: &ConsensusCommitInfo,
1407        transaction: VerifiedExecutableTransactionWithAliases,
1408        shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
1409        previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
1410        execution_time_estimator: &ExecutionTimeEstimator,
1411    ) {
1412        let tx_cost = shared_object_congestion_tracker.get_tx_cost(
1413            execution_time_estimator,
1414            transaction.tx(),
1415            state.indirect_state_observer.as_mut().unwrap(),
1416        );
1417
1418        let deferral_info = self.epoch_store.should_defer(
1419            transaction.tx(),
1420            commit_info,
1421            state.dkg_failed,
1422            state.randomness_round.is_some(),
1423            previously_deferred_tx_digests,
1424            shared_object_congestion_tracker,
1425        );
1426
1427        if let Some((deferral_key, deferral_reason)) = deferral_info {
1428            debug!(
1429                "Deferring consensus certificate for transaction {:?} until {:?}",
1430                transaction.tx().digest(),
1431                deferral_key
1432            );
1433
1434            match deferral_reason {
1435                DeferralReason::RandomnessNotReady => {
1436                    deferred_txns
1437                        .entry(deferral_key)
1438                        .or_default()
1439                        .push(transaction);
1440                }
1441                DeferralReason::SharedObjectCongestion(congested_objects) => {
1442                    self.metrics.consensus_handler_congested_transactions.inc();
1443                    if transaction_deferral_within_limit(
1444                        &deferral_key,
1445                        protocol_config.max_deferral_rounds_for_congestion_control(),
1446                    ) {
1447                        deferred_txns
1448                            .entry(deferral_key)
1449                            .or_default()
1450                            .push(transaction);
1451                    } else {
1452                        assert_sometimes!(
1453                            transaction.tx().data().transaction_data().uses_randomness(),
1454                            "cancelled randomness-using transaction"
1455                        );
1456                        assert_sometimes!(
1457                            !transaction.tx().data().transaction_data().uses_randomness(),
1458                            "cancelled non-randomness-using transaction"
1459                        );
1460
1461                        // Cancel the transaction that has been deferred for too long.
1462                        debug!(
1463                            "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}",
1464                            transaction.tx().digest(),
1465                            deferral_key,
1466                            congested_objects
1467                        );
1468                        cancelled_txns.insert(
1469                            *transaction.tx().digest(),
1470                            CancelConsensusCertificateReason::CongestionOnObjects(
1471                                congested_objects,
1472                            ),
1473                        );
1474                        scheduled_txns.push(transaction);
1475                    }
1476                }
1477            }
1478        } else {
1479            // Update object execution cost for all scheduled transactions
1480            shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, transaction.tx());
1481            scheduled_txns.push(transaction);
1482        }
1483    }
1484
1485    fn merge_and_reorder_transactions(
1486        &self,
1487        state: &mut CommitHandlerState,
1488        commit_info: &ConsensusCommitInfo,
1489        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1490    ) -> (
1491        Vec<VerifiedExecutableTransactionWithAliases>,
1492        Vec<VerifiedExecutableTransactionWithAliases>,
1493        HashMap<TransactionDigest, DeferralKey>,
1494    ) {
1495        let protocol_config = self.epoch_store.protocol_config();
1496
1497        let (mut txns, mut randomness_txns, previously_deferred_tx_digests) =
1498            self.load_deferred_transactions(state, commit_info);
1499
1500        txns.reserve(user_transactions.len());
1501        randomness_txns.reserve(user_transactions.len());
1502
1503        // There may be randomness transactions in `txns`, which were deferred due to congestion.
1504        // They must be placed back into `randomness_txns`.
1505        let mut txns: Vec<_> = txns
1506            .into_iter()
1507            .filter_map(|tx| {
1508                if tx.tx().transaction_data().uses_randomness() {
1509                    randomness_txns.push(tx);
1510                    None
1511                } else {
1512                    Some(tx)
1513                }
1514            })
1515            .collect();
1516
1517        for txn in user_transactions {
1518            if txn.tx().transaction_data().uses_randomness() {
1519                randomness_txns.push(txn);
1520            } else {
1521                txns.push(txn);
1522            }
1523        }
1524
1525        PostConsensusTxReorder::reorder(
1526            &mut txns,
1527            protocol_config.consensus_transaction_ordering(),
1528        );
1529        PostConsensusTxReorder::reorder(
1530            &mut randomness_txns,
1531            protocol_config.consensus_transaction_ordering(),
1532        );
1533
1534        (txns, randomness_txns, previously_deferred_tx_digests)
1535    }
1536
1537    fn load_deferred_transactions(
1538        &self,
1539        state: &mut CommitHandlerState,
1540        commit_info: &ConsensusCommitInfo,
1541    ) -> (
1542        Vec<VerifiedExecutableTransactionWithAliases>,
1543        Vec<VerifiedExecutableTransactionWithAliases>,
1544        HashMap<TransactionDigest, DeferralKey>,
1545    ) {
1546        let mut previously_deferred_tx_digests = HashMap::new();
1547
1548        let deferred_txs: Vec<_> = self
1549            .epoch_store
1550            .load_deferred_transactions_for_up_to_consensus_round_v2(
1551                &mut state.output,
1552                commit_info.round,
1553            )
1554            .expect("db error")
1555            .into_iter()
1556            .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
1557            .map(|(key, tx)| {
1558                previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
1559                tx
1560            })
1561            .collect();
1562        trace!(
1563            "loading deferred transactions: {:?}",
1564            deferred_txs.iter().map(|tx| tx.tx().digest())
1565        );
1566
1567        let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() {
1568            let txns: Vec<_> = self
1569                .epoch_store
1570                .load_deferred_transactions_for_randomness_v2(&mut state.output)
1571                .expect("db error")
1572                .into_iter()
1573                .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
1574                .map(|(key, tx)| {
1575                    previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
1576                    tx
1577                })
1578                .collect();
1579            trace!(
1580                "loading deferred randomness transactions: {:?}",
1581                txns.iter().map(|tx| tx.tx().digest())
1582            );
1583            txns
1584        } else {
1585            vec![]
1586        };
1587
1588        (
1589            deferred_txs,
1590            deferred_randomness_txs,
1591            previously_deferred_tx_digests,
1592        )
1593    }
1594
1595    fn init_congestion_tracker(
1596        &self,
1597        commit_info: &ConsensusCommitInfo,
1598        for_randomness: bool,
1599        txns: &[VerifiedExecutableTransactionWithAliases],
1600    ) -> SharedObjectCongestionTracker {
1601        #[allow(unused_mut)]
1602        let mut ret = SharedObjectCongestionTracker::from_protocol_config(
1603            self.epoch_store
1604                .consensus_quarantine
1605                .read()
1606                .load_initial_object_debts(
1607                    &self.epoch_store,
1608                    commit_info.round,
1609                    for_randomness,
1610                    txns,
1611                )
1612                .expect("db error"),
1613            self.epoch_store.protocol_config(),
1614            for_randomness,
1615        );
1616
1617        fail_point_arg!(
1618            "initial_congestion_tracker",
1619            |tracker: SharedObjectCongestionTracker| {
1620                info!(
1621                    "Initialize shared_object_congestion_tracker to  {:?}",
1622                    tracker
1623                );
1624                ret = tracker;
1625            }
1626        );
1627
1628        ret
1629    }
1630
1631    fn process_jwks(
1632        &self,
1633        state: &mut CommitHandlerState,
1634        commit_info: &ConsensusCommitInfo,
1635        new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
1636    ) {
1637        for (authority_name, jwk_id, jwk) in new_jwks {
1638            self.epoch_store.record_jwk_vote(
1639                &mut state.output,
1640                commit_info.round,
1641                authority_name,
1642                &jwk_id,
1643                &jwk,
1644            );
1645        }
1646    }
1647
1648    fn process_capability_notifications(
1649        &self,
1650        capability_notifications: Vec<AuthorityCapabilitiesV2>,
1651    ) {
1652        for capabilities in capability_notifications {
1653            self.epoch_store
1654                .record_capabilities_v2(&capabilities)
1655                .expect("db error");
1656        }
1657    }
1658
1659    fn process_execution_time_observations(
1660        &self,
1661        state: &mut CommitHandlerState,
1662        execution_time_observations: Vec<ExecutionTimeObservation>,
1663    ) {
1664        let mut execution_time_estimator = self
1665            .epoch_store
1666            .execution_time_estimator
1667            .try_lock()
1668            .expect("should only ever be called from the commit handler thread");
1669
1670        for ExecutionTimeObservation {
1671            authority,
1672            generation,
1673            estimates,
1674        } in execution_time_observations
1675        {
1676            let authority_index = self
1677                .epoch_store
1678                .committee()
1679                .authority_index(&authority)
1680                .unwrap();
1681            execution_time_estimator.process_observations_from_consensus(
1682                authority_index,
1683                Some(generation),
1684                &estimates,
1685            );
1686            state
1687                .output
1688                .insert_execution_time_observation(authority_index, generation, estimates);
1689        }
1690    }
1691
1692    fn process_checkpoint_signature_messages(
1693        &self,
1694        checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
1695    ) {
1696        for checkpoint_signature_message in checkpoint_signature_messages {
1697            self.checkpoint_service
1698                .notify_checkpoint_signature(&self.epoch_store, &checkpoint_signature_message)
1699                .expect("db error");
1700        }
1701    }
1702
1703    async fn process_dkg_updates(
1704        &self,
1705        state: &mut CommitHandlerState,
1706        commit_info: &ConsensusCommitInfo,
1707        randomness_manager: Option<&mut RandomnessManager>,
1708        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
1709        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
1710    ) {
1711        if !self.epoch_store.randomness_state_enabled() {
1712            let num_dkg_messages = randomness_dkg_messages.len();
1713            let num_dkg_confirmations = randomness_dkg_confirmations.len();
1714            if num_dkg_messages + num_dkg_confirmations > 0 {
1715                debug_fatal!(
1716                    "received {} RandomnessDkgMessage and {} RandomnessDkgConfirmation messages when randomness is not enabled",
1717                    num_dkg_messages,
1718                    num_dkg_confirmations
1719                );
1720            }
1721            return;
1722        }
1723
1724        let randomness_manager =
1725            randomness_manager.expect("randomness manager should exist if randomness is enabled");
1726
1727        let randomness_dkg_updates =
1728            self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages);
1729
1730        let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations(
1731            state,
1732            randomness_manager,
1733            randomness_dkg_confirmations,
1734        );
1735
1736        if randomness_dkg_updates || randomness_dkg_confirmation_updates {
1737            randomness_manager
1738                .advance_dkg(&mut state.output, commit_info.round)
1739                .await
1740                .expect("epoch ended");
1741        }
1742    }
1743
1744    fn process_randomness_dkg_messages(
1745        &self,
1746        randomness_manager: &mut RandomnessManager,
1747        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
1748    ) -> bool /* randomness state updated */ {
1749        if randomness_dkg_messages.is_empty() {
1750            return false;
1751        }
1752
1753        let mut randomness_state_updated = false;
1754        for (authority, bytes) in randomness_dkg_messages {
1755            match bcs::from_bytes(&bytes) {
1756                Ok(message) => {
1757                    randomness_manager
1758                        .add_message(&authority, message)
1759                        // TODO: make infallible
1760                        .expect("epoch ended");
1761                    randomness_state_updated = true;
1762                }
1763
1764                Err(e) => {
1765                    warn!(
1766                        "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
1767                        authority.concise(),
1768                    );
1769                }
1770            }
1771        }
1772
1773        randomness_state_updated
1774    }
1775
1776    fn process_randomness_dkg_confirmations(
1777        &self,
1778        state: &mut CommitHandlerState,
1779        randomness_manager: &mut RandomnessManager,
1780        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
1781    ) -> bool /* randomness state updated */ {
1782        if randomness_dkg_confirmations.is_empty() {
1783            return false;
1784        }
1785
1786        let mut randomness_state_updated = false;
1787        for (authority, bytes) in randomness_dkg_confirmations {
1788            match bcs::from_bytes(&bytes) {
1789                Ok(message) => {
1790                    randomness_manager
1791                        .add_confirmation(&mut state.output, &authority, message)
1792                        // TODO: make infallible
1793                        .expect("epoch ended");
1794                    randomness_state_updated = true;
1795                }
1796                Err(e) => {
1797                    warn!(
1798                        "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}",
1799                        authority.concise(),
1800                    );
1801                }
1802            }
1803        }
1804
1805        randomness_state_updated
1806    }
1807
1808    /// Returns true if we have collected a quorum of end of publish messages (either in this round or a previous round).
1809    fn process_end_of_publish_transactions(
1810        &self,
1811        state: &mut CommitHandlerState,
1812        end_of_publish_transactions: Vec<AuthorityName>,
1813    ) -> bool {
1814        let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect(
1815            "No contention on end_of_publish as it is only accessed from consensus handler",
1816        );
1817
1818        if eop_aggregator.has_quorum() {
1819            return true;
1820        }
1821
1822        if end_of_publish_transactions.is_empty() {
1823            return false;
1824        }
1825
1826        for authority in end_of_publish_transactions {
1827            info!("Received EndOfPublish from {:?}", authority.concise());
1828
1829            // It is ok to just release lock here as this function is the only place that transition into RejectAllCerts state
1830            // And this function itself is always executed from consensus task
1831            state.output.insert_end_of_publish(authority);
1832            if eop_aggregator
1833                .insert_generic(authority, ())
1834                .is_quorum_reached()
1835            {
1836                debug!(
1837                    "Collected enough end_of_publish messages with last message from validator {:?}",
1838                    authority.concise(),
1839                );
1840                return true;
1841            }
1842        }
1843
1844        false
1845    }
1846
1847    /// After we have collected 2f+1 EndOfPublish messages, we call this function every round until the epoch
1848    /// ends.
1849    fn advance_eop_state_machine(
1850        &self,
1851        state: &mut CommitHandlerState,
1852    ) -> (
1853        RwLockWriteGuard<'_, ReconfigState>,
1854        bool, // true if final round
1855    ) {
1856        let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard();
1857        let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx();
1858
1859        reconfig_state.close_all_certs();
1860
1861        let commit_has_deferred_txns = state.output.has_deferred_transactions();
1862        let previous_commits_have_deferred_txns = !self.epoch_store.deferred_transactions_empty();
1863
1864        if !commit_has_deferred_txns && !previous_commits_have_deferred_txns {
1865            if !start_state_is_reject_all_tx {
1866                info!("Transitioning to RejectAllTx");
1867            }
1868            reconfig_state.close_all_tx();
1869        } else {
1870            debug!(
1871                "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}",
1872                previous_commits_have_deferred_txns, commit_has_deferred_txns,
1873            );
1874        }
1875
1876        state.output.store_reconfig_state(reconfig_state.clone());
1877
1878        if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() {
1879            (reconfig_state, true)
1880        } else {
1881            (reconfig_state, false)
1882        }
1883    }
1884
1885    fn gather_commit_metadata(
1886        &self,
1887        consensus_commit: &impl ConsensusCommitAPI,
1888    ) -> (u64, AuthorityIndex, u64) {
1889        let timestamp = consensus_commit.commit_timestamp_ms();
1890        let leader_author = consensus_commit.leader_author_index();
1891        let commit_sub_dag_index = consensus_commit.commit_sub_dag_index();
1892
1893        let system_time_ms = SystemTime::now()
1894            .duration_since(UNIX_EPOCH)
1895            .unwrap()
1896            .as_millis() as i64;
1897
1898        let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64);
1899        let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0;
1900        self.metrics
1901            .consensus_timestamp_bias
1902            .observe(consensus_timestamp_bias_seconds);
1903
1904        let epoch_start = self
1905            .epoch_store
1906            .epoch_start_config()
1907            .epoch_start_timestamp_ms();
1908        let timestamp = if timestamp < epoch_start {
1909            error!(
1910                "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}"
1911            );
1912            epoch_start
1913        } else {
1914            timestamp
1915        };
1916
1917        (timestamp, leader_author, commit_sub_dag_index)
1918    }
1919
1920    fn create_authenticator_state_update(
1921        &self,
1922        last_committed_round: u64,
1923        commit_info: &ConsensusCommitInfo,
1924    ) -> Option<VerifiedExecutableTransactionWithAliases> {
1925        // Load all jwks that became active in the previous round, and commit them in this round.
1926        // We want to delay one round because none of the transactions in the previous round could
1927        // have been authenticated with the jwks that became active in that round.
1928        //
1929        // Because of this delay, jwks that become active in the last round of the epoch will
1930        // never be committed. That is ok, because in the new epoch, the validators should
1931        // immediately re-submit these jwks, and they can become active then.
1932        let new_jwks = self
1933            .epoch_store
1934            .get_new_jwks(last_committed_round)
1935            .expect("Unrecoverable error in consensus handler");
1936
1937        if !new_jwks.is_empty() {
1938            let authenticator_state_update_transaction = authenticator_state_update_transaction(
1939                &self.epoch_store,
1940                commit_info.round,
1941                new_jwks,
1942            );
1943            debug!(
1944                "adding AuthenticatorStateUpdate({:?}) tx: {:?}",
1945                authenticator_state_update_transaction.digest(),
1946                authenticator_state_update_transaction,
1947            );
1948
1949            Some(VerifiedExecutableTransactionWithAliases::no_aliases(
1950                authenticator_state_update_transaction,
1951            ))
1952        } else {
1953            None
1954        }
1955    }
1956
1957    // Filters out rejected or deprecated transactions.
1958    // Returns FilteredConsensusOutput containing transactions and owned_object_locks
1959    // (collected when preconsensus locking is disabled).
1960    #[instrument(level = "trace", skip_all)]
1961    fn filter_consensus_txns(
1962        &mut self,
1963        initial_reconfig_state: ReconfigState,
1964        commit_info: &ConsensusCommitInfo,
1965        consensus_commit: &impl ConsensusCommitAPI,
1966    ) -> FilteredConsensusOutput {
1967        let mut transactions = Vec::new();
1968        let mut owned_object_locks = HashMap::new();
1969        let epoch = self.epoch_store.epoch();
1970        let mut num_finalized_user_transactions = vec![0; self.committee.size()];
1971        let mut num_rejected_user_transactions = vec![0; self.committee.size()];
1972        for (block, parsed_transactions) in consensus_commit.transactions() {
1973            let author = block.author.value();
1974            // TODO: consider only messages within 1~3 rounds of the leader?
1975            self.last_consensus_stats.stats.inc_num_messages(author);
1976
1977            // Set the "ping" transaction status for this block. This is necessary as there might be some ping requests waiting for the ping transaction to be certified.
1978            self.epoch_store.set_consensus_tx_status(
1979                ConsensusPosition::ping(epoch, block),
1980                ConsensusTxStatus::Finalized,
1981            );
1982
1983            for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() {
1984                let position = ConsensusPosition {
1985                    epoch,
1986                    block,
1987                    index: tx_index as TransactionIndex,
1988                };
1989
1990                // Transaction has appeared in consensus output, we can increment the submission count
1991                // for this tx for DoS protection.
1992                if self.epoch_store.protocol_config().mysticeti_fastpath()
1993                    && let Some(tx) = parsed.transaction.kind.as_user_transaction()
1994                {
1995                    let digest = tx.digest();
1996                    if let Some((spam_weight, submitter_client_addrs)) = self
1997                        .epoch_store
1998                        .submitted_transaction_cache
1999                        .increment_submission_count(digest)
2000                    {
2001                        if let Some(ref traffic_controller) = self.traffic_controller {
2002                            debug!(
2003                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses",
2004                                submitter_client_addrs.len()
2005                            );
2006
2007                            // Apply spam weight to all client addresses that submitted this transaction
2008                            for addr in submitter_client_addrs {
2009                                traffic_controller.tally(TrafficTally::new(
2010                                    Some(addr),
2011                                    None,
2012                                    None,
2013                                    spam_weight.clone(),
2014                                ));
2015                            }
2016                        } else {
2017                            warn!(
2018                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)",
2019                                submitter_client_addrs.len()
2020                            );
2021                        }
2022                    }
2023                }
2024
2025                if parsed.rejected {
2026                    // TODO(fastpath): Add metrics for rejected transactions.
2027                    if matches!(
2028                        parsed.transaction.kind,
2029                        ConsensusTransactionKind::UserTransaction(_)
2030                            | ConsensusTransactionKind::UserTransactionV2(_)
2031                    ) {
2032                        self.epoch_store
2033                            .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
2034                        num_rejected_user_transactions[author] += 1;
2035                    }
2036                    // Skip processing rejected transactions.
2037                    // TODO(fastpath): Handle unlocking.
2038                    continue;
2039                }
2040                // Set Finalized status for user transactions.
2041                // For UserTransactionV2 with disable_preconsensus_locking, we defer setting
2042                // Finalized until after successful lock acquisition (see below).
2043                let defer_finalized_status = self
2044                    .epoch_store
2045                    .protocol_config()
2046                    .disable_preconsensus_locking()
2047                    && matches!(
2048                        parsed.transaction.kind,
2049                        ConsensusTransactionKind::UserTransactionV2(_)
2050                    );
2051                if !defer_finalized_status
2052                    && matches!(
2053                        parsed.transaction.kind,
2054                        ConsensusTransactionKind::UserTransaction(_)
2055                            | ConsensusTransactionKind::UserTransactionV2(_)
2056                    )
2057                {
2058                    self.epoch_store
2059                        .set_consensus_tx_status(position, ConsensusTxStatus::Finalized);
2060                    num_finalized_user_transactions[author] += 1;
2061                }
2062                let kind = classify(&parsed.transaction);
2063                self.metrics
2064                    .consensus_handler_processed
2065                    .with_label_values(&[kind])
2066                    .inc();
2067                self.metrics
2068                    .consensus_handler_transaction_sizes
2069                    .with_label_values(&[kind])
2070                    .observe(parsed.serialized_len as f64);
2071                // UserTransaction exists only when mysticeti_fastpath is enabled in protocol config.
2072                if matches!(
2073                    &parsed.transaction.kind,
2074                    ConsensusTransactionKind::CertifiedTransaction(_)
2075                        | ConsensusTransactionKind::UserTransaction(_)
2076                        | ConsensusTransactionKind::UserTransactionV2(_)
2077                ) {
2078                    self.last_consensus_stats
2079                        .stats
2080                        .inc_num_user_transactions(author);
2081                }
2082
2083                if !initial_reconfig_state.should_accept_consensus_certs() {
2084                    // (Note: we no longer need to worry about the previously deferred condition, since we are only
2085                    // processing newly-received transactions at this time).
2086                    match &parsed.transaction.kind {
2087                        ConsensusTransactionKind::UserTransaction(_)
2088                        | ConsensusTransactionKind::UserTransactionV2(_)
2089                        | ConsensusTransactionKind::CertifiedTransaction(_)
2090                        // deprecated and ignore later, but added for exhaustive match
2091                        | ConsensusTransactionKind::CapabilityNotification(_)
2092                        | ConsensusTransactionKind::CapabilityNotificationV2(_)
2093                        | ConsensusTransactionKind::EndOfPublish(_)
2094                        // Note: we no longer have to check protocol_config.ignore_execution_time_observations_after_certs_closed()
2095                        | ConsensusTransactionKind::ExecutionTimeObservation(_)
2096                        | ConsensusTransactionKind::NewJWKFetched(_, _, _) => {
2097                            debug!(
2098                                "Ignoring consensus transaction {:?} because of end of epoch",
2099                                parsed.transaction.key()
2100                            );
2101                            continue;
2102                        }
2103
2104                        // These are the message types that are still processed even if !should_accept_consensus_certs()
2105                        ConsensusTransactionKind::CheckpointSignature(_)
2106                        | ConsensusTransactionKind::CheckpointSignatureV2(_)
2107                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2108                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
2109                        | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => ()
2110                    }
2111                }
2112
2113                if !initial_reconfig_state.should_accept_tx() {
2114                    match &parsed.transaction.kind {
2115                        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
2116                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue,
2117                        _ => {}
2118                    }
2119                }
2120
2121                if parsed.transaction.is_mfp_transaction()
2122                    && !self.epoch_store.protocol_config().mysticeti_fastpath()
2123                {
2124                    debug!(
2125                        "Ignoring MFP transaction {:?} because MFP is disabled",
2126                        parsed.transaction.key()
2127                    );
2128                    continue;
2129                }
2130
2131                if let ConsensusTransactionKind::CertifiedTransaction(certificate) =
2132                    &parsed.transaction.kind
2133                    && certificate.epoch() != epoch
2134                {
2135                    debug!(
2136                        "Certificate epoch ({:?}) doesn't match the current epoch ({:?})",
2137                        certificate.epoch(),
2138                        epoch
2139                    );
2140                    continue;
2141                }
2142
2143                // Handle deprecated messages
2144                match &parsed.transaction.kind {
2145                    ConsensusTransactionKind::CapabilityNotification(_)
2146                    | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2147                    | ConsensusTransactionKind::CheckpointSignature(_) => {
2148                        debug_fatal!(
2149                            "BUG: saw deprecated tx {:?}for commit round {}",
2150                            parsed.transaction.key(),
2151                            commit_info.round
2152                        );
2153                        continue;
2154                    }
2155                    _ => {}
2156                }
2157
2158                if matches!(
2159                    &parsed.transaction.kind,
2160                    ConsensusTransactionKind::UserTransaction(_)
2161                        | ConsensusTransactionKind::UserTransactionV2(_)
2162                        | ConsensusTransactionKind::CertifiedTransaction(_)
2163                ) {
2164                    let author_name = self
2165                        .epoch_store
2166                        .committee()
2167                        .authority_by_index(author as u32)
2168                        .unwrap();
2169                    if self
2170                        .epoch_store
2171                        .has_received_end_of_publish_from(author_name)
2172                    {
2173                        // In some edge cases, consensus might resend previously seen certificate after EndOfPublish
2174                        // An honest validator should not send a new transaction after EndOfPublish. Whether the
2175                        // transaction is duplicate or not, we filter it out here.
2176                        warn!(
2177                            "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus",
2178                            author_name.concise(),
2179                            parsed.transaction.key(),
2180                        );
2181                        continue;
2182                    }
2183                }
2184
2185                // When preconsensus locking is disabled, perform post-consensus owned object
2186                // conflict detection. If lock acquisition fails, the transaction has
2187                // invalid/conflicting owned inputs and should be dropped.
2188                // This must happen AFTER all filtering checks above to avoid acquiring locks
2189                // for transactions that will be dropped (e.g., during epoch change).
2190                // Only applies to UserTransactionV2 - other transaction types don't need lock acquisition.
2191                if self
2192                    .epoch_store
2193                    .protocol_config()
2194                    .disable_preconsensus_locking()
2195                    && let ConsensusTransactionKind::UserTransactionV2(tx_with_claims) =
2196                        &parsed.transaction.kind
2197                {
2198                    let immutable_object_ids: HashSet<ObjectID> =
2199                        tx_with_claims.get_immutable_objects().into_iter().collect();
2200                    let tx = tx_with_claims.tx();
2201
2202                    let Ok(input_objects) = tx.transaction_data().input_objects() else {
2203                        debug_fatal!("Invalid input objects for transaction {}", tx.digest());
2204                        continue;
2205                    };
2206
2207                    // Filter ImmOrOwnedMoveObject inputs, excluding those claimed to be immutable.
2208                    // Immutable objects don't need lock acquisition as they can be used concurrently.
2209                    let owned_object_refs: Vec<_> = input_objects
2210                        .iter()
2211                        .filter_map(|obj| match obj {
2212                            InputObjectKind::ImmOrOwnedMoveObject(obj_ref)
2213                                if !immutable_object_ids.contains(&obj_ref.0) =>
2214                            {
2215                                Some(*obj_ref)
2216                            }
2217                            _ => None,
2218                        })
2219                        .collect();
2220
2221                    match self
2222                        .epoch_store
2223                        .try_acquire_owned_object_locks_post_consensus(
2224                            &owned_object_refs,
2225                            *tx.digest(),
2226                            &owned_object_locks,
2227                        ) {
2228                        Ok(new_locks) => {
2229                            owned_object_locks.extend(new_locks.into_iter());
2230                            // Lock acquisition succeeded - now set Finalized status
2231                            self.epoch_store
2232                                .set_consensus_tx_status(position, ConsensusTxStatus::Finalized);
2233                            num_finalized_user_transactions[author] += 1;
2234                        }
2235                        Err(e) => {
2236                            debug!("Dropping transaction {}: {}", tx.digest(), e);
2237                            self.epoch_store
2238                                .set_consensus_tx_status(position, ConsensusTxStatus::Dropped);
2239                            self.epoch_store.set_rejection_vote_reason(position, &e);
2240                            continue;
2241                        }
2242                    }
2243                }
2244
2245                let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);
2246                transactions.push((transaction, author as u32));
2247            }
2248        }
2249
2250        for (i, authority) in self.committee.authorities() {
2251            let hostname = &authority.hostname;
2252            self.metrics
2253                .consensus_committed_messages
2254                .with_label_values(&[hostname])
2255                .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
2256            self.metrics
2257                .consensus_committed_user_transactions
2258                .with_label_values(&[hostname])
2259                .set(
2260                    self.last_consensus_stats
2261                        .stats
2262                        .get_num_user_transactions(i.value()) as i64,
2263                );
2264            self.metrics
2265                .consensus_finalized_user_transactions
2266                .with_label_values(&[hostname])
2267                .add(num_finalized_user_transactions[i.value()] as i64);
2268            self.metrics
2269                .consensus_rejected_user_transactions
2270                .with_label_values(&[hostname])
2271                .add(num_rejected_user_transactions[i.value()] as i64);
2272        }
2273
2274        FilteredConsensusOutput {
2275            transactions,
2276            owned_object_locks,
2277        }
2278    }
2279
2280    fn deduplicate_consensus_txns(
2281        &mut self,
2282        state: &mut CommitHandlerState,
2283        commit_info: &ConsensusCommitInfo,
2284        transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
2285    ) -> Vec<VerifiedSequencedConsensusTransaction> {
2286        // We need a set here as well, since the processed_cache is a LRU cache and can drop
2287        // entries while we're iterating over the sequenced transactions.
2288        let mut processed_set = HashSet::new();
2289
2290        let mut all_transactions = Vec::new();
2291
2292        // All of these TODOs are handled here in the new code, whereas in the old code, they were
2293        // each handled separately. The key thing to see is that all messages are marked as processed
2294        // here, except for ones that are filtered out earlier (e.g. due to !should_accept_consensus_certs()).
2295
2296        for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
2297            // SequencedConsensusTransaction for commit prologue any more.
2298            // In process_consensus_transactions_and_commit_boundary(), we will add a system consensus commit
2299            // prologue transaction, which will be the first transaction in this consensus commit batch.
2300            // Therefore, the transaction sequence number starts from 1 here.
2301            let current_tx_index = ExecutionIndices {
2302                last_committed_round: commit_info.round,
2303                sub_dag_index: commit_info.consensus_commit_ref.index.into(),
2304                transaction_index: (seq + 1) as u64,
2305            };
2306
2307            self.last_consensus_stats.index = current_tx_index;
2308
2309            let certificate_author = *self
2310                .epoch_store
2311                .committee()
2312                .authority_by_index(cert_origin)
2313                .unwrap();
2314
2315            let sequenced_transaction = SequencedConsensusTransaction {
2316                certificate_author_index: cert_origin,
2317                certificate_author,
2318                consensus_index: current_tx_index,
2319                transaction,
2320            };
2321
2322            let Some(verified_transaction) = self
2323                .epoch_store
2324                .verify_consensus_transaction(sequenced_transaction)
2325            else {
2326                continue;
2327            };
2328
2329            let key = verified_transaction.0.key();
2330
2331            if let Some(tx_digest) = key.user_transaction_digest() {
2332                self.epoch_store
2333                    .cache_recently_finalized_transaction(tx_digest);
2334            }
2335
2336            let in_set = !processed_set.insert(key.clone());
2337            let in_cache = self.processed_cache.put(key.clone(), ()).is_some();
2338            if in_set || in_cache {
2339                self.metrics.skipped_consensus_txns_cache_hit.inc();
2340                continue;
2341            }
2342            if self
2343                .epoch_store
2344                .is_consensus_message_processed(&key)
2345                .expect("db error")
2346            {
2347                self.metrics.skipped_consensus_txns.inc();
2348                continue;
2349            }
2350
2351            state.output.record_consensus_message_processed(key);
2352
2353            all_transactions.push(verified_transaction);
2354        }
2355
2356        all_transactions
2357    }
2358
2359    fn build_commit_handler_input(
2360        &self,
2361        transactions: Vec<VerifiedSequencedConsensusTransaction>,
2362    ) -> CommitHandlerInput {
2363        let epoch = self.epoch_store.epoch();
2364        let mut commit_handler_input = CommitHandlerInput::default();
2365
2366        for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() {
2367            match transaction.transaction {
2368                SequencedConsensusTransactionKind::External(consensus_transaction) => {
2369                    match consensus_transaction.kind {
2370                        // === User transactions ===
2371                        ConsensusTransactionKind::CertifiedTransaction(cert) => {
2372                            // Safe because signatures are verified when consensus called into SuiTxValidator::validate_batch.
2373                            let cert = VerifiedCertificate::new_unchecked(*cert);
2374                            let transaction =
2375                                VerifiedExecutableTransaction::new_from_certificate(cert);
2376                            commit_handler_input.user_transactions.push(
2377                                VerifiedExecutableTransactionWithAliases::no_aliases(transaction),
2378                            );
2379                        }
2380                        ConsensusTransactionKind::UserTransaction(tx) => {
2381                            // Safe because transactions are certified by consensus.
2382                            let tx = VerifiedTransaction::new_unchecked(*tx);
2383                            // TODO(fastpath): accept position in consensus, after plumbing consensus round, authority index, and transaction index here.
2384                            let transaction =
2385                                VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
2386                            commit_handler_input
2387                                .user_transactions
2388                                // Use of v1 UserTransaction implies commitment to no aliases.
2389                                .push(VerifiedExecutableTransactionWithAliases::no_aliases(
2390                                    transaction,
2391                                ));
2392                        }
2393                        ConsensusTransactionKind::UserTransactionV2(tx) => {
2394                            // Extract the aliases claim (required) from the claims
2395                            let used_alias_versions = tx.aliases();
2396                            let inner_tx = tx.into_tx();
2397                            // Safe because transactions are certified by consensus.
2398                            let tx = VerifiedTransaction::new_unchecked(inner_tx);
2399                            // TODO(fastpath): accept position in consensus, after plumbing consensus round, authority index, and transaction index here.
2400                            let transaction =
2401                                VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
2402                            if let Some(used_alias_versions) = used_alias_versions {
2403                                commit_handler_input
2404                                    .user_transactions
2405                                    .push(WithAliases::new(transaction, used_alias_versions));
2406                            } else {
2407                                commit_handler_input.user_transactions.push(
2408                                    VerifiedExecutableTransactionWithAliases::no_aliases(
2409                                        transaction,
2410                                    ),
2411                                );
2412                            }
2413                        }
2414
2415                        // === State machines ===
2416                        ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => {
2417                            commit_handler_input
2418                                .end_of_publish_transactions
2419                                .push(authority_public_key_bytes);
2420                        }
2421                        ConsensusTransactionKind::NewJWKFetched(
2422                            authority_public_key_bytes,
2423                            jwk_id,
2424                            jwk,
2425                        ) => {
2426                            commit_handler_input.new_jwks.push((
2427                                authority_public_key_bytes,
2428                                jwk_id,
2429                                jwk,
2430                            ));
2431                        }
2432                        ConsensusTransactionKind::RandomnessDkgMessage(
2433                            authority_public_key_bytes,
2434                            items,
2435                        ) => {
2436                            commit_handler_input
2437                                .randomness_dkg_messages
2438                                .push((authority_public_key_bytes, items));
2439                        }
2440                        ConsensusTransactionKind::RandomnessDkgConfirmation(
2441                            authority_public_key_bytes,
2442                            items,
2443                        ) => {
2444                            commit_handler_input
2445                                .randomness_dkg_confirmations
2446                                .push((authority_public_key_bytes, items));
2447                        }
2448                        ConsensusTransactionKind::CapabilityNotificationV2(
2449                            authority_capabilities_v2,
2450                        ) => {
2451                            commit_handler_input
2452                                .capability_notifications
2453                                .push(authority_capabilities_v2);
2454                        }
2455                        ConsensusTransactionKind::ExecutionTimeObservation(
2456                            execution_time_observation,
2457                        ) => {
2458                            commit_handler_input
2459                                .execution_time_observations
2460                                .push(execution_time_observation);
2461                        }
2462                        ConsensusTransactionKind::CheckpointSignatureV2(
2463                            checkpoint_signature_message,
2464                        ) => {
2465                            commit_handler_input
2466                                .checkpoint_signature_messages
2467                                .push(*checkpoint_signature_message);
2468                        }
2469
2470                        // Deprecated messages, filtered earlier by filter_consensus_txns()
2471                        ConsensusTransactionKind::CheckpointSignature(_)
2472                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2473                        | ConsensusTransactionKind::CapabilityNotification(_) => {
2474                            unreachable!("filtered earlier")
2475                        }
2476                    }
2477                }
2478                // TODO: I think we can delete this, it was only used to inject randomness state update into the tx stream.
2479                SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(),
2480            }
2481        }
2482
2483        commit_handler_input
2484    }
2485
2486    async fn send_end_of_publish_if_needed(&self) {
2487        if !self.epoch_store.should_send_end_of_publish() {
2488            return;
2489        }
2490
2491        let end_of_publish = ConsensusTransaction::new_end_of_publish(self.epoch_store.name);
2492        if let Err(err) =
2493            self.consensus_adapter
2494                .submit(end_of_publish, None, &self.epoch_store, None, None)
2495        {
2496            warn!(
2497                "Error when sending EndOfPublish message from ConsensusHandler: {:?}",
2498                err
2499            );
2500        } else {
2501            info!(epoch=?self.epoch_store.epoch(), "Sending EndOfPublish message to consensus");
2502        }
2503    }
2504}
2505
2506/// Sends transactions to the execution scheduler in a separate task,
2507/// to avoid blocking consensus handler.
2508#[derive(Clone)]
2509pub(crate) struct ExecutionSchedulerSender {
2510    // Using unbounded channel to avoid blocking consensus commit and transaction handler.
2511    sender: monitored_mpsc::UnboundedSender<(
2512        Vec<Schedulable>,
2513        AssignedTxAndVersions,
2514        SchedulingSource,
2515    )>,
2516}
2517
2518impl ExecutionSchedulerSender {
2519    fn start(
2520        execution_scheduler: Arc<ExecutionScheduler>,
2521        epoch_store: Arc<AuthorityPerEpochStore>,
2522    ) -> Self {
2523        let (sender, recv) = monitored_mpsc::unbounded_channel("execution_scheduler_sender");
2524        spawn_monitored_task!(Self::run(recv, execution_scheduler, epoch_store));
2525        Self { sender }
2526    }
2527
2528    pub(crate) fn new_for_testing(
2529        sender: monitored_mpsc::UnboundedSender<(
2530            Vec<Schedulable>,
2531            AssignedTxAndVersions,
2532            SchedulingSource,
2533        )>,
2534    ) -> Self {
2535        Self { sender }
2536    }
2537
2538    fn send(
2539        &self,
2540        transactions: Vec<Schedulable>,
2541        assigned_versions: AssignedTxAndVersions,
2542        scheduling_source: SchedulingSource,
2543    ) {
2544        let _ = self
2545            .sender
2546            .send((transactions, assigned_versions, scheduling_source));
2547    }
2548
2549    async fn run(
2550        mut recv: monitored_mpsc::UnboundedReceiver<(
2551            Vec<Schedulable>,
2552            AssignedTxAndVersions,
2553            SchedulingSource,
2554        )>,
2555        execution_scheduler: Arc<ExecutionScheduler>,
2556        epoch_store: Arc<AuthorityPerEpochStore>,
2557    ) {
2558        while let Some((transactions, assigned_versions, scheduling_source)) = recv.recv().await {
2559            let _guard = monitored_scope("ConsensusHandler::enqueue");
2560            let assigned_versions = assigned_versions.into_map();
2561            let txns = transactions
2562                .into_iter()
2563                .map(|txn| {
2564                    let key = txn.key();
2565                    (
2566                        txn,
2567                        ExecutionEnv::new()
2568                            .with_scheduling_source(scheduling_source)
2569                            .with_assigned_versions(
2570                                assigned_versions.get(&key).cloned().unwrap_or_default(),
2571                            ),
2572                    )
2573                })
2574                .collect();
2575            execution_scheduler.enqueue(txns, &epoch_store);
2576        }
2577    }
2578}
2579
2580/// Manages the lifetime of tasks handling the commits and transactions output by consensus.
2581pub(crate) struct MysticetiConsensusHandler {
2582    tasks: JoinSet<()>,
2583}
2584
2585impl MysticetiConsensusHandler {
2586    pub(crate) fn new(
2587        last_processed_commit_at_startup: CommitIndex,
2588        mut consensus_handler: ConsensusHandler<CheckpointService>,
2589        consensus_block_handler: ConsensusBlockHandler,
2590        mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
2591        mut block_receiver: UnboundedReceiver<consensus_core::CertifiedBlocksOutput>,
2592        commit_consumer_monitor: Arc<CommitConsumerMonitor>,
2593    ) -> Self {
2594        let mut tasks = JoinSet::new();
2595        tasks.spawn(monitored_future!(async move {
2596            // TODO: pause when execution is overloaded, so consensus can detect the backpressure.
2597            while let Some(consensus_commit) = commit_receiver.recv().await {
2598                let commit_index = consensus_commit.commit_ref.index;
2599                if commit_index <= last_processed_commit_at_startup {
2600                    consensus_handler.handle_prior_consensus_commit(consensus_commit);
2601                } else {
2602                    consensus_handler
2603                        .handle_consensus_commit(consensus_commit)
2604                        .await;
2605                }
2606                commit_consumer_monitor.set_highest_handled_commit(commit_index);
2607            }
2608        }));
2609        if consensus_block_handler.enabled() {
2610            tasks.spawn(monitored_future!(async move {
2611                while let Some(blocks) = block_receiver.recv().await {
2612                    consensus_block_handler
2613                        .handle_certified_blocks(blocks)
2614                        .await;
2615                }
2616            }));
2617        }
2618        Self { tasks }
2619    }
2620
2621    pub(crate) async fn abort(&mut self) {
2622        self.tasks.shutdown().await;
2623    }
2624}
2625
2626fn authenticator_state_update_transaction(
2627    epoch_store: &AuthorityPerEpochStore,
2628    round: u64,
2629    mut new_active_jwks: Vec<ActiveJwk>,
2630) -> VerifiedExecutableTransaction {
2631    let epoch = epoch_store.epoch();
2632    new_active_jwks.sort();
2633
2634    info!("creating authenticator state update transaction");
2635    assert!(epoch_store.authenticator_state_enabled());
2636    let transaction = VerifiedTransaction::new_authenticator_state_update(
2637        epoch,
2638        round,
2639        new_active_jwks,
2640        epoch_store
2641            .epoch_start_config()
2642            .authenticator_obj_initial_shared_version()
2643            .expect("authenticator state obj must exist"),
2644    );
2645    VerifiedExecutableTransaction::new_system(transaction, epoch)
2646}
2647
2648pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
2649    match &transaction.kind {
2650        ConsensusTransactionKind::CertifiedTransaction(certificate) => {
2651            if certificate.is_consensus_tx() {
2652                "shared_certificate"
2653            } else {
2654                "owned_certificate"
2655            }
2656        }
2657        ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
2658        ConsensusTransactionKind::CheckpointSignatureV2(_) => "checkpoint_signature",
2659        ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
2660        ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
2661        ConsensusTransactionKind::CapabilityNotificationV2(_) => "capability_notification_v2",
2662        ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
2663        ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
2664        ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
2665        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
2666        ConsensusTransactionKind::UserTransaction(tx) => {
2667            if tx.is_consensus_tx() {
2668                "shared_user_transaction"
2669            } else {
2670                "owned_user_transaction"
2671            }
2672        }
2673        ConsensusTransactionKind::UserTransactionV2(tx) => {
2674            if tx.tx().is_consensus_tx() {
2675                "shared_user_transaction_v2"
2676            } else {
2677                "owned_user_transaction_v2"
2678            }
2679        }
2680        ConsensusTransactionKind::ExecutionTimeObservation(_) => "execution_time_observation",
2681    }
2682}
2683
2684#[derive(Debug, Clone, Serialize, Deserialize)]
2685pub struct SequencedConsensusTransaction {
2686    pub certificate_author_index: AuthorityIndex,
2687    pub certificate_author: AuthorityName,
2688    pub consensus_index: ExecutionIndices,
2689    pub transaction: SequencedConsensusTransactionKind,
2690}
2691
2692#[derive(Debug, Clone)]
2693#[allow(clippy::large_enum_variant)]
2694pub enum SequencedConsensusTransactionKind {
2695    External(ConsensusTransaction),
2696    System(VerifiedExecutableTransaction),
2697}
2698
2699impl Serialize for SequencedConsensusTransactionKind {
2700    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2701        let serializable = SerializableSequencedConsensusTransactionKind::from(self);
2702        serializable.serialize(serializer)
2703    }
2704}
2705
2706impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
2707    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
2708        let serializable =
2709            SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
2710        Ok(serializable.into())
2711    }
2712}
2713
2714// We can't serialize SequencedConsensusTransactionKind directly because it contains a
2715// VerifiedExecutableTransaction, which is not serializable (by design). This wrapper allows us to
2716// convert to a serializable format easily.
2717#[derive(Debug, Clone, Serialize, Deserialize)]
2718#[allow(clippy::large_enum_variant)]
2719enum SerializableSequencedConsensusTransactionKind {
2720    External(ConsensusTransaction),
2721    System(TrustedExecutableTransaction),
2722}
2723
2724impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
2725    fn from(kind: &SequencedConsensusTransactionKind) -> Self {
2726        match kind {
2727            SequencedConsensusTransactionKind::External(ext) => {
2728                SerializableSequencedConsensusTransactionKind::External(ext.clone())
2729            }
2730            SequencedConsensusTransactionKind::System(txn) => {
2731                SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
2732            }
2733        }
2734    }
2735}
2736
2737impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
2738    fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
2739        match kind {
2740            SerializableSequencedConsensusTransactionKind::External(ext) => {
2741                SequencedConsensusTransactionKind::External(ext)
2742            }
2743            SerializableSequencedConsensusTransactionKind::System(txn) => {
2744                SequencedConsensusTransactionKind::System(txn.into())
2745            }
2746        }
2747    }
2748}
2749
2750#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
2751pub enum SequencedConsensusTransactionKey {
2752    External(ConsensusTransactionKey),
2753    System(TransactionDigest),
2754}
2755
2756impl SequencedConsensusTransactionKey {
2757    pub fn user_transaction_digest(&self) -> Option<TransactionDigest> {
2758        match self {
2759            SequencedConsensusTransactionKey::External(key) => match key {
2760                ConsensusTransactionKey::Certificate(digest) => Some(*digest),
2761                _ => None,
2762            },
2763            SequencedConsensusTransactionKey::System(_) => None,
2764        }
2765    }
2766}
2767
2768impl SequencedConsensusTransactionKind {
2769    pub fn key(&self) -> SequencedConsensusTransactionKey {
2770        match self {
2771            SequencedConsensusTransactionKind::External(ext) => {
2772                SequencedConsensusTransactionKey::External(ext.key())
2773            }
2774            SequencedConsensusTransactionKind::System(txn) => {
2775                SequencedConsensusTransactionKey::System(*txn.digest())
2776            }
2777        }
2778    }
2779
2780    pub fn get_tracking_id(&self) -> u64 {
2781        match self {
2782            SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
2783            SequencedConsensusTransactionKind::System(_txn) => 0,
2784        }
2785    }
2786
2787    pub fn is_executable_transaction(&self) -> bool {
2788        match self {
2789            SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(),
2790            SequencedConsensusTransactionKind::System(_) => true,
2791        }
2792    }
2793
2794    pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
2795        match self {
2796            SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
2797                ConsensusTransactionKind::CertifiedTransaction(txn) => Some(*txn.digest()),
2798                ConsensusTransactionKind::UserTransaction(txn) => Some(*txn.digest()),
2799                ConsensusTransactionKind::UserTransactionV2(txn) => Some(*txn.tx().digest()),
2800                _ => None,
2801            },
2802            SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
2803        }
2804    }
2805
2806    pub fn is_end_of_publish(&self) -> bool {
2807        match self {
2808            SequencedConsensusTransactionKind::External(ext) => {
2809                matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
2810            }
2811            SequencedConsensusTransactionKind::System(_) => false,
2812        }
2813    }
2814}
2815
2816impl SequencedConsensusTransaction {
2817    pub fn sender_authority(&self) -> AuthorityName {
2818        self.certificate_author
2819    }
2820
2821    pub fn key(&self) -> SequencedConsensusTransactionKey {
2822        self.transaction.key()
2823    }
2824
2825    pub fn is_end_of_publish(&self) -> bool {
2826        if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
2827            matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
2828        } else {
2829            false
2830        }
2831    }
2832
2833    pub fn try_take_execution_time_observation(&mut self) -> Option<ExecutionTimeObservation> {
2834        if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
2835            kind: ConsensusTransactionKind::ExecutionTimeObservation(observation),
2836            ..
2837        }) = &mut self.transaction
2838        {
2839            Some(std::mem::take(observation))
2840        } else {
2841            None
2842        }
2843    }
2844
2845    pub fn is_system(&self) -> bool {
2846        matches!(
2847            self.transaction,
2848            SequencedConsensusTransactionKind::System(_)
2849        )
2850    }
2851
2852    pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
2853        if !randomness_state_enabled {
2854            // If randomness is disabled, these should be processed same as a tx without randomness,
2855            // which will eventually fail when the randomness state object is not found.
2856            return false;
2857        }
2858        match &self.transaction {
2859            SequencedConsensusTransactionKind::External(ConsensusTransaction {
2860                kind: ConsensusTransactionKind::CertifiedTransaction(cert),
2861                ..
2862            }) => cert.transaction_data().uses_randomness(),
2863            SequencedConsensusTransactionKind::External(ConsensusTransaction {
2864                kind: ConsensusTransactionKind::UserTransaction(txn),
2865                ..
2866            }) => txn.transaction_data().uses_randomness(),
2867            SequencedConsensusTransactionKind::External(ConsensusTransaction {
2868                kind: ConsensusTransactionKind::UserTransactionV2(txn),
2869                ..
2870            }) => txn.tx().transaction_data().uses_randomness(),
2871            _ => false,
2872        }
2873    }
2874
2875    pub fn as_consensus_txn(&self) -> Option<&SenderSignedData> {
2876        match &self.transaction {
2877            SequencedConsensusTransactionKind::External(ConsensusTransaction {
2878                kind: ConsensusTransactionKind::CertifiedTransaction(certificate),
2879                ..
2880            }) if certificate.is_consensus_tx() => Some(certificate.data()),
2881            SequencedConsensusTransactionKind::External(ConsensusTransaction {
2882                kind: ConsensusTransactionKind::UserTransaction(txn),
2883                ..
2884            }) if txn.is_consensus_tx() => Some(txn.data()),
2885            SequencedConsensusTransactionKind::External(ConsensusTransaction {
2886                kind: ConsensusTransactionKind::UserTransactionV2(txn),
2887                ..
2888            }) if txn.tx().is_consensus_tx() => Some(txn.tx().data()),
2889            SequencedConsensusTransactionKind::System(txn) if txn.is_consensus_tx() => {
2890                Some(txn.data())
2891            }
2892            _ => None,
2893        }
2894    }
2895}
2896
2897#[derive(Debug, Clone, Serialize, Deserialize)]
2898pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
2899
2900#[cfg(test)]
2901impl VerifiedSequencedConsensusTransaction {
2902    pub fn new_test(transaction: ConsensusTransaction) -> Self {
2903        Self(SequencedConsensusTransaction::new_test(transaction))
2904    }
2905}
2906
2907impl SequencedConsensusTransaction {
2908    pub fn new_test(transaction: ConsensusTransaction) -> Self {
2909        Self {
2910            certificate_author_index: 0,
2911            certificate_author: AuthorityName::ZERO,
2912            consensus_index: Default::default(),
2913            transaction: SequencedConsensusTransactionKind::External(transaction),
2914        }
2915    }
2916}
2917
2918/// Handles certified and rejected transactions output by consensus.
2919pub(crate) struct ConsensusBlockHandler {
2920    /// Whether to enable handling certified transactions.
2921    enabled: bool,
2922    /// Per-epoch store.
2923    epoch_store: Arc<AuthorityPerEpochStore>,
2924    /// Enqueues transactions to the execution scheduler via a separate task.
2925    execution_scheduler_sender: ExecutionSchedulerSender,
2926    /// Backpressure subscriber to wait for backpressure to be resolved.
2927    backpressure_subscriber: BackpressureSubscriber,
2928    /// Metrics for consensus transaction handling.
2929    metrics: Arc<AuthorityMetrics>,
2930}
2931
2932impl ConsensusBlockHandler {
2933    pub fn new(
2934        epoch_store: Arc<AuthorityPerEpochStore>,
2935        execution_scheduler_sender: ExecutionSchedulerSender,
2936        backpressure_subscriber: BackpressureSubscriber,
2937        metrics: Arc<AuthorityMetrics>,
2938    ) -> Self {
2939        Self {
2940            // Disable mysticeti fastpath execution when preconsensus locking is disabled,
2941            // ensuring all transactions go through normal consensus commit path
2942            // where post-consensus conflict detection runs.
2943            enabled: epoch_store.protocol_config().mysticeti_fastpath()
2944                && !epoch_store.protocol_config().disable_preconsensus_locking(),
2945            epoch_store,
2946            execution_scheduler_sender,
2947            backpressure_subscriber,
2948            metrics,
2949        }
2950    }
2951
2952    pub fn enabled(&self) -> bool {
2953        self.enabled
2954    }
2955
2956    #[instrument(level = "debug", skip_all)]
2957    async fn handle_certified_blocks(&self, blocks_output: CertifiedBlocksOutput) {
2958        self.backpressure_subscriber.await_no_backpressure().await;
2959
2960        let _scope = monitored_scope("ConsensusBlockHandler::handle_certified_blocks");
2961
2962        // Avoid triggering fastpath execution or setting transaction status to fastpath certified, during reconfiguration.
2963        let reconfiguration_lock = self.epoch_store.get_reconfig_state_read_lock_guard();
2964        if !reconfiguration_lock.should_accept_user_certs() {
2965            debug!(
2966                "Skipping fastpath execution because epoch {} is closing user transactions: {}",
2967                self.epoch_store.epoch(),
2968                blocks_output
2969                    .blocks
2970                    .iter()
2971                    .map(|b| b.block.reference().to_string())
2972                    .join(", "),
2973            );
2974            return;
2975        }
2976
2977        self.metrics.consensus_block_handler_block_processed.inc();
2978        let epoch = self.epoch_store.epoch();
2979        let parsed_transactions = blocks_output
2980            .blocks
2981            .into_iter()
2982            .map(|certified_block| {
2983                let block_ref = certified_block.block.reference();
2984                let transactions =
2985                    parse_block_transactions(&certified_block.block, &certified_block.rejected);
2986                (block_ref, transactions)
2987            })
2988            .collect::<Vec<_>>();
2989        let mut executable_transactions = vec![];
2990        for (block, transactions) in parsed_transactions.into_iter() {
2991            // Set the "ping" transaction status for this block. This is ncecessary as there might be some ping requests waiting for the ping transaction to be certified.
2992            self.epoch_store.set_consensus_tx_status(
2993                ConsensusPosition::ping(epoch, block),
2994                ConsensusTxStatus::FastpathCertified,
2995            );
2996
2997            for (txn_idx, parsed) in transactions.into_iter().enumerate() {
2998                let position = ConsensusPosition {
2999                    epoch,
3000                    block,
3001                    index: txn_idx as TransactionIndex,
3002                };
3003
3004                let status_str = if parsed.rejected {
3005                    "rejected"
3006                } else {
3007                    "certified"
3008                };
3009                if let Some(tx) = parsed.transaction.kind.as_user_transaction() {
3010                    debug!(
3011                        "User Transaction in position: {:} with digest {:} is {:}",
3012                        position,
3013                        tx.digest(),
3014                        status_str
3015                    );
3016                } else {
3017                    debug!(
3018                        "System Transaction in position: {:} is {:}",
3019                        position, status_str
3020                    );
3021                }
3022
3023                if parsed.rejected {
3024                    // TODO(fastpath): avoid parsing blocks twice between handling commit and fastpath transactions?
3025                    self.epoch_store
3026                        .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
3027                    self.metrics
3028                        .consensus_block_handler_txn_processed
3029                        .with_label_values(&["rejected"])
3030                        .inc();
3031                    continue;
3032                }
3033
3034                self.metrics
3035                    .consensus_block_handler_txn_processed
3036                    .with_label_values(&["certified"])
3037                    .inc();
3038
3039                if let Some(tx) = parsed.transaction.kind.into_user_transaction() {
3040                    if tx.is_consensus_tx() {
3041                        continue;
3042                    }
3043                    // Only set fastpath certified status on transactions intended for fastpath execution.
3044                    self.epoch_store
3045                        .set_consensus_tx_status(position, ConsensusTxStatus::FastpathCertified);
3046                    let tx = VerifiedTransaction::new_unchecked(tx);
3047                    executable_transactions.push(Schedulable::Transaction(
3048                        VerifiedExecutableTransaction::new_from_consensus(
3049                            tx,
3050                            self.epoch_store.epoch(),
3051                        ),
3052                    ));
3053                }
3054            }
3055        }
3056
3057        if executable_transactions.is_empty() {
3058            return;
3059        }
3060        self.metrics
3061            .consensus_block_handler_fastpath_executions
3062            .inc_by(executable_transactions.len() as u64);
3063
3064        self.execution_scheduler_sender.send(
3065            executable_transactions,
3066            Default::default(),
3067            SchedulingSource::MysticetiFastPath,
3068        );
3069    }
3070}
3071
3072#[derive(Serialize, Deserialize)]
3073pub(crate) struct CommitIntervalObserver {
3074    ring_buffer: VecDeque<u64>,
3075}
3076
3077impl CommitIntervalObserver {
3078    pub fn new(window_size: u32) -> Self {
3079        Self {
3080            ring_buffer: VecDeque::with_capacity(window_size as usize),
3081        }
3082    }
3083
3084    pub fn observe_commit_time(&mut self, consensus_commit: &impl ConsensusCommitAPI) {
3085        let commit_time = consensus_commit.commit_timestamp_ms();
3086        if self.ring_buffer.len() == self.ring_buffer.capacity() {
3087            self.ring_buffer.pop_front();
3088        }
3089        self.ring_buffer.push_back(commit_time);
3090    }
3091
3092    pub fn commit_interval_estimate(&self) -> Option<Duration> {
3093        if self.ring_buffer.len() <= 1 {
3094            None
3095        } else {
3096            let first = self.ring_buffer.front().unwrap();
3097            let last = self.ring_buffer.back().unwrap();
3098            let duration = last.saturating_sub(*first);
3099            let num_commits = self.ring_buffer.len() as u64;
3100            Some(Duration::from_millis(duration.div_ceil(num_commits)))
3101        }
3102    }
3103}
3104
3105#[cfg(test)]
3106mod tests {
3107    use std::collections::HashSet;
3108
3109    use consensus_core::{
3110        BlockAPI, CertifiedBlock, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction,
3111        VerifiedBlock,
3112    };
3113    use consensus_types::block::TransactionIndex;
3114    use futures::pin_mut;
3115    use prometheus::Registry;
3116    use sui_protocol_config::{ConsensusTransactionOrdering, ProtocolConfig};
3117    use sui_types::{
3118        base_types::ExecutionDigests,
3119        base_types::{AuthorityName, FullObjectRef, ObjectID, SuiAddress, random_object_ref},
3120        committee::Committee,
3121        crypto::deterministic_random_account_key,
3122        gas::GasCostSummary,
3123        message_envelope::Message,
3124        messages_checkpoint::{
3125            CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
3126            SignedCheckpointSummary,
3127        },
3128        messages_consensus::ConsensusTransaction,
3129        object::Object,
3130        transaction::{
3131            CertifiedTransaction, SenderSignedData, TransactionData, TransactionDataAPI,
3132            VerifiedCertificate,
3133        },
3134    };
3135
3136    use super::*;
3137    use crate::{
3138        authority::{
3139            authority_per_epoch_store::ConsensusStatsAPI,
3140            test_authority_builder::TestAuthorityBuilder,
3141        },
3142        checkpoints::CheckpointServiceNoop,
3143        consensus_adapter::consensus_tests::test_user_transaction,
3144        consensus_test_utils::make_consensus_adapter_for_test,
3145        post_consensus_tx_reorder::PostConsensusTxReorder,
3146    };
3147
3148    #[tokio::test(flavor = "current_thread", start_paused = true)]
3149    async fn test_consensus_commit_handler() {
3150        telemetry_subscribers::init_for_testing();
3151
3152        // GIVEN
3153        // 1 account keypair
3154        let (sender, keypair) = deterministic_random_account_key();
3155        // 12 gas objects.
3156        let gas_objects: Vec<Object> = (0..12)
3157            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3158            .collect();
3159        // 4 owned objects.
3160        let owned_objects: Vec<Object> = (0..4)
3161            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3162            .collect();
3163        // 6 shared objects.
3164        let shared_objects: Vec<Object> = (0..6)
3165            .map(|_| Object::shared_for_testing())
3166            .collect::<Vec<_>>();
3167        let mut all_objects = gas_objects.clone();
3168        all_objects.extend(owned_objects.clone());
3169        all_objects.extend(shared_objects.clone());
3170
3171        let network_config =
3172            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3173                .with_objects(all_objects.clone())
3174                .build();
3175
3176        let state = TestAuthorityBuilder::new()
3177            .with_network_config(&network_config, 0)
3178            .build()
3179            .await;
3180
3181        let epoch_store = state.epoch_store_for_testing().clone();
3182        let new_epoch_start_state = epoch_store.epoch_start_state();
3183        let consensus_committee = new_epoch_start_state.get_consensus_committee();
3184
3185        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3186
3187        let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
3188
3189        let backpressure_manager = BackpressureManager::new_for_tests();
3190        let consensus_adapter =
3191            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3192        let mut consensus_handler = ConsensusHandler::new(
3193            epoch_store,
3194            Arc::new(CheckpointServiceNoop {}),
3195            state.execution_scheduler().clone(),
3196            consensus_adapter,
3197            state.get_object_cache_reader().clone(),
3198            Arc::new(ArcSwap::default()),
3199            consensus_committee.clone(),
3200            metrics,
3201            Arc::new(throughput_calculator),
3202            backpressure_manager.subscribe(),
3203            state.traffic_controller.clone(),
3204        );
3205
3206        // AND create test user transactions alternating between owned and shared input.
3207        let mut user_transactions = vec![];
3208        for (i, gas_object) in gas_objects[0..8].iter().enumerate() {
3209            let input_object = if i % 2 == 0 {
3210                owned_objects.get(i / 2).unwrap().clone()
3211            } else {
3212                shared_objects.get(i / 2).unwrap().clone()
3213            };
3214            let transaction = test_user_transaction(
3215                &state,
3216                sender,
3217                &keypair,
3218                gas_object.clone(),
3219                vec![input_object],
3220            )
3221            .await;
3222            user_transactions.push(transaction);
3223        }
3224
3225        // AND create 4 more user transactions with remaining gas objects and 2 shared objects.
3226        // Having more txns on the same shared object may get deferred.
3227        for (i, gas_object) in gas_objects[8..12].iter().enumerate() {
3228            let shared_object = if i < 2 {
3229                shared_objects[4].clone()
3230            } else {
3231                shared_objects[5].clone()
3232            };
3233            let transaction = test_user_transaction(
3234                &state,
3235                sender,
3236                &keypair,
3237                gas_object.clone(),
3238                vec![shared_object],
3239            )
3240            .await;
3241            user_transactions.push(transaction);
3242        }
3243
3244        // AND create block for each user transaction
3245        let mut blocks = Vec::new();
3246        for (i, consensus_transaction) in user_transactions
3247            .iter()
3248            .cloned()
3249            .map(|t| ConsensusTransaction::new_user_transaction_v2_message(&state.name, t.into()))
3250            .enumerate()
3251        {
3252            let transaction_bytes = bcs::to_bytes(&consensus_transaction).unwrap();
3253            let block = VerifiedBlock::new_for_test(
3254                TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
3255                    .set_transactions(vec![Transaction::new(transaction_bytes)])
3256                    .build(),
3257            );
3258
3259            blocks.push(block);
3260        }
3261
3262        // AND create the consensus commit
3263        let leader_block = blocks[0].clone();
3264        let committed_sub_dag = CommittedSubDag::new(
3265            leader_block.reference(),
3266            blocks.clone(),
3267            leader_block.timestamp_ms(),
3268            CommitRef::new(10, CommitDigest::MIN),
3269        );
3270
3271        // Test that the consensus handler respects backpressure.
3272        backpressure_manager.set_backpressure(true);
3273        // Default watermarks are 0,0 which will suppress the backpressure.
3274        backpressure_manager.update_highest_certified_checkpoint(1);
3275
3276        // AND process the consensus commit once
3277        {
3278            let waiter = consensus_handler.handle_consensus_commit(committed_sub_dag.clone());
3279            pin_mut!(waiter);
3280
3281            // waiter should not complete within 5 seconds
3282            tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
3283                .await
3284                .unwrap_err();
3285
3286            // lift backpressure
3287            backpressure_manager.set_backpressure(false);
3288
3289            // waiter completes now.
3290            tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
3291                .await
3292                .unwrap();
3293        }
3294
3295        // THEN check the consensus stats
3296        let num_blocks = blocks.len();
3297        let num_transactions = user_transactions.len();
3298        let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
3299        assert_eq!(
3300            last_consensus_stats_1.index.transaction_index,
3301            num_transactions as u64
3302        );
3303        assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
3304        assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
3305        assert_eq!(last_consensus_stats_1.hash, 0);
3306        assert_eq!(
3307            last_consensus_stats_1.stats.get_num_messages(0),
3308            num_blocks as u64
3309        );
3310        assert_eq!(
3311            last_consensus_stats_1.stats.get_num_user_transactions(0),
3312            num_transactions as u64
3313        );
3314
3315        // THEN check for execution status of user transactions.
3316        for (i, t) in user_transactions.iter().enumerate() {
3317            let digest = t.tx().digest();
3318            if let Ok(Ok(_)) = tokio::time::timeout(
3319                std::time::Duration::from_secs(10),
3320                state.notify_read_effects("", *digest),
3321            )
3322            .await
3323            {
3324                // Effects exist as expected.
3325            } else {
3326                panic!("User transaction {} {} did not execute", i, digest);
3327            }
3328        }
3329
3330        // THEN check for no inflight or suspended transactions.
3331        state.execution_scheduler().check_empty_for_testing();
3332    }
3333
3334    #[tokio::test]
3335    async fn test_consensus_block_handler() {
3336        // GIVEN
3337        // 1 account keypair
3338        let (sender, keypair) = deterministic_random_account_key();
3339        // 8 gas objects.
3340        let gas_objects: Vec<Object> = (0..8)
3341            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3342            .collect();
3343        // 4 owned objects.
3344        let owned_objects: Vec<Object> = (0..4)
3345            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3346            .collect();
3347        // 4 shared objects.
3348        let shared_objects: Vec<Object> = (0..4)
3349            .map(|_| Object::shared_for_testing())
3350            .collect::<Vec<_>>();
3351        let mut all_objects = gas_objects.clone();
3352        all_objects.extend(owned_objects.clone());
3353        all_objects.extend(shared_objects.clone());
3354
3355        let network_config =
3356            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3357                .with_objects(all_objects.clone())
3358                .build();
3359
3360        let state = TestAuthorityBuilder::new()
3361            .with_network_config(&network_config, 0)
3362            .build()
3363            .await;
3364        let epoch_store = state.epoch_store_for_testing().clone();
3365        let execution_scheduler_sender = ExecutionSchedulerSender::start(
3366            state.execution_scheduler().clone(),
3367            epoch_store.clone(),
3368        );
3369
3370        let backpressure_manager = BackpressureManager::new_for_tests();
3371        let block_handler = ConsensusBlockHandler::new(
3372            epoch_store.clone(),
3373            execution_scheduler_sender,
3374            backpressure_manager.subscribe(),
3375            state.metrics.clone(),
3376        );
3377
3378        // AND create test transactions alternating between owned and shared input.
3379        let mut transactions = vec![];
3380        for (i, gas_object) in gas_objects.iter().enumerate() {
3381            let input_object = if i % 2 == 0 {
3382                owned_objects.get(i / 2).unwrap().clone()
3383            } else {
3384                shared_objects.get(i / 2).unwrap().clone()
3385            };
3386            let transaction = test_user_transaction(
3387                &state,
3388                sender,
3389                &keypair,
3390                gas_object.clone(),
3391                vec![input_object],
3392            )
3393            .await;
3394            transactions.push(transaction);
3395        }
3396
3397        let serialized_transactions: Vec<_> = transactions
3398            .iter()
3399            .cloned()
3400            .map(|t| {
3401                Transaction::new(
3402                    bcs::to_bytes(&ConsensusTransaction::new_user_transaction_v2_message(
3403                        &state.name,
3404                        t.into(),
3405                    ))
3406                    .unwrap(),
3407                )
3408            })
3409            .collect();
3410
3411        // AND create block for all transactions
3412        let block = VerifiedBlock::new_for_test(
3413            TestBlock::new(100, 1)
3414                .set_transactions(serialized_transactions.clone())
3415                .build(),
3416        );
3417
3418        // AND set rejected transactions.
3419        let rejected_transactions = vec![0, 3, 4];
3420
3421        // AND process the transactions from consensus output.
3422        block_handler
3423            .handle_certified_blocks(CertifiedBlocksOutput {
3424                blocks: vec![CertifiedBlock {
3425                    block: block.clone(),
3426                    rejected: rejected_transactions.clone(),
3427                }],
3428            })
3429            .await;
3430
3431        // Ensure the correct consensus status is set for the correct consensus position
3432        let consensus_tx_status_cache = epoch_store.consensus_tx_status_cache.as_ref().unwrap();
3433        for txn_idx in 0..transactions.len() {
3434            let position = ConsensusPosition {
3435                epoch: epoch_store.epoch(),
3436                block: block.reference(),
3437                index: txn_idx as TransactionIndex,
3438            };
3439            if rejected_transactions.contains(&(txn_idx as TransactionIndex)) {
3440                // Expect rejected transactions to be marked as such.
3441                assert_eq!(
3442                    consensus_tx_status_cache.get_transaction_status(&position),
3443                    Some(ConsensusTxStatus::Rejected)
3444                );
3445            } else if txn_idx % 2 == 0 {
3446                // Expect owned object transactions to be marked as fastpath certified.
3447                assert_eq!(
3448                    consensus_tx_status_cache.get_transaction_status(&position),
3449                    Some(ConsensusTxStatus::FastpathCertified),
3450                );
3451            } else {
3452                // Expect shared object transactions to be marked as fastpath certified.
3453                assert_eq!(
3454                    consensus_tx_status_cache.get_transaction_status(&position),
3455                    None,
3456                );
3457            }
3458        }
3459
3460        // THEN check for status of transactions that should have been executed.
3461        for (i, t) in transactions.iter().enumerate() {
3462            // Do not expect shared transactions or rejected transactions to be executed.
3463            if i % 2 == 1 || rejected_transactions.contains(&(i as TransactionIndex)) {
3464                continue;
3465            }
3466            let digest = t.tx().digest();
3467            if tokio::time::timeout(
3468                std::time::Duration::from_secs(10),
3469                state
3470                    .get_transaction_cache_reader()
3471                    .notify_read_fastpath_transaction_outputs(&[*digest]),
3472            )
3473            .await
3474            .is_err()
3475            {
3476                panic!("Transaction {} {} did not execute", i, digest);
3477            }
3478        }
3479
3480        // THEN check for no inflight or suspended transactions.
3481        state.execution_scheduler().check_empty_for_testing();
3482
3483        // THEN check that rejected transactions are not executed.
3484        for (i, t) in transactions.iter().enumerate() {
3485            // Expect shared transactions or rejected transactions to not have executed.
3486            if i % 2 == 0 && !rejected_transactions.contains(&(i as TransactionIndex)) {
3487                continue;
3488            }
3489            let digest = t.tx().digest();
3490            assert!(
3491                !state.is_tx_already_executed(digest),
3492                "Rejected transaction {} {} should not have been executed",
3493                i,
3494                digest
3495            );
3496        }
3497    }
3498
3499    fn to_short_strings(txs: Vec<VerifiedExecutableTransactionWithAliases>) -> Vec<String> {
3500        txs.into_iter()
3501            .map(|tx| format!("transaction({})", tx.tx().transaction_data().gas_price()))
3502            .collect()
3503    }
3504
3505    #[test]
3506    fn test_order_by_gas_price() {
3507        let mut v = vec![user_txn(42), user_txn(100)];
3508        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3509        assert_eq!(
3510            to_short_strings(v),
3511            vec![
3512                "transaction(100)".to_string(),
3513                "transaction(42)".to_string(),
3514            ]
3515        );
3516
3517        let mut v = vec![
3518            user_txn(1200),
3519            user_txn(12),
3520            user_txn(1000),
3521            user_txn(42),
3522            user_txn(100),
3523            user_txn(1000),
3524        ];
3525        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3526        assert_eq!(
3527            to_short_strings(v),
3528            vec![
3529                "transaction(1200)".to_string(),
3530                "transaction(1000)".to_string(),
3531                "transaction(1000)".to_string(),
3532                "transaction(100)".to_string(),
3533                "transaction(42)".to_string(),
3534                "transaction(12)".to_string(),
3535            ]
3536        );
3537    }
3538
3539    #[tokio::test(flavor = "current_thread")]
3540    async fn test_checkpoint_signature_dedup() {
3541        telemetry_subscribers::init_for_testing();
3542
3543        let network_config =
3544            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3545        let state = TestAuthorityBuilder::new()
3546            .with_network_config(&network_config, 0)
3547            .build()
3548            .await;
3549
3550        let epoch_store = state.epoch_store_for_testing().clone();
3551        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3552
3553        let make_signed = || {
3554            let epoch = epoch_store.epoch();
3555            let contents =
3556                CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3557            let summary = CheckpointSummary::new(
3558                &ProtocolConfig::get_for_max_version_UNSAFE(),
3559                epoch,
3560                42, // sequence number
3561                10, // network_total_transactions
3562                &contents,
3563                None, // previous_digest
3564                GasCostSummary::default(),
3565                None,       // end_of_epoch_data
3566                0,          // timestamp
3567                Vec::new(), // randomness_rounds
3568                Vec::new(), // checkpoint_artifact_digests
3569            );
3570            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name)
3571        };
3572
3573        // Prepare V2 pair: same (authority, seq), different digests => different keys
3574        let v2_s1 = make_signed();
3575        let v2_s1_clone = v2_s1.clone();
3576        let v2_digest_a = v2_s1.data().digest();
3577        let v2_a =
3578            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3579                summary: v2_s1,
3580            });
3581
3582        let v2_s2 = make_signed();
3583        let v2_digest_b = v2_s2.data().digest();
3584        let v2_b =
3585            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3586                summary: v2_s2,
3587            });
3588
3589        assert_ne!(v2_digest_a, v2_digest_b);
3590
3591        // Create an exact duplicate with same digest to exercise valid dedup
3592        assert_eq!(v2_s1_clone.data().digest(), v2_digest_a);
3593        let v2_dup =
3594            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3595                summary: v2_s1_clone,
3596            });
3597
3598        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3599        let block = VerifiedBlock::new_for_test(
3600            TestBlock::new(100, 0)
3601                .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)])
3602                .build(),
3603        );
3604        let commit = CommittedSubDag::new(
3605            block.reference(),
3606            vec![block.clone()],
3607            block.timestamp_ms(),
3608            CommitRef::new(10, CommitDigest::MIN),
3609        );
3610
3611        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3612        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3613        let backpressure = BackpressureManager::new_for_tests();
3614        let consensus_adapter =
3615            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3616        let mut handler = ConsensusHandler::new(
3617            epoch_store.clone(),
3618            Arc::new(CheckpointServiceNoop {}),
3619            state.execution_scheduler().clone(),
3620            consensus_adapter,
3621            state.get_object_cache_reader().clone(),
3622            Arc::new(ArcSwap::default()),
3623            consensus_committee.clone(),
3624            metrics,
3625            Arc::new(throughput),
3626            backpressure.subscribe(),
3627            state.traffic_controller.clone(),
3628        );
3629
3630        handler.handle_consensus_commit(commit).await;
3631
3632        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3633        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3634
3635        // V2 distinct digests: both must be processed. If these were collapsed to one CheckpointSeq num, only one would process.
3636        let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a));
3637        let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b));
3638        assert!(
3639            epoch_store
3640                .is_consensus_message_processed(&v2_key_a)
3641                .unwrap()
3642        );
3643        assert!(
3644            epoch_store
3645                .is_consensus_message_processed(&v2_key_b)
3646                .unwrap()
3647        );
3648    }
3649
3650    #[tokio::test(flavor = "current_thread")]
3651    async fn test_verify_consensus_transaction_filters_mismatched_authorities() {
3652        telemetry_subscribers::init_for_testing();
3653
3654        let network_config =
3655            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3656        let state = TestAuthorityBuilder::new()
3657            .with_network_config(&network_config, 0)
3658            .build()
3659            .await;
3660
3661        let epoch_store = state.epoch_store_for_testing().clone();
3662        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3663
3664        // Create a different authority than our test authority
3665        use fastcrypto::traits::KeyPair;
3666        let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair();
3667        let wrong_authority: AuthorityName = wrong_keypair.public().into();
3668
3669        // Create EndOfPublish transaction with mismatched authority
3670        let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority);
3671
3672        // Create valid EndOfPublish transaction with correct authority
3673        let valid_eop = ConsensusTransaction::new_end_of_publish(state.name);
3674
3675        // Create CheckpointSignature with mismatched authority
3676        let epoch = epoch_store.epoch();
3677        let contents =
3678            CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3679        let summary = CheckpointSummary::new(
3680            &ProtocolConfig::get_for_max_version_UNSAFE(),
3681            epoch,
3682            42, // sequence number
3683            10, // network_total_transactions
3684            &contents,
3685            None, // previous_digest
3686            GasCostSummary::default(),
3687            None,       // end_of_epoch_data
3688            0,          // timestamp
3689            Vec::new(), // randomness_rounds
3690            Vec::new(), // checkpoint commitments
3691        );
3692
3693        // Create a signed checkpoint with the wrong authority
3694        let mismatched_checkpoint_signed =
3695            SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority);
3696        let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest();
3697        let mismatched_checkpoint =
3698            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3699                summary: mismatched_checkpoint_signed,
3700            });
3701
3702        // Create a valid checkpoint signature with correct authority
3703        let valid_checkpoint_signed =
3704            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name);
3705        let valid_checkpoint_digest = valid_checkpoint_signed.data().digest();
3706        let valid_checkpoint =
3707            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3708                summary: valid_checkpoint_signed,
3709            });
3710
3711        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3712
3713        // Create a block with both valid and invalid transactions
3714        let block = VerifiedBlock::new_for_test(
3715            TestBlock::new(100, 0)
3716                .set_transactions(vec![
3717                    to_tx(&mismatched_eop),
3718                    to_tx(&valid_eop),
3719                    to_tx(&mismatched_checkpoint),
3720                    to_tx(&valid_checkpoint),
3721                ])
3722                .build(),
3723        );
3724        let commit = CommittedSubDag::new(
3725            block.reference(),
3726            vec![block.clone()],
3727            block.timestamp_ms(),
3728            CommitRef::new(10, CommitDigest::MIN),
3729        );
3730
3731        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3732        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3733        let backpressure = BackpressureManager::new_for_tests();
3734        let consensus_adapter =
3735            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3736        let mut handler = ConsensusHandler::new(
3737            epoch_store.clone(),
3738            Arc::new(CheckpointServiceNoop {}),
3739            state.execution_scheduler().clone(),
3740            consensus_adapter,
3741            state.get_object_cache_reader().clone(),
3742            Arc::new(ArcSwap::default()),
3743            consensus_committee.clone(),
3744            metrics,
3745            Arc::new(throughput),
3746            backpressure.subscribe(),
3747            state.traffic_controller.clone(),
3748        );
3749
3750        handler.handle_consensus_commit(commit).await;
3751
3752        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3753        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3754
3755        // Check that valid transactions were processed
3756        let valid_eop_key = SK::External(CK::EndOfPublish(state.name));
3757        assert!(
3758            epoch_store
3759                .is_consensus_message_processed(&valid_eop_key)
3760                .unwrap(),
3761            "Valid EndOfPublish should have been processed"
3762        );
3763
3764        let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
3765            state.name,
3766            42,
3767            valid_checkpoint_digest,
3768        ));
3769        assert!(
3770            epoch_store
3771                .is_consensus_message_processed(&valid_checkpoint_key)
3772                .unwrap(),
3773            "Valid CheckpointSignature should have been processed"
3774        );
3775
3776        // Check that mismatched authority transactions were NOT processed (filtered out by verify_consensus_transaction)
3777        let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority));
3778        assert!(
3779            !epoch_store
3780                .is_consensus_message_processed(&mismatched_eop_key)
3781                .unwrap(),
3782            "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)"
3783        );
3784
3785        let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
3786            wrong_authority,
3787            42,
3788            mismatched_checkpoint_digest,
3789        ));
3790        assert!(
3791            !epoch_store
3792                .is_consensus_message_processed(&mismatched_checkpoint_key)
3793                .unwrap(),
3794            "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)"
3795        );
3796    }
3797
3798    fn user_txn(gas_price: u64) -> VerifiedExecutableTransactionWithAliases {
3799        let (committee, keypairs) = Committee::new_simple_test_committee();
3800        let data = SenderSignedData::new(
3801            TransactionData::new_transfer(
3802                SuiAddress::default(),
3803                FullObjectRef::from_fastpath_ref(random_object_ref()),
3804                SuiAddress::default(),
3805                random_object_ref(),
3806                1000 * gas_price,
3807                gas_price,
3808            ),
3809            vec![],
3810        );
3811        let tx = VerifiedExecutableTransaction::new_from_certificate(
3812            VerifiedCertificate::new_unchecked(
3813                CertifiedTransaction::new_from_keypairs_for_testing(data, &keypairs, &committee),
3814            ),
3815        );
3816        VerifiedExecutableTransactionWithAliases::no_aliases(tx)
3817    }
3818}