sui_core/
consensus_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet, VecDeque},
6    hash::Hash,
7    num::NonZeroUsize,
8    sync::{Arc, Mutex},
9    time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use consensus_config::Committee as ConsensusCommittee;
13use consensus_core::{CommitConsumerMonitor, CommitIndex, CommitRef};
14use consensus_types::block::TransactionIndex;
15use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
16use lru::LruCache;
17use mysten_common::{
18    assert_reachable, assert_sometimes, debug_fatal, random_util::randomize_cache_capacity_in_tests,
19};
20use mysten_metrics::{
21    monitored_future,
22    monitored_mpsc::{self, UnboundedReceiver},
23    monitored_scope, spawn_monitored_task,
24};
25use nonempty::NonEmpty;
26use parking_lot::RwLockWriteGuard;
27use serde::{Deserialize, Serialize};
28use sui_config::node::CongestionLogConfig;
29use sui_macros::{fail_point, fail_point_arg, fail_point_if};
30use sui_protocol_config::{PerObjectCongestionControlMode, ProtocolConfig};
31use sui_types::{
32    SUI_RANDOMNESS_STATE_OBJECT_ID,
33    authenticator_state::ActiveJwk,
34    base_types::{
35        AuthorityName, ConciseableName, ConsensusObjectSequenceKey, ObjectID, ObjectRef,
36        SequenceNumber, TransactionDigest,
37    },
38    crypto::RandomnessRound,
39    digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest, Digest},
40    executable_transaction::{
41        TrustedExecutableTransaction, VerifiedExecutableTransaction,
42        VerifiedExecutableTransactionWithAliases,
43    },
44    messages_checkpoint::{
45        CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointTimestamp,
46    },
47    messages_consensus::{
48        AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments,
49        ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
50        ExecutionTimeObservation,
51    },
52    sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
53    transaction::{
54        InputObjectKind, SenderSignedData, TransactionDataAPI, TransactionKey, VerifiedTransaction,
55        WithAliases,
56    },
57};
58use tokio::task::JoinSet;
59use tracing::{debug, error, info, instrument, trace, warn};
60
61use crate::{
62    authority::{
63        AuthorityMetrics, AuthorityState, ExecutionEnv,
64        authority_per_epoch_store::{
65            AuthorityPerEpochStore, CancelConsensusCertificateReason, ConsensusStats,
66            ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStatsV2,
67            consensus_quarantine::ConsensusCommitOutput,
68        },
69        backpressure::{BackpressureManager, BackpressureSubscriber},
70        congestion_log::CongestionCommitLogger,
71        consensus_tx_status_cache::ConsensusTxStatus,
72        epoch_start_configuration::EpochStartConfigTrait,
73        execution_time_estimator::ExecutionTimeEstimator,
74        shared_object_congestion_tracker::SharedObjectCongestionTracker,
75        shared_object_version_manager::{AssignedTxAndVersions, AssignedVersions, Schedulable},
76        transaction_deferral::{DeferralKey, DeferralReason, transaction_deferral_within_limit},
77    },
78    checkpoints::{
79        CheckpointHeight, CheckpointRoots, CheckpointService, CheckpointServiceNotify,
80        PendingCheckpoint, PendingCheckpointInfo, PendingCheckpointV2,
81    },
82    consensus_adapter::ConsensusAdapter,
83    consensus_throughput_calculator::ConsensusThroughputCalculator,
84    consensus_types::consensus_output_api::ConsensusCommitAPI,
85    epoch::{
86        randomness::{DkgStatus, RandomnessManager},
87        reconfiguration::ReconfigState,
88    },
89    execution_cache::ObjectCacheRead,
90    execution_scheduler::{SettlementBatchInfo, SettlementScheduler},
91    gasless_rate_limiter::ConsensusGaslessCounter,
92    post_consensus_tx_reorder::PostConsensusTxReorder,
93    traffic_controller::{TrafficController, policies::TrafficTally},
94};
95
96/// Output from filtering consensus transactions.
97/// Contains the filtered transactions and any owned object locks acquired post-consensus.
98struct FilteredConsensusOutput {
99    transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
100    owned_object_locks: HashMap<ObjectRef, TransactionDigest>,
101}
102
103pub struct ConsensusHandlerInitializer {
104    state: Arc<AuthorityState>,
105    checkpoint_service: Arc<CheckpointService>,
106    epoch_store: Arc<AuthorityPerEpochStore>,
107    consensus_adapter: Arc<ConsensusAdapter>,
108    throughput_calculator: Arc<ConsensusThroughputCalculator>,
109    backpressure_manager: Arc<BackpressureManager>,
110    congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
111    consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
112}
113
114impl ConsensusHandlerInitializer {
115    pub fn new(
116        state: Arc<AuthorityState>,
117        checkpoint_service: Arc<CheckpointService>,
118        epoch_store: Arc<AuthorityPerEpochStore>,
119        consensus_adapter: Arc<ConsensusAdapter>,
120        throughput_calculator: Arc<ConsensusThroughputCalculator>,
121        backpressure_manager: Arc<BackpressureManager>,
122        congestion_log_config: Option<CongestionLogConfig>,
123    ) -> Self {
124        let congestion_logger =
125            congestion_log_config.and_then(|config| match CongestionCommitLogger::new(&config) {
126                Ok(logger) => Some(Arc::new(Mutex::new(logger))),
127                Err(e) => {
128                    debug_fatal!("Failed to create congestion logger: {e}");
129                    None
130                }
131            });
132        let consensus_gasless_counter = state.consensus_gasless_counter.clone();
133        Self {
134            state,
135            checkpoint_service,
136            epoch_store,
137            consensus_adapter,
138            throughput_calculator,
139            backpressure_manager,
140            congestion_logger,
141            consensus_gasless_counter,
142        }
143    }
144
145    #[cfg(test)]
146    pub(crate) fn new_for_testing(
147        state: Arc<AuthorityState>,
148        checkpoint_service: Arc<CheckpointService>,
149    ) -> Self {
150        use crate::consensus_test_utils::make_consensus_adapter_for_test;
151        use std::collections::HashSet;
152
153        let backpressure_manager = BackpressureManager::new_for_tests();
154        let consensus_adapter =
155            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
156        let consensus_gasless_counter = state.consensus_gasless_counter.clone();
157        Self {
158            state: state.clone(),
159            checkpoint_service,
160            epoch_store: state.epoch_store_for_testing().clone(),
161            consensus_adapter,
162            throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
163                None,
164                state.metrics.clone(),
165            )),
166            backpressure_manager,
167            congestion_logger: None,
168            consensus_gasless_counter,
169        }
170    }
171
172    pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
173        let new_epoch_start_state = self.epoch_store.epoch_start_state();
174        let consensus_committee = new_epoch_start_state.get_consensus_committee();
175
176        let settlement_scheduler = SettlementScheduler::new(
177            self.state.execution_scheduler().as_ref().clone(),
178            self.state.get_transaction_cache_reader().clone(),
179            self.state.metrics.clone(),
180        );
181        ConsensusHandler::new(
182            self.epoch_store.clone(),
183            self.checkpoint_service.clone(),
184            settlement_scheduler,
185            self.consensus_adapter.clone(),
186            self.state.get_object_cache_reader().clone(),
187            consensus_committee,
188            self.state.metrics.clone(),
189            self.throughput_calculator.clone(),
190            self.backpressure_manager.subscribe(),
191            self.state.traffic_controller.clone(),
192            self.congestion_logger.clone(),
193            self.consensus_gasless_counter.clone(),
194        )
195    }
196}
197
198mod additional_consensus_state {
199    use std::marker::PhantomData;
200
201    use consensus_core::CommitRef;
202    use fastcrypto::hash::HashFunction as _;
203    use sui_types::{crypto::DefaultHash, digests::Digest};
204
205    use super::*;
206    /// AdditionalConsensusState tracks any in-memory state that is retained by ConsensusHandler
207    /// between consensus commits. Because of crash recovery, using such data is inherently risky.
208    /// In order to do this safely, we must store data from a fixed number of previous commits.
209    /// Then, at start-up, that same fixed number of already processed commits is replayed to
210    /// reconstruct the state.
211    ///
212    /// To make sure that bugs in this process appear immediately, we record the digest of this
213    /// state in ConsensusCommitPrologue, so that any deviation causes an immediate fork.
214    #[derive(Serialize, Deserialize)]
215    pub(super) struct AdditionalConsensusState {
216        commit_interval_observer: CommitIntervalObserver,
217    }
218
219    impl AdditionalConsensusState {
220        pub fn new(additional_consensus_state_window_size: u32) -> Self {
221            Self {
222                commit_interval_observer: CommitIntervalObserver::new(
223                    additional_consensus_state_window_size,
224                ),
225            }
226        }
227
228        /// Update all internal state based on the new commit
229        pub(crate) fn observe_commit(
230            &mut self,
231            protocol_config: &ProtocolConfig,
232            epoch_start_time: u64,
233            consensus_commit: &impl ConsensusCommitAPI,
234        ) -> ConsensusCommitInfo {
235            self.commit_interval_observer
236                .observe_commit_time(consensus_commit);
237
238            let estimated_commit_period = self
239                .commit_interval_observer
240                .commit_interval_estimate()
241                .unwrap_or(Duration::from_millis(
242                    protocol_config.min_checkpoint_interval_ms(),
243                ));
244
245            info!("estimated commit rate: {:?}", estimated_commit_period);
246
247            self.commit_info_impl(
248                epoch_start_time,
249                consensus_commit,
250                Some(estimated_commit_period),
251            )
252        }
253
254        fn commit_info_impl(
255            &self,
256            epoch_start_time: u64,
257            consensus_commit: &impl ConsensusCommitAPI,
258            estimated_commit_period: Option<Duration>,
259        ) -> ConsensusCommitInfo {
260            let leader_author = consensus_commit.leader_author_index();
261            let timestamp = consensus_commit.commit_timestamp_ms();
262
263            let timestamp = if timestamp < epoch_start_time {
264                error!(
265                    "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}"
266                );
267                epoch_start_time
268            } else {
269                timestamp
270            };
271
272            ConsensusCommitInfo {
273                _phantom: PhantomData,
274                round: consensus_commit.leader_round(),
275                timestamp,
276                leader_author,
277                consensus_commit_ref: consensus_commit.commit_ref(),
278                rejected_transactions_digest: consensus_commit.rejected_transactions_digest(),
279                additional_state_digest: Some(self.digest()),
280                estimated_commit_period,
281                skip_consensus_commit_prologue_in_test: false,
282            }
283        }
284
285        /// Get the digest of the current state.
286        fn digest(&self) -> AdditionalConsensusStateDigest {
287            let mut hash = DefaultHash::new();
288            bcs::serialize_into(&mut hash, self).unwrap();
289            AdditionalConsensusStateDigest::new(hash.finalize().into())
290        }
291    }
292
293    pub struct ConsensusCommitInfo {
294        // prevent public construction
295        _phantom: PhantomData<()>,
296
297        pub round: u64,
298        pub timestamp: u64,
299        pub leader_author: AuthorityIndex,
300        pub consensus_commit_ref: CommitRef,
301        pub rejected_transactions_digest: Digest,
302
303        additional_state_digest: Option<AdditionalConsensusStateDigest>,
304        estimated_commit_period: Option<Duration>,
305
306        pub skip_consensus_commit_prologue_in_test: bool,
307    }
308
309    impl ConsensusCommitInfo {
310        pub fn new_for_test(
311            commit_round: u64,
312            commit_timestamp: u64,
313            estimated_commit_period: Option<Duration>,
314            skip_consensus_commit_prologue_in_test: bool,
315        ) -> Self {
316            Self {
317                _phantom: PhantomData,
318                round: commit_round,
319                timestamp: commit_timestamp,
320                leader_author: 0,
321                consensus_commit_ref: CommitRef::default(),
322                rejected_transactions_digest: Digest::default(),
323                additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO),
324                estimated_commit_period,
325                skip_consensus_commit_prologue_in_test,
326            }
327        }
328
329        pub fn new_for_congestion_test(
330            commit_round: u64,
331            commit_timestamp: u64,
332            estimated_commit_period: Duration,
333        ) -> Self {
334            Self::new_for_test(
335                commit_round,
336                commit_timestamp,
337                Some(estimated_commit_period),
338                true,
339            )
340        }
341
342        pub fn additional_state_digest(&self) -> AdditionalConsensusStateDigest {
343            // this method cannot be called if stateless_commit_info is used
344            self.additional_state_digest
345                .expect("additional_state_digest is not available")
346        }
347
348        pub fn estimated_commit_period(&self) -> Duration {
349            // this method cannot be called if stateless_commit_info is used
350            self.estimated_commit_period
351                .expect("estimated commit period is not available")
352        }
353
354        fn consensus_commit_digest(&self) -> ConsensusCommitDigest {
355            ConsensusCommitDigest::new(self.consensus_commit_ref.digest.into_inner())
356        }
357
358        fn consensus_commit_prologue_transaction(
359            &self,
360            epoch: u64,
361        ) -> VerifiedExecutableTransaction {
362            let transaction = VerifiedTransaction::new_consensus_commit_prologue(
363                epoch,
364                self.round,
365                self.timestamp,
366            );
367            VerifiedExecutableTransaction::new_system(transaction, epoch)
368        }
369
370        fn consensus_commit_prologue_v2_transaction(
371            &self,
372            epoch: u64,
373        ) -> VerifiedExecutableTransaction {
374            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
375                epoch,
376                self.round,
377                self.timestamp,
378                self.consensus_commit_digest(),
379            );
380            VerifiedExecutableTransaction::new_system(transaction, epoch)
381        }
382
383        fn consensus_commit_prologue_v3_transaction(
384            &self,
385            epoch: u64,
386            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
387        ) -> VerifiedExecutableTransaction {
388            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v3(
389                epoch,
390                self.round,
391                self.timestamp,
392                self.consensus_commit_digest(),
393                consensus_determined_version_assignments,
394            );
395            VerifiedExecutableTransaction::new_system(transaction, epoch)
396        }
397
398        fn consensus_commit_prologue_v4_transaction(
399            &self,
400            epoch: u64,
401            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
402            additional_state_digest: AdditionalConsensusStateDigest,
403        ) -> VerifiedExecutableTransaction {
404            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v4(
405                epoch,
406                self.round,
407                self.timestamp,
408                self.consensus_commit_digest(),
409                consensus_determined_version_assignments,
410                additional_state_digest,
411            );
412            VerifiedExecutableTransaction::new_system(transaction, epoch)
413        }
414
415        pub fn create_consensus_commit_prologue_transaction(
416            &self,
417            epoch: u64,
418            protocol_config: &ProtocolConfig,
419            cancelled_txn_version_assignment: Vec<(
420                TransactionDigest,
421                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
422            )>,
423            commit_info: &ConsensusCommitInfo,
424            indirect_state_observer: IndirectStateObserver,
425        ) -> VerifiedExecutableTransaction {
426            let version_assignments = if protocol_config
427                .record_consensus_determined_version_assignments_in_prologue_v2()
428            {
429                Some(
430                    ConsensusDeterminedVersionAssignments::CancelledTransactionsV2(
431                        cancelled_txn_version_assignment,
432                    ),
433                )
434            } else if protocol_config.record_consensus_determined_version_assignments_in_prologue()
435            {
436                Some(
437                    ConsensusDeterminedVersionAssignments::CancelledTransactions(
438                        cancelled_txn_version_assignment
439                            .into_iter()
440                            .map(|(tx_digest, versions)| {
441                                (
442                                    tx_digest,
443                                    versions.into_iter().map(|(id, v)| (id.0, v)).collect(),
444                                )
445                            })
446                            .collect(),
447                    ),
448                )
449            } else {
450                None
451            };
452
453            if protocol_config.record_additional_state_digest_in_prologue() {
454                let additional_state_digest =
455                    if protocol_config.additional_consensus_digest_indirect_state() {
456                        let d1 = commit_info.additional_state_digest();
457                        indirect_state_observer.fold_with(d1)
458                    } else {
459                        commit_info.additional_state_digest()
460                    };
461
462                self.consensus_commit_prologue_v4_transaction(
463                    epoch,
464                    version_assignments.unwrap(),
465                    additional_state_digest,
466                )
467            } else if let Some(version_assignments) = version_assignments {
468                self.consensus_commit_prologue_v3_transaction(epoch, version_assignments)
469            } else if protocol_config.include_consensus_digest_in_prologue() {
470                self.consensus_commit_prologue_v2_transaction(epoch)
471            } else {
472                self.consensus_commit_prologue_transaction(epoch)
473            }
474        }
475    }
476
477    #[derive(Default)]
478    pub struct IndirectStateObserver {
479        hash: DefaultHash,
480    }
481
482    impl IndirectStateObserver {
483        pub fn new() -> Self {
484            Self::default()
485        }
486
487        pub fn observe_indirect_state<T: Serialize>(&mut self, state: &T) {
488            bcs::serialize_into(&mut self.hash, state).unwrap();
489        }
490
491        pub fn fold_with(
492            self,
493            d1: AdditionalConsensusStateDigest,
494        ) -> AdditionalConsensusStateDigest {
495            let hash = self.hash.finalize();
496            let d2 = AdditionalConsensusStateDigest::new(hash.into());
497
498            let mut hasher = DefaultHash::new();
499            bcs::serialize_into(&mut hasher, &d1).unwrap();
500            bcs::serialize_into(&mut hasher, &d2).unwrap();
501            AdditionalConsensusStateDigest::new(hasher.finalize().into())
502        }
503    }
504
505    #[test]
506    fn test_additional_consensus_state() {
507        use crate::consensus_test_utils::TestConsensusCommit;
508
509        fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) {
510            let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
511            state.observe_commit(
512                &protocol_config,
513                100,
514                &TestConsensusCommit::empty(round, timestamp, 0),
515            );
516        }
517
518        let mut s1 = AdditionalConsensusState::new(3);
519        observe(&mut s1, 1, 1000);
520        observe(&mut s1, 2, 2000);
521        observe(&mut s1, 3, 3000);
522        observe(&mut s1, 4, 4000);
523
524        let mut s2 = AdditionalConsensusState::new(3);
525        // Because state uses a ring buffer, we should get the same digest
526        // even though we only added the 3 latest observations.
527        observe(&mut s2, 2, 2000);
528        observe(&mut s2, 3, 3000);
529        observe(&mut s2, 4, 4000);
530
531        assert_eq!(s1.digest(), s2.digest());
532
533        observe(&mut s1, 5, 5000);
534        observe(&mut s2, 5, 5000);
535
536        assert_eq!(s1.digest(), s2.digest());
537    }
538}
539use additional_consensus_state::AdditionalConsensusState;
540pub(crate) use additional_consensus_state::{ConsensusCommitInfo, IndirectStateObserver};
541
542struct QueuedCheckpointRoots {
543    roots: CheckpointRoots,
544    timestamp: CheckpointTimestamp,
545    consensus_commit_ref: CommitRef,
546    rejected_transactions_digest: Digest,
547}
548
549struct Chunk<
550    T: crate::authority::shared_object_version_manager::AsTx = VerifiedExecutableTransaction,
551> {
552    schedulables: Vec<Schedulable<T>>,
553    settlement: Option<Schedulable<T>>,
554    height: CheckpointHeight,
555}
556
557impl<T: crate::authority::shared_object_version_manager::AsTx + Clone> Chunk<T> {
558    fn all_schedulables(&self) -> impl Iterator<Item = &Schedulable<T>> + Clone {
559        self.schedulables.iter().chain(self.settlement.iter())
560    }
561
562    fn all_schedulables_from(chunks: &[Self]) -> impl Iterator<Item = &Schedulable<T>> + Clone {
563        chunks.iter().flat_map(|c| c.all_schedulables())
564    }
565
566    fn to_checkpoint_roots(&self) -> CheckpointRoots {
567        let tx_roots: Vec<_> = self.schedulables.iter().map(|s| s.key()).collect();
568        let settlement_root = self.settlement.as_ref().map(|s| s.key());
569        CheckpointRoots {
570            tx_roots,
571            settlement_root,
572            height: self.height,
573        }
574    }
575}
576
577impl From<Chunk<VerifiedExecutableTransactionWithAliases>> for Chunk {
578    fn from(chunk: Chunk<VerifiedExecutableTransactionWithAliases>) -> Self {
579        Chunk {
580            schedulables: chunk.schedulables.into_iter().map(|s| s.into()).collect(),
581            settlement: chunk.settlement.map(|s| s.into()),
582            height: chunk.height,
583        }
584    }
585}
586
587// Accumulates checkpoint roots from consensus and flushes them into PendingCheckpointV2s.
588// Roots are buffered in `pending_roots` until a flush is triggered (by max_tx overflow or
589// time interval). A flush always drains all buffered roots into a single PendingCheckpointV2,
590// so the queue only ever holds roots for one pending checkpoint at a time.
591//
592// Owns an ExecutionSchedulerSender so push_chunk can send schedulables and settlement info
593// for execution in one shot.
594pub(crate) struct CheckpointQueue {
595    last_built_timestamp: CheckpointTimestamp,
596    pending_roots: VecDeque<QueuedCheckpointRoots>,
597    height: u64,
598    pending_tx_count: usize,
599    current_checkpoint_seq: CheckpointSequenceNumber,
600    max_tx: usize,
601    min_checkpoint_interval_ms: u64,
602    execution_scheduler_sender: ExecutionSchedulerSender,
603}
604
605impl CheckpointQueue {
606    pub(crate) fn new(
607        last_built_timestamp: CheckpointTimestamp,
608        checkpoint_height: u64,
609        next_checkpoint_seq: CheckpointSequenceNumber,
610        max_tx: usize,
611        min_checkpoint_interval_ms: u64,
612        execution_scheduler_sender: ExecutionSchedulerSender,
613    ) -> Self {
614        Self {
615            last_built_timestamp,
616            pending_roots: VecDeque::new(),
617            height: checkpoint_height,
618            pending_tx_count: 0,
619            current_checkpoint_seq: next_checkpoint_seq,
620            max_tx,
621            min_checkpoint_interval_ms,
622            execution_scheduler_sender,
623        }
624    }
625
626    #[cfg(test)]
627    fn new_for_testing(
628        last_built_timestamp: CheckpointTimestamp,
629        checkpoint_height: u64,
630        next_checkpoint_seq: CheckpointSequenceNumber,
631        max_tx: usize,
632        min_checkpoint_interval_ms: u64,
633    ) -> Self {
634        let (sender, _receiver) = monitored_mpsc::unbounded_channel("test_checkpoint_queue_sender");
635        Self {
636            last_built_timestamp,
637            pending_roots: VecDeque::new(),
638            height: checkpoint_height,
639            pending_tx_count: 0,
640            current_checkpoint_seq: next_checkpoint_seq,
641            max_tx,
642            min_checkpoint_interval_ms,
643            execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
644        }
645    }
646
647    #[cfg(test)]
648    fn new_for_testing_with_sender(
649        last_built_timestamp: CheckpointTimestamp,
650        checkpoint_height: u64,
651        next_checkpoint_seq: CheckpointSequenceNumber,
652        max_tx: usize,
653        min_checkpoint_interval_ms: u64,
654        sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
655    ) -> Self {
656        Self {
657            last_built_timestamp,
658            pending_roots: VecDeque::new(),
659            height: checkpoint_height,
660            pending_tx_count: 0,
661            current_checkpoint_seq: next_checkpoint_seq,
662            max_tx,
663            min_checkpoint_interval_ms,
664            execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
665        }
666    }
667
668    pub(crate) fn last_built_timestamp(&self) -> CheckpointTimestamp {
669        self.last_built_timestamp
670    }
671
672    pub(crate) fn is_empty(&self) -> bool {
673        self.pending_roots.is_empty()
674    }
675
676    fn next_height(&mut self) -> u64 {
677        self.height += 1;
678        self.height
679    }
680
681    fn push_chunk(
682        &mut self,
683        chunk: Chunk,
684        assigned_versions: &HashMap<TransactionKey, AssignedVersions>,
685        timestamp: CheckpointTimestamp,
686        consensus_commit_ref: CommitRef,
687        rejected_transactions_digest: Digest,
688    ) -> Vec<PendingCheckpointV2> {
689        let max_tx = self.max_tx;
690        let user_tx_count = chunk.schedulables.len();
691
692        let roots = chunk.to_checkpoint_roots();
693
694        let schedulables: Vec<_> = chunk
695            .schedulables
696            .into_iter()
697            .map(|s| {
698                let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
699                (s, versions)
700            })
701            .collect();
702
703        let mut flushed_checkpoints = Vec::new();
704
705        if self.pending_tx_count > 0
706            && self.pending_tx_count + user_tx_count > max_tx
707            && let Some(checkpoint) = self.flush_forced()
708        {
709            flushed_checkpoints.push(checkpoint);
710        }
711
712        let settlement_info = chunk.settlement.as_ref().map(|s| {
713            let settlement_key = s.key();
714            let tx_keys: Vec<_> = schedulables.iter().map(|(s, _)| s.key()).collect();
715            SettlementBatchInfo {
716                settlement_key,
717                tx_keys,
718                checkpoint_height: chunk.height,
719                checkpoint_seq: self.current_checkpoint_seq,
720                assigned_versions: assigned_versions
721                    .get(&settlement_key)
722                    .cloned()
723                    .unwrap_or_default(),
724            }
725        });
726
727        self.execution_scheduler_sender
728            .send(schedulables, settlement_info);
729
730        self.pending_tx_count += user_tx_count;
731        self.pending_roots.push_back(QueuedCheckpointRoots {
732            roots,
733            timestamp,
734            consensus_commit_ref,
735            rejected_transactions_digest,
736        });
737
738        flushed_checkpoints
739    }
740
741    pub(crate) fn flush(
742        &mut self,
743        current_timestamp: CheckpointTimestamp,
744        force: bool,
745    ) -> Option<PendingCheckpointV2> {
746        if !force && current_timestamp < self.last_built_timestamp + self.min_checkpoint_interval_ms
747        {
748            return None;
749        }
750        self.flush_forced()
751    }
752
753    fn flush_forced(&mut self) -> Option<PendingCheckpointV2> {
754        if self.pending_roots.is_empty() {
755            return None;
756        }
757
758        let to_flush: Vec<_> = self.pending_roots.drain(..).collect();
759        let last_root = to_flush.last().unwrap();
760
761        let checkpoint = PendingCheckpointV2 {
762            roots: to_flush.iter().map(|q| q.roots.clone()).collect(),
763            details: PendingCheckpointInfo {
764                timestamp_ms: last_root.timestamp,
765                last_of_epoch: false,
766                checkpoint_height: last_root.roots.height,
767                consensus_commit_ref: last_root.consensus_commit_ref,
768                rejected_transactions_digest: last_root.rejected_transactions_digest,
769                checkpoint_seq: Some(self.current_checkpoint_seq),
770            },
771        };
772
773        self.last_built_timestamp = last_root.timestamp;
774        self.pending_tx_count = 0;
775        self.current_checkpoint_seq += 1;
776
777        Some(checkpoint)
778    }
779
780    pub(crate) fn checkpoint_seq(&self) -> CheckpointSequenceNumber {
781        self.current_checkpoint_seq
782            .checked_sub(1)
783            .expect("checkpoint_seq called before any checkpoint was assigned")
784    }
785}
786
787pub struct ConsensusHandler<C> {
788    /// A store created for each epoch. ConsensusHandler is recreated each epoch, with the
789    /// corresponding store. This store is also used to get the current epoch ID.
790    epoch_store: Arc<AuthorityPerEpochStore>,
791    /// Holds the indices, hash and stats after the last consensus commit
792    /// It is used for avoiding replaying already processed transactions,
793    /// checking chain consistency, and accumulating per-epoch consensus output stats.
794    last_consensus_stats: ExecutionIndicesWithStatsV2,
795    checkpoint_service: Arc<C>,
796    /// cache reader is needed when determining the next version to assign for shared objects.
797    cache_reader: Arc<dyn ObjectCacheRead>,
798    /// The consensus committee used to do stake computations for deciding set of low scoring authorities
799    committee: ConsensusCommittee,
800    // TODO: ConsensusHandler doesn't really share metrics with AuthorityState. We could define
801    // a new metrics type here if we want to.
802    metrics: Arc<AuthorityMetrics>,
803    /// Lru cache to quickly discard transactions processed by consensus
804    processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
805    /// Enqueues transactions to the execution scheduler via a separate task.
806    execution_scheduler_sender: ExecutionSchedulerSender,
807    /// Consensus adapter for submitting transactions to consensus
808    consensus_adapter: Arc<ConsensusAdapter>,
809
810    /// Using the throughput calculator to record the current consensus throughput
811    throughput_calculator: Arc<ConsensusThroughputCalculator>,
812
813    additional_consensus_state: AdditionalConsensusState,
814
815    backpressure_subscriber: BackpressureSubscriber,
816
817    traffic_controller: Option<Arc<TrafficController>>,
818
819    congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
820
821    consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
822
823    checkpoint_queue: Mutex<CheckpointQueue>,
824}
825
826const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
827
828impl<C> ConsensusHandler<C> {
829    pub(crate) fn new(
830        epoch_store: Arc<AuthorityPerEpochStore>,
831        checkpoint_service: Arc<C>,
832        settlement_scheduler: SettlementScheduler,
833        consensus_adapter: Arc<ConsensusAdapter>,
834        cache_reader: Arc<dyn ObjectCacheRead>,
835        committee: ConsensusCommittee,
836        metrics: Arc<AuthorityMetrics>,
837        throughput_calculator: Arc<ConsensusThroughputCalculator>,
838        backpressure_subscriber: BackpressureSubscriber,
839        traffic_controller: Option<Arc<TrafficController>>,
840        congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
841        consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
842    ) -> Self {
843        assert!(
844            matches!(
845                epoch_store
846                    .protocol_config()
847                    .per_object_congestion_control_mode(),
848                PerObjectCongestionControlMode::ExecutionTimeEstimate(_)
849            ),
850            "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
851        );
852
853        // Recover last_consensus_stats so it is consistent across validators.
854        let mut last_consensus_stats = epoch_store
855            .get_last_consensus_stats()
856            .expect("Should be able to read last consensus index");
857        // stats is empty at the beginning of epoch.
858        if !last_consensus_stats.stats.is_initialized() {
859            last_consensus_stats.stats = ConsensusStats::new(committee.size());
860            if epoch_store
861                .protocol_config()
862                .split_checkpoints_in_consensus_handler()
863            {
864                last_consensus_stats.checkpoint_seq = epoch_store.previous_epoch_last_checkpoint();
865            }
866        }
867        let max_tx = epoch_store
868            .protocol_config()
869            .max_transactions_per_checkpoint() as usize;
870        let min_checkpoint_interval_ms = epoch_store
871            .protocol_config()
872            .min_checkpoint_interval_ms_as_option()
873            .unwrap_or_default();
874        let execution_scheduler_sender =
875            ExecutionSchedulerSender::start(settlement_scheduler, epoch_store.clone());
876        let commit_rate_estimate_window_size = epoch_store
877            .protocol_config()
878            .get_consensus_commit_rate_estimation_window_size();
879        let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
880        let checkpoint_height = last_consensus_stats.height;
881        let next_checkpoint_seq = if epoch_store
882            .protocol_config()
883            .split_checkpoints_in_consensus_handler()
884        {
885            last_consensus_stats.checkpoint_seq + 1
886        } else {
887            0
888        };
889        Self {
890            epoch_store,
891            last_consensus_stats,
892            checkpoint_service,
893            cache_reader,
894            committee,
895            metrics,
896            processed_cache: LruCache::new(
897                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
898            ),
899            execution_scheduler_sender: execution_scheduler_sender.clone(),
900            consensus_adapter,
901            throughput_calculator,
902            additional_consensus_state: AdditionalConsensusState::new(
903                commit_rate_estimate_window_size,
904            ),
905            backpressure_subscriber,
906            traffic_controller,
907            congestion_logger,
908            consensus_gasless_counter,
909            checkpoint_queue: Mutex::new(CheckpointQueue::new(
910                last_built_timestamp,
911                checkpoint_height,
912                next_checkpoint_seq,
913                max_tx,
914                min_checkpoint_interval_ms,
915                execution_scheduler_sender,
916            )),
917        }
918    }
919
920    /// Returns the last subdag index processed by the handler.
921    pub(crate) fn last_processed_subdag_index(&self) -> u64 {
922        self.last_consensus_stats.index.sub_dag_index
923    }
924
925    pub(crate) fn new_for_testing(
926        epoch_store: Arc<AuthorityPerEpochStore>,
927        checkpoint_service: Arc<C>,
928        execution_scheduler_sender: ExecutionSchedulerSender,
929        consensus_adapter: Arc<ConsensusAdapter>,
930        cache_reader: Arc<dyn ObjectCacheRead>,
931        committee: ConsensusCommittee,
932        metrics: Arc<AuthorityMetrics>,
933        throughput_calculator: Arc<ConsensusThroughputCalculator>,
934        backpressure_subscriber: BackpressureSubscriber,
935        traffic_controller: Option<Arc<TrafficController>>,
936        last_consensus_stats: ExecutionIndicesWithStatsV2,
937    ) -> Self {
938        let commit_rate_estimate_window_size = epoch_store
939            .protocol_config()
940            .get_consensus_commit_rate_estimation_window_size();
941        let max_tx = epoch_store
942            .protocol_config()
943            .max_transactions_per_checkpoint() as usize;
944        let min_checkpoint_interval_ms = epoch_store
945            .protocol_config()
946            .min_checkpoint_interval_ms_as_option()
947            .unwrap_or_default();
948        let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
949        let checkpoint_height = last_consensus_stats.height;
950        Self {
951            epoch_store,
952            last_consensus_stats,
953            checkpoint_service,
954            cache_reader,
955            committee,
956            metrics,
957            processed_cache: LruCache::new(
958                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
959            ),
960            execution_scheduler_sender: execution_scheduler_sender.clone(),
961            consensus_adapter,
962            throughput_calculator,
963            additional_consensus_state: AdditionalConsensusState::new(
964                commit_rate_estimate_window_size,
965            ),
966            backpressure_subscriber,
967            traffic_controller,
968            congestion_logger: None,
969            consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
970            checkpoint_queue: Mutex::new(CheckpointQueue::new(
971                last_built_timestamp,
972                checkpoint_height,
973                0,
974                max_tx,
975                min_checkpoint_interval_ms,
976                execution_scheduler_sender,
977            )),
978        }
979    }
980}
981
982#[derive(Default)]
983struct CommitHandlerInput {
984    user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
985    capability_notifications: Vec<AuthorityCapabilitiesV2>,
986    execution_time_observations: Vec<ExecutionTimeObservation>,
987    checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
988    randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
989    randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
990    end_of_publish_transactions: Vec<AuthorityName>,
991    new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
992}
993
994struct CommitHandlerState {
995    dkg_failed: bool,
996    randomness_round: Option<RandomnessRound>,
997    output: ConsensusCommitOutput,
998    indirect_state_observer: Option<IndirectStateObserver>,
999    initial_reconfig_state: ReconfigState,
1000    // Occurrence counts for user transactions, used for unpaid amplification detection.
1001    occurrence_counts: HashMap<TransactionDigest, u32>,
1002}
1003
1004impl CommitHandlerState {
1005    fn get_notifications(&self) -> Vec<SequencedConsensusTransactionKey> {
1006        self.output
1007            .get_consensus_messages_processed()
1008            .cloned()
1009            .collect()
1010    }
1011
1012    fn init_randomness<'a, 'epoch>(
1013        &'a mut self,
1014        epoch_store: &'epoch AuthorityPerEpochStore,
1015        commit_info: &'a ConsensusCommitInfo,
1016    ) -> Option<tokio::sync::MutexGuard<'epoch, RandomnessManager>> {
1017        let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| {
1018            rm.try_lock()
1019                .expect("should only ever be called from the commit handler thread")
1020        });
1021
1022        let mut dkg_failed = false;
1023        let randomness_round = if epoch_store.randomness_state_enabled() {
1024            let randomness_manager = randomness_manager
1025                .as_mut()
1026                .expect("randomness manager should exist if randomness is enabled");
1027            match randomness_manager.dkg_status() {
1028                DkgStatus::Pending => None,
1029                DkgStatus::Failed => {
1030                    dkg_failed = true;
1031                    None
1032                }
1033                DkgStatus::Successful => {
1034                    // Generate randomness for this commit if DKG is successful and we are still
1035                    // accepting certs.
1036                    if self.initial_reconfig_state.should_accept_tx() {
1037                        randomness_manager
1038                            // TODO: make infallible
1039                            .reserve_next_randomness(commit_info.timestamp, &mut self.output)
1040                            .expect("epoch ended")
1041                    } else {
1042                        None
1043                    }
1044                }
1045            }
1046        } else {
1047            None
1048        };
1049
1050        if randomness_round.is_some() {
1051            assert!(!dkg_failed); // invariant check
1052        }
1053
1054        self.randomness_round = randomness_round;
1055        self.dkg_failed = dkg_failed;
1056
1057        randomness_manager
1058    }
1059}
1060
1061impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
1062    /// Called during startup to allow us to observe commits we previously processed, for crash recovery.
1063    /// Any state computed here must be a pure function of the commits observed, it cannot depend on any
1064    /// state recorded in the epoch db.
1065    fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
1066        assert!(
1067            self.epoch_store
1068                .protocol_config()
1069                .record_additional_state_digest_in_prologue()
1070        );
1071        let protocol_config = self.epoch_store.protocol_config();
1072        let epoch_start_time = self
1073            .epoch_store
1074            .epoch_start_config()
1075            .epoch_start_timestamp_ms();
1076
1077        self.additional_consensus_state.observe_commit(
1078            protocol_config,
1079            epoch_start_time,
1080            &consensus_commit,
1081        );
1082    }
1083
1084    #[cfg(test)]
1085    pub(crate) async fn handle_consensus_commit_for_test(
1086        &mut self,
1087        consensus_commit: impl ConsensusCommitAPI,
1088    ) {
1089        self.handle_consensus_commit(consensus_commit).await;
1090    }
1091
1092    #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))]
1093    pub(crate) async fn handle_consensus_commit(
1094        &mut self,
1095        consensus_commit: impl ConsensusCommitAPI,
1096    ) {
1097        {
1098            let protocol_config = self.epoch_store.protocol_config();
1099
1100            // Assert all protocol config settings for which we don't support old behavior.
1101            assert!(protocol_config.ignore_execution_time_observations_after_certs_closed());
1102            assert!(protocol_config.record_time_estimate_processed());
1103            assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints());
1104            assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest());
1105            assert!(protocol_config.authority_capabilities_v2());
1106            assert!(protocol_config.cancel_for_failed_dkg_early());
1107        }
1108
1109        // This may block until one of two conditions happens:
1110        // - Number of uncommitted transactions in the writeback cache goes below the
1111        //   backpressure threshold.
1112        // - The highest executed checkpoint catches up to the highest certified checkpoint.
1113        self.backpressure_subscriber.await_no_backpressure().await;
1114
1115        let epoch = self.epoch_store.epoch();
1116
1117        let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit");
1118
1119        let last_committed_round = self.last_consensus_stats.index.last_committed_round;
1120
1121        self.epoch_store
1122            .consensus_tx_status_cache
1123            .update_last_committed_leader_round(last_committed_round as u32);
1124        self.epoch_store
1125            .tx_reject_reason_cache
1126            .set_last_committed_leader_round(last_committed_round as u32);
1127
1128        let commit_info = self.additional_consensus_state.observe_commit(
1129            self.epoch_store.protocol_config(),
1130            self.epoch_store
1131                .epoch_start_config()
1132                .epoch_start_timestamp_ms(),
1133            &consensus_commit,
1134        );
1135        assert!(commit_info.round > last_committed_round);
1136
1137        let (timestamp, leader_author, commit_sub_dag_index) =
1138            self.gather_commit_metadata(&consensus_commit);
1139
1140        info!(
1141            %consensus_commit,
1142            "Received consensus output {}. Rejected transactions {}",
1143            consensus_commit.commit_ref(),
1144            consensus_commit.rejected_transactions_debug_string(),
1145        );
1146
1147        self.last_consensus_stats.index = ExecutionIndices {
1148            last_committed_round: commit_info.round,
1149            sub_dag_index: commit_sub_dag_index,
1150            transaction_index: 0_u64,
1151        };
1152
1153        self.metrics
1154            .consensus_committed_subdags
1155            .with_label_values(&[&leader_author.to_string()])
1156            .inc();
1157
1158        let mut state = CommitHandlerState {
1159            output: ConsensusCommitOutput::new(commit_info.round),
1160            dkg_failed: false,
1161            randomness_round: None,
1162            indirect_state_observer: Some(IndirectStateObserver::new()),
1163            initial_reconfig_state: self
1164                .epoch_store
1165                .get_reconfig_state_read_lock_guard()
1166                .clone(),
1167            occurrence_counts: HashMap::new(),
1168        };
1169
1170        let FilteredConsensusOutput {
1171            transactions,
1172            owned_object_locks,
1173        } = self.filter_consensus_txns(
1174            state.initial_reconfig_state.clone(),
1175            &commit_info,
1176            &consensus_commit,
1177        );
1178        // Buffer owned object locks for batch write.
1179        if !owned_object_locks.is_empty() {
1180            state.output.set_owned_object_locks(owned_object_locks);
1181        }
1182        let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions);
1183
1184        let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info);
1185
1186        let CommitHandlerInput {
1187            user_transactions,
1188            capability_notifications,
1189            execution_time_observations,
1190            checkpoint_signature_messages,
1191            randomness_dkg_messages,
1192            randomness_dkg_confirmations,
1193            end_of_publish_transactions,
1194            new_jwks,
1195        } = self.build_commit_handler_input(transactions);
1196
1197        self.process_gasless_transactions(&commit_info, &user_transactions);
1198        self.process_jwks(&mut state, &commit_info, new_jwks);
1199        self.process_capability_notifications(capability_notifications);
1200        self.process_execution_time_observations(&mut state, execution_time_observations);
1201        self.process_checkpoint_signature_messages(checkpoint_signature_messages);
1202
1203        self.process_dkg_updates(
1204            &mut state,
1205            &commit_info,
1206            randomness_manager.as_deref_mut(),
1207            randomness_dkg_messages,
1208            randomness_dkg_confirmations,
1209        )
1210        .await;
1211
1212        let mut execution_time_estimator = self
1213            .epoch_store
1214            .execution_time_estimator
1215            .try_lock()
1216            .expect("should only ever be called from the commit handler thread");
1217
1218        let authenticator_state_update_transaction =
1219            self.create_authenticator_state_update(last_committed_round, &commit_info);
1220
1221        let split_checkpoints = self
1222            .epoch_store
1223            .protocol_config()
1224            .split_checkpoints_in_consensus_handler();
1225
1226        let (lock, final_round, num_schedulables, checkpoint_height) = if !split_checkpoints {
1227            let (schedulables, randomness_schedulables, assigned_versions) = self
1228                .process_transactions(
1229                    &mut state,
1230                    &mut execution_time_estimator,
1231                    &commit_info,
1232                    authenticator_state_update_transaction,
1233                    user_transactions,
1234                );
1235
1236            let (should_accept_tx, lock, final_round) =
1237                self.handle_eop(&mut state, end_of_publish_transactions);
1238
1239            let make_checkpoint = should_accept_tx || final_round;
1240            if !make_checkpoint {
1241                // No need for any further processing
1242                return;
1243            }
1244
1245            // If this is the final round, record execution time observations for storage in the
1246            // end-of-epoch tx.
1247            if final_round {
1248                self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1249            }
1250
1251            let checkpoint_height = self
1252                .epoch_store
1253                .calculate_pending_checkpoint_height(commit_info.round);
1254
1255            self.create_pending_checkpoints(
1256                &mut state,
1257                &commit_info,
1258                &schedulables,
1259                &randomness_schedulables,
1260                final_round,
1261            );
1262
1263            let num_schedulables = schedulables.len();
1264            let mut all_schedulables = schedulables;
1265            all_schedulables.extend(randomness_schedulables);
1266            let assigned_versions = assigned_versions.into_map();
1267            let paired: Vec<_> = all_schedulables
1268                .into_iter()
1269                .map(|s| {
1270                    let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
1271                    (s, versions)
1272                })
1273                .collect();
1274            self.execution_scheduler_sender.send(paired, None);
1275
1276            (lock, final_round, num_schedulables, checkpoint_height)
1277        } else {
1278            let (
1279                transactions_to_schedule,
1280                randomness_transactions_to_schedule,
1281                cancelled_txns,
1282                randomness_state_update_transaction,
1283            ) = self.collect_transactions_to_schedule(
1284                &mut state,
1285                &mut execution_time_estimator,
1286                &commit_info,
1287                user_transactions,
1288            );
1289
1290            let (should_accept_tx, lock, final_round) =
1291                self.handle_eop(&mut state, end_of_publish_transactions);
1292
1293            let make_checkpoint = should_accept_tx || final_round;
1294            if !make_checkpoint {
1295                // No need for any further processing
1296                return;
1297            }
1298
1299            // If this is the final round, record execution time observations for storage in the
1300            // end-of-epoch tx.
1301            if final_round {
1302                self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1303            }
1304
1305            let epoch = self.epoch_store.epoch();
1306            let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1307                .then_some(Schedulable::ConsensusCommitPrologue(
1308                    epoch,
1309                    commit_info.round,
1310                    commit_info.consensus_commit_ref.index,
1311                ));
1312
1313            let schedulables: Vec<_> = itertools::chain!(
1314                consensus_commit_prologue.into_iter(),
1315                authenticator_state_update_transaction
1316                    .into_iter()
1317                    .map(Schedulable::Transaction),
1318                transactions_to_schedule
1319                    .into_iter()
1320                    .map(Schedulable::Transaction),
1321            )
1322            .collect();
1323
1324            let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1325                .into_iter()
1326                .chain(
1327                    randomness_transactions_to_schedule
1328                        .into_iter()
1329                        .map(Schedulable::Transaction),
1330                )
1331                .collect();
1332
1333            let num_schedulables = schedulables.len();
1334            let checkpoint_height = self.create_pending_checkpoints_v2(
1335                &mut state,
1336                &commit_info,
1337                schedulables,
1338                randomness_schedulables,
1339                &cancelled_txns,
1340                final_round,
1341            );
1342
1343            (lock, final_round, num_schedulables, checkpoint_height)
1344        };
1345
1346        let notifications = state.get_notifications();
1347
1348        let mut stats_to_record = self.last_consensus_stats.clone();
1349        stats_to_record.height = checkpoint_height;
1350        if split_checkpoints {
1351            let queue = self.checkpoint_queue.lock().unwrap();
1352            stats_to_record.last_checkpoint_flush_timestamp = queue.last_built_timestamp();
1353            stats_to_record.checkpoint_seq = queue.checkpoint_seq();
1354        }
1355
1356        state.output.record_consensus_commit_stats(stats_to_record);
1357
1358        self.record_deferral_deletion(&mut state);
1359
1360        self.epoch_store
1361            .consensus_quarantine
1362            .write()
1363            .push_consensus_output(state.output, &self.epoch_store)
1364            .expect("push_consensus_output should not fail");
1365
1366        debug!(
1367            ?commit_info.round,
1368            "Notifying checkpoint service about new pending checkpoint(s)",
1369        );
1370        self.checkpoint_service
1371            .notify_checkpoint()
1372            .expect("failed to notify checkpoint service");
1373
1374        if let Some(randomness_round) = state.randomness_round {
1375            randomness_manager
1376                .as_ref()
1377                .expect("randomness manager should exist if randomness round is provided")
1378                .generate_randomness(epoch, randomness_round);
1379        }
1380
1381        self.epoch_store.process_notifications(notifications.iter());
1382
1383        // pass lock by value to ensure that it is held until this point
1384        self.log_final_round(lock, final_round);
1385
1386        // update the calculated throughput
1387        self.throughput_calculator
1388            .add_transactions(timestamp, num_schedulables as u64);
1389
1390        fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
1391            let key = [commit_sub_dag_index, epoch];
1392            if sui_simulator::random::deterministic_probability_once(&key, 0.01) {
1393                sui_simulator::task::kill_current_node(None);
1394            }
1395        });
1396
1397        fail_point!("crash");
1398
1399        self.send_end_of_publish_if_needed().await;
1400    }
1401
1402    fn handle_eop(
1403        &self,
1404        state: &mut CommitHandlerState,
1405        end_of_publish_transactions: Vec<AuthorityName>,
1406    ) -> (bool, Option<RwLockWriteGuard<'_, ReconfigState>>, bool) {
1407        let collected_eop =
1408            self.process_end_of_publish_transactions(state, end_of_publish_transactions);
1409        if collected_eop {
1410            let (lock, final_round) = self.advance_eop_state_machine(state);
1411            (lock.should_accept_tx(), Some(lock), final_round)
1412        } else {
1413            (true, None, false)
1414        }
1415    }
1416
1417    fn record_end_of_epoch_execution_time_observations(
1418        &self,
1419        estimator: &mut ExecutionTimeEstimator,
1420    ) {
1421        self.epoch_store
1422            .end_of_epoch_execution_time_observations
1423            .set(estimator.take_observations())
1424            .expect("`stored_execution_time_observations` should only be set once at end of epoch");
1425    }
1426
1427    fn record_deferral_deletion(&self, state: &mut CommitHandlerState) {
1428        let mut deferred_transactions = self
1429            .epoch_store
1430            .consensus_output_cache
1431            .deferred_transactions
1432            .lock();
1433        for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() {
1434            deferred_transactions.remove(&deleted_deferred_key);
1435        }
1436    }
1437
1438    fn log_final_round(&self, lock: Option<RwLockWriteGuard<ReconfigState>>, final_round: bool) {
1439        if final_round {
1440            let epoch = self.epoch_store.epoch();
1441            info!(
1442                ?epoch,
1443                lock=?lock.as_ref(),
1444                final_round=?final_round,
1445                "Notified last checkpoint"
1446            );
1447            self.epoch_store.record_end_of_message_quorum_time_metric();
1448        }
1449    }
1450
1451    fn create_pending_checkpoints(
1452        &self,
1453        state: &mut CommitHandlerState,
1454        commit_info: &ConsensusCommitInfo,
1455        schedulables: &[Schedulable],
1456        randomness_schedulables: &[Schedulable],
1457        final_round: bool,
1458    ) {
1459        assert!(
1460            !self
1461                .epoch_store
1462                .protocol_config()
1463                .split_checkpoints_in_consensus_handler()
1464        );
1465
1466        let checkpoint_height = self
1467            .epoch_store
1468            .calculate_pending_checkpoint_height(commit_info.round);
1469
1470        // Determine whether to write pending checkpoint for user tx with randomness.
1471        // - If randomness is not generated for this commit, we will skip the
1472        //   checkpoint with the associated height. Therefore checkpoint heights may
1473        //   not be contiguous.
1474        // - Exception: if DKG fails, we always need to write out a PendingCheckpoint
1475        //   for randomness tx that are canceled.
1476        let should_write_random_checkpoint = state.randomness_round.is_some()
1477            || (state.dkg_failed && !randomness_schedulables.is_empty());
1478
1479        let pending_checkpoint = PendingCheckpoint {
1480            roots: schedulables.iter().map(|s| s.key()).collect(),
1481            details: PendingCheckpointInfo {
1482                timestamp_ms: commit_info.timestamp,
1483                last_of_epoch: final_round && !should_write_random_checkpoint,
1484                checkpoint_height,
1485                consensus_commit_ref: commit_info.consensus_commit_ref,
1486                rejected_transactions_digest: commit_info.rejected_transactions_digest,
1487                checkpoint_seq: None,
1488            },
1489        };
1490        self.epoch_store
1491            .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1492            .expect("failed to write pending checkpoint");
1493
1494        info!(
1495            "Written pending checkpoint: {:?}",
1496            pending_checkpoint.details,
1497        );
1498
1499        if should_write_random_checkpoint {
1500            let pending_checkpoint = PendingCheckpoint {
1501                roots: randomness_schedulables.iter().map(|s| s.key()).collect(),
1502                details: PendingCheckpointInfo {
1503                    timestamp_ms: commit_info.timestamp,
1504                    last_of_epoch: final_round,
1505                    checkpoint_height: checkpoint_height + 1,
1506                    consensus_commit_ref: commit_info.consensus_commit_ref,
1507                    rejected_transactions_digest: commit_info.rejected_transactions_digest,
1508                    checkpoint_seq: None,
1509                },
1510            };
1511            self.epoch_store
1512                .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1513                .expect("failed to write pending checkpoint");
1514        }
1515    }
1516
1517    #[allow(clippy::type_complexity)]
1518    fn collect_transactions_to_schedule(
1519        &self,
1520        state: &mut CommitHandlerState,
1521        execution_time_estimator: &mut ExecutionTimeEstimator,
1522        commit_info: &ConsensusCommitInfo,
1523        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1524    ) -> (
1525        Vec<VerifiedExecutableTransactionWithAliases>,
1526        Vec<VerifiedExecutableTransactionWithAliases>,
1527        BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1528        Option<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1529    ) {
1530        let protocol_config = self.epoch_store.protocol_config();
1531        let epoch = self.epoch_store.epoch();
1532
1533        let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) =
1534            self.merge_and_reorder_transactions(state, commit_info, user_transactions);
1535
1536        let mut shared_object_congestion_tracker =
1537            self.init_congestion_tracker(commit_info, false, &ordered_txns);
1538        let mut shared_object_using_randomness_congestion_tracker =
1539            self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns);
1540
1541        let randomness_state_update_transaction = state
1542            .randomness_round
1543            .map(|round| Schedulable::RandomnessStateUpdate(epoch, round));
1544        debug!(
1545            "Randomness state update transaction: {:?}",
1546            randomness_state_update_transaction
1547                .as_ref()
1548                .map(|t| t.key())
1549        );
1550
1551        let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len());
1552        let mut randomness_transactions_to_schedule =
1553            Vec::with_capacity(ordered_randomness_txns.len());
1554        let mut deferred_txns = BTreeMap::new();
1555        let mut cancelled_txns = BTreeMap::new();
1556
1557        for transaction in ordered_txns {
1558            self.handle_deferral_and_cancellation(
1559                state,
1560                &mut cancelled_txns,
1561                &mut deferred_txns,
1562                &mut transactions_to_schedule,
1563                protocol_config,
1564                commit_info,
1565                transaction,
1566                &mut shared_object_congestion_tracker,
1567                &previously_deferred_tx_digests,
1568                execution_time_estimator,
1569            );
1570        }
1571
1572        for transaction in ordered_randomness_txns {
1573            if state.dkg_failed {
1574                debug!(
1575                    "Canceling randomness-using transaction {:?} because DKG failed",
1576                    transaction.tx().digest(),
1577                );
1578                cancelled_txns.insert(
1579                    *transaction.tx().digest(),
1580                    CancelConsensusCertificateReason::DkgFailed,
1581                );
1582                randomness_transactions_to_schedule.push(transaction);
1583                continue;
1584            }
1585            self.handle_deferral_and_cancellation(
1586                state,
1587                &mut cancelled_txns,
1588                &mut deferred_txns,
1589                &mut randomness_transactions_to_schedule,
1590                protocol_config,
1591                commit_info,
1592                transaction,
1593                &mut shared_object_using_randomness_congestion_tracker,
1594                &previously_deferred_tx_digests,
1595                execution_time_estimator,
1596            );
1597        }
1598
1599        let mut total_deferred_txns = 0;
1600        {
1601            let mut deferred_transactions = self
1602                .epoch_store
1603                .consensus_output_cache
1604                .deferred_transactions
1605                .lock();
1606            for (key, txns) in deferred_txns.into_iter() {
1607                total_deferred_txns += txns.len();
1608                deferred_transactions.insert(key, txns.clone());
1609                state.output.defer_transactions(key, txns);
1610            }
1611        }
1612
1613        self.metrics
1614            .consensus_handler_deferred_transactions
1615            .inc_by(total_deferred_txns as u64);
1616        self.metrics
1617            .consensus_handler_cancelled_transactions
1618            .inc_by(cancelled_txns.len() as u64);
1619        self.metrics
1620            .consensus_handler_max_object_costs
1621            .with_label_values(&["regular_commit"])
1622            .set(shared_object_congestion_tracker.max_cost() as i64);
1623        self.metrics
1624            .consensus_handler_max_object_costs
1625            .with_label_values(&["randomness_commit"])
1626            .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
1627
1628        let congestion_commit_data = shared_object_congestion_tracker.finish_commit(commit_info);
1629        let randomness_congestion_commit_data =
1630            shared_object_using_randomness_congestion_tracker.finish_commit(commit_info);
1631
1632        if let Some(logger) = &self.congestion_logger {
1633            let epoch = self.epoch_store.epoch();
1634            let mut logger = logger.lock().unwrap();
1635            logger.write_commit_log(epoch, commit_info, false, &congestion_commit_data);
1636            logger.write_commit_log(epoch, commit_info, true, &randomness_congestion_commit_data);
1637        }
1638
1639        if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get()
1640            && let Err(e) = tx_object_debts.try_send(
1641                congestion_commit_data
1642                    .accumulated_debts
1643                    .iter()
1644                    .chain(randomness_congestion_commit_data.accumulated_debts.iter())
1645                    .map(|(id, _)| *id)
1646                    .collect(),
1647            )
1648        {
1649            info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
1650        }
1651
1652        state
1653            .output
1654            .set_congestion_control_object_debts(congestion_commit_data.accumulated_debts);
1655        state.output.set_congestion_control_randomness_object_debts(
1656            randomness_congestion_commit_data.accumulated_debts,
1657        );
1658
1659        (
1660            transactions_to_schedule,
1661            randomness_transactions_to_schedule,
1662            cancelled_txns,
1663            randomness_state_update_transaction,
1664        )
1665    }
1666
1667    fn process_transactions(
1668        &self,
1669        state: &mut CommitHandlerState,
1670        execution_time_estimator: &mut ExecutionTimeEstimator,
1671        commit_info: &ConsensusCommitInfo,
1672        authenticator_state_update_transaction: Option<VerifiedExecutableTransactionWithAliases>,
1673        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1674    ) -> (Vec<Schedulable>, Vec<Schedulable>, AssignedTxAndVersions) {
1675        let protocol_config = self.epoch_store.protocol_config();
1676        assert!(!protocol_config.split_checkpoints_in_consensus_handler());
1677        let epoch = self.epoch_store.epoch();
1678
1679        let (
1680            transactions_to_schedule,
1681            randomness_transactions_to_schedule,
1682            cancelled_txns,
1683            randomness_state_update_transaction,
1684        ) = self.collect_transactions_to_schedule(
1685            state,
1686            execution_time_estimator,
1687            commit_info,
1688            user_transactions,
1689        );
1690
1691        let mut settlement = None;
1692        let mut randomness_settlement = None;
1693        if self.epoch_store.accumulators_enabled() {
1694            let checkpoint_height = self
1695                .epoch_store
1696                .calculate_pending_checkpoint_height(commit_info.round);
1697
1698            settlement = Some(Schedulable::AccumulatorSettlement(epoch, checkpoint_height));
1699
1700            if state.randomness_round.is_some() || !randomness_transactions_to_schedule.is_empty() {
1701                randomness_settlement = Some(Schedulable::AccumulatorSettlement(
1702                    epoch,
1703                    checkpoint_height + 1,
1704                ));
1705            }
1706        }
1707
1708        let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1709            .then_some(Schedulable::ConsensusCommitPrologue(
1710                epoch,
1711                commit_info.round,
1712                commit_info.consensus_commit_ref.index,
1713            ));
1714
1715        let schedulables: Vec<_> = itertools::chain!(
1716            consensus_commit_prologue.into_iter(),
1717            authenticator_state_update_transaction
1718                .into_iter()
1719                .map(Schedulable::Transaction),
1720            transactions_to_schedule
1721                .into_iter()
1722                .map(Schedulable::Transaction),
1723            settlement,
1724        )
1725        .collect();
1726
1727        let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1728            .into_iter()
1729            .chain(
1730                randomness_transactions_to_schedule
1731                    .into_iter()
1732                    .map(Schedulable::Transaction),
1733            )
1734            .chain(randomness_settlement)
1735            .collect();
1736
1737        let assigned_versions = self
1738            .epoch_store
1739            .process_consensus_transaction_shared_object_versions(
1740                self.cache_reader.as_ref(),
1741                schedulables.iter(),
1742                randomness_schedulables.iter(),
1743                &cancelled_txns,
1744                &mut state.output,
1745            )
1746            .expect("failed to assign shared object versions");
1747
1748        let consensus_commit_prologue =
1749            self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1750
1751        let mut schedulables = schedulables;
1752        let mut assigned_versions = assigned_versions;
1753        if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1754            assert!(matches!(
1755                schedulables[0],
1756                Schedulable::ConsensusCommitPrologue(..)
1757            ));
1758            assert!(matches!(
1759                assigned_versions.0[0].0,
1760                TransactionKey::ConsensusCommitPrologue(..)
1761            ));
1762            assigned_versions.0[0].0 =
1763                TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1764            schedulables[0] = Schedulable::Transaction(consensus_commit_prologue);
1765        }
1766
1767        self.epoch_store
1768            .process_user_signatures(schedulables.iter().chain(randomness_schedulables.iter()));
1769
1770        // After this point we can throw away alias version information.
1771        let schedulables: Vec<Schedulable> = schedulables.into_iter().map(|s| s.into()).collect();
1772        let randomness_schedulables: Vec<Schedulable> = randomness_schedulables
1773            .into_iter()
1774            .map(|s| s.into())
1775            .collect();
1776
1777        (schedulables, randomness_schedulables, assigned_versions)
1778    }
1779
1780    #[allow(clippy::type_complexity)]
1781    fn create_pending_checkpoints_v2(
1782        &self,
1783        state: &mut CommitHandlerState,
1784        commit_info: &ConsensusCommitInfo,
1785        schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1786        randomness_schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1787        cancelled_txns: &BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1788        final_round: bool,
1789    ) -> CheckpointHeight {
1790        let protocol_config = self.epoch_store.protocol_config();
1791        assert!(protocol_config.split_checkpoints_in_consensus_handler());
1792
1793        let epoch = self.epoch_store.epoch();
1794        let accumulators_enabled = self.epoch_store.accumulators_enabled();
1795        let max_transactions_per_checkpoint =
1796            protocol_config.max_transactions_per_checkpoint() as usize;
1797
1798        let should_write_random_checkpoint = state.randomness_round.is_some()
1799            || (state.dkg_failed && !randomness_schedulables.is_empty());
1800
1801        let mut checkpoint_queue = self.checkpoint_queue.lock().unwrap();
1802
1803        let build_chunks =
1804            |schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1805             queue: &mut CheckpointQueue|
1806             -> Vec<Chunk<VerifiedExecutableTransactionWithAliases>> {
1807                schedulables
1808                    .chunks(max_transactions_per_checkpoint)
1809                    .map(|chunk| {
1810                        let height = queue.next_height();
1811                        let schedulables: Vec<_> = chunk.to_vec();
1812                        let settlement = if accumulators_enabled {
1813                            Some(Schedulable::AccumulatorSettlement(epoch, height))
1814                        } else {
1815                            None
1816                        };
1817                        Chunk {
1818                            schedulables,
1819                            settlement,
1820                            height,
1821                        }
1822                    })
1823                    .collect()
1824            };
1825
1826        let num_schedulables = schedulables.len();
1827        let chunked_schedulables = build_chunks(schedulables, &mut checkpoint_queue);
1828        if chunked_schedulables.len() > 1 {
1829            info!(
1830                "Splitting transactions into {} checkpoint chunks (num_schedulables={}, max_tx={})",
1831                chunked_schedulables.len(),
1832                num_schedulables,
1833                max_transactions_per_checkpoint
1834            );
1835            assert_reachable!("checkpoint split due to transaction limit");
1836        }
1837        let chunked_randomness_schedulables = if should_write_random_checkpoint {
1838            build_chunks(randomness_schedulables, &mut checkpoint_queue)
1839        } else {
1840            vec![]
1841        };
1842
1843        let schedulables_for_version_assignment =
1844            Chunk::all_schedulables_from(&chunked_schedulables);
1845        let randomness_schedulables_for_version_assignment =
1846            Chunk::all_schedulables_from(&chunked_randomness_schedulables);
1847
1848        let assigned_versions = self
1849            .epoch_store
1850            .process_consensus_transaction_shared_object_versions(
1851                self.cache_reader.as_ref(),
1852                schedulables_for_version_assignment,
1853                randomness_schedulables_for_version_assignment,
1854                cancelled_txns,
1855                &mut state.output,
1856            )
1857            .expect("failed to assign shared object versions");
1858
1859        let consensus_commit_prologue =
1860            self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1861
1862        let mut chunked_schedulables = chunked_schedulables;
1863        let mut assigned_versions = assigned_versions;
1864        if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1865            assert!(matches!(
1866                chunked_schedulables[0].schedulables[0],
1867                Schedulable::ConsensusCommitPrologue(..)
1868            ));
1869            assert!(matches!(
1870                assigned_versions.0[0].0,
1871                TransactionKey::ConsensusCommitPrologue(..)
1872            ));
1873            assigned_versions.0[0].0 =
1874                TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1875            chunked_schedulables[0].schedulables[0] =
1876                Schedulable::Transaction(consensus_commit_prologue);
1877        }
1878
1879        let assigned_versions = assigned_versions.into_map();
1880
1881        self.epoch_store.process_user_signatures(
1882            chunked_schedulables
1883                .iter()
1884                .flat_map(|c| c.all_schedulables())
1885                .chain(
1886                    chunked_randomness_schedulables
1887                        .iter()
1888                        .flat_map(|c| c.all_schedulables()),
1889                ),
1890        );
1891
1892        let commit_height = chunked_randomness_schedulables
1893            .last()
1894            .or(chunked_schedulables.last())
1895            .map(|c| c.height)
1896            .expect("at least one checkpoint root must be created per commit");
1897
1898        let mut pending_checkpoints = Vec::new();
1899        for chunk in chunked_schedulables {
1900            pending_checkpoints.extend(checkpoint_queue.push_chunk(
1901                chunk.into(),
1902                &assigned_versions,
1903                commit_info.timestamp,
1904                commit_info.consensus_commit_ref,
1905                commit_info.rejected_transactions_digest,
1906            ));
1907        }
1908
1909        if protocol_config.merge_randomness_into_checkpoint() {
1910            // We don't want to block checkpoint formation for non-randomness schedulables
1911            // on randomness state update. Therefore, we include randomness chunks in the
1912            // subsequent checkpoint. First we flush the queue, then enqueue randomness
1913            // for merging into the subsequent commit.
1914            pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, final_round));
1915
1916            if should_write_random_checkpoint {
1917                for chunk in chunked_randomness_schedulables {
1918                    pending_checkpoints.extend(checkpoint_queue.push_chunk(
1919                        chunk.into(),
1920                        &assigned_versions,
1921                        commit_info.timestamp,
1922                        commit_info.consensus_commit_ref,
1923                        commit_info.rejected_transactions_digest,
1924                    ));
1925                }
1926                if final_round {
1927                    pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1928                }
1929            }
1930        } else {
1931            let force = final_round || should_write_random_checkpoint;
1932            pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, force));
1933
1934            if should_write_random_checkpoint {
1935                for chunk in chunked_randomness_schedulables {
1936                    pending_checkpoints.extend(checkpoint_queue.push_chunk(
1937                        chunk.into(),
1938                        &assigned_versions,
1939                        commit_info.timestamp,
1940                        commit_info.consensus_commit_ref,
1941                        commit_info.rejected_transactions_digest,
1942                    ));
1943                }
1944                pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1945            }
1946        }
1947
1948        if final_round && let Some(last) = pending_checkpoints.last_mut() {
1949            last.details.last_of_epoch = true;
1950        }
1951
1952        let queue_drained = checkpoint_queue.is_empty();
1953        drop(checkpoint_queue);
1954
1955        for pending_checkpoint in pending_checkpoints {
1956            debug!(
1957                checkpoint_height = pending_checkpoint.details.checkpoint_height,
1958                roots_count = pending_checkpoint.num_roots(),
1959                "Writing pending checkpoint",
1960            );
1961            self.epoch_store
1962                .write_pending_checkpoint_v2(&mut state.output, &pending_checkpoint)
1963                .expect("failed to write pending checkpoint");
1964        }
1965
1966        state.output.set_checkpoint_queue_drained(queue_drained);
1967
1968        commit_height
1969    }
1970
1971    // Adds the consensus commit prologue transaction to the beginning of input `transactions` to update
1972    // the system clock used in all transactions in the current consensus commit.
1973    // Returns the root of the consensus commit prologue transaction if it was added to the input.
1974    fn add_consensus_commit_prologue_transaction<'a>(
1975        &'a self,
1976        state: &'a mut CommitHandlerState,
1977        commit_info: &'a ConsensusCommitInfo,
1978        assigned_versions: &AssignedTxAndVersions,
1979    ) -> Option<VerifiedExecutableTransactionWithAliases> {
1980        {
1981            if commit_info.skip_consensus_commit_prologue_in_test {
1982                return None;
1983            }
1984        }
1985
1986        let mut cancelled_txn_version_assignment = Vec::new();
1987
1988        let protocol_config = self.epoch_store.protocol_config();
1989
1990        for (txn_key, assigned_versions) in assigned_versions.0.iter() {
1991            let Some(d) = txn_key.as_digest() else {
1992                continue;
1993            };
1994
1995            if !protocol_config.include_cancelled_randomness_txns_in_prologue()
1996                && assigned_versions
1997                    .shared_object_versions
1998                    .iter()
1999                    .any(|((id, _), _)| *id == SUI_RANDOMNESS_STATE_OBJECT_ID)
2000            {
2001                continue;
2002            }
2003
2004            if assigned_versions
2005                .shared_object_versions
2006                .iter()
2007                .any(|(_, version)| version.is_cancelled())
2008            {
2009                assert_reachable!("cancelled transactions");
2010                cancelled_txn_version_assignment
2011                    .push((*d, assigned_versions.shared_object_versions.clone()));
2012            }
2013        }
2014
2015        fail_point_arg!(
2016            "additional_cancelled_txns_for_tests",
2017            |additional_cancelled_txns: Vec<(
2018                TransactionDigest,
2019                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>
2020            )>| {
2021                cancelled_txn_version_assignment.extend(additional_cancelled_txns);
2022            }
2023        );
2024
2025        let transaction = commit_info.create_consensus_commit_prologue_transaction(
2026            self.epoch_store.epoch(),
2027            self.epoch_store.protocol_config(),
2028            cancelled_txn_version_assignment,
2029            commit_info,
2030            state.indirect_state_observer.take().unwrap(),
2031        );
2032        Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2033            transaction,
2034        ))
2035    }
2036
2037    fn handle_deferral_and_cancellation(
2038        &self,
2039        state: &mut CommitHandlerState,
2040        cancelled_txns: &mut BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
2041        deferred_txns: &mut BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>,
2042        scheduled_txns: &mut Vec<VerifiedExecutableTransactionWithAliases>,
2043        protocol_config: &ProtocolConfig,
2044        commit_info: &ConsensusCommitInfo,
2045        transaction: VerifiedExecutableTransactionWithAliases,
2046        shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
2047        previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
2048        execution_time_estimator: &ExecutionTimeEstimator,
2049    ) {
2050        // Check for unpaid amplification before other deferral checks.
2051        // SIP-45: Paid amplification allows (gas_price / RGP + 1) submissions.
2052        // Transactions with more duplicates than paid for are deferred.
2053        if protocol_config.defer_unpaid_amplification() {
2054            let occurrence_count = state
2055                .occurrence_counts
2056                .get(transaction.tx().digest())
2057                .copied()
2058                .unwrap_or(0);
2059
2060            let rgp = self.epoch_store.reference_gas_price();
2061            let gas_price = transaction.tx().transaction_data().gas_price();
2062            let allowed_count = (gas_price / rgp.max(1)) + 1;
2063
2064            if occurrence_count as u64 > allowed_count {
2065                self.metrics
2066                    .consensus_handler_unpaid_amplification_deferrals
2067                    .inc();
2068
2069                let deferred_from_round = previously_deferred_tx_digests
2070                    .get(transaction.tx().digest())
2071                    .map(|k| k.deferred_from_round())
2072                    .unwrap_or(commit_info.round);
2073
2074                let deferral_key = DeferralKey::new_for_consensus_round(
2075                    commit_info.round + 1,
2076                    deferred_from_round,
2077                );
2078
2079                if transaction_deferral_within_limit(
2080                    &deferral_key,
2081                    protocol_config.max_deferral_rounds_for_congestion_control(),
2082                ) {
2083                    assert_reachable!("unpaid amplification deferral");
2084                    debug!(
2085                        "Deferring transaction {:?} due to unpaid amplification (count={}, allowed={})",
2086                        transaction.tx().digest(),
2087                        occurrence_count,
2088                        allowed_count
2089                    );
2090                    deferred_txns
2091                        .entry(deferral_key)
2092                        .or_default()
2093                        .push(transaction);
2094                    return;
2095                }
2096            }
2097        }
2098
2099        let tx_cost = shared_object_congestion_tracker.get_tx_cost(
2100            execution_time_estimator,
2101            transaction.tx(),
2102            state.indirect_state_observer.as_mut().unwrap(),
2103        );
2104
2105        let deferral_info = self.epoch_store.should_defer(
2106            transaction.tx(),
2107            commit_info,
2108            state.dkg_failed,
2109            state.randomness_round.is_some(),
2110            previously_deferred_tx_digests,
2111            shared_object_congestion_tracker,
2112        );
2113
2114        if let Some((deferral_key, deferral_reason)) = deferral_info {
2115            debug!(
2116                "Deferring consensus certificate for transaction {:?} until {:?}",
2117                transaction.tx().digest(),
2118                deferral_key
2119            );
2120
2121            match deferral_reason {
2122                DeferralReason::RandomnessNotReady => {
2123                    deferred_txns
2124                        .entry(deferral_key)
2125                        .or_default()
2126                        .push(transaction);
2127                }
2128                DeferralReason::SharedObjectCongestion(congested_objects) => {
2129                    self.metrics.consensus_handler_congested_transactions.inc();
2130                    if transaction_deferral_within_limit(
2131                        &deferral_key,
2132                        protocol_config.max_deferral_rounds_for_congestion_control(),
2133                    ) {
2134                        deferred_txns
2135                            .entry(deferral_key)
2136                            .or_default()
2137                            .push(transaction);
2138                    } else {
2139                        assert_sometimes!(
2140                            transaction.tx().data().transaction_data().uses_randomness(),
2141                            "cancelled randomness-using transaction"
2142                        );
2143                        assert_sometimes!(
2144                            !transaction.tx().data().transaction_data().uses_randomness(),
2145                            "cancelled non-randomness-using transaction"
2146                        );
2147
2148                        // Cancel the transaction that has been deferred for too long.
2149                        debug!(
2150                            "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}",
2151                            transaction.tx().digest(),
2152                            deferral_key,
2153                            congested_objects
2154                        );
2155                        cancelled_txns.insert(
2156                            *transaction.tx().digest(),
2157                            CancelConsensusCertificateReason::CongestionOnObjects(
2158                                congested_objects,
2159                            ),
2160                        );
2161                        scheduled_txns.push(transaction);
2162                    }
2163                }
2164            }
2165        } else {
2166            // Update object execution cost for all scheduled transactions
2167            shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, transaction.tx());
2168            scheduled_txns.push(transaction);
2169        }
2170    }
2171
2172    fn merge_and_reorder_transactions(
2173        &self,
2174        state: &mut CommitHandlerState,
2175        commit_info: &ConsensusCommitInfo,
2176        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
2177    ) -> (
2178        Vec<VerifiedExecutableTransactionWithAliases>,
2179        Vec<VerifiedExecutableTransactionWithAliases>,
2180        HashMap<TransactionDigest, DeferralKey>,
2181    ) {
2182        let protocol_config = self.epoch_store.protocol_config();
2183
2184        let (mut txns, mut randomness_txns, previously_deferred_tx_digests) =
2185            self.load_deferred_transactions(state, commit_info);
2186
2187        txns.reserve(user_transactions.len());
2188        randomness_txns.reserve(user_transactions.len());
2189
2190        // There may be randomness transactions in `txns`, which were deferred due to congestion.
2191        // They must be placed back into `randomness_txns`.
2192        let mut txns: Vec<_> = txns
2193            .into_iter()
2194            .filter_map(|tx| {
2195                if tx.tx().transaction_data().uses_randomness() {
2196                    randomness_txns.push(tx);
2197                    None
2198                } else {
2199                    Some(tx)
2200                }
2201            })
2202            .collect();
2203
2204        for txn in user_transactions {
2205            if txn.tx().transaction_data().uses_randomness() {
2206                randomness_txns.push(txn);
2207            } else {
2208                txns.push(txn);
2209            }
2210        }
2211
2212        PostConsensusTxReorder::reorder(
2213            &mut txns,
2214            protocol_config.consensus_transaction_ordering(),
2215        );
2216        PostConsensusTxReorder::reorder(
2217            &mut randomness_txns,
2218            protocol_config.consensus_transaction_ordering(),
2219        );
2220
2221        (txns, randomness_txns, previously_deferred_tx_digests)
2222    }
2223
2224    fn load_deferred_transactions(
2225        &self,
2226        state: &mut CommitHandlerState,
2227        commit_info: &ConsensusCommitInfo,
2228    ) -> (
2229        Vec<VerifiedExecutableTransactionWithAliases>,
2230        Vec<VerifiedExecutableTransactionWithAliases>,
2231        HashMap<TransactionDigest, DeferralKey>,
2232    ) {
2233        let mut previously_deferred_tx_digests = HashMap::new();
2234
2235        let deferred_txs: Vec<_> = self
2236            .epoch_store
2237            .load_deferred_transactions_for_up_to_consensus_round_v2(
2238                &mut state.output,
2239                commit_info.round,
2240            )
2241            .expect("db error")
2242            .into_iter()
2243            .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2244            .map(|(key, tx)| {
2245                previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2246                tx
2247            })
2248            .collect();
2249        trace!(
2250            "loading deferred transactions: {:?}",
2251            deferred_txs.iter().map(|tx| tx.tx().digest())
2252        );
2253
2254        let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() {
2255            let txns: Vec<_> = self
2256                .epoch_store
2257                .load_deferred_transactions_for_randomness_v2(&mut state.output)
2258                .expect("db error")
2259                .into_iter()
2260                .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2261                .map(|(key, tx)| {
2262                    previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2263                    tx
2264                })
2265                .collect();
2266            trace!(
2267                "loading deferred randomness transactions: {:?}",
2268                txns.iter().map(|tx| tx.tx().digest())
2269            );
2270            txns
2271        } else {
2272            vec![]
2273        };
2274
2275        (
2276            deferred_txs,
2277            deferred_randomness_txs,
2278            previously_deferred_tx_digests,
2279        )
2280    }
2281
2282    fn init_congestion_tracker(
2283        &self,
2284        commit_info: &ConsensusCommitInfo,
2285        for_randomness: bool,
2286        txns: &[VerifiedExecutableTransactionWithAliases],
2287    ) -> SharedObjectCongestionTracker {
2288        #[allow(unused_mut)]
2289        let mut ret = SharedObjectCongestionTracker::from_protocol_config(
2290            self.epoch_store
2291                .consensus_quarantine
2292                .read()
2293                .load_initial_object_debts(
2294                    &self.epoch_store,
2295                    commit_info.round,
2296                    for_randomness,
2297                    txns,
2298                )
2299                .expect("db error"),
2300            self.epoch_store.protocol_config(),
2301            for_randomness,
2302            self.congestion_logger.is_some(),
2303        );
2304
2305        fail_point_arg!(
2306            "initial_congestion_tracker",
2307            |tracker: SharedObjectCongestionTracker| {
2308                info!(
2309                    "Initialize shared_object_congestion_tracker to  {:?}",
2310                    tracker
2311                );
2312                ret = tracker;
2313            }
2314        );
2315
2316        ret
2317    }
2318
2319    fn process_gasless_transactions(
2320        &self,
2321        commit_info: &ConsensusCommitInfo,
2322        user_transactions: &[VerifiedExecutableTransactionWithAliases],
2323    ) {
2324        let gasless_count = user_transactions
2325            .iter()
2326            .filter(|txn| txn.tx().transaction_data().is_gasless_transaction())
2327            .count() as u64;
2328        self.consensus_gasless_counter
2329            .record_commit(commit_info.timestamp, gasless_count);
2330    }
2331
2332    fn process_jwks(
2333        &self,
2334        state: &mut CommitHandlerState,
2335        commit_info: &ConsensusCommitInfo,
2336        new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
2337    ) {
2338        for (authority_name, jwk_id, jwk) in new_jwks {
2339            self.epoch_store.record_jwk_vote(
2340                &mut state.output,
2341                commit_info.round,
2342                authority_name,
2343                &jwk_id,
2344                &jwk,
2345            );
2346        }
2347    }
2348
2349    fn process_capability_notifications(
2350        &self,
2351        capability_notifications: Vec<AuthorityCapabilitiesV2>,
2352    ) {
2353        for capabilities in capability_notifications {
2354            self.epoch_store
2355                .record_capabilities_v2(&capabilities)
2356                .expect("db error");
2357        }
2358    }
2359
2360    fn process_execution_time_observations(
2361        &self,
2362        state: &mut CommitHandlerState,
2363        execution_time_observations: Vec<ExecutionTimeObservation>,
2364    ) {
2365        let mut execution_time_estimator = self
2366            .epoch_store
2367            .execution_time_estimator
2368            .try_lock()
2369            .expect("should only ever be called from the commit handler thread");
2370
2371        for ExecutionTimeObservation {
2372            authority,
2373            generation,
2374            estimates,
2375        } in execution_time_observations
2376        {
2377            let authority_index = self
2378                .epoch_store
2379                .committee()
2380                .authority_index(&authority)
2381                .unwrap();
2382            execution_time_estimator.process_observations_from_consensus(
2383                authority_index,
2384                Some(generation),
2385                &estimates,
2386            );
2387            state
2388                .output
2389                .insert_execution_time_observation(authority_index, generation, estimates);
2390        }
2391    }
2392
2393    fn process_checkpoint_signature_messages(
2394        &self,
2395        checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
2396    ) {
2397        for checkpoint_signature_message in checkpoint_signature_messages {
2398            self.checkpoint_service
2399                .notify_checkpoint_signature(&checkpoint_signature_message)
2400                .expect("db error");
2401        }
2402    }
2403
2404    async fn process_dkg_updates(
2405        &self,
2406        state: &mut CommitHandlerState,
2407        commit_info: &ConsensusCommitInfo,
2408        randomness_manager: Option<&mut RandomnessManager>,
2409        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2410        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2411    ) {
2412        if !self.epoch_store.randomness_state_enabled() {
2413            let num_dkg_messages = randomness_dkg_messages.len();
2414            let num_dkg_confirmations = randomness_dkg_confirmations.len();
2415            if num_dkg_messages + num_dkg_confirmations > 0 {
2416                debug_fatal!(
2417                    "received {} RandomnessDkgMessage and {} RandomnessDkgConfirmation messages when randomness is not enabled",
2418                    num_dkg_messages,
2419                    num_dkg_confirmations
2420                );
2421            }
2422            return;
2423        }
2424
2425        let randomness_manager =
2426            randomness_manager.expect("randomness manager should exist if randomness is enabled");
2427
2428        let randomness_dkg_updates =
2429            self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages);
2430
2431        let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations(
2432            state,
2433            randomness_manager,
2434            randomness_dkg_confirmations,
2435        );
2436
2437        if randomness_dkg_updates || randomness_dkg_confirmation_updates {
2438            randomness_manager
2439                .advance_dkg(&mut state.output, commit_info.round)
2440                .await
2441                .expect("epoch ended");
2442        }
2443    }
2444
2445    fn process_randomness_dkg_messages(
2446        &self,
2447        randomness_manager: &mut RandomnessManager,
2448        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2449    ) -> bool /* randomness state updated */ {
2450        if randomness_dkg_messages.is_empty() {
2451            return false;
2452        }
2453
2454        let mut randomness_state_updated = false;
2455        for (authority, bytes) in randomness_dkg_messages {
2456            match bcs::from_bytes(&bytes) {
2457                Ok(message) => {
2458                    randomness_manager
2459                        .add_message(&authority, message)
2460                        // TODO: make infallible
2461                        .expect("epoch ended");
2462                    randomness_state_updated = true;
2463                }
2464
2465                Err(e) => {
2466                    warn!(
2467                        "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
2468                        authority.concise(),
2469                    );
2470                }
2471            }
2472        }
2473
2474        randomness_state_updated
2475    }
2476
2477    fn process_randomness_dkg_confirmations(
2478        &self,
2479        state: &mut CommitHandlerState,
2480        randomness_manager: &mut RandomnessManager,
2481        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2482    ) -> bool /* randomness state updated */ {
2483        if randomness_dkg_confirmations.is_empty() {
2484            return false;
2485        }
2486
2487        let mut randomness_state_updated = false;
2488        for (authority, bytes) in randomness_dkg_confirmations {
2489            match bcs::from_bytes(&bytes) {
2490                Ok(message) => {
2491                    randomness_manager
2492                        .add_confirmation(&mut state.output, &authority, message)
2493                        // TODO: make infallible
2494                        .expect("epoch ended");
2495                    randomness_state_updated = true;
2496                }
2497                Err(e) => {
2498                    warn!(
2499                        "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}",
2500                        authority.concise(),
2501                    );
2502                }
2503            }
2504        }
2505
2506        randomness_state_updated
2507    }
2508
2509    /// Returns true if we have collected a quorum of end of publish messages (either in this round or a previous round).
2510    fn process_end_of_publish_transactions(
2511        &self,
2512        state: &mut CommitHandlerState,
2513        end_of_publish_transactions: Vec<AuthorityName>,
2514    ) -> bool {
2515        let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect(
2516            "No contention on end_of_publish as it is only accessed from consensus handler",
2517        );
2518
2519        if eop_aggregator.has_quorum() {
2520            return true;
2521        }
2522
2523        if end_of_publish_transactions.is_empty() {
2524            return false;
2525        }
2526
2527        for authority in end_of_publish_transactions {
2528            info!("Received EndOfPublish from {:?}", authority.concise());
2529
2530            // It is ok to just release lock here as this function is the only place that transition into RejectAllCerts state
2531            // And this function itself is always executed from consensus task
2532            state.output.insert_end_of_publish(authority);
2533            if eop_aggregator
2534                .insert_generic(authority, ())
2535                .is_quorum_reached()
2536            {
2537                debug!(
2538                    "Collected enough end_of_publish messages with last message from validator {:?}",
2539                    authority.concise(),
2540                );
2541                return true;
2542            }
2543        }
2544
2545        false
2546    }
2547
2548    /// After we have collected 2f+1 EndOfPublish messages, we call this function every round until the epoch
2549    /// ends.
2550    fn advance_eop_state_machine(
2551        &self,
2552        state: &mut CommitHandlerState,
2553    ) -> (
2554        RwLockWriteGuard<'_, ReconfigState>,
2555        bool, // true if final round
2556    ) {
2557        let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard();
2558        let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx();
2559
2560        reconfig_state.close_all_certs();
2561
2562        let commit_has_deferred_txns = state.output.has_deferred_transactions();
2563        let previous_commits_have_deferred_txns = !self.epoch_store.deferred_transactions_empty();
2564
2565        if !commit_has_deferred_txns && !previous_commits_have_deferred_txns {
2566            if !start_state_is_reject_all_tx {
2567                info!("Transitioning to RejectAllTx");
2568            }
2569            reconfig_state.close_all_tx();
2570        } else {
2571            debug!(
2572                "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}",
2573                previous_commits_have_deferred_txns, commit_has_deferred_txns,
2574            );
2575        }
2576
2577        state.output.store_reconfig_state(reconfig_state.clone());
2578
2579        if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() {
2580            (reconfig_state, true)
2581        } else {
2582            (reconfig_state, false)
2583        }
2584    }
2585
2586    fn gather_commit_metadata(
2587        &self,
2588        consensus_commit: &impl ConsensusCommitAPI,
2589    ) -> (u64, AuthorityIndex, u64) {
2590        let timestamp = consensus_commit.commit_timestamp_ms();
2591        let leader_author = consensus_commit.leader_author_index();
2592        let commit_sub_dag_index = consensus_commit.commit_sub_dag_index();
2593
2594        let system_time_ms = SystemTime::now()
2595            .duration_since(UNIX_EPOCH)
2596            .unwrap()
2597            .as_millis() as i64;
2598
2599        let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64);
2600        let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0;
2601        self.metrics
2602            .consensus_timestamp_bias
2603            .observe(consensus_timestamp_bias_seconds);
2604
2605        let epoch_start = self
2606            .epoch_store
2607            .epoch_start_config()
2608            .epoch_start_timestamp_ms();
2609        let timestamp = if timestamp < epoch_start {
2610            error!(
2611                "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}"
2612            );
2613            epoch_start
2614        } else {
2615            timestamp
2616        };
2617
2618        (timestamp, leader_author, commit_sub_dag_index)
2619    }
2620
2621    fn create_authenticator_state_update(
2622        &self,
2623        last_committed_round: u64,
2624        commit_info: &ConsensusCommitInfo,
2625    ) -> Option<VerifiedExecutableTransactionWithAliases> {
2626        // Load all jwks that became active in the previous round, and commit them in this round.
2627        // We want to delay one round because none of the transactions in the previous round could
2628        // have been authenticated with the jwks that became active in that round.
2629        //
2630        // Because of this delay, jwks that become active in the last round of the epoch will
2631        // never be committed. That is ok, because in the new epoch, the validators should
2632        // immediately re-submit these jwks, and they can become active then.
2633        let new_jwks = self
2634            .epoch_store
2635            .get_new_jwks(last_committed_round)
2636            .expect("Unrecoverable error in consensus handler");
2637
2638        if !new_jwks.is_empty() {
2639            let authenticator_state_update_transaction = authenticator_state_update_transaction(
2640                &self.epoch_store,
2641                commit_info.round,
2642                new_jwks,
2643            );
2644            debug!(
2645                "adding AuthenticatorStateUpdate({:?}) tx: {:?}",
2646                authenticator_state_update_transaction.digest(),
2647                authenticator_state_update_transaction,
2648            );
2649
2650            Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2651                authenticator_state_update_transaction,
2652            ))
2653        } else {
2654            None
2655        }
2656    }
2657
2658    // Filters out rejected or deprecated transactions.
2659    // Returns FilteredConsensusOutput containing transactions and owned_object_locks.
2660    #[instrument(level = "trace", skip_all)]
2661    fn filter_consensus_txns(
2662        &mut self,
2663        initial_reconfig_state: ReconfigState,
2664        commit_info: &ConsensusCommitInfo,
2665        consensus_commit: &impl ConsensusCommitAPI,
2666    ) -> FilteredConsensusOutput {
2667        let mut transactions = Vec::new();
2668        let mut owned_object_locks = HashMap::new();
2669        let epoch = self.epoch_store.epoch();
2670        let mut num_finalized_user_transactions = vec![0; self.committee.size()];
2671        let mut num_rejected_user_transactions = vec![0; self.committee.size()];
2672        for (block, parsed_transactions) in consensus_commit.transactions() {
2673            let author = block.author.value();
2674            // TODO: consider only messages within 1~3 rounds of the leader?
2675            self.last_consensus_stats.stats.inc_num_messages(author);
2676
2677            // Set the "ping" transaction status for this block. This is necessary as there might be some ping requests waiting for the ping transaction to be certified.
2678            self.epoch_store.set_consensus_tx_status(
2679                ConsensusPosition::ping(epoch, block),
2680                ConsensusTxStatus::Finalized,
2681            );
2682
2683            for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() {
2684                let position = ConsensusPosition {
2685                    epoch,
2686                    block,
2687                    index: tx_index as TransactionIndex,
2688                };
2689
2690                // Transaction has appeared in consensus output, we can increment the submission count
2691                // for this tx for DoS protection.
2692                if let Some(tx) = parsed.transaction.kind.as_user_transaction() {
2693                    let digest = tx.digest();
2694                    if let Some((spam_weight, submitter_client_addrs)) = self
2695                        .epoch_store
2696                        .submitted_transaction_cache
2697                        .increment_submission_count(digest)
2698                    {
2699                        if let Some(ref traffic_controller) = self.traffic_controller {
2700                            debug!(
2701                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses",
2702                                submitter_client_addrs.len()
2703                            );
2704
2705                            // Apply spam weight to all client addresses that submitted this transaction
2706                            for addr in submitter_client_addrs {
2707                                traffic_controller.tally(TrafficTally::new(
2708                                    Some(addr),
2709                                    None,
2710                                    None,
2711                                    spam_weight.clone(),
2712                                ));
2713                            }
2714                        } else {
2715                            warn!(
2716                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)",
2717                                submitter_client_addrs.len()
2718                            );
2719                        }
2720                    }
2721                }
2722
2723                if parsed.rejected {
2724                    // TODO(fastpath): Add metrics for rejected transactions.
2725                    if parsed.transaction.is_user_transaction() {
2726                        self.epoch_store
2727                            .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
2728                        num_rejected_user_transactions[author] += 1;
2729                    }
2730                    // Skip processing rejected transactions.
2731                    continue;
2732                }
2733
2734                let kind = classify(&parsed.transaction);
2735                self.metrics
2736                    .consensus_handler_processed
2737                    .with_label_values(&[kind])
2738                    .inc();
2739                self.metrics
2740                    .consensus_handler_transaction_sizes
2741                    .with_label_values(&[kind])
2742                    .observe(parsed.serialized_len as f64);
2743                if parsed.transaction.is_user_transaction() {
2744                    self.last_consensus_stats
2745                        .stats
2746                        .inc_num_user_transactions(author);
2747                }
2748
2749                if !initial_reconfig_state.should_accept_consensus_certs() {
2750                    // (Note: we no longer need to worry about the previously deferred condition, since we are only
2751                    // processing newly-received transactions at this time).
2752                    match &parsed.transaction.kind {
2753                        ConsensusTransactionKind::UserTransactionV2(_)
2754                        // deprecated and ignore later, but added for exhaustive match
2755                        | ConsensusTransactionKind::UserTransaction(_)
2756                        | ConsensusTransactionKind::CertifiedTransaction(_)
2757                        | ConsensusTransactionKind::CapabilityNotification(_)
2758                        | ConsensusTransactionKind::CapabilityNotificationV2(_)
2759                        | ConsensusTransactionKind::EndOfPublish(_)
2760                        // Note: we no longer have to check protocol_config.ignore_execution_time_observations_after_certs_closed()
2761                        | ConsensusTransactionKind::ExecutionTimeObservation(_)
2762                        | ConsensusTransactionKind::NewJWKFetched(_, _, _) => {
2763                            debug!(
2764                                "Ignoring consensus transaction {:?} because of end of epoch",
2765                                parsed.transaction.key()
2766                            );
2767                            continue;
2768                        }
2769
2770                        // These are the message types that are still processed even if !should_accept_consensus_certs()
2771                        ConsensusTransactionKind::CheckpointSignature(_)
2772                        | ConsensusTransactionKind::CheckpointSignatureV2(_)
2773                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2774                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
2775                        | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => ()
2776                    }
2777                }
2778
2779                if !initial_reconfig_state.should_accept_tx() {
2780                    match &parsed.transaction.kind {
2781                        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
2782                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue,
2783                        _ => {}
2784                    }
2785                }
2786
2787                // Handle deprecated messages
2788                match &parsed.transaction.kind {
2789                    ConsensusTransactionKind::CapabilityNotification(_)
2790                    | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2791                    | ConsensusTransactionKind::CheckpointSignature(_) => {
2792                        debug_fatal!(
2793                            "BUG: saw deprecated tx {:?}for commit round {}",
2794                            parsed.transaction.key(),
2795                            commit_info.round
2796                        );
2797                        continue;
2798                    }
2799                    _ => {}
2800                }
2801
2802                if parsed.transaction.is_user_transaction() {
2803                    let author_name = self
2804                        .epoch_store
2805                        .committee()
2806                        .authority_by_index(author as u32)
2807                        .unwrap();
2808                    if self
2809                        .epoch_store
2810                        .has_received_end_of_publish_from(author_name)
2811                    {
2812                        // In some edge cases, consensus might resend previously seen certificate after EndOfPublish
2813                        // An honest validator should not send a new transaction after EndOfPublish. Whether the
2814                        // transaction is duplicate or not, we filter it out here.
2815                        warn!(
2816                            "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus",
2817                            author_name.concise(),
2818                            parsed.transaction.key(),
2819                        );
2820                        continue;
2821                    }
2822                }
2823
2824                // Perform post-consensus owned object conflict detection. If lock acquisition
2825                // fails, the transaction has invalid/conflicting owned inputs and should be dropped.
2826                // This must happen AFTER all filtering checks above to avoid acquiring locks
2827                // for transactions that will be dropped (e.g., during epoch change).
2828                // Only applies to UserTransactionV2 - other transaction types don't need lock acquisition.
2829                if let ConsensusTransactionKind::UserTransactionV2(tx_with_claims) =
2830                    &parsed.transaction.kind
2831                {
2832                    let immutable_object_ids: HashSet<ObjectID> =
2833                        tx_with_claims.get_immutable_objects().into_iter().collect();
2834                    let tx = tx_with_claims.tx();
2835
2836                    let Ok(input_objects) = tx.transaction_data().input_objects() else {
2837                        debug_fatal!("Invalid input objects for transaction {}", tx.digest());
2838                        continue;
2839                    };
2840
2841                    // Filter ImmOrOwnedMoveObject inputs, excluding those claimed to be immutable.
2842                    // Immutable objects don't need lock acquisition as they can be used concurrently.
2843                    let owned_object_refs: Vec<_> = input_objects
2844                        .iter()
2845                        .filter_map(|obj| match obj {
2846                            InputObjectKind::ImmOrOwnedMoveObject(obj_ref)
2847                                if !immutable_object_ids.contains(&obj_ref.0) =>
2848                            {
2849                                Some(*obj_ref)
2850                            }
2851                            _ => None,
2852                        })
2853                        .collect();
2854
2855                    match self
2856                        .epoch_store
2857                        .try_acquire_owned_object_locks_post_consensus(
2858                            &owned_object_refs,
2859                            *tx.digest(),
2860                            &owned_object_locks,
2861                        ) {
2862                        Ok(new_locks) => {
2863                            owned_object_locks.extend(new_locks.into_iter());
2864                            // Lock acquisition succeeded - now set Finalized status
2865                            self.epoch_store
2866                                .set_consensus_tx_status(position, ConsensusTxStatus::Finalized);
2867                            num_finalized_user_transactions[author] += 1;
2868                        }
2869                        Err(e) => {
2870                            debug!("Dropping transaction {}: {}", tx.digest(), e);
2871                            self.epoch_store
2872                                .set_consensus_tx_status(position, ConsensusTxStatus::Dropped);
2873                            self.epoch_store.set_rejection_vote_reason(position, &e);
2874                            continue;
2875                        }
2876                    }
2877                }
2878
2879                let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);
2880                transactions.push((transaction, author as u32));
2881            }
2882        }
2883
2884        for (i, authority) in self.committee.authorities() {
2885            let hostname = &authority.hostname;
2886            self.metrics
2887                .consensus_committed_messages
2888                .with_label_values(&[hostname])
2889                .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
2890            self.metrics
2891                .consensus_committed_user_transactions
2892                .with_label_values(&[hostname])
2893                .set(
2894                    self.last_consensus_stats
2895                        .stats
2896                        .get_num_user_transactions(i.value()) as i64,
2897                );
2898            self.metrics
2899                .consensus_finalized_user_transactions
2900                .with_label_values(&[hostname])
2901                .add(num_finalized_user_transactions[i.value()] as i64);
2902            self.metrics
2903                .consensus_rejected_user_transactions
2904                .with_label_values(&[hostname])
2905                .add(num_rejected_user_transactions[i.value()] as i64);
2906        }
2907
2908        FilteredConsensusOutput {
2909            transactions,
2910            owned_object_locks,
2911        }
2912    }
2913
2914    fn deduplicate_consensus_txns(
2915        &mut self,
2916        state: &mut CommitHandlerState,
2917        commit_info: &ConsensusCommitInfo,
2918        transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
2919    ) -> Vec<VerifiedSequencedConsensusTransaction> {
2920        let mut all_transactions = Vec::new();
2921
2922        // Track occurrence counts for each transaction key within this commit.
2923        // Also serves as the deduplication set (count > 1 means duplicate within commit).
2924        let mut occurrence_counts: HashMap<SequencedConsensusTransactionKey, u32> = HashMap::new();
2925        // Keys being seen for the first time (not duplicates from previous commits).
2926        let mut first_commit_keys: HashSet<SequencedConsensusTransactionKey> = HashSet::new();
2927
2928        for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
2929            // In process_consensus_transactions_and_commit_boundary(), we will add a system consensus commit
2930            // prologue transaction, which will be the first transaction in this consensus commit batch.
2931            // Therefore, the transaction sequence number starts from 1 here.
2932            let current_tx_index = ExecutionIndices {
2933                last_committed_round: commit_info.round,
2934                sub_dag_index: commit_info.consensus_commit_ref.index.into(),
2935                transaction_index: (seq + 1) as u64,
2936            };
2937
2938            self.last_consensus_stats.index = current_tx_index;
2939
2940            let certificate_author = *self
2941                .epoch_store
2942                .committee()
2943                .authority_by_index(cert_origin)
2944                .unwrap();
2945
2946            let sequenced_transaction = SequencedConsensusTransaction {
2947                certificate_author_index: cert_origin,
2948                certificate_author,
2949                consensus_index: current_tx_index,
2950                transaction,
2951            };
2952
2953            let Some(verified_transaction) = self
2954                .epoch_store
2955                .verify_consensus_transaction(sequenced_transaction)
2956            else {
2957                continue;
2958            };
2959
2960            let key = verified_transaction.0.key();
2961
2962            if let Some(tx_digest) = key.user_transaction_digest() {
2963                self.epoch_store
2964                    .cache_recently_finalized_transaction(tx_digest);
2965            }
2966
2967            // Increment count and check if this is a duplicate within this commit.
2968            // This replaces the separate processed_set HashSet.
2969            let count = occurrence_counts.entry(key.clone()).or_insert(0);
2970            *count += 1;
2971            let in_commit = *count > 1;
2972
2973            let in_cache = self.processed_cache.put(key.clone(), ()).is_some();
2974            if in_commit || in_cache {
2975                self.metrics.skipped_consensus_txns_cache_hit.inc();
2976                continue;
2977            }
2978            if self
2979                .epoch_store
2980                .is_consensus_message_processed(&key)
2981                .expect("db error")
2982            {
2983                self.metrics.skipped_consensus_txns.inc();
2984                continue;
2985            }
2986
2987            first_commit_keys.insert(key.clone());
2988
2989            state.output.record_consensus_message_processed(key);
2990
2991            all_transactions.push(verified_transaction);
2992        }
2993
2994        for key in first_commit_keys {
2995            if let Some(&count) = occurrence_counts.get(&key)
2996                && count > 1
2997            {
2998                self.metrics
2999                    .consensus_handler_duplicate_tx_count
3000                    .observe(count as f64);
3001            }
3002        }
3003
3004        // Copy user transaction occurrence counts to state for unpaid amplification detection.
3005        assert!(
3006            state.occurrence_counts.is_empty(),
3007            "occurrence_counts should be empty before populating"
3008        );
3009        state.occurrence_counts.reserve(occurrence_counts.len());
3010        state.occurrence_counts.extend(
3011            occurrence_counts
3012                .into_iter()
3013                .filter_map(|(key, count)| key.user_transaction_digest().map(|d| (d, count))),
3014        );
3015
3016        all_transactions
3017    }
3018
3019    fn build_commit_handler_input(
3020        &self,
3021        transactions: Vec<VerifiedSequencedConsensusTransaction>,
3022    ) -> CommitHandlerInput {
3023        let epoch = self.epoch_store.epoch();
3024        let mut commit_handler_input = CommitHandlerInput::default();
3025
3026        for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() {
3027            match transaction.transaction {
3028                SequencedConsensusTransactionKind::External(consensus_transaction) => {
3029                    match consensus_transaction.kind {
3030                        // === User transactions ===
3031                        ConsensusTransactionKind::UserTransactionV2(tx) => {
3032                            // Extract the aliases claim (required) from the claims
3033                            let used_alias_versions = if self
3034                                .epoch_store
3035                                .protocol_config()
3036                                .fix_checkpoint_signature_mapping()
3037                            {
3038                                tx.aliases()
3039                            } else {
3040                                // Convert V1 to V2 format using dummy signature indices
3041                                // which will be ignored with `fix_checkpoint_signature_mapping`
3042                                // disabled.
3043                                tx.aliases_v1().map(|a| {
3044                                    NonEmpty::from_vec(
3045                                        a.into_iter()
3046                                            .enumerate()
3047                                            .map(|(idx, (_, seq))| (idx as u8, seq))
3048                                            .collect(),
3049                                    )
3050                                    .unwrap()
3051                                })
3052                            };
3053                            let inner_tx = tx.into_tx();
3054                            // Safe because transactions are certified by consensus.
3055                            let tx = VerifiedTransaction::new_unchecked(inner_tx);
3056                            // TODO(fastpath): accept position in consensus, after plumbing consensus round, authority index, and transaction index here.
3057                            let transaction =
3058                                VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
3059                            if let Some(used_alias_versions) = used_alias_versions {
3060                                commit_handler_input
3061                                    .user_transactions
3062                                    .push(WithAliases::new(transaction, used_alias_versions));
3063                            } else {
3064                                commit_handler_input.user_transactions.push(
3065                                    VerifiedExecutableTransactionWithAliases::no_aliases(
3066                                        transaction,
3067                                    ),
3068                                );
3069                            }
3070                        }
3071
3072                        // === State machines ===
3073                        ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => {
3074                            commit_handler_input
3075                                .end_of_publish_transactions
3076                                .push(authority_public_key_bytes);
3077                        }
3078                        ConsensusTransactionKind::NewJWKFetched(
3079                            authority_public_key_bytes,
3080                            jwk_id,
3081                            jwk,
3082                        ) => {
3083                            commit_handler_input.new_jwks.push((
3084                                authority_public_key_bytes,
3085                                jwk_id,
3086                                jwk,
3087                            ));
3088                        }
3089                        ConsensusTransactionKind::RandomnessDkgMessage(
3090                            authority_public_key_bytes,
3091                            items,
3092                        ) => {
3093                            commit_handler_input
3094                                .randomness_dkg_messages
3095                                .push((authority_public_key_bytes, items));
3096                        }
3097                        ConsensusTransactionKind::RandomnessDkgConfirmation(
3098                            authority_public_key_bytes,
3099                            items,
3100                        ) => {
3101                            commit_handler_input
3102                                .randomness_dkg_confirmations
3103                                .push((authority_public_key_bytes, items));
3104                        }
3105                        ConsensusTransactionKind::CapabilityNotificationV2(
3106                            authority_capabilities_v2,
3107                        ) => {
3108                            commit_handler_input
3109                                .capability_notifications
3110                                .push(authority_capabilities_v2);
3111                        }
3112                        ConsensusTransactionKind::ExecutionTimeObservation(
3113                            execution_time_observation,
3114                        ) => {
3115                            commit_handler_input
3116                                .execution_time_observations
3117                                .push(execution_time_observation);
3118                        }
3119                        ConsensusTransactionKind::CheckpointSignatureV2(
3120                            checkpoint_signature_message,
3121                        ) => {
3122                            commit_handler_input
3123                                .checkpoint_signature_messages
3124                                .push(*checkpoint_signature_message);
3125                        }
3126
3127                        // Deprecated messages, filtered earlier by filter_consensus_txns()
3128                        // or rejected by SuiTxValidator. Kept for exhaustiveness.
3129                        ConsensusTransactionKind::CheckpointSignature(_)
3130                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
3131                        | ConsensusTransactionKind::CapabilityNotification(_)
3132                        | ConsensusTransactionKind::CertifiedTransaction(_)
3133                        | ConsensusTransactionKind::UserTransaction(_) => {
3134                            unreachable!("filtered earlier")
3135                        }
3136                    }
3137                }
3138                // TODO: I think we can delete this, it was only used to inject randomness state update into the tx stream.
3139                SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(),
3140            }
3141        }
3142
3143        commit_handler_input
3144    }
3145
3146    async fn send_end_of_publish_if_needed(&self) {
3147        if !self.epoch_store.should_send_end_of_publish() {
3148            return;
3149        }
3150
3151        let end_of_publish = ConsensusTransaction::new_end_of_publish(self.epoch_store.name);
3152        if let Err(err) =
3153            self.consensus_adapter
3154                .submit(end_of_publish, None, &self.epoch_store, None, None)
3155        {
3156            warn!(
3157                "Error when sending EndOfPublish message from ConsensusHandler: {:?}",
3158                err
3159            );
3160        } else {
3161            info!(epoch=?self.epoch_store.epoch(), "Sending EndOfPublish message to consensus");
3162        }
3163    }
3164}
3165
3166/// Sends transactions to the execution scheduler in a separate task,
3167/// to avoid blocking consensus handler.
3168pub(crate) type SchedulerMessage = (
3169    Vec<(Schedulable, AssignedVersions)>,
3170    Option<SettlementBatchInfo>,
3171);
3172
3173#[derive(Clone)]
3174pub(crate) struct ExecutionSchedulerSender {
3175    sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3176}
3177
3178impl ExecutionSchedulerSender {
3179    fn start(
3180        settlement_scheduler: SettlementScheduler,
3181        epoch_store: Arc<AuthorityPerEpochStore>,
3182    ) -> Self {
3183        let (sender, recv) = monitored_mpsc::unbounded_channel("execution_scheduler_sender");
3184        spawn_monitored_task!(Self::run(recv, settlement_scheduler, epoch_store));
3185        Self { sender }
3186    }
3187
3188    pub(crate) fn new_for_testing(
3189        sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3190    ) -> Self {
3191        Self { sender }
3192    }
3193
3194    fn send(
3195        &self,
3196        transactions: Vec<(Schedulable, AssignedVersions)>,
3197        settlement: Option<SettlementBatchInfo>,
3198    ) {
3199        let _ = self.sender.send((transactions, settlement));
3200    }
3201
3202    async fn run(
3203        mut recv: monitored_mpsc::UnboundedReceiver<SchedulerMessage>,
3204        settlement_scheduler: SettlementScheduler,
3205        epoch_store: Arc<AuthorityPerEpochStore>,
3206    ) {
3207        while let Some((transactions, settlement)) = recv.recv().await {
3208            let _guard = monitored_scope("ConsensusHandler::enqueue");
3209            let txns = transactions
3210                .into_iter()
3211                .map(|(txn, versions)| (txn, ExecutionEnv::new().with_assigned_versions(versions)))
3212                .collect();
3213            if let Some(settlement) = settlement {
3214                settlement_scheduler.enqueue_v2(txns, settlement, &epoch_store);
3215            } else {
3216                settlement_scheduler.enqueue(txns, &epoch_store);
3217            }
3218        }
3219    }
3220}
3221
3222/// Manages the lifetime of tasks handling the commits and transactions output by consensus.
3223pub(crate) struct MysticetiConsensusHandler {
3224    tasks: JoinSet<()>,
3225}
3226
3227impl MysticetiConsensusHandler {
3228    pub(crate) fn new(
3229        last_processed_commit_at_startup: CommitIndex,
3230        mut consensus_handler: ConsensusHandler<CheckpointService>,
3231        mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
3232        commit_consumer_monitor: Arc<CommitConsumerMonitor>,
3233    ) -> Self {
3234        debug!(
3235            last_processed_commit_at_startup,
3236            "Starting consensus replay"
3237        );
3238        let mut tasks = JoinSet::new();
3239        tasks.spawn(monitored_future!(async move {
3240            // TODO: pause when execution is overloaded, so consensus can detect the backpressure.
3241            while let Some(consensus_commit) = commit_receiver.recv().await {
3242                let commit_index = consensus_commit.commit_ref.index;
3243                if commit_index <= last_processed_commit_at_startup {
3244                    consensus_handler.handle_prior_consensus_commit(consensus_commit);
3245                } else {
3246                    consensus_handler
3247                        .handle_consensus_commit(consensus_commit)
3248                        .await;
3249                }
3250                commit_consumer_monitor.set_highest_handled_commit(commit_index);
3251            }
3252        }));
3253        Self { tasks }
3254    }
3255
3256    pub(crate) async fn abort(&mut self) {
3257        self.tasks.shutdown().await;
3258    }
3259}
3260
3261fn authenticator_state_update_transaction(
3262    epoch_store: &AuthorityPerEpochStore,
3263    round: u64,
3264    mut new_active_jwks: Vec<ActiveJwk>,
3265) -> VerifiedExecutableTransaction {
3266    let epoch = epoch_store.epoch();
3267    new_active_jwks.sort();
3268
3269    info!("creating authenticator state update transaction");
3270    assert!(epoch_store.authenticator_state_enabled());
3271    let transaction = VerifiedTransaction::new_authenticator_state_update(
3272        epoch,
3273        round,
3274        new_active_jwks,
3275        epoch_store
3276            .epoch_start_config()
3277            .authenticator_obj_initial_shared_version()
3278            .expect("authenticator state obj must exist"),
3279    );
3280    VerifiedExecutableTransaction::new_system(transaction, epoch)
3281}
3282
3283pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
3284    match &transaction.kind {
3285        // Deprecated and rejected by SuiTxValidator; never classified in practice.
3286        ConsensusTransactionKind::CertifiedTransaction(_) => "_deprecated_certificate",
3287        ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
3288        ConsensusTransactionKind::CheckpointSignatureV2(_) => "checkpoint_signature",
3289        ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
3290        ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
3291        ConsensusTransactionKind::CapabilityNotificationV2(_) => "capability_notification_v2",
3292        ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
3293        ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
3294        ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
3295        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
3296        ConsensusTransactionKind::UserTransaction(_) => "_deprecated_user_transaction",
3297        ConsensusTransactionKind::UserTransactionV2(tx) => {
3298            if tx.tx().is_consensus_tx() {
3299                "shared_user_transaction_v2"
3300            } else {
3301                "owned_user_transaction_v2"
3302            }
3303        }
3304        ConsensusTransactionKind::ExecutionTimeObservation(_) => "execution_time_observation",
3305    }
3306}
3307
3308#[derive(Debug, Clone, Serialize, Deserialize)]
3309pub struct SequencedConsensusTransaction {
3310    pub certificate_author_index: AuthorityIndex,
3311    pub certificate_author: AuthorityName,
3312    pub consensus_index: ExecutionIndices,
3313    pub transaction: SequencedConsensusTransactionKind,
3314}
3315
3316#[derive(Debug, Clone)]
3317#[allow(clippy::large_enum_variant)]
3318pub enum SequencedConsensusTransactionKind {
3319    External(ConsensusTransaction),
3320    System(VerifiedExecutableTransaction),
3321}
3322
3323impl Serialize for SequencedConsensusTransactionKind {
3324    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
3325        let serializable = SerializableSequencedConsensusTransactionKind::from(self);
3326        serializable.serialize(serializer)
3327    }
3328}
3329
3330impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
3331    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
3332        let serializable =
3333            SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
3334        Ok(serializable.into())
3335    }
3336}
3337
3338// We can't serialize SequencedConsensusTransactionKind directly because it contains a
3339// VerifiedExecutableTransaction, which is not serializable (by design). This wrapper allows us to
3340// convert to a serializable format easily.
3341#[derive(Debug, Clone, Serialize, Deserialize)]
3342#[allow(clippy::large_enum_variant)]
3343enum SerializableSequencedConsensusTransactionKind {
3344    External(ConsensusTransaction),
3345    System(TrustedExecutableTransaction),
3346}
3347
3348impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
3349    fn from(kind: &SequencedConsensusTransactionKind) -> Self {
3350        match kind {
3351            SequencedConsensusTransactionKind::External(ext) => {
3352                SerializableSequencedConsensusTransactionKind::External(ext.clone())
3353            }
3354            SequencedConsensusTransactionKind::System(txn) => {
3355                SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
3356            }
3357        }
3358    }
3359}
3360
3361impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
3362    fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
3363        match kind {
3364            SerializableSequencedConsensusTransactionKind::External(ext) => {
3365                SequencedConsensusTransactionKind::External(ext)
3366            }
3367            SerializableSequencedConsensusTransactionKind::System(txn) => {
3368                SequencedConsensusTransactionKind::System(txn.into())
3369            }
3370        }
3371    }
3372}
3373
3374#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
3375pub enum SequencedConsensusTransactionKey {
3376    External(ConsensusTransactionKey),
3377    System(TransactionDigest),
3378}
3379
3380impl SequencedConsensusTransactionKey {
3381    pub fn user_transaction_digest(&self) -> Option<TransactionDigest> {
3382        match self {
3383            SequencedConsensusTransactionKey::External(key) => match key {
3384                ConsensusTransactionKey::Certificate(digest) => Some(*digest),
3385                _ => None,
3386            },
3387            SequencedConsensusTransactionKey::System(_) => None,
3388        }
3389    }
3390}
3391
3392impl SequencedConsensusTransactionKind {
3393    pub fn key(&self) -> SequencedConsensusTransactionKey {
3394        match self {
3395            SequencedConsensusTransactionKind::External(ext) => {
3396                SequencedConsensusTransactionKey::External(ext.key())
3397            }
3398            SequencedConsensusTransactionKind::System(txn) => {
3399                SequencedConsensusTransactionKey::System(*txn.digest())
3400            }
3401        }
3402    }
3403
3404    pub fn get_tracking_id(&self) -> u64 {
3405        match self {
3406            SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
3407            SequencedConsensusTransactionKind::System(_txn) => 0,
3408        }
3409    }
3410
3411    pub fn is_executable_transaction(&self) -> bool {
3412        match self {
3413            SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(),
3414            SequencedConsensusTransactionKind::System(_) => true,
3415        }
3416    }
3417
3418    pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
3419        match self {
3420            SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
3421                ConsensusTransactionKind::UserTransactionV2(txn) => Some(*txn.tx().digest()),
3422                _ => None,
3423            },
3424            SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
3425        }
3426    }
3427
3428    pub fn is_end_of_publish(&self) -> bool {
3429        match self {
3430            SequencedConsensusTransactionKind::External(ext) => {
3431                matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
3432            }
3433            SequencedConsensusTransactionKind::System(_) => false,
3434        }
3435    }
3436}
3437
3438impl SequencedConsensusTransaction {
3439    pub fn sender_authority(&self) -> AuthorityName {
3440        self.certificate_author
3441    }
3442
3443    pub fn key(&self) -> SequencedConsensusTransactionKey {
3444        self.transaction.key()
3445    }
3446
3447    pub fn is_end_of_publish(&self) -> bool {
3448        if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
3449            matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
3450        } else {
3451            false
3452        }
3453    }
3454
3455    pub fn try_take_execution_time_observation(&mut self) -> Option<ExecutionTimeObservation> {
3456        if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
3457            kind: ConsensusTransactionKind::ExecutionTimeObservation(observation),
3458            ..
3459        }) = &mut self.transaction
3460        {
3461            Some(std::mem::take(observation))
3462        } else {
3463            None
3464        }
3465    }
3466
3467    pub fn is_system(&self) -> bool {
3468        matches!(
3469            self.transaction,
3470            SequencedConsensusTransactionKind::System(_)
3471        )
3472    }
3473
3474    pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
3475        if !randomness_state_enabled {
3476            // If randomness is disabled, these should be processed same as a tx without randomness,
3477            // which will eventually fail when the randomness state object is not found.
3478            return false;
3479        }
3480        match &self.transaction {
3481            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3482                kind: ConsensusTransactionKind::UserTransactionV2(txn),
3483                ..
3484            }) => txn.tx().transaction_data().uses_randomness(),
3485            _ => false,
3486        }
3487    }
3488
3489    pub fn as_consensus_txn(&self) -> Option<&SenderSignedData> {
3490        match &self.transaction {
3491            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3492                kind: ConsensusTransactionKind::UserTransactionV2(txn),
3493                ..
3494            }) if txn.tx().is_consensus_tx() => Some(txn.tx().data()),
3495            SequencedConsensusTransactionKind::System(txn) if txn.is_consensus_tx() => {
3496                Some(txn.data())
3497            }
3498            _ => None,
3499        }
3500    }
3501}
3502
3503#[derive(Debug, Clone, Serialize, Deserialize)]
3504pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
3505
3506#[cfg(test)]
3507impl VerifiedSequencedConsensusTransaction {
3508    pub fn new_test(transaction: ConsensusTransaction) -> Self {
3509        Self(SequencedConsensusTransaction::new_test(transaction))
3510    }
3511}
3512
3513impl SequencedConsensusTransaction {
3514    pub fn new_test(transaction: ConsensusTransaction) -> Self {
3515        Self {
3516            certificate_author_index: 0,
3517            certificate_author: AuthorityName::ZERO,
3518            consensus_index: Default::default(),
3519            transaction: SequencedConsensusTransactionKind::External(transaction),
3520        }
3521    }
3522}
3523
3524#[derive(Serialize, Deserialize)]
3525pub(crate) struct CommitIntervalObserver {
3526    ring_buffer: VecDeque<u64>,
3527}
3528
3529impl CommitIntervalObserver {
3530    pub fn new(window_size: u32) -> Self {
3531        Self {
3532            ring_buffer: VecDeque::with_capacity(window_size as usize),
3533        }
3534    }
3535
3536    pub fn observe_commit_time(&mut self, consensus_commit: &impl ConsensusCommitAPI) {
3537        let commit_time = consensus_commit.commit_timestamp_ms();
3538        if self.ring_buffer.len() == self.ring_buffer.capacity() {
3539            self.ring_buffer.pop_front();
3540        }
3541        self.ring_buffer.push_back(commit_time);
3542    }
3543
3544    pub fn commit_interval_estimate(&self) -> Option<Duration> {
3545        if self.ring_buffer.len() <= 1 {
3546            None
3547        } else {
3548            let first = self.ring_buffer.front().unwrap();
3549            let last = self.ring_buffer.back().unwrap();
3550            let duration = last.saturating_sub(*first);
3551            let num_commits = self.ring_buffer.len() as u64;
3552            Some(Duration::from_millis(duration.div_ceil(num_commits)))
3553        }
3554    }
3555}
3556
3557#[cfg(test)]
3558mod tests {
3559    use std::collections::HashSet;
3560
3561    use consensus_core::{
3562        BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock,
3563    };
3564    use futures::pin_mut;
3565    use prometheus::Registry;
3566    use sui_protocol_config::{ConsensusTransactionOrdering, ProtocolConfig};
3567    use sui_types::{
3568        base_types::ExecutionDigests,
3569        base_types::{AuthorityName, FullObjectRef, ObjectID, SuiAddress, random_object_ref},
3570        committee::Committee,
3571        crypto::deterministic_random_account_key,
3572        gas::GasCostSummary,
3573        message_envelope::Message,
3574        messages_checkpoint::{
3575            CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
3576            SignedCheckpointSummary,
3577        },
3578        messages_consensus::ConsensusTransaction,
3579        object::Object,
3580        transaction::{
3581            CertifiedTransaction, TransactionData, TransactionDataAPI, VerifiedCertificate,
3582        },
3583    };
3584
3585    use super::*;
3586    use crate::{
3587        authority::{
3588            authority_per_epoch_store::ConsensusStatsAPI,
3589            test_authority_builder::TestAuthorityBuilder,
3590        },
3591        checkpoints::CheckpointServiceNoop,
3592        consensus_adapter::consensus_tests::test_user_transaction,
3593        consensus_test_utils::make_consensus_adapter_for_test,
3594        post_consensus_tx_reorder::PostConsensusTxReorder,
3595    };
3596
3597    #[tokio::test(flavor = "current_thread", start_paused = true)]
3598    async fn test_consensus_commit_handler() {
3599        telemetry_subscribers::init_for_testing();
3600
3601        // GIVEN
3602        // 1 account keypair
3603        let (sender, keypair) = deterministic_random_account_key();
3604        // 12 gas objects.
3605        let gas_objects: Vec<Object> = (0..12)
3606            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3607            .collect();
3608        // 4 owned objects.
3609        let owned_objects: Vec<Object> = (0..4)
3610            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3611            .collect();
3612        // 6 shared objects.
3613        let shared_objects: Vec<Object> = (0..6)
3614            .map(|_| Object::shared_for_testing())
3615            .collect::<Vec<_>>();
3616        let mut all_objects = gas_objects.clone();
3617        all_objects.extend(owned_objects.clone());
3618        all_objects.extend(shared_objects.clone());
3619
3620        let network_config =
3621            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3622                .with_objects(all_objects.clone())
3623                .build();
3624
3625        let state = TestAuthorityBuilder::new()
3626            .with_network_config(&network_config, 0)
3627            .build()
3628            .await;
3629
3630        let epoch_store = state.epoch_store_for_testing().clone();
3631        let new_epoch_start_state = epoch_store.epoch_start_state();
3632        let consensus_committee = new_epoch_start_state.get_consensus_committee();
3633
3634        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3635
3636        let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
3637
3638        let backpressure_manager = BackpressureManager::new_for_tests();
3639        let consensus_adapter =
3640            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3641        let settlement_scheduler = SettlementScheduler::new(
3642            state.execution_scheduler().as_ref().clone(),
3643            state.get_transaction_cache_reader().clone(),
3644            state.metrics.clone(),
3645        );
3646        let mut consensus_handler = ConsensusHandler::new(
3647            epoch_store,
3648            Arc::new(CheckpointServiceNoop {}),
3649            settlement_scheduler,
3650            consensus_adapter,
3651            state.get_object_cache_reader().clone(),
3652            consensus_committee.clone(),
3653            metrics,
3654            Arc::new(throughput_calculator),
3655            backpressure_manager.subscribe(),
3656            state.traffic_controller.clone(),
3657            None,
3658            state.consensus_gasless_counter.clone(),
3659        );
3660
3661        // AND create test user transactions alternating between owned and shared input.
3662        let mut user_transactions = vec![];
3663        for (i, gas_object) in gas_objects[0..8].iter().enumerate() {
3664            let input_object = if i % 2 == 0 {
3665                owned_objects.get(i / 2).unwrap().clone()
3666            } else {
3667                shared_objects.get(i / 2).unwrap().clone()
3668            };
3669            let transaction = test_user_transaction(
3670                &state,
3671                sender,
3672                &keypair,
3673                gas_object.clone(),
3674                vec![input_object],
3675            )
3676            .await;
3677            user_transactions.push(transaction);
3678        }
3679
3680        // AND create 4 more user transactions with remaining gas objects and 2 shared objects.
3681        // Having more txns on the same shared object may get deferred.
3682        for (i, gas_object) in gas_objects[8..12].iter().enumerate() {
3683            let shared_object = if i < 2 {
3684                shared_objects[4].clone()
3685            } else {
3686                shared_objects[5].clone()
3687            };
3688            let transaction = test_user_transaction(
3689                &state,
3690                sender,
3691                &keypair,
3692                gas_object.clone(),
3693                vec![shared_object],
3694            )
3695            .await;
3696            user_transactions.push(transaction);
3697        }
3698
3699        // AND create block for each user transaction
3700        let mut blocks = Vec::new();
3701        for (i, consensus_transaction) in user_transactions
3702            .iter()
3703            .cloned()
3704            .map(|t| ConsensusTransaction::new_user_transaction_v2_message(&state.name, t.into()))
3705            .enumerate()
3706        {
3707            let transaction_bytes = bcs::to_bytes(&consensus_transaction).unwrap();
3708            let block = VerifiedBlock::new_for_test(
3709                TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
3710                    .set_transactions(vec![Transaction::new(transaction_bytes)])
3711                    .build(),
3712            );
3713
3714            blocks.push(block);
3715        }
3716
3717        // AND create the consensus commit
3718        let leader_block = blocks[0].clone();
3719        let committed_sub_dag = CommittedSubDag::new(
3720            leader_block.reference(),
3721            blocks.clone(),
3722            leader_block.timestamp_ms(),
3723            CommitRef::new(10, CommitDigest::MIN),
3724        );
3725
3726        // Test that the consensus handler respects backpressure.
3727        backpressure_manager.set_backpressure(true);
3728        // Default watermarks are 0,0 which will suppress the backpressure.
3729        backpressure_manager.update_highest_certified_checkpoint(1);
3730
3731        // AND process the consensus commit once
3732        {
3733            let waiter = consensus_handler.handle_consensus_commit(committed_sub_dag.clone());
3734            pin_mut!(waiter);
3735
3736            // waiter should not complete within 5 seconds
3737            tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
3738                .await
3739                .unwrap_err();
3740
3741            // lift backpressure
3742            backpressure_manager.set_backpressure(false);
3743
3744            // waiter completes now.
3745            tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
3746                .await
3747                .unwrap();
3748        }
3749
3750        // THEN check the consensus stats
3751        let num_blocks = blocks.len();
3752        let num_transactions = user_transactions.len();
3753        let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
3754        assert_eq!(
3755            last_consensus_stats_1.index.transaction_index,
3756            num_transactions as u64
3757        );
3758        assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
3759        assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
3760        assert_eq!(
3761            last_consensus_stats_1.stats.get_num_messages(0),
3762            num_blocks as u64
3763        );
3764        assert_eq!(
3765            last_consensus_stats_1.stats.get_num_user_transactions(0),
3766            num_transactions as u64
3767        );
3768
3769        // THEN check for execution status of user transactions.
3770        for (i, t) in user_transactions.iter().enumerate() {
3771            let digest = t.tx().digest();
3772            if tokio::time::timeout(
3773                std::time::Duration::from_secs(10),
3774                state.notify_read_effects_for_testing("", *digest),
3775            )
3776            .await
3777            .is_ok()
3778            {
3779                // Effects exist as expected.
3780            } else {
3781                panic!("User transaction {} {} did not execute", i, digest);
3782            }
3783        }
3784
3785        // THEN check for no inflight or suspended transactions.
3786        state.execution_scheduler().check_empty_for_testing().await;
3787    }
3788
3789    fn to_short_strings(txs: Vec<VerifiedExecutableTransactionWithAliases>) -> Vec<String> {
3790        txs.into_iter()
3791            .map(|tx| format!("transaction({})", tx.tx().transaction_data().gas_price()))
3792            .collect()
3793    }
3794
3795    #[test]
3796    fn test_order_by_gas_price() {
3797        let mut v = vec![user_txn(42), user_txn(100)];
3798        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3799        assert_eq!(
3800            to_short_strings(v),
3801            vec![
3802                "transaction(100)".to_string(),
3803                "transaction(42)".to_string(),
3804            ]
3805        );
3806
3807        let mut v = vec![
3808            user_txn(1200),
3809            user_txn(12),
3810            user_txn(1000),
3811            user_txn(42),
3812            user_txn(100),
3813            user_txn(1000),
3814        ];
3815        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3816        assert_eq!(
3817            to_short_strings(v),
3818            vec![
3819                "transaction(1200)".to_string(),
3820                "transaction(1000)".to_string(),
3821                "transaction(1000)".to_string(),
3822                "transaction(100)".to_string(),
3823                "transaction(42)".to_string(),
3824                "transaction(12)".to_string(),
3825            ]
3826        );
3827    }
3828
3829    #[tokio::test(flavor = "current_thread")]
3830    async fn test_checkpoint_signature_dedup() {
3831        telemetry_subscribers::init_for_testing();
3832
3833        let network_config =
3834            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3835        let state = TestAuthorityBuilder::new()
3836            .with_network_config(&network_config, 0)
3837            .build()
3838            .await;
3839
3840        let epoch_store = state.epoch_store_for_testing().clone();
3841        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3842
3843        let make_signed = || {
3844            let epoch = epoch_store.epoch();
3845            let contents =
3846                CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3847            let summary = CheckpointSummary::new(
3848                &ProtocolConfig::get_for_max_version_UNSAFE(),
3849                epoch,
3850                42, // sequence number
3851                10, // network_total_transactions
3852                &contents,
3853                None, // previous_digest
3854                GasCostSummary::default(),
3855                None,       // end_of_epoch_data
3856                0,          // timestamp
3857                Vec::new(), // randomness_rounds
3858                Vec::new(), // checkpoint_artifact_digests
3859            );
3860            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name)
3861        };
3862
3863        // Prepare V2 pair: same (authority, seq), different digests => different keys
3864        let v2_s1 = make_signed();
3865        let v2_s1_clone = v2_s1.clone();
3866        let v2_digest_a = v2_s1.data().digest();
3867        let v2_a =
3868            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3869                summary: v2_s1,
3870            });
3871
3872        let v2_s2 = make_signed();
3873        let v2_digest_b = v2_s2.data().digest();
3874        let v2_b =
3875            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3876                summary: v2_s2,
3877            });
3878
3879        assert_ne!(v2_digest_a, v2_digest_b);
3880
3881        // Create an exact duplicate with same digest to exercise valid dedup
3882        assert_eq!(v2_s1_clone.data().digest(), v2_digest_a);
3883        let v2_dup =
3884            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3885                summary: v2_s1_clone,
3886            });
3887
3888        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3889        let block = VerifiedBlock::new_for_test(
3890            TestBlock::new(100, 0)
3891                .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)])
3892                .build(),
3893        );
3894        let commit = CommittedSubDag::new(
3895            block.reference(),
3896            vec![block.clone()],
3897            block.timestamp_ms(),
3898            CommitRef::new(10, CommitDigest::MIN),
3899        );
3900
3901        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3902        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3903        let backpressure = BackpressureManager::new_for_tests();
3904        let consensus_adapter =
3905            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3906        let settlement_scheduler = SettlementScheduler::new(
3907            state.execution_scheduler().as_ref().clone(),
3908            state.get_transaction_cache_reader().clone(),
3909            state.metrics.clone(),
3910        );
3911        let mut handler = ConsensusHandler::new(
3912            epoch_store.clone(),
3913            Arc::new(CheckpointServiceNoop {}),
3914            settlement_scheduler,
3915            consensus_adapter,
3916            state.get_object_cache_reader().clone(),
3917            consensus_committee.clone(),
3918            metrics,
3919            Arc::new(throughput),
3920            backpressure.subscribe(),
3921            state.traffic_controller.clone(),
3922            None,
3923            state.consensus_gasless_counter.clone(),
3924        );
3925
3926        handler.handle_consensus_commit(commit).await;
3927
3928        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3929        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3930
3931        // V2 distinct digests: both must be processed. If these were collapsed to one CheckpointSeq num, only one would process.
3932        let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a));
3933        let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b));
3934        assert!(
3935            epoch_store
3936                .is_consensus_message_processed(&v2_key_a)
3937                .unwrap()
3938        );
3939        assert!(
3940            epoch_store
3941                .is_consensus_message_processed(&v2_key_b)
3942                .unwrap()
3943        );
3944    }
3945
3946    #[tokio::test(flavor = "current_thread")]
3947    async fn test_verify_consensus_transaction_filters_mismatched_authorities() {
3948        telemetry_subscribers::init_for_testing();
3949
3950        let network_config =
3951            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3952        let state = TestAuthorityBuilder::new()
3953            .with_network_config(&network_config, 0)
3954            .build()
3955            .await;
3956
3957        let epoch_store = state.epoch_store_for_testing().clone();
3958        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3959
3960        // Create a different authority than our test authority
3961        use fastcrypto::traits::KeyPair;
3962        let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair();
3963        let wrong_authority: AuthorityName = wrong_keypair.public().into();
3964
3965        // Create EndOfPublish transaction with mismatched authority
3966        let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority);
3967
3968        // Create valid EndOfPublish transaction with correct authority
3969        let valid_eop = ConsensusTransaction::new_end_of_publish(state.name);
3970
3971        // Create CheckpointSignature with mismatched authority
3972        let epoch = epoch_store.epoch();
3973        let contents =
3974            CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3975        let summary = CheckpointSummary::new(
3976            &ProtocolConfig::get_for_max_version_UNSAFE(),
3977            epoch,
3978            42, // sequence number
3979            10, // network_total_transactions
3980            &contents,
3981            None, // previous_digest
3982            GasCostSummary::default(),
3983            None,       // end_of_epoch_data
3984            0,          // timestamp
3985            Vec::new(), // randomness_rounds
3986            Vec::new(), // checkpoint commitments
3987        );
3988
3989        // Create a signed checkpoint with the wrong authority
3990        let mismatched_checkpoint_signed =
3991            SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority);
3992        let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest();
3993        let mismatched_checkpoint =
3994            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3995                summary: mismatched_checkpoint_signed,
3996            });
3997
3998        // Create a valid checkpoint signature with correct authority
3999        let valid_checkpoint_signed =
4000            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name);
4001        let valid_checkpoint_digest = valid_checkpoint_signed.data().digest();
4002        let valid_checkpoint =
4003            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4004                summary: valid_checkpoint_signed,
4005            });
4006
4007        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
4008
4009        // Create a block with both valid and invalid transactions
4010        let block = VerifiedBlock::new_for_test(
4011            TestBlock::new(100, 0)
4012                .set_transactions(vec![
4013                    to_tx(&mismatched_eop),
4014                    to_tx(&valid_eop),
4015                    to_tx(&mismatched_checkpoint),
4016                    to_tx(&valid_checkpoint),
4017                ])
4018                .build(),
4019        );
4020        let commit = CommittedSubDag::new(
4021            block.reference(),
4022            vec![block.clone()],
4023            block.timestamp_ms(),
4024            CommitRef::new(10, CommitDigest::MIN),
4025        );
4026
4027        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
4028        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
4029        let backpressure = BackpressureManager::new_for_tests();
4030        let consensus_adapter =
4031            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
4032        let settlement_scheduler = SettlementScheduler::new(
4033            state.execution_scheduler().as_ref().clone(),
4034            state.get_transaction_cache_reader().clone(),
4035            state.metrics.clone(),
4036        );
4037        let mut handler = ConsensusHandler::new(
4038            epoch_store.clone(),
4039            Arc::new(CheckpointServiceNoop {}),
4040            settlement_scheduler,
4041            consensus_adapter,
4042            state.get_object_cache_reader().clone(),
4043            consensus_committee.clone(),
4044            metrics,
4045            Arc::new(throughput),
4046            backpressure.subscribe(),
4047            state.traffic_controller.clone(),
4048            None,
4049            state.consensus_gasless_counter.clone(),
4050        );
4051
4052        handler.handle_consensus_commit(commit).await;
4053
4054        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
4055        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
4056
4057        // Check that valid transactions were processed
4058        let valid_eop_key = SK::External(CK::EndOfPublish(state.name));
4059        assert!(
4060            epoch_store
4061                .is_consensus_message_processed(&valid_eop_key)
4062                .unwrap(),
4063            "Valid EndOfPublish should have been processed"
4064        );
4065
4066        let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4067            state.name,
4068            42,
4069            valid_checkpoint_digest,
4070        ));
4071        assert!(
4072            epoch_store
4073                .is_consensus_message_processed(&valid_checkpoint_key)
4074                .unwrap(),
4075            "Valid CheckpointSignature should have been processed"
4076        );
4077
4078        // Check that mismatched authority transactions were NOT processed (filtered out by verify_consensus_transaction)
4079        let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority));
4080        assert!(
4081            !epoch_store
4082                .is_consensus_message_processed(&mismatched_eop_key)
4083                .unwrap(),
4084            "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)"
4085        );
4086
4087        let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4088            wrong_authority,
4089            42,
4090            mismatched_checkpoint_digest,
4091        ));
4092        assert!(
4093            !epoch_store
4094                .is_consensus_message_processed(&mismatched_checkpoint_key)
4095                .unwrap(),
4096            "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)"
4097        );
4098    }
4099
4100    fn user_txn(gas_price: u64) -> VerifiedExecutableTransactionWithAliases {
4101        let (committee, keypairs) = Committee::new_simple_test_committee();
4102        let (sender, sender_keypair) = deterministic_random_account_key();
4103        let tx = sui_types::transaction::Transaction::from_data_and_signer(
4104            TransactionData::new_transfer(
4105                SuiAddress::default(),
4106                FullObjectRef::from_fastpath_ref(random_object_ref()),
4107                sender,
4108                random_object_ref(),
4109                1000 * gas_price,
4110                gas_price,
4111            ),
4112            vec![&sender_keypair],
4113        );
4114        let tx = VerifiedExecutableTransaction::new_from_certificate(
4115            VerifiedCertificate::new_unchecked(
4116                CertifiedTransaction::new_from_keypairs_for_testing(
4117                    tx.into_data(),
4118                    &keypairs,
4119                    &committee,
4120                ),
4121            ),
4122        );
4123        VerifiedExecutableTransactionWithAliases::no_aliases(tx)
4124    }
4125
4126    mod checkpoint_queue_tests {
4127        use super::*;
4128        use consensus_core::CommitRef;
4129        use sui_types::digests::Digest;
4130
4131        fn make_chunk(tx_count: usize, height: u64) -> Chunk {
4132            Chunk {
4133                schedulables: (0..tx_count)
4134                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4135                    .collect(),
4136                settlement: None,
4137                height,
4138            }
4139        }
4140
4141        fn make_commit_ref(index: u32) -> CommitRef {
4142            CommitRef {
4143                index,
4144                digest: CommitDigest::MIN,
4145            }
4146        }
4147
4148        fn default_versions() -> HashMap<TransactionKey, AssignedVersions> {
4149            HashMap::new()
4150        }
4151
4152        #[test]
4153        fn test_flush_all_checkpoint_roots() {
4154            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4155            let versions = default_versions();
4156
4157            queue.push_chunk(
4158                make_chunk(5, 1),
4159                &versions,
4160                1000,
4161                make_commit_ref(1),
4162                Digest::default(),
4163            );
4164            queue.push_chunk(
4165                make_chunk(3, 2),
4166                &versions,
4167                1000,
4168                make_commit_ref(1),
4169                Digest::default(),
4170            );
4171
4172            let pending = queue.flush(1000, true);
4173
4174            assert!(pending.is_some());
4175            assert!(queue.pending_roots.is_empty());
4176        }
4177
4178        #[test]
4179        fn test_flush_respects_min_checkpoint_interval() {
4180            let min_interval = 200;
4181            let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, 1000, min_interval);
4182            let versions = default_versions();
4183
4184            queue.push_chunk(
4185                make_chunk(5, 1),
4186                &versions,
4187                1000,
4188                make_commit_ref(1),
4189                Digest::default(),
4190            );
4191
4192            let pending = queue.flush(1000 + min_interval - 1, false);
4193            assert!(pending.is_none());
4194            assert_eq!(queue.pending_roots.len(), 1);
4195
4196            let pending = queue.flush(1000 + min_interval, false);
4197            assert!(pending.is_some());
4198            assert!(queue.pending_roots.is_empty());
4199        }
4200
4201        #[test]
4202        fn test_push_chunk_flushes_when_exceeds_max() {
4203            let max_tx = 10;
4204            let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, max_tx, 0);
4205            let versions = default_versions();
4206
4207            queue.push_chunk(
4208                make_chunk(max_tx / 2 + 1, 1),
4209                &versions,
4210                1000,
4211                make_commit_ref(1),
4212                Digest::default(),
4213            );
4214
4215            let flushed = queue.push_chunk(
4216                make_chunk(max_tx / 2 + 1, 2),
4217                &versions,
4218                1000,
4219                make_commit_ref(2),
4220                Digest::default(),
4221            );
4222
4223            assert_eq!(flushed.len(), 1);
4224            assert_eq!(queue.pending_roots.len(), 1);
4225        }
4226
4227        #[test]
4228        fn test_multiple_chunks_merged_into_one_checkpoint() {
4229            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 200);
4230            let versions = default_versions();
4231
4232            queue.push_chunk(
4233                make_chunk(10, 1),
4234                &versions,
4235                1000,
4236                make_commit_ref(1),
4237                Digest::default(),
4238            );
4239            queue.push_chunk(
4240                make_chunk(10, 2),
4241                &versions,
4242                1000,
4243                make_commit_ref(2),
4244                Digest::default(),
4245            );
4246            queue.push_chunk(
4247                make_chunk(10, 3),
4248                &versions,
4249                1000,
4250                make_commit_ref(3),
4251                Digest::default(),
4252            );
4253
4254            let pending = queue.flush(1000, true).unwrap();
4255
4256            assert_eq!(pending.roots.len(), 3);
4257        }
4258
4259        #[test]
4260        fn test_push_chunk_handles_overflow() {
4261            let max_tx = 10;
4262            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4263            let versions = default_versions();
4264
4265            let flushed1 = queue.push_chunk(
4266                make_chunk(max_tx / 2, 1),
4267                &versions,
4268                1000,
4269                make_commit_ref(1),
4270                Digest::default(),
4271            );
4272            assert!(flushed1.is_empty());
4273
4274            let flushed2 = queue.push_chunk(
4275                make_chunk(max_tx / 2, 2),
4276                &versions,
4277                1000,
4278                make_commit_ref(2),
4279                Digest::default(),
4280            );
4281            assert!(flushed2.is_empty());
4282
4283            let flushed3 = queue.push_chunk(
4284                make_chunk(max_tx / 2, 3),
4285                &versions,
4286                1000,
4287                make_commit_ref(3),
4288                Digest::default(),
4289            );
4290            assert_eq!(flushed3.len(), 1);
4291
4292            let pending = queue.flush(1000, true);
4293
4294            for p in pending.iter().chain(flushed3.iter()) {
4295                let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4296                assert!(tx_count <= max_tx);
4297            }
4298        }
4299
4300        #[test]
4301        fn test_checkpoint_uses_last_chunk_height() {
4302            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4303            let versions = default_versions();
4304
4305            queue.push_chunk(
4306                make_chunk(10, 100),
4307                &versions,
4308                1000,
4309                make_commit_ref(1),
4310                Digest::default(),
4311            );
4312            queue.push_chunk(
4313                make_chunk(10, 200),
4314                &versions,
4315                1000,
4316                make_commit_ref(2),
4317                Digest::default(),
4318            );
4319
4320            let pending = queue.flush(1000, true).unwrap();
4321
4322            assert_eq!(pending.details.checkpoint_height, 200);
4323        }
4324
4325        #[test]
4326        fn test_last_built_timestamp_updated_on_flush() {
4327            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4328            let versions = default_versions();
4329
4330            queue.push_chunk(
4331                make_chunk(10, 1),
4332                &versions,
4333                5000,
4334                make_commit_ref(1),
4335                Digest::default(),
4336            );
4337
4338            assert_eq!(queue.last_built_timestamp, 0);
4339
4340            let _ = queue.flush(5000, true);
4341
4342            assert_eq!(queue.last_built_timestamp, 5000);
4343        }
4344
4345        #[test]
4346        fn test_settlement_info_sent_through_channel() {
4347            let mut queue = CheckpointQueue::new_for_testing(0, 0, 5, 1000, 0);
4348            let versions = default_versions();
4349
4350            let chunk1 = Chunk {
4351                schedulables: vec![
4352                    Schedulable::ConsensusCommitPrologue(0, 1, 0),
4353                    Schedulable::ConsensusCommitPrologue(0, 2, 0),
4354                    Schedulable::ConsensusCommitPrologue(0, 3, 0),
4355                ],
4356                settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4357                height: 1,
4358            };
4359
4360            let chunk2 = Chunk {
4361                schedulables: vec![
4362                    Schedulable::ConsensusCommitPrologue(0, 4, 0),
4363                    Schedulable::ConsensusCommitPrologue(0, 5, 0),
4364                ],
4365                settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4366                height: 2,
4367            };
4368
4369            queue.push_chunk(
4370                chunk1,
4371                &versions,
4372                1000,
4373                make_commit_ref(1),
4374                Digest::default(),
4375            );
4376            queue.push_chunk(
4377                chunk2,
4378                &versions,
4379                1000,
4380                make_commit_ref(1),
4381                Digest::default(),
4382            );
4383        }
4384
4385        #[test]
4386        fn test_settlement_checkpoint_seq_correct_after_flush() {
4387            let max_tx = 10;
4388            let initial_seq = 5;
4389            let (sender, mut receiver) = monitored_mpsc::unbounded_channel("test_settlement_seq");
4390            let mut queue =
4391                CheckpointQueue::new_for_testing_with_sender(0, 0, initial_seq, max_tx, 0, sender);
4392            let versions = default_versions();
4393
4394            // Push a chunk that partially fills the queue (no flush).
4395            let chunk1 = Chunk {
4396                schedulables: (0..max_tx / 2 + 1)
4397                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4398                    .collect(),
4399                settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4400                height: 1,
4401            };
4402            queue.push_chunk(
4403                chunk1,
4404                &versions,
4405                1000,
4406                make_commit_ref(1),
4407                Digest::default(),
4408            );
4409
4410            // Drain the first message from the channel.
4411            let msg1 = receiver.try_recv().unwrap();
4412            let settlement1 = msg1.1.unwrap();
4413            assert_eq!(settlement1.checkpoint_seq, initial_seq);
4414
4415            // Push a second chunk that triggers a flush of chunk1's roots.
4416            let chunk2 = Chunk {
4417                schedulables: (0..max_tx / 2 + 1)
4418                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4419                    .collect(),
4420                settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4421                height: 2,
4422            };
4423            let flushed = queue.push_chunk(
4424                chunk2,
4425                &versions,
4426                1000,
4427                make_commit_ref(2),
4428                Digest::default(),
4429            );
4430            assert_eq!(flushed.len(), 1);
4431            assert_eq!(flushed[0].details.checkpoint_seq, Some(initial_seq));
4432
4433            // The second settlement must have checkpoint_seq = initial_seq + 1,
4434            // because the flush incremented current_checkpoint_seq.
4435            let msg2 = receiver.try_recv().unwrap();
4436            let settlement2 = msg2.1.unwrap();
4437            assert_eq!(settlement2.checkpoint_seq, initial_seq + 1);
4438
4439            // Flush the remaining roots and verify the PendingCheckpointV2's seq
4440            // matches the settlement's seq.
4441            let pending = queue.flush_forced().unwrap();
4442            assert_eq!(
4443                pending.details.checkpoint_seq,
4444                Some(settlement2.checkpoint_seq)
4445            );
4446        }
4447
4448        #[test]
4449        fn test_checkpoint_seq_increments_on_flush() {
4450            let mut queue = CheckpointQueue::new_for_testing(0, 0, 10, 1000, 0);
4451            let versions = default_versions();
4452
4453            queue.push_chunk(
4454                make_chunk(5, 1),
4455                &versions,
4456                1000,
4457                make_commit_ref(1),
4458                Digest::default(),
4459            );
4460
4461            let pending = queue.flush(1000, true).unwrap();
4462
4463            assert_eq!(pending.details.checkpoint_seq, Some(10));
4464            assert_eq!(queue.current_checkpoint_seq, 11);
4465        }
4466
4467        #[test]
4468        fn test_multiple_chunks_with_overflow() {
4469            let max_tx = 10;
4470            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4471            let versions = default_versions();
4472
4473            let flushed1 = queue.push_chunk(
4474                make_chunk(max_tx / 2 + 1, 1),
4475                &versions,
4476                1000,
4477                make_commit_ref(1),
4478                Digest::default(),
4479            );
4480            let flushed2 = queue.push_chunk(
4481                make_chunk(max_tx / 2 + 1, 2),
4482                &versions,
4483                1000,
4484                make_commit_ref(1),
4485                Digest::default(),
4486            );
4487            let flushed3 = queue.push_chunk(
4488                make_chunk(max_tx / 2 + 1, 3),
4489                &versions,
4490                1000,
4491                make_commit_ref(1),
4492                Digest::default(),
4493            );
4494
4495            let all_flushed: Vec<_> = flushed1
4496                .into_iter()
4497                .chain(flushed2)
4498                .chain(flushed3)
4499                .collect();
4500            assert_eq!(all_flushed.len(), 2);
4501            assert_eq!(queue.pending_roots.len(), 1);
4502
4503            for p in &all_flushed {
4504                let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4505                assert!(tx_count <= max_tx);
4506            }
4507        }
4508    }
4509}