sui_core/
consensus_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet, VecDeque},
6    hash::Hash,
7    num::NonZeroUsize,
8    sync::{Arc, Mutex},
9    time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use consensus_config::Committee as ConsensusCommittee;
13use consensus_core::{CommitConsumerMonitor, CommitIndex, CommitRef};
14use consensus_types::block::TransactionIndex;
15use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
16use lru::LruCache;
17use mysten_common::{
18    assert_reachable, assert_sometimes, debug_fatal, random_util::randomize_cache_capacity_in_tests,
19};
20use mysten_metrics::{
21    monitored_future,
22    monitored_mpsc::{self, UnboundedReceiver},
23    monitored_scope, spawn_monitored_task,
24};
25use nonempty::NonEmpty;
26use parking_lot::RwLockWriteGuard;
27use serde::{Deserialize, Serialize};
28use sui_config::node::CongestionLogConfig;
29use sui_macros::{fail_point, fail_point_arg, fail_point_if};
30use sui_protocol_config::{PerObjectCongestionControlMode, ProtocolConfig};
31use sui_types::{
32    SUI_RANDOMNESS_STATE_OBJECT_ID,
33    authenticator_state::ActiveJwk,
34    base_types::{
35        AuthorityName, ConciseableName, ConsensusObjectSequenceKey, ObjectID, ObjectRef,
36        SequenceNumber, TransactionDigest,
37    },
38    crypto::RandomnessRound,
39    digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest, Digest},
40    executable_transaction::{
41        TrustedExecutableTransaction, VerifiedExecutableTransaction,
42        VerifiedExecutableTransactionWithAliases,
43    },
44    messages_checkpoint::{
45        CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointTimestamp,
46    },
47    messages_consensus::{
48        AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments,
49        ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
50        ExecutionTimeObservation,
51    },
52    sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
53    transaction::{
54        InputObjectKind, SenderSignedData, TransactionDataAPI, TransactionKey, VerifiedTransaction,
55        WithAliases,
56    },
57};
58use tokio::task::JoinSet;
59use tracing::{debug, error, info, instrument, trace, warn};
60
61use crate::{
62    authority::{
63        AuthorityMetrics, AuthorityState, ExecutionEnv,
64        authority_per_epoch_store::{
65            AuthorityPerEpochStore, CancelConsensusCertificateReason, ConsensusStats,
66            ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStatsV2,
67            consensus_quarantine::ConsensusCommitOutput,
68        },
69        backpressure::{BackpressureManager, BackpressureSubscriber},
70        congestion_log::CongestionCommitLogger,
71        consensus_tx_status_cache::ConsensusTxStatus,
72        epoch_start_configuration::EpochStartConfigTrait,
73        execution_time_estimator::ExecutionTimeEstimator,
74        shared_object_congestion_tracker::SharedObjectCongestionTracker,
75        shared_object_version_manager::{AssignedTxAndVersions, AssignedVersions, Schedulable},
76        transaction_deferral::{DeferralKey, DeferralReason, transaction_deferral_within_limit},
77    },
78    checkpoints::{
79        CheckpointHeight, CheckpointRoots, CheckpointService, CheckpointServiceNotify,
80        PendingCheckpoint, PendingCheckpointInfo, PendingCheckpointV2,
81    },
82    consensus_adapter::ConsensusAdapter,
83    consensus_throughput_calculator::ConsensusThroughputCalculator,
84    consensus_types::consensus_output_api::ConsensusCommitAPI,
85    epoch::{
86        randomness::{DkgStatus, RandomnessManager},
87        reconfiguration::ReconfigState,
88    },
89    execution_cache::ObjectCacheRead,
90    execution_scheduler::{SettlementBatchInfo, SettlementScheduler},
91    gasless_rate_limiter::ConsensusGaslessCounter,
92    post_consensus_tx_reorder::PostConsensusTxReorder,
93    traffic_controller::{TrafficController, policies::TrafficTally},
94};
95
96/// Output from filtering consensus transactions.
97/// Contains the filtered transactions and any owned object locks acquired post-consensus.
98struct FilteredConsensusOutput {
99    transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
100    owned_object_locks: HashMap<ObjectRef, TransactionDigest>,
101}
102
103pub struct ConsensusHandlerInitializer {
104    state: Arc<AuthorityState>,
105    checkpoint_service: Arc<CheckpointService>,
106    epoch_store: Arc<AuthorityPerEpochStore>,
107    consensus_adapter: Arc<ConsensusAdapter>,
108    throughput_calculator: Arc<ConsensusThroughputCalculator>,
109    backpressure_manager: Arc<BackpressureManager>,
110    congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
111    consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
112}
113
114impl ConsensusHandlerInitializer {
115    pub fn new(
116        state: Arc<AuthorityState>,
117        checkpoint_service: Arc<CheckpointService>,
118        epoch_store: Arc<AuthorityPerEpochStore>,
119        consensus_adapter: Arc<ConsensusAdapter>,
120        throughput_calculator: Arc<ConsensusThroughputCalculator>,
121        backpressure_manager: Arc<BackpressureManager>,
122        congestion_log_config: Option<CongestionLogConfig>,
123    ) -> Self {
124        let congestion_logger =
125            congestion_log_config.and_then(|config| match CongestionCommitLogger::new(&config) {
126                Ok(logger) => Some(Arc::new(Mutex::new(logger))),
127                Err(e) => {
128                    debug_fatal!("Failed to create congestion logger: {e}");
129                    None
130                }
131            });
132        let consensus_gasless_counter = state.consensus_gasless_counter.clone();
133        Self {
134            state,
135            checkpoint_service,
136            epoch_store,
137            consensus_adapter,
138            throughput_calculator,
139            backpressure_manager,
140            congestion_logger,
141            consensus_gasless_counter,
142        }
143    }
144
145    #[cfg(test)]
146    pub(crate) fn new_for_testing(
147        state: Arc<AuthorityState>,
148        checkpoint_service: Arc<CheckpointService>,
149    ) -> Self {
150        use crate::consensus_test_utils::make_consensus_adapter_for_test;
151        use std::collections::HashSet;
152
153        let backpressure_manager = BackpressureManager::new_for_tests();
154        let consensus_adapter =
155            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
156        let consensus_gasless_counter = state.consensus_gasless_counter.clone();
157        Self {
158            state: state.clone(),
159            checkpoint_service,
160            epoch_store: state.epoch_store_for_testing().clone(),
161            consensus_adapter,
162            throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
163                None,
164                state.metrics.clone(),
165            )),
166            backpressure_manager,
167            congestion_logger: None,
168            consensus_gasless_counter,
169        }
170    }
171
172    pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
173        let new_epoch_start_state = self.epoch_store.epoch_start_state();
174        let consensus_committee = new_epoch_start_state.get_consensus_committee();
175
176        let settlement_scheduler = SettlementScheduler::new(
177            self.state.execution_scheduler().as_ref().clone(),
178            self.state.get_transaction_cache_reader().clone(),
179            self.state.metrics.clone(),
180        );
181        ConsensusHandler::new(
182            self.epoch_store.clone(),
183            self.checkpoint_service.clone(),
184            settlement_scheduler,
185            self.consensus_adapter.clone(),
186            self.state.get_object_cache_reader().clone(),
187            consensus_committee,
188            self.state.metrics.clone(),
189            self.throughput_calculator.clone(),
190            self.backpressure_manager.subscribe(),
191            self.state.traffic_controller.clone(),
192            self.congestion_logger.clone(),
193            self.consensus_gasless_counter.clone(),
194        )
195    }
196}
197
198mod additional_consensus_state {
199    use std::marker::PhantomData;
200
201    use consensus_core::CommitRef;
202    use fastcrypto::hash::HashFunction as _;
203    use sui_types::{crypto::DefaultHash, digests::Digest};
204
205    use super::*;
206    /// AdditionalConsensusState tracks any in-memory state that is retained by ConsensusHandler
207    /// between consensus commits. Because of crash recovery, using such data is inherently risky.
208    /// In order to do this safely, we must store data from a fixed number of previous commits.
209    /// Then, at start-up, that same fixed number of already processed commits is replayed to
210    /// reconstruct the state.
211    ///
212    /// To make sure that bugs in this process appear immediately, we record the digest of this
213    /// state in ConsensusCommitPrologue, so that any deviation causes an immediate fork.
214    #[derive(Serialize, Deserialize)]
215    pub(super) struct AdditionalConsensusState {
216        commit_interval_observer: CommitIntervalObserver,
217    }
218
219    impl AdditionalConsensusState {
220        pub fn new(additional_consensus_state_window_size: u32) -> Self {
221            Self {
222                commit_interval_observer: CommitIntervalObserver::new(
223                    additional_consensus_state_window_size,
224                ),
225            }
226        }
227
228        /// Update all internal state based on the new commit
229        pub(crate) fn observe_commit(
230            &mut self,
231            protocol_config: &ProtocolConfig,
232            epoch_start_time: u64,
233            consensus_commit: &impl ConsensusCommitAPI,
234        ) -> ConsensusCommitInfo {
235            self.commit_interval_observer
236                .observe_commit_time(consensus_commit);
237
238            let estimated_commit_period = self
239                .commit_interval_observer
240                .commit_interval_estimate()
241                .unwrap_or(Duration::from_millis(
242                    protocol_config.min_checkpoint_interval_ms(),
243                ));
244
245            info!("estimated commit rate: {:?}", estimated_commit_period);
246
247            self.commit_info_impl(
248                epoch_start_time,
249                consensus_commit,
250                Some(estimated_commit_period),
251            )
252        }
253
254        fn commit_info_impl(
255            &self,
256            epoch_start_time: u64,
257            consensus_commit: &impl ConsensusCommitAPI,
258            estimated_commit_period: Option<Duration>,
259        ) -> ConsensusCommitInfo {
260            let leader_author = consensus_commit.leader_author_index();
261            let timestamp = consensus_commit.commit_timestamp_ms();
262
263            let timestamp = if timestamp < epoch_start_time {
264                error!(
265                    "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}"
266                );
267                epoch_start_time
268            } else {
269                timestamp
270            };
271
272            ConsensusCommitInfo {
273                _phantom: PhantomData,
274                round: consensus_commit.leader_round(),
275                timestamp,
276                leader_author,
277                consensus_commit_ref: consensus_commit.commit_ref(),
278                rejected_transactions_digest: consensus_commit.rejected_transactions_digest(),
279                additional_state_digest: Some(self.digest()),
280                estimated_commit_period,
281                skip_consensus_commit_prologue_in_test: false,
282            }
283        }
284
285        /// Get the digest of the current state.
286        fn digest(&self) -> AdditionalConsensusStateDigest {
287            let mut hash = DefaultHash::new();
288            bcs::serialize_into(&mut hash, self).unwrap();
289            AdditionalConsensusStateDigest::new(hash.finalize().into())
290        }
291    }
292
293    pub struct ConsensusCommitInfo {
294        // prevent public construction
295        _phantom: PhantomData<()>,
296
297        pub round: u64,
298        pub timestamp: u64,
299        pub leader_author: AuthorityIndex,
300        pub consensus_commit_ref: CommitRef,
301        pub rejected_transactions_digest: Digest,
302
303        additional_state_digest: Option<AdditionalConsensusStateDigest>,
304        estimated_commit_period: Option<Duration>,
305
306        pub skip_consensus_commit_prologue_in_test: bool,
307    }
308
309    impl ConsensusCommitInfo {
310        pub fn new_for_test(
311            commit_round: u64,
312            commit_timestamp: u64,
313            estimated_commit_period: Option<Duration>,
314            skip_consensus_commit_prologue_in_test: bool,
315        ) -> Self {
316            Self {
317                _phantom: PhantomData,
318                round: commit_round,
319                timestamp: commit_timestamp,
320                leader_author: 0,
321                consensus_commit_ref: CommitRef::default(),
322                rejected_transactions_digest: Digest::default(),
323                additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO),
324                estimated_commit_period,
325                skip_consensus_commit_prologue_in_test,
326            }
327        }
328
329        pub fn new_for_congestion_test(
330            commit_round: u64,
331            commit_timestamp: u64,
332            estimated_commit_period: Duration,
333        ) -> Self {
334            Self::new_for_test(
335                commit_round,
336                commit_timestamp,
337                Some(estimated_commit_period),
338                true,
339            )
340        }
341
342        pub fn additional_state_digest(&self) -> AdditionalConsensusStateDigest {
343            // this method cannot be called if stateless_commit_info is used
344            self.additional_state_digest
345                .expect("additional_state_digest is not available")
346        }
347
348        pub fn estimated_commit_period(&self) -> Duration {
349            // this method cannot be called if stateless_commit_info is used
350            self.estimated_commit_period
351                .expect("estimated commit period is not available")
352        }
353
354        fn consensus_commit_digest(&self) -> ConsensusCommitDigest {
355            ConsensusCommitDigest::new(self.consensus_commit_ref.digest.into_inner())
356        }
357
358        fn consensus_commit_prologue_transaction(
359            &self,
360            epoch: u64,
361        ) -> VerifiedExecutableTransaction {
362            let transaction = VerifiedTransaction::new_consensus_commit_prologue(
363                epoch,
364                self.round,
365                self.timestamp,
366            );
367            VerifiedExecutableTransaction::new_system(transaction, epoch)
368        }
369
370        fn consensus_commit_prologue_v2_transaction(
371            &self,
372            epoch: u64,
373        ) -> VerifiedExecutableTransaction {
374            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
375                epoch,
376                self.round,
377                self.timestamp,
378                self.consensus_commit_digest(),
379            );
380            VerifiedExecutableTransaction::new_system(transaction, epoch)
381        }
382
383        fn consensus_commit_prologue_v3_transaction(
384            &self,
385            epoch: u64,
386            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
387        ) -> VerifiedExecutableTransaction {
388            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v3(
389                epoch,
390                self.round,
391                self.timestamp,
392                self.consensus_commit_digest(),
393                consensus_determined_version_assignments,
394            );
395            VerifiedExecutableTransaction::new_system(transaction, epoch)
396        }
397
398        fn consensus_commit_prologue_v4_transaction(
399            &self,
400            epoch: u64,
401            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
402            additional_state_digest: AdditionalConsensusStateDigest,
403        ) -> VerifiedExecutableTransaction {
404            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v4(
405                epoch,
406                self.round,
407                self.timestamp,
408                self.consensus_commit_digest(),
409                consensus_determined_version_assignments,
410                additional_state_digest,
411            );
412            VerifiedExecutableTransaction::new_system(transaction, epoch)
413        }
414
415        pub fn create_consensus_commit_prologue_transaction(
416            &self,
417            epoch: u64,
418            protocol_config: &ProtocolConfig,
419            cancelled_txn_version_assignment: Vec<(
420                TransactionDigest,
421                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
422            )>,
423            commit_info: &ConsensusCommitInfo,
424            indirect_state_observer: IndirectStateObserver,
425        ) -> VerifiedExecutableTransaction {
426            let version_assignments = if protocol_config
427                .record_consensus_determined_version_assignments_in_prologue_v2()
428            {
429                Some(
430                    ConsensusDeterminedVersionAssignments::CancelledTransactionsV2(
431                        cancelled_txn_version_assignment,
432                    ),
433                )
434            } else if protocol_config.record_consensus_determined_version_assignments_in_prologue()
435            {
436                Some(
437                    ConsensusDeterminedVersionAssignments::CancelledTransactions(
438                        cancelled_txn_version_assignment
439                            .into_iter()
440                            .map(|(tx_digest, versions)| {
441                                (
442                                    tx_digest,
443                                    versions.into_iter().map(|(id, v)| (id.0, v)).collect(),
444                                )
445                            })
446                            .collect(),
447                    ),
448                )
449            } else {
450                None
451            };
452
453            if protocol_config.record_additional_state_digest_in_prologue() {
454                let additional_state_digest =
455                    if protocol_config.additional_consensus_digest_indirect_state() {
456                        let d1 = commit_info.additional_state_digest();
457                        indirect_state_observer.fold_with(d1)
458                    } else {
459                        commit_info.additional_state_digest()
460                    };
461
462                self.consensus_commit_prologue_v4_transaction(
463                    epoch,
464                    version_assignments.unwrap(),
465                    additional_state_digest,
466                )
467            } else if let Some(version_assignments) = version_assignments {
468                self.consensus_commit_prologue_v3_transaction(epoch, version_assignments)
469            } else if protocol_config.include_consensus_digest_in_prologue() {
470                self.consensus_commit_prologue_v2_transaction(epoch)
471            } else {
472                self.consensus_commit_prologue_transaction(epoch)
473            }
474        }
475    }
476
477    #[derive(Default)]
478    pub struct IndirectStateObserver {
479        hash: DefaultHash,
480    }
481
482    impl IndirectStateObserver {
483        pub fn new() -> Self {
484            Self::default()
485        }
486
487        pub fn observe_indirect_state<T: Serialize>(&mut self, state: &T) {
488            bcs::serialize_into(&mut self.hash, state).unwrap();
489        }
490
491        pub fn fold_with(
492            self,
493            d1: AdditionalConsensusStateDigest,
494        ) -> AdditionalConsensusStateDigest {
495            let hash = self.hash.finalize();
496            let d2 = AdditionalConsensusStateDigest::new(hash.into());
497
498            let mut hasher = DefaultHash::new();
499            bcs::serialize_into(&mut hasher, &d1).unwrap();
500            bcs::serialize_into(&mut hasher, &d2).unwrap();
501            AdditionalConsensusStateDigest::new(hasher.finalize().into())
502        }
503    }
504
505    #[test]
506    fn test_additional_consensus_state() {
507        use crate::consensus_test_utils::TestConsensusCommit;
508
509        fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) {
510            let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
511            state.observe_commit(
512                &protocol_config,
513                100,
514                &TestConsensusCommit::empty(round, timestamp, 0),
515            );
516        }
517
518        let mut s1 = AdditionalConsensusState::new(3);
519        observe(&mut s1, 1, 1000);
520        observe(&mut s1, 2, 2000);
521        observe(&mut s1, 3, 3000);
522        observe(&mut s1, 4, 4000);
523
524        let mut s2 = AdditionalConsensusState::new(3);
525        // Because state uses a ring buffer, we should get the same digest
526        // even though we only added the 3 latest observations.
527        observe(&mut s2, 2, 2000);
528        observe(&mut s2, 3, 3000);
529        observe(&mut s2, 4, 4000);
530
531        assert_eq!(s1.digest(), s2.digest());
532
533        observe(&mut s1, 5, 5000);
534        observe(&mut s2, 5, 5000);
535
536        assert_eq!(s1.digest(), s2.digest());
537    }
538}
539use additional_consensus_state::AdditionalConsensusState;
540pub(crate) use additional_consensus_state::{ConsensusCommitInfo, IndirectStateObserver};
541
542struct QueuedCheckpointRoots {
543    roots: CheckpointRoots,
544    timestamp: CheckpointTimestamp,
545    consensus_commit_ref: CommitRef,
546    rejected_transactions_digest: Digest,
547}
548
549struct Chunk<
550    T: crate::authority::shared_object_version_manager::AsTx = VerifiedExecutableTransaction,
551> {
552    schedulables: Vec<Schedulable<T>>,
553    settlement: Option<Schedulable<T>>,
554    height: CheckpointHeight,
555}
556
557impl<T: crate::authority::shared_object_version_manager::AsTx + Clone> Chunk<T> {
558    fn all_schedulables(&self) -> impl Iterator<Item = &Schedulable<T>> + Clone {
559        self.schedulables.iter().chain(self.settlement.iter())
560    }
561
562    fn all_schedulables_from(chunks: &[Self]) -> impl Iterator<Item = &Schedulable<T>> + Clone {
563        chunks.iter().flat_map(|c| c.all_schedulables())
564    }
565
566    fn to_checkpoint_roots(&self) -> CheckpointRoots {
567        let tx_roots: Vec<_> = self.schedulables.iter().map(|s| s.key()).collect();
568        let settlement_root = self.settlement.as_ref().map(|s| s.key());
569        CheckpointRoots {
570            tx_roots,
571            settlement_root,
572            height: self.height,
573        }
574    }
575}
576
577impl From<Chunk<VerifiedExecutableTransactionWithAliases>> for Chunk {
578    fn from(chunk: Chunk<VerifiedExecutableTransactionWithAliases>) -> Self {
579        Chunk {
580            schedulables: chunk.schedulables.into_iter().map(|s| s.into()).collect(),
581            settlement: chunk.settlement.map(|s| s.into()),
582            height: chunk.height,
583        }
584    }
585}
586
587// Accumulates checkpoint roots from consensus and flushes them into PendingCheckpointV2s.
588// Roots are buffered in `pending_roots` until a flush is triggered (by max_tx overflow or
589// time interval). A flush always drains all buffered roots into a single PendingCheckpointV2,
590// so the queue only ever holds roots for one pending checkpoint at a time.
591//
592// Owns an ExecutionSchedulerSender so push_chunk can send schedulables and settlement info
593// for execution in one shot.
594pub(crate) struct CheckpointQueue {
595    last_built_timestamp: CheckpointTimestamp,
596    pending_roots: VecDeque<QueuedCheckpointRoots>,
597    height: u64,
598    pending_tx_count: usize,
599    current_checkpoint_seq: CheckpointSequenceNumber,
600    max_tx: usize,
601    min_checkpoint_interval_ms: u64,
602    execution_scheduler_sender: ExecutionSchedulerSender,
603}
604
605impl CheckpointQueue {
606    pub(crate) fn new(
607        last_built_timestamp: CheckpointTimestamp,
608        checkpoint_height: u64,
609        next_checkpoint_seq: CheckpointSequenceNumber,
610        max_tx: usize,
611        min_checkpoint_interval_ms: u64,
612        execution_scheduler_sender: ExecutionSchedulerSender,
613    ) -> Self {
614        Self {
615            last_built_timestamp,
616            pending_roots: VecDeque::new(),
617            height: checkpoint_height,
618            pending_tx_count: 0,
619            current_checkpoint_seq: next_checkpoint_seq,
620            max_tx,
621            min_checkpoint_interval_ms,
622            execution_scheduler_sender,
623        }
624    }
625
626    #[cfg(test)]
627    fn new_for_testing(
628        last_built_timestamp: CheckpointTimestamp,
629        checkpoint_height: u64,
630        next_checkpoint_seq: CheckpointSequenceNumber,
631        max_tx: usize,
632        min_checkpoint_interval_ms: u64,
633    ) -> Self {
634        let (sender, _receiver) = monitored_mpsc::unbounded_channel("test_checkpoint_queue_sender");
635        Self {
636            last_built_timestamp,
637            pending_roots: VecDeque::new(),
638            height: checkpoint_height,
639            pending_tx_count: 0,
640            current_checkpoint_seq: next_checkpoint_seq,
641            max_tx,
642            min_checkpoint_interval_ms,
643            execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
644        }
645    }
646
647    #[cfg(test)]
648    fn new_for_testing_with_sender(
649        last_built_timestamp: CheckpointTimestamp,
650        checkpoint_height: u64,
651        next_checkpoint_seq: CheckpointSequenceNumber,
652        max_tx: usize,
653        min_checkpoint_interval_ms: u64,
654        sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
655    ) -> Self {
656        Self {
657            last_built_timestamp,
658            pending_roots: VecDeque::new(),
659            height: checkpoint_height,
660            pending_tx_count: 0,
661            current_checkpoint_seq: next_checkpoint_seq,
662            max_tx,
663            min_checkpoint_interval_ms,
664            execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
665        }
666    }
667
668    pub(crate) fn last_built_timestamp(&self) -> CheckpointTimestamp {
669        self.last_built_timestamp
670    }
671
672    pub(crate) fn is_empty(&self) -> bool {
673        self.pending_roots.is_empty()
674    }
675
676    fn next_height(&mut self) -> u64 {
677        self.height += 1;
678        self.height
679    }
680
681    fn push_chunk(
682        &mut self,
683        chunk: Chunk,
684        assigned_versions: &HashMap<TransactionKey, AssignedVersions>,
685        timestamp: CheckpointTimestamp,
686        consensus_commit_ref: CommitRef,
687        rejected_transactions_digest: Digest,
688    ) -> Vec<PendingCheckpointV2> {
689        let max_tx = self.max_tx;
690        let user_tx_count = chunk.schedulables.len();
691
692        let roots = chunk.to_checkpoint_roots();
693
694        let schedulables: Vec<_> = chunk
695            .schedulables
696            .into_iter()
697            .map(|s| {
698                let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
699                (s, versions)
700            })
701            .collect();
702
703        let mut flushed_checkpoints = Vec::new();
704
705        if self.pending_tx_count > 0
706            && self.pending_tx_count + user_tx_count > max_tx
707            && let Some(checkpoint) = self.flush_forced()
708        {
709            flushed_checkpoints.push(checkpoint);
710        }
711
712        let settlement_info = chunk.settlement.as_ref().map(|s| {
713            let settlement_key = s.key();
714            let tx_keys: Vec<_> = schedulables.iter().map(|(s, _)| s.key()).collect();
715            SettlementBatchInfo {
716                settlement_key,
717                tx_keys,
718                checkpoint_height: chunk.height,
719                checkpoint_seq: self.current_checkpoint_seq,
720                assigned_versions: assigned_versions
721                    .get(&settlement_key)
722                    .cloned()
723                    .unwrap_or_default(),
724            }
725        });
726
727        self.execution_scheduler_sender
728            .send(schedulables, settlement_info);
729
730        self.pending_tx_count += user_tx_count;
731        self.pending_roots.push_back(QueuedCheckpointRoots {
732            roots,
733            timestamp,
734            consensus_commit_ref,
735            rejected_transactions_digest,
736        });
737
738        flushed_checkpoints
739    }
740
741    pub(crate) fn flush(
742        &mut self,
743        current_timestamp: CheckpointTimestamp,
744        force: bool,
745    ) -> Option<PendingCheckpointV2> {
746        if !force && current_timestamp < self.last_built_timestamp + self.min_checkpoint_interval_ms
747        {
748            return None;
749        }
750        self.flush_forced()
751    }
752
753    fn flush_forced(&mut self) -> Option<PendingCheckpointV2> {
754        if self.pending_roots.is_empty() {
755            return None;
756        }
757
758        let to_flush: Vec<_> = self.pending_roots.drain(..).collect();
759        let last_root = to_flush.last().unwrap();
760
761        let checkpoint = PendingCheckpointV2 {
762            roots: to_flush.iter().map(|q| q.roots.clone()).collect(),
763            details: PendingCheckpointInfo {
764                timestamp_ms: last_root.timestamp,
765                last_of_epoch: false,
766                checkpoint_height: last_root.roots.height,
767                consensus_commit_ref: last_root.consensus_commit_ref,
768                rejected_transactions_digest: last_root.rejected_transactions_digest,
769                checkpoint_seq: Some(self.current_checkpoint_seq),
770            },
771        };
772
773        self.last_built_timestamp = last_root.timestamp;
774        self.pending_tx_count = 0;
775        self.current_checkpoint_seq += 1;
776
777        Some(checkpoint)
778    }
779
780    pub(crate) fn checkpoint_seq(&self) -> CheckpointSequenceNumber {
781        self.current_checkpoint_seq
782            .checked_sub(1)
783            .expect("checkpoint_seq called before any checkpoint was assigned")
784    }
785}
786
787pub struct ConsensusHandler<C> {
788    /// A store created for each epoch. ConsensusHandler is recreated each epoch, with the
789    /// corresponding store. This store is also used to get the current epoch ID.
790    epoch_store: Arc<AuthorityPerEpochStore>,
791    /// Holds the indices, hash and stats after the last consensus commit
792    /// It is used for avoiding replaying already processed transactions,
793    /// checking chain consistency, and accumulating per-epoch consensus output stats.
794    last_consensus_stats: ExecutionIndicesWithStatsV2,
795    checkpoint_service: Arc<C>,
796    /// cache reader is needed when determining the next version to assign for shared objects.
797    cache_reader: Arc<dyn ObjectCacheRead>,
798    /// The consensus committee used to do stake computations for deciding set of low scoring authorities
799    committee: ConsensusCommittee,
800    // TODO: ConsensusHandler doesn't really share metrics with AuthorityState. We could define
801    // a new metrics type here if we want to.
802    metrics: Arc<AuthorityMetrics>,
803    /// Lru cache to quickly discard transactions processed by consensus
804    processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
805    /// Enqueues transactions to the execution scheduler via a separate task.
806    execution_scheduler_sender: ExecutionSchedulerSender,
807    /// Consensus adapter for submitting transactions to consensus
808    consensus_adapter: Arc<ConsensusAdapter>,
809
810    /// Using the throughput calculator to record the current consensus throughput
811    throughput_calculator: Arc<ConsensusThroughputCalculator>,
812
813    additional_consensus_state: AdditionalConsensusState,
814
815    backpressure_subscriber: BackpressureSubscriber,
816
817    traffic_controller: Option<Arc<TrafficController>>,
818
819    congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
820
821    consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
822
823    checkpoint_queue: Mutex<CheckpointQueue>,
824}
825
826const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
827
828impl<C> ConsensusHandler<C> {
829    pub(crate) fn new(
830        epoch_store: Arc<AuthorityPerEpochStore>,
831        checkpoint_service: Arc<C>,
832        settlement_scheduler: SettlementScheduler,
833        consensus_adapter: Arc<ConsensusAdapter>,
834        cache_reader: Arc<dyn ObjectCacheRead>,
835        committee: ConsensusCommittee,
836        metrics: Arc<AuthorityMetrics>,
837        throughput_calculator: Arc<ConsensusThroughputCalculator>,
838        backpressure_subscriber: BackpressureSubscriber,
839        traffic_controller: Option<Arc<TrafficController>>,
840        congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
841        consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
842    ) -> Self {
843        assert!(
844            matches!(
845                epoch_store
846                    .protocol_config()
847                    .per_object_congestion_control_mode(),
848                PerObjectCongestionControlMode::ExecutionTimeEstimate(_)
849            ),
850            "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
851        );
852
853        // Recover last_consensus_stats so it is consistent across validators.
854        let mut last_consensus_stats = epoch_store
855            .get_last_consensus_stats()
856            .expect("Should be able to read last consensus index");
857        // stats is empty at the beginning of epoch.
858        if !last_consensus_stats.stats.is_initialized() {
859            last_consensus_stats.stats = ConsensusStats::new(committee.size());
860            if epoch_store
861                .protocol_config()
862                .split_checkpoints_in_consensus_handler()
863            {
864                last_consensus_stats.checkpoint_seq = epoch_store.previous_epoch_last_checkpoint();
865            }
866        }
867        let max_tx = epoch_store
868            .protocol_config()
869            .max_transactions_per_checkpoint() as usize;
870        let min_checkpoint_interval_ms = epoch_store
871            .protocol_config()
872            .min_checkpoint_interval_ms_as_option()
873            .unwrap_or_default();
874        let execution_scheduler_sender =
875            ExecutionSchedulerSender::start(settlement_scheduler, epoch_store.clone());
876        let commit_rate_estimate_window_size = epoch_store
877            .protocol_config()
878            .get_consensus_commit_rate_estimation_window_size();
879        let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
880        let checkpoint_height = last_consensus_stats.height;
881        let next_checkpoint_seq = if epoch_store
882            .protocol_config()
883            .split_checkpoints_in_consensus_handler()
884        {
885            last_consensus_stats.checkpoint_seq + 1
886        } else {
887            0
888        };
889        Self {
890            epoch_store,
891            last_consensus_stats,
892            checkpoint_service,
893            cache_reader,
894            committee,
895            metrics,
896            processed_cache: LruCache::new(
897                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
898            ),
899            execution_scheduler_sender: execution_scheduler_sender.clone(),
900            consensus_adapter,
901            throughput_calculator,
902            additional_consensus_state: AdditionalConsensusState::new(
903                commit_rate_estimate_window_size,
904            ),
905            backpressure_subscriber,
906            traffic_controller,
907            congestion_logger,
908            consensus_gasless_counter,
909            checkpoint_queue: Mutex::new(CheckpointQueue::new(
910                last_built_timestamp,
911                checkpoint_height,
912                next_checkpoint_seq,
913                max_tx,
914                min_checkpoint_interval_ms,
915                execution_scheduler_sender,
916            )),
917        }
918    }
919
920    /// Returns the last subdag index processed by the handler.
921    pub(crate) fn last_processed_subdag_index(&self) -> u64 {
922        self.last_consensus_stats.index.sub_dag_index
923    }
924
925    pub(crate) fn new_for_testing(
926        epoch_store: Arc<AuthorityPerEpochStore>,
927        checkpoint_service: Arc<C>,
928        execution_scheduler_sender: ExecutionSchedulerSender,
929        consensus_adapter: Arc<ConsensusAdapter>,
930        cache_reader: Arc<dyn ObjectCacheRead>,
931        committee: ConsensusCommittee,
932        metrics: Arc<AuthorityMetrics>,
933        throughput_calculator: Arc<ConsensusThroughputCalculator>,
934        backpressure_subscriber: BackpressureSubscriber,
935        traffic_controller: Option<Arc<TrafficController>>,
936        last_consensus_stats: ExecutionIndicesWithStatsV2,
937    ) -> Self {
938        let commit_rate_estimate_window_size = epoch_store
939            .protocol_config()
940            .get_consensus_commit_rate_estimation_window_size();
941        let max_tx = epoch_store
942            .protocol_config()
943            .max_transactions_per_checkpoint() as usize;
944        let min_checkpoint_interval_ms = epoch_store
945            .protocol_config()
946            .min_checkpoint_interval_ms_as_option()
947            .unwrap_or_default();
948        let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
949        let checkpoint_height = last_consensus_stats.height;
950        Self {
951            epoch_store,
952            last_consensus_stats,
953            checkpoint_service,
954            cache_reader,
955            committee,
956            metrics,
957            processed_cache: LruCache::new(
958                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
959            ),
960            execution_scheduler_sender: execution_scheduler_sender.clone(),
961            consensus_adapter,
962            throughput_calculator,
963            additional_consensus_state: AdditionalConsensusState::new(
964                commit_rate_estimate_window_size,
965            ),
966            backpressure_subscriber,
967            traffic_controller,
968            congestion_logger: None,
969            consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
970            checkpoint_queue: Mutex::new(CheckpointQueue::new(
971                last_built_timestamp,
972                checkpoint_height,
973                0,
974                max_tx,
975                min_checkpoint_interval_ms,
976                execution_scheduler_sender,
977            )),
978        }
979    }
980}
981
982#[derive(Default)]
983struct CommitHandlerInput {
984    user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
985    capability_notifications: Vec<AuthorityCapabilitiesV2>,
986    execution_time_observations: Vec<ExecutionTimeObservation>,
987    checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
988    randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
989    randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
990    end_of_publish_transactions: Vec<AuthorityName>,
991    new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
992}
993
994struct CommitHandlerState {
995    dkg_failed: bool,
996    randomness_round: Option<RandomnessRound>,
997    output: ConsensusCommitOutput,
998    indirect_state_observer: Option<IndirectStateObserver>,
999    initial_reconfig_state: ReconfigState,
1000    // Occurrence counts for user transactions, used for unpaid amplification detection.
1001    occurrence_counts: HashMap<TransactionDigest, u32>,
1002}
1003
1004impl CommitHandlerState {
1005    fn get_notifications(&self) -> Vec<SequencedConsensusTransactionKey> {
1006        self.output
1007            .get_consensus_messages_processed()
1008            .cloned()
1009            .collect()
1010    }
1011
1012    fn init_randomness<'a, 'epoch>(
1013        &'a mut self,
1014        epoch_store: &'epoch AuthorityPerEpochStore,
1015        commit_info: &'a ConsensusCommitInfo,
1016    ) -> Option<tokio::sync::MutexGuard<'epoch, RandomnessManager>> {
1017        let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| {
1018            rm.try_lock()
1019                .expect("should only ever be called from the commit handler thread")
1020        });
1021
1022        let mut dkg_failed = false;
1023        let randomness_round = if epoch_store.randomness_state_enabled() {
1024            let randomness_manager = randomness_manager
1025                .as_mut()
1026                .expect("randomness manager should exist if randomness is enabled");
1027            match randomness_manager.dkg_status() {
1028                DkgStatus::Pending => None,
1029                DkgStatus::Failed => {
1030                    dkg_failed = true;
1031                    None
1032                }
1033                DkgStatus::Successful => {
1034                    // Generate randomness for this commit if DKG is successful and we are still
1035                    // accepting certs.
1036                    if self.initial_reconfig_state.should_accept_tx() {
1037                        randomness_manager
1038                            // TODO: make infallible
1039                            .reserve_next_randomness(commit_info.timestamp, &mut self.output)
1040                            .expect("epoch ended")
1041                    } else {
1042                        None
1043                    }
1044                }
1045            }
1046        } else {
1047            None
1048        };
1049
1050        if randomness_round.is_some() {
1051            assert!(!dkg_failed); // invariant check
1052        }
1053
1054        self.randomness_round = randomness_round;
1055        self.dkg_failed = dkg_failed;
1056
1057        randomness_manager
1058    }
1059}
1060
1061impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
1062    /// Called during startup to allow us to observe commits we previously processed, for crash recovery.
1063    /// Any state computed here must be a pure function of the commits observed, it cannot depend on any
1064    /// state recorded in the epoch db.
1065    fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
1066        assert!(
1067            self.epoch_store
1068                .protocol_config()
1069                .record_additional_state_digest_in_prologue()
1070        );
1071        let protocol_config = self.epoch_store.protocol_config();
1072        let epoch_start_time = self
1073            .epoch_store
1074            .epoch_start_config()
1075            .epoch_start_timestamp_ms();
1076
1077        self.additional_consensus_state.observe_commit(
1078            protocol_config,
1079            epoch_start_time,
1080            &consensus_commit,
1081        );
1082    }
1083
1084    #[cfg(test)]
1085    pub(crate) async fn handle_consensus_commit_for_test(
1086        &mut self,
1087        consensus_commit: impl ConsensusCommitAPI,
1088    ) {
1089        self.handle_consensus_commit(consensus_commit).await;
1090    }
1091
1092    #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))]
1093    pub(crate) async fn handle_consensus_commit(
1094        &mut self,
1095        consensus_commit: impl ConsensusCommitAPI,
1096    ) {
1097        {
1098            let protocol_config = self.epoch_store.protocol_config();
1099
1100            // Assert all protocol config settings for which we don't support old behavior.
1101            assert!(protocol_config.ignore_execution_time_observations_after_certs_closed());
1102            assert!(protocol_config.record_time_estimate_processed());
1103            assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints());
1104            assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest());
1105            assert!(protocol_config.authority_capabilities_v2());
1106            assert!(protocol_config.cancel_for_failed_dkg_early());
1107        }
1108
1109        // This may block until one of two conditions happens:
1110        // - Number of uncommitted transactions in the writeback cache goes below the
1111        //   backpressure threshold.
1112        // - The highest executed checkpoint catches up to the highest certified checkpoint.
1113        self.backpressure_subscriber.await_no_backpressure().await;
1114
1115        let epoch = self.epoch_store.epoch();
1116
1117        let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit");
1118
1119        let last_committed_round = self.last_consensus_stats.index.last_committed_round;
1120
1121        self.epoch_store
1122            .consensus_tx_status_cache
1123            .update_last_committed_leader_round(last_committed_round as u32);
1124        self.epoch_store
1125            .tx_reject_reason_cache
1126            .set_last_committed_leader_round(last_committed_round as u32);
1127
1128        let commit_info = self.additional_consensus_state.observe_commit(
1129            self.epoch_store.protocol_config(),
1130            self.epoch_store
1131                .epoch_start_config()
1132                .epoch_start_timestamp_ms(),
1133            &consensus_commit,
1134        );
1135        assert!(commit_info.round > last_committed_round);
1136
1137        let (timestamp, leader_author, commit_sub_dag_index) =
1138            self.gather_commit_metadata(&consensus_commit);
1139
1140        info!(
1141            %consensus_commit,
1142            "Received consensus output {}. Rejected transactions {}",
1143            consensus_commit.commit_ref(),
1144            consensus_commit.rejected_transactions_debug_string(),
1145        );
1146
1147        self.last_consensus_stats.index = ExecutionIndices {
1148            last_committed_round: commit_info.round,
1149            sub_dag_index: commit_sub_dag_index,
1150            transaction_index: 0_u64,
1151        };
1152
1153        self.metrics
1154            .consensus_committed_subdags
1155            .with_label_values(&[&leader_author.to_string()])
1156            .inc();
1157
1158        let mut state = CommitHandlerState {
1159            output: ConsensusCommitOutput::new(commit_info.round),
1160            dkg_failed: false,
1161            randomness_round: None,
1162            indirect_state_observer: Some(IndirectStateObserver::new()),
1163            initial_reconfig_state: self
1164                .epoch_store
1165                .get_reconfig_state_read_lock_guard()
1166                .clone(),
1167            occurrence_counts: HashMap::new(),
1168        };
1169
1170        let FilteredConsensusOutput {
1171            transactions,
1172            owned_object_locks,
1173        } = self.filter_consensus_txns(
1174            state.initial_reconfig_state.clone(),
1175            &commit_info,
1176            &consensus_commit,
1177        );
1178        // Buffer owned object locks for batch write.
1179        if !owned_object_locks.is_empty() {
1180            state.output.set_owned_object_locks(owned_object_locks);
1181        }
1182        let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions);
1183
1184        let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info);
1185
1186        let CommitHandlerInput {
1187            user_transactions,
1188            capability_notifications,
1189            execution_time_observations,
1190            checkpoint_signature_messages,
1191            randomness_dkg_messages,
1192            randomness_dkg_confirmations,
1193            end_of_publish_transactions,
1194            new_jwks,
1195        } = self.build_commit_handler_input(transactions);
1196
1197        self.process_gasless_transactions(&commit_info, &user_transactions);
1198        self.process_jwks(&mut state, &commit_info, new_jwks);
1199        self.process_capability_notifications(capability_notifications);
1200        self.process_execution_time_observations(&mut state, execution_time_observations);
1201        self.process_checkpoint_signature_messages(checkpoint_signature_messages);
1202
1203        self.process_dkg_updates(
1204            &mut state,
1205            &commit_info,
1206            randomness_manager.as_deref_mut(),
1207            randomness_dkg_messages,
1208            randomness_dkg_confirmations,
1209        )
1210        .await;
1211
1212        let mut execution_time_estimator = self
1213            .epoch_store
1214            .execution_time_estimator
1215            .try_lock()
1216            .expect("should only ever be called from the commit handler thread");
1217
1218        let authenticator_state_update_transaction =
1219            self.create_authenticator_state_update(last_committed_round, &commit_info);
1220
1221        let split_checkpoints = self
1222            .epoch_store
1223            .protocol_config()
1224            .split_checkpoints_in_consensus_handler();
1225
1226        let (lock, final_round, num_schedulables, checkpoint_height) = if !split_checkpoints {
1227            let (schedulables, randomness_schedulables, assigned_versions) = self
1228                .process_transactions(
1229                    &mut state,
1230                    &mut execution_time_estimator,
1231                    &commit_info,
1232                    authenticator_state_update_transaction,
1233                    user_transactions,
1234                );
1235
1236            let (should_accept_tx, lock, final_round) =
1237                self.handle_close_epoch(&mut state, &commit_info, end_of_publish_transactions);
1238
1239            let make_checkpoint = should_accept_tx || final_round;
1240            if !make_checkpoint {
1241                // No need for any further processing
1242                return;
1243            }
1244
1245            // If this is the final round, record execution time observations for storage in the
1246            // end-of-epoch tx.
1247            if final_round {
1248                self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1249            }
1250
1251            let checkpoint_height = self
1252                .epoch_store
1253                .calculate_pending_checkpoint_height(commit_info.round);
1254
1255            self.create_pending_checkpoints(
1256                &mut state,
1257                &commit_info,
1258                &schedulables,
1259                &randomness_schedulables,
1260                final_round,
1261            );
1262
1263            let num_schedulables = schedulables.len();
1264            let mut all_schedulables = schedulables;
1265            all_schedulables.extend(randomness_schedulables);
1266            let assigned_versions = assigned_versions.into_map();
1267            let paired: Vec<_> = all_schedulables
1268                .into_iter()
1269                .map(|s| {
1270                    let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
1271                    (s, versions)
1272                })
1273                .collect();
1274            self.execution_scheduler_sender.send(paired, None);
1275
1276            (lock, final_round, num_schedulables, checkpoint_height)
1277        } else {
1278            let (
1279                transactions_to_schedule,
1280                randomness_transactions_to_schedule,
1281                cancelled_txns,
1282                randomness_state_update_transaction,
1283            ) = self.collect_transactions_to_schedule(
1284                &mut state,
1285                &mut execution_time_estimator,
1286                &commit_info,
1287                user_transactions,
1288            );
1289
1290            let (should_accept_tx, lock, final_round) =
1291                self.handle_close_epoch(&mut state, &commit_info, end_of_publish_transactions);
1292
1293            let make_checkpoint = should_accept_tx || final_round;
1294            if !make_checkpoint {
1295                // No need for any further processing
1296                return;
1297            }
1298
1299            // If this is the final round, record execution time observations for storage in the
1300            // end-of-epoch tx.
1301            if final_round {
1302                self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1303            }
1304
1305            let epoch = self.epoch_store.epoch();
1306            let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1307                .then_some(Schedulable::ConsensusCommitPrologue(
1308                    epoch,
1309                    commit_info.round,
1310                    commit_info.consensus_commit_ref.index,
1311                ));
1312
1313            let schedulables: Vec<_> = itertools::chain!(
1314                consensus_commit_prologue.into_iter(),
1315                authenticator_state_update_transaction
1316                    .into_iter()
1317                    .map(Schedulable::Transaction),
1318                transactions_to_schedule
1319                    .into_iter()
1320                    .map(Schedulable::Transaction),
1321            )
1322            .collect();
1323
1324            let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1325                .into_iter()
1326                .chain(
1327                    randomness_transactions_to_schedule
1328                        .into_iter()
1329                        .map(Schedulable::Transaction),
1330                )
1331                .collect();
1332
1333            let num_schedulables = schedulables.len();
1334            let checkpoint_height = self.create_pending_checkpoints_v2(
1335                &mut state,
1336                &commit_info,
1337                schedulables,
1338                randomness_schedulables,
1339                &cancelled_txns,
1340                final_round,
1341            );
1342
1343            (lock, final_round, num_schedulables, checkpoint_height)
1344        };
1345
1346        let notifications = state.get_notifications();
1347
1348        let mut stats_to_record = self.last_consensus_stats.clone();
1349        stats_to_record.height = checkpoint_height;
1350        if split_checkpoints {
1351            let queue = self.checkpoint_queue.lock().unwrap();
1352            stats_to_record.last_checkpoint_flush_timestamp = queue.last_built_timestamp();
1353            stats_to_record.checkpoint_seq = queue.checkpoint_seq();
1354        }
1355
1356        state.output.record_consensus_commit_stats(stats_to_record);
1357
1358        self.record_deferral_deletion(&mut state);
1359
1360        self.epoch_store
1361            .consensus_quarantine
1362            .write()
1363            .push_consensus_output(state.output, &self.epoch_store)
1364            .expect("push_consensus_output should not fail");
1365
1366        debug!(
1367            ?commit_info.round,
1368            "Notifying checkpoint service about new pending checkpoint(s)",
1369        );
1370        self.checkpoint_service
1371            .notify_checkpoint()
1372            .expect("failed to notify checkpoint service");
1373
1374        if let Some(randomness_round) = state.randomness_round {
1375            randomness_manager
1376                .as_ref()
1377                .expect("randomness manager should exist if randomness round is provided")
1378                .generate_randomness(epoch, randomness_round);
1379        }
1380
1381        self.epoch_store.process_notifications(notifications.iter());
1382
1383        // pass lock by value to ensure that it is held until this point
1384        self.log_final_round(lock, final_round);
1385
1386        // update the calculated throughput
1387        self.throughput_calculator
1388            .add_transactions(timestamp, num_schedulables as u64);
1389
1390        fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
1391            let key = [commit_sub_dag_index, epoch];
1392            if sui_simulator::random::deterministic_probability_once(&key, 0.01) {
1393                sui_simulator::task::kill_current_node(None);
1394            }
1395        });
1396
1397        fail_point!("crash");
1398
1399        self.send_end_of_publish_if_needed().await;
1400    }
1401
1402    fn handle_close_epoch(
1403        &self,
1404        state: &mut CommitHandlerState,
1405        commit_info: &ConsensusCommitInfo,
1406        end_of_publish_transactions: Vec<AuthorityName>,
1407    ) -> (bool, Option<RwLockWriteGuard<'_, ReconfigState>>, bool) {
1408        let timestamp_based_epoch_close = self
1409            .epoch_store
1410            .protocol_config()
1411            .timestamp_based_epoch_close();
1412        let timestamp_triggered = timestamp_based_epoch_close
1413            && commit_info.timestamp >= self.epoch_store.next_reconfiguration_timestamp_ms();
1414        if timestamp_triggered {
1415            // close_user_certs() is only needed here when timestamp_based_epoch_close is enabled.
1416            let reconfig_guard = self.epoch_store.get_reconfig_state_write_lock_guard();
1417            if reconfig_guard.should_accept_user_certs() {
1418                self.epoch_store.close_user_certs(reconfig_guard);
1419            }
1420        }
1421        let collected_eop_quorum =
1422            self.process_end_of_publish_transactions(state, end_of_publish_transactions);
1423        if timestamp_triggered || collected_eop_quorum {
1424            let (lock, final_round) = self.advance_eop_state_machine(state);
1425            (lock.should_accept_tx(), Some(lock), final_round)
1426        } else {
1427            (true, None, false)
1428        }
1429    }
1430
1431    fn record_end_of_epoch_execution_time_observations(
1432        &self,
1433        estimator: &mut ExecutionTimeEstimator,
1434    ) {
1435        self.epoch_store
1436            .end_of_epoch_execution_time_observations
1437            .set(estimator.take_observations())
1438            .expect("`stored_execution_time_observations` should only be set once at end of epoch");
1439    }
1440
1441    fn record_deferral_deletion(&self, state: &mut CommitHandlerState) {
1442        let mut deferred_transactions = self
1443            .epoch_store
1444            .consensus_output_cache
1445            .deferred_transactions
1446            .lock();
1447        for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() {
1448            deferred_transactions.remove(&deleted_deferred_key);
1449        }
1450    }
1451
1452    fn log_final_round(&self, lock: Option<RwLockWriteGuard<ReconfigState>>, final_round: bool) {
1453        if final_round {
1454            let epoch = self.epoch_store.epoch();
1455            info!(
1456                ?epoch,
1457                lock=?lock.as_ref(),
1458                final_round=?final_round,
1459                "Notified last checkpoint"
1460            );
1461            self.epoch_store.record_end_of_message_quorum_time_metric();
1462        }
1463    }
1464
1465    fn create_pending_checkpoints(
1466        &self,
1467        state: &mut CommitHandlerState,
1468        commit_info: &ConsensusCommitInfo,
1469        schedulables: &[Schedulable],
1470        randomness_schedulables: &[Schedulable],
1471        final_round: bool,
1472    ) {
1473        assert!(
1474            !self
1475                .epoch_store
1476                .protocol_config()
1477                .split_checkpoints_in_consensus_handler()
1478        );
1479
1480        let checkpoint_height = self
1481            .epoch_store
1482            .calculate_pending_checkpoint_height(commit_info.round);
1483
1484        // Determine whether to write pending checkpoint for user tx with randomness.
1485        // - If randomness is not generated for this commit, we will skip the
1486        //   checkpoint with the associated height. Therefore checkpoint heights may
1487        //   not be contiguous.
1488        // - Exception: if DKG fails, we always need to write out a PendingCheckpoint
1489        //   for randomness tx that are canceled.
1490        let should_write_random_checkpoint = state.randomness_round.is_some()
1491            || (state.dkg_failed && !randomness_schedulables.is_empty());
1492
1493        let pending_checkpoint = PendingCheckpoint {
1494            roots: schedulables.iter().map(|s| s.key()).collect(),
1495            details: PendingCheckpointInfo {
1496                timestamp_ms: commit_info.timestamp,
1497                last_of_epoch: final_round && !should_write_random_checkpoint,
1498                checkpoint_height,
1499                consensus_commit_ref: commit_info.consensus_commit_ref,
1500                rejected_transactions_digest: commit_info.rejected_transactions_digest,
1501                checkpoint_seq: None,
1502            },
1503        };
1504        self.epoch_store
1505            .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1506            .expect("failed to write pending checkpoint");
1507
1508        info!(
1509            "Written pending checkpoint: {:?}",
1510            pending_checkpoint.details,
1511        );
1512
1513        if should_write_random_checkpoint {
1514            let pending_checkpoint = PendingCheckpoint {
1515                roots: randomness_schedulables.iter().map(|s| s.key()).collect(),
1516                details: PendingCheckpointInfo {
1517                    timestamp_ms: commit_info.timestamp,
1518                    last_of_epoch: final_round,
1519                    checkpoint_height: checkpoint_height + 1,
1520                    consensus_commit_ref: commit_info.consensus_commit_ref,
1521                    rejected_transactions_digest: commit_info.rejected_transactions_digest,
1522                    checkpoint_seq: None,
1523                },
1524            };
1525            self.epoch_store
1526                .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1527                .expect("failed to write pending checkpoint");
1528        }
1529    }
1530
1531    #[allow(clippy::type_complexity)]
1532    fn collect_transactions_to_schedule(
1533        &self,
1534        state: &mut CommitHandlerState,
1535        execution_time_estimator: &mut ExecutionTimeEstimator,
1536        commit_info: &ConsensusCommitInfo,
1537        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1538    ) -> (
1539        Vec<VerifiedExecutableTransactionWithAliases>,
1540        Vec<VerifiedExecutableTransactionWithAliases>,
1541        BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1542        Option<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1543    ) {
1544        let protocol_config = self.epoch_store.protocol_config();
1545        let epoch = self.epoch_store.epoch();
1546
1547        let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) =
1548            self.merge_and_reorder_transactions(state, commit_info, user_transactions);
1549
1550        let mut shared_object_congestion_tracker =
1551            self.init_congestion_tracker(commit_info, false, &ordered_txns);
1552        let mut shared_object_using_randomness_congestion_tracker =
1553            self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns);
1554
1555        let randomness_state_update_transaction = state
1556            .randomness_round
1557            .map(|round| Schedulable::RandomnessStateUpdate(epoch, round));
1558        debug!(
1559            "Randomness state update transaction: {:?}",
1560            randomness_state_update_transaction
1561                .as_ref()
1562                .map(|t| t.key())
1563        );
1564
1565        let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len());
1566        let mut randomness_transactions_to_schedule =
1567            Vec::with_capacity(ordered_randomness_txns.len());
1568        let mut deferred_txns = BTreeMap::new();
1569        let mut cancelled_txns = BTreeMap::new();
1570
1571        for transaction in ordered_txns {
1572            self.handle_deferral_and_cancellation(
1573                state,
1574                &mut cancelled_txns,
1575                &mut deferred_txns,
1576                &mut transactions_to_schedule,
1577                protocol_config,
1578                commit_info,
1579                transaction,
1580                &mut shared_object_congestion_tracker,
1581                &previously_deferred_tx_digests,
1582                execution_time_estimator,
1583            );
1584        }
1585
1586        for transaction in ordered_randomness_txns {
1587            if state.dkg_failed {
1588                debug!(
1589                    "Canceling randomness-using transaction {:?} because DKG failed",
1590                    transaction.tx().digest(),
1591                );
1592                cancelled_txns.insert(
1593                    *transaction.tx().digest(),
1594                    CancelConsensusCertificateReason::DkgFailed,
1595                );
1596                randomness_transactions_to_schedule.push(transaction);
1597                continue;
1598            }
1599            self.handle_deferral_and_cancellation(
1600                state,
1601                &mut cancelled_txns,
1602                &mut deferred_txns,
1603                &mut randomness_transactions_to_schedule,
1604                protocol_config,
1605                commit_info,
1606                transaction,
1607                &mut shared_object_using_randomness_congestion_tracker,
1608                &previously_deferred_tx_digests,
1609                execution_time_estimator,
1610            );
1611        }
1612
1613        let mut total_deferred_txns = 0;
1614        {
1615            let mut deferred_transactions = self
1616                .epoch_store
1617                .consensus_output_cache
1618                .deferred_transactions
1619                .lock();
1620            for (key, txns) in deferred_txns.into_iter() {
1621                total_deferred_txns += txns.len();
1622                deferred_transactions.insert(key, txns.clone());
1623                state.output.defer_transactions(key, txns);
1624            }
1625        }
1626
1627        self.metrics
1628            .consensus_handler_deferred_transactions
1629            .inc_by(total_deferred_txns as u64);
1630        self.metrics
1631            .consensus_handler_cancelled_transactions
1632            .inc_by(cancelled_txns.len() as u64);
1633        self.metrics
1634            .consensus_handler_max_object_costs
1635            .with_label_values(&["regular_commit"])
1636            .set(shared_object_congestion_tracker.max_cost() as i64);
1637        self.metrics
1638            .consensus_handler_max_object_costs
1639            .with_label_values(&["randomness_commit"])
1640            .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
1641
1642        let congestion_commit_data = shared_object_congestion_tracker.finish_commit(commit_info);
1643        let randomness_congestion_commit_data =
1644            shared_object_using_randomness_congestion_tracker.finish_commit(commit_info);
1645
1646        if let Some(logger) = &self.congestion_logger {
1647            let epoch = self.epoch_store.epoch();
1648            let mut logger = logger.lock().unwrap();
1649            logger.write_commit_log(epoch, commit_info, false, &congestion_commit_data);
1650            logger.write_commit_log(epoch, commit_info, true, &randomness_congestion_commit_data);
1651        }
1652
1653        if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get()
1654            && let Err(e) = tx_object_debts.try_send(
1655                congestion_commit_data
1656                    .accumulated_debts
1657                    .iter()
1658                    .chain(randomness_congestion_commit_data.accumulated_debts.iter())
1659                    .map(|(id, _)| *id)
1660                    .collect(),
1661            )
1662        {
1663            info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
1664        }
1665
1666        state
1667            .output
1668            .set_congestion_control_object_debts(congestion_commit_data.accumulated_debts);
1669        state.output.set_congestion_control_randomness_object_debts(
1670            randomness_congestion_commit_data.accumulated_debts,
1671        );
1672
1673        (
1674            transactions_to_schedule,
1675            randomness_transactions_to_schedule,
1676            cancelled_txns,
1677            randomness_state_update_transaction,
1678        )
1679    }
1680
1681    fn process_transactions(
1682        &self,
1683        state: &mut CommitHandlerState,
1684        execution_time_estimator: &mut ExecutionTimeEstimator,
1685        commit_info: &ConsensusCommitInfo,
1686        authenticator_state_update_transaction: Option<VerifiedExecutableTransactionWithAliases>,
1687        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1688    ) -> (Vec<Schedulable>, Vec<Schedulable>, AssignedTxAndVersions) {
1689        let protocol_config = self.epoch_store.protocol_config();
1690        assert!(!protocol_config.split_checkpoints_in_consensus_handler());
1691        let epoch = self.epoch_store.epoch();
1692
1693        let (
1694            transactions_to_schedule,
1695            randomness_transactions_to_schedule,
1696            cancelled_txns,
1697            randomness_state_update_transaction,
1698        ) = self.collect_transactions_to_schedule(
1699            state,
1700            execution_time_estimator,
1701            commit_info,
1702            user_transactions,
1703        );
1704
1705        let mut settlement = None;
1706        let mut randomness_settlement = None;
1707        if self.epoch_store.accumulators_enabled() {
1708            let checkpoint_height = self
1709                .epoch_store
1710                .calculate_pending_checkpoint_height(commit_info.round);
1711
1712            settlement = Some(Schedulable::AccumulatorSettlement(epoch, checkpoint_height));
1713
1714            if state.randomness_round.is_some() || !randomness_transactions_to_schedule.is_empty() {
1715                randomness_settlement = Some(Schedulable::AccumulatorSettlement(
1716                    epoch,
1717                    checkpoint_height + 1,
1718                ));
1719            }
1720        }
1721
1722        let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1723            .then_some(Schedulable::ConsensusCommitPrologue(
1724                epoch,
1725                commit_info.round,
1726                commit_info.consensus_commit_ref.index,
1727            ));
1728
1729        let schedulables: Vec<_> = itertools::chain!(
1730            consensus_commit_prologue.into_iter(),
1731            authenticator_state_update_transaction
1732                .into_iter()
1733                .map(Schedulable::Transaction),
1734            transactions_to_schedule
1735                .into_iter()
1736                .map(Schedulable::Transaction),
1737            settlement,
1738        )
1739        .collect();
1740
1741        let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1742            .into_iter()
1743            .chain(
1744                randomness_transactions_to_schedule
1745                    .into_iter()
1746                    .map(Schedulable::Transaction),
1747            )
1748            .chain(randomness_settlement)
1749            .collect();
1750
1751        let assigned_versions = self
1752            .epoch_store
1753            .process_consensus_transaction_shared_object_versions(
1754                self.cache_reader.as_ref(),
1755                schedulables.iter(),
1756                randomness_schedulables.iter(),
1757                &cancelled_txns,
1758                &mut state.output,
1759            )
1760            .expect("failed to assign shared object versions");
1761
1762        let consensus_commit_prologue =
1763            self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1764
1765        let mut schedulables = schedulables;
1766        let mut assigned_versions = assigned_versions;
1767        if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1768            assert!(matches!(
1769                schedulables[0],
1770                Schedulable::ConsensusCommitPrologue(..)
1771            ));
1772            assert!(matches!(
1773                assigned_versions.0[0].0,
1774                TransactionKey::ConsensusCommitPrologue(..)
1775            ));
1776            assigned_versions.0[0].0 =
1777                TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1778            schedulables[0] = Schedulable::Transaction(consensus_commit_prologue);
1779        }
1780
1781        self.epoch_store
1782            .process_user_signatures(schedulables.iter().chain(randomness_schedulables.iter()));
1783
1784        // After this point we can throw away alias version information.
1785        let schedulables: Vec<Schedulable> = schedulables.into_iter().map(|s| s.into()).collect();
1786        let randomness_schedulables: Vec<Schedulable> = randomness_schedulables
1787            .into_iter()
1788            .map(|s| s.into())
1789            .collect();
1790
1791        (schedulables, randomness_schedulables, assigned_versions)
1792    }
1793
1794    #[allow(clippy::type_complexity)]
1795    fn create_pending_checkpoints_v2(
1796        &self,
1797        state: &mut CommitHandlerState,
1798        commit_info: &ConsensusCommitInfo,
1799        schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1800        randomness_schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1801        cancelled_txns: &BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1802        final_round: bool,
1803    ) -> CheckpointHeight {
1804        let protocol_config = self.epoch_store.protocol_config();
1805        assert!(protocol_config.split_checkpoints_in_consensus_handler());
1806
1807        let epoch = self.epoch_store.epoch();
1808        let accumulators_enabled = self.epoch_store.accumulators_enabled();
1809        let max_transactions_per_checkpoint =
1810            protocol_config.max_transactions_per_checkpoint() as usize;
1811
1812        let should_write_random_checkpoint = state.randomness_round.is_some()
1813            || (state.dkg_failed && !randomness_schedulables.is_empty());
1814
1815        let mut checkpoint_queue = self.checkpoint_queue.lock().unwrap();
1816
1817        let build_chunks =
1818            |schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1819             queue: &mut CheckpointQueue|
1820             -> Vec<Chunk<VerifiedExecutableTransactionWithAliases>> {
1821                schedulables
1822                    .chunks(max_transactions_per_checkpoint)
1823                    .map(|chunk| {
1824                        let height = queue.next_height();
1825                        let schedulables: Vec<_> = chunk.to_vec();
1826                        let settlement = if accumulators_enabled {
1827                            Some(Schedulable::AccumulatorSettlement(epoch, height))
1828                        } else {
1829                            None
1830                        };
1831                        Chunk {
1832                            schedulables,
1833                            settlement,
1834                            height,
1835                        }
1836                    })
1837                    .collect()
1838            };
1839
1840        let num_schedulables = schedulables.len();
1841        let chunked_schedulables = build_chunks(schedulables, &mut checkpoint_queue);
1842        if chunked_schedulables.len() > 1 {
1843            info!(
1844                "Splitting transactions into {} checkpoint chunks (num_schedulables={}, max_tx={})",
1845                chunked_schedulables.len(),
1846                num_schedulables,
1847                max_transactions_per_checkpoint
1848            );
1849            assert_reachable!("checkpoint split due to transaction limit");
1850        }
1851        let chunked_randomness_schedulables = if should_write_random_checkpoint {
1852            build_chunks(randomness_schedulables, &mut checkpoint_queue)
1853        } else {
1854            vec![]
1855        };
1856
1857        let schedulables_for_version_assignment =
1858            Chunk::all_schedulables_from(&chunked_schedulables);
1859        let randomness_schedulables_for_version_assignment =
1860            Chunk::all_schedulables_from(&chunked_randomness_schedulables);
1861
1862        let assigned_versions = self
1863            .epoch_store
1864            .process_consensus_transaction_shared_object_versions(
1865                self.cache_reader.as_ref(),
1866                schedulables_for_version_assignment,
1867                randomness_schedulables_for_version_assignment,
1868                cancelled_txns,
1869                &mut state.output,
1870            )
1871            .expect("failed to assign shared object versions");
1872
1873        let consensus_commit_prologue =
1874            self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1875
1876        let mut chunked_schedulables = chunked_schedulables;
1877        let mut assigned_versions = assigned_versions;
1878        if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1879            assert!(matches!(
1880                chunked_schedulables[0].schedulables[0],
1881                Schedulable::ConsensusCommitPrologue(..)
1882            ));
1883            assert!(matches!(
1884                assigned_versions.0[0].0,
1885                TransactionKey::ConsensusCommitPrologue(..)
1886            ));
1887            assigned_versions.0[0].0 =
1888                TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1889            chunked_schedulables[0].schedulables[0] =
1890                Schedulable::Transaction(consensus_commit_prologue);
1891        }
1892
1893        let assigned_versions = assigned_versions.into_map();
1894
1895        self.epoch_store.process_user_signatures(
1896            chunked_schedulables
1897                .iter()
1898                .flat_map(|c| c.all_schedulables())
1899                .chain(
1900                    chunked_randomness_schedulables
1901                        .iter()
1902                        .flat_map(|c| c.all_schedulables()),
1903                ),
1904        );
1905
1906        let commit_height = chunked_randomness_schedulables
1907            .last()
1908            .or(chunked_schedulables.last())
1909            .map(|c| c.height)
1910            .expect("at least one checkpoint root must be created per commit");
1911
1912        let mut pending_checkpoints = Vec::new();
1913        for chunk in chunked_schedulables {
1914            pending_checkpoints.extend(checkpoint_queue.push_chunk(
1915                chunk.into(),
1916                &assigned_versions,
1917                commit_info.timestamp,
1918                commit_info.consensus_commit_ref,
1919                commit_info.rejected_transactions_digest,
1920            ));
1921        }
1922
1923        if protocol_config.merge_randomness_into_checkpoint() {
1924            // We don't want to block checkpoint formation for non-randomness schedulables
1925            // on randomness state update. Therefore, we include randomness chunks in the
1926            // subsequent checkpoint. First we flush the queue, then enqueue randomness
1927            // for merging into the subsequent commit.
1928            pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, final_round));
1929
1930            if should_write_random_checkpoint {
1931                for chunk in chunked_randomness_schedulables {
1932                    pending_checkpoints.extend(checkpoint_queue.push_chunk(
1933                        chunk.into(),
1934                        &assigned_versions,
1935                        commit_info.timestamp,
1936                        commit_info.consensus_commit_ref,
1937                        commit_info.rejected_transactions_digest,
1938                    ));
1939                }
1940                if final_round {
1941                    pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1942                }
1943            }
1944        } else {
1945            let force = final_round || should_write_random_checkpoint;
1946            pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, force));
1947
1948            if should_write_random_checkpoint {
1949                for chunk in chunked_randomness_schedulables {
1950                    pending_checkpoints.extend(checkpoint_queue.push_chunk(
1951                        chunk.into(),
1952                        &assigned_versions,
1953                        commit_info.timestamp,
1954                        commit_info.consensus_commit_ref,
1955                        commit_info.rejected_transactions_digest,
1956                    ));
1957                }
1958                pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1959            }
1960        }
1961
1962        if final_round && let Some(last) = pending_checkpoints.last_mut() {
1963            last.details.last_of_epoch = true;
1964        }
1965
1966        let queue_drained = checkpoint_queue.is_empty();
1967        drop(checkpoint_queue);
1968
1969        for pending_checkpoint in pending_checkpoints {
1970            debug!(
1971                checkpoint_height = pending_checkpoint.details.checkpoint_height,
1972                roots_count = pending_checkpoint.num_roots(),
1973                "Writing pending checkpoint",
1974            );
1975            self.epoch_store
1976                .write_pending_checkpoint_v2(&mut state.output, &pending_checkpoint)
1977                .expect("failed to write pending checkpoint");
1978        }
1979
1980        state.output.set_checkpoint_queue_drained(queue_drained);
1981
1982        commit_height
1983    }
1984
1985    // Adds the consensus commit prologue transaction to the beginning of input `transactions` to update
1986    // the system clock used in all transactions in the current consensus commit.
1987    // Returns the root of the consensus commit prologue transaction if it was added to the input.
1988    fn add_consensus_commit_prologue_transaction<'a>(
1989        &'a self,
1990        state: &'a mut CommitHandlerState,
1991        commit_info: &'a ConsensusCommitInfo,
1992        assigned_versions: &AssignedTxAndVersions,
1993    ) -> Option<VerifiedExecutableTransactionWithAliases> {
1994        {
1995            if commit_info.skip_consensus_commit_prologue_in_test {
1996                return None;
1997            }
1998        }
1999
2000        let mut cancelled_txn_version_assignment = Vec::new();
2001
2002        let protocol_config = self.epoch_store.protocol_config();
2003
2004        for (txn_key, assigned_versions) in assigned_versions.0.iter() {
2005            let Some(d) = txn_key.as_digest() else {
2006                continue;
2007            };
2008
2009            if !protocol_config.include_cancelled_randomness_txns_in_prologue()
2010                && assigned_versions
2011                    .shared_object_versions
2012                    .iter()
2013                    .any(|((id, _), _)| *id == SUI_RANDOMNESS_STATE_OBJECT_ID)
2014            {
2015                continue;
2016            }
2017
2018            if assigned_versions
2019                .shared_object_versions
2020                .iter()
2021                .any(|(_, version)| version.is_cancelled())
2022            {
2023                assert_reachable!("cancelled transactions");
2024                cancelled_txn_version_assignment
2025                    .push((*d, assigned_versions.shared_object_versions.clone()));
2026            }
2027        }
2028
2029        fail_point_arg!(
2030            "additional_cancelled_txns_for_tests",
2031            |additional_cancelled_txns: Vec<(
2032                TransactionDigest,
2033                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>
2034            )>| {
2035                cancelled_txn_version_assignment.extend(additional_cancelled_txns);
2036            }
2037        );
2038
2039        let transaction = commit_info.create_consensus_commit_prologue_transaction(
2040            self.epoch_store.epoch(),
2041            self.epoch_store.protocol_config(),
2042            cancelled_txn_version_assignment,
2043            commit_info,
2044            state.indirect_state_observer.take().unwrap(),
2045        );
2046        Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2047            transaction,
2048        ))
2049    }
2050
2051    fn handle_deferral_and_cancellation(
2052        &self,
2053        state: &mut CommitHandlerState,
2054        cancelled_txns: &mut BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
2055        deferred_txns: &mut BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>,
2056        scheduled_txns: &mut Vec<VerifiedExecutableTransactionWithAliases>,
2057        protocol_config: &ProtocolConfig,
2058        commit_info: &ConsensusCommitInfo,
2059        transaction: VerifiedExecutableTransactionWithAliases,
2060        shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
2061        previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
2062        execution_time_estimator: &ExecutionTimeEstimator,
2063    ) {
2064        // Check for unpaid amplification before other deferral checks.
2065        // SIP-45: Paid amplification allows (gas_price / RGP + 1) submissions.
2066        // Transactions with more duplicates than paid for are deferred.
2067        if protocol_config.defer_unpaid_amplification() {
2068            let occurrence_count = state
2069                .occurrence_counts
2070                .get(transaction.tx().digest())
2071                .copied()
2072                .unwrap_or(0);
2073
2074            let rgp = self.epoch_store.reference_gas_price();
2075            let gas_price = transaction.tx().transaction_data().gas_price();
2076            let allowed_count = (gas_price / rgp.max(1)) + 1;
2077
2078            if occurrence_count as u64 > allowed_count {
2079                self.metrics
2080                    .consensus_handler_unpaid_amplification_deferrals
2081                    .inc();
2082
2083                let deferred_from_round = previously_deferred_tx_digests
2084                    .get(transaction.tx().digest())
2085                    .map(|k| k.deferred_from_round())
2086                    .unwrap_or(commit_info.round);
2087
2088                let deferral_key = DeferralKey::new_for_consensus_round(
2089                    commit_info.round + 1,
2090                    deferred_from_round,
2091                );
2092
2093                if transaction_deferral_within_limit(
2094                    &deferral_key,
2095                    protocol_config.max_deferral_rounds_for_congestion_control(),
2096                ) {
2097                    assert_reachable!("unpaid amplification deferral");
2098                    debug!(
2099                        "Deferring transaction {:?} due to unpaid amplification (count={}, allowed={})",
2100                        transaction.tx().digest(),
2101                        occurrence_count,
2102                        allowed_count
2103                    );
2104                    deferred_txns
2105                        .entry(deferral_key)
2106                        .or_default()
2107                        .push(transaction);
2108                    return;
2109                }
2110            }
2111        }
2112
2113        let tx_cost = shared_object_congestion_tracker.get_tx_cost(
2114            execution_time_estimator,
2115            transaction.tx(),
2116            state.indirect_state_observer.as_mut().unwrap(),
2117        );
2118
2119        let deferral_info = self.epoch_store.should_defer(
2120            transaction.tx(),
2121            commit_info,
2122            state.dkg_failed,
2123            state.randomness_round.is_some(),
2124            previously_deferred_tx_digests,
2125            shared_object_congestion_tracker,
2126        );
2127
2128        if let Some((deferral_key, deferral_reason)) = deferral_info {
2129            debug!(
2130                "Deferring consensus certificate for transaction {:?} until {:?}",
2131                transaction.tx().digest(),
2132                deferral_key
2133            );
2134
2135            match deferral_reason {
2136                DeferralReason::RandomnessNotReady => {
2137                    deferred_txns
2138                        .entry(deferral_key)
2139                        .or_default()
2140                        .push(transaction);
2141                }
2142                DeferralReason::SharedObjectCongestion(congested_objects) => {
2143                    self.metrics.consensus_handler_congested_transactions.inc();
2144                    if transaction_deferral_within_limit(
2145                        &deferral_key,
2146                        protocol_config.max_deferral_rounds_for_congestion_control(),
2147                    ) {
2148                        deferred_txns
2149                            .entry(deferral_key)
2150                            .or_default()
2151                            .push(transaction);
2152                    } else {
2153                        assert_sometimes!(
2154                            transaction.tx().data().transaction_data().uses_randomness(),
2155                            "cancelled randomness-using transaction"
2156                        );
2157                        assert_sometimes!(
2158                            !transaction.tx().data().transaction_data().uses_randomness(),
2159                            "cancelled non-randomness-using transaction"
2160                        );
2161
2162                        // Cancel the transaction that has been deferred for too long.
2163                        debug!(
2164                            "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}",
2165                            transaction.tx().digest(),
2166                            deferral_key,
2167                            congested_objects
2168                        );
2169                        cancelled_txns.insert(
2170                            *transaction.tx().digest(),
2171                            CancelConsensusCertificateReason::CongestionOnObjects(
2172                                congested_objects,
2173                            ),
2174                        );
2175                        scheduled_txns.push(transaction);
2176                    }
2177                }
2178            }
2179        } else {
2180            // Update object execution cost for all scheduled transactions
2181            shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, transaction.tx());
2182            scheduled_txns.push(transaction);
2183        }
2184    }
2185
2186    fn merge_and_reorder_transactions(
2187        &self,
2188        state: &mut CommitHandlerState,
2189        commit_info: &ConsensusCommitInfo,
2190        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
2191    ) -> (
2192        Vec<VerifiedExecutableTransactionWithAliases>,
2193        Vec<VerifiedExecutableTransactionWithAliases>,
2194        HashMap<TransactionDigest, DeferralKey>,
2195    ) {
2196        let protocol_config = self.epoch_store.protocol_config();
2197
2198        let (mut txns, mut randomness_txns, previously_deferred_tx_digests) =
2199            self.load_deferred_transactions(state, commit_info);
2200
2201        txns.reserve(user_transactions.len());
2202        randomness_txns.reserve(user_transactions.len());
2203
2204        // There may be randomness transactions in `txns`, which were deferred due to congestion.
2205        // They must be placed back into `randomness_txns`.
2206        let mut txns: Vec<_> = txns
2207            .into_iter()
2208            .filter_map(|tx| {
2209                if tx.tx().transaction_data().uses_randomness() {
2210                    randomness_txns.push(tx);
2211                    None
2212                } else {
2213                    Some(tx)
2214                }
2215            })
2216            .collect();
2217
2218        for txn in user_transactions {
2219            if txn.tx().transaction_data().uses_randomness() {
2220                randomness_txns.push(txn);
2221            } else {
2222                txns.push(txn);
2223            }
2224        }
2225
2226        PostConsensusTxReorder::reorder(
2227            &mut txns,
2228            protocol_config.consensus_transaction_ordering(),
2229        );
2230        PostConsensusTxReorder::reorder(
2231            &mut randomness_txns,
2232            protocol_config.consensus_transaction_ordering(),
2233        );
2234
2235        (txns, randomness_txns, previously_deferred_tx_digests)
2236    }
2237
2238    fn load_deferred_transactions(
2239        &self,
2240        state: &mut CommitHandlerState,
2241        commit_info: &ConsensusCommitInfo,
2242    ) -> (
2243        Vec<VerifiedExecutableTransactionWithAliases>,
2244        Vec<VerifiedExecutableTransactionWithAliases>,
2245        HashMap<TransactionDigest, DeferralKey>,
2246    ) {
2247        let mut previously_deferred_tx_digests = HashMap::new();
2248
2249        let deferred_txs: Vec<_> = self
2250            .epoch_store
2251            .load_deferred_transactions_for_up_to_consensus_round_v2(
2252                &mut state.output,
2253                commit_info.round,
2254            )
2255            .expect("db error")
2256            .into_iter()
2257            .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2258            .map(|(key, tx)| {
2259                previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2260                tx
2261            })
2262            .collect();
2263        trace!(
2264            "loading deferred transactions: {:?}",
2265            deferred_txs.iter().map(|tx| tx.tx().digest())
2266        );
2267
2268        let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() {
2269            let txns: Vec<_> = self
2270                .epoch_store
2271                .load_deferred_transactions_for_randomness_v2(&mut state.output)
2272                .expect("db error")
2273                .into_iter()
2274                .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2275                .map(|(key, tx)| {
2276                    previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2277                    tx
2278                })
2279                .collect();
2280            trace!(
2281                "loading deferred randomness transactions: {:?}",
2282                txns.iter().map(|tx| tx.tx().digest())
2283            );
2284            txns
2285        } else {
2286            vec![]
2287        };
2288
2289        (
2290            deferred_txs,
2291            deferred_randomness_txs,
2292            previously_deferred_tx_digests,
2293        )
2294    }
2295
2296    fn init_congestion_tracker(
2297        &self,
2298        commit_info: &ConsensusCommitInfo,
2299        for_randomness: bool,
2300        txns: &[VerifiedExecutableTransactionWithAliases],
2301    ) -> SharedObjectCongestionTracker {
2302        #[allow(unused_mut)]
2303        let mut ret = SharedObjectCongestionTracker::from_protocol_config(
2304            self.epoch_store
2305                .consensus_quarantine
2306                .read()
2307                .load_initial_object_debts(
2308                    &self.epoch_store,
2309                    commit_info.round,
2310                    for_randomness,
2311                    txns,
2312                )
2313                .expect("db error"),
2314            self.epoch_store.protocol_config(),
2315            for_randomness,
2316            self.congestion_logger.is_some(),
2317        );
2318
2319        fail_point_arg!(
2320            "initial_congestion_tracker",
2321            |tracker: SharedObjectCongestionTracker| {
2322                info!(
2323                    "Initialize shared_object_congestion_tracker to  {:?}",
2324                    tracker
2325                );
2326                ret = tracker;
2327            }
2328        );
2329
2330        ret
2331    }
2332
2333    fn process_gasless_transactions(
2334        &self,
2335        commit_info: &ConsensusCommitInfo,
2336        user_transactions: &[VerifiedExecutableTransactionWithAliases],
2337    ) {
2338        let gasless_count = user_transactions
2339            .iter()
2340            .filter(|txn| txn.tx().transaction_data().is_gasless_transaction())
2341            .count() as u64;
2342        self.consensus_gasless_counter
2343            .record_commit(commit_info.timestamp, gasless_count);
2344    }
2345
2346    fn process_jwks(
2347        &self,
2348        state: &mut CommitHandlerState,
2349        commit_info: &ConsensusCommitInfo,
2350        new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
2351    ) {
2352        for (authority_name, jwk_id, jwk) in new_jwks {
2353            self.epoch_store.record_jwk_vote(
2354                &mut state.output,
2355                commit_info.round,
2356                authority_name,
2357                &jwk_id,
2358                &jwk,
2359            );
2360        }
2361    }
2362
2363    fn process_capability_notifications(
2364        &self,
2365        capability_notifications: Vec<AuthorityCapabilitiesV2>,
2366    ) {
2367        for capabilities in capability_notifications {
2368            self.epoch_store
2369                .record_capabilities_v2(&capabilities)
2370                .expect("db error");
2371        }
2372    }
2373
2374    fn process_execution_time_observations(
2375        &self,
2376        state: &mut CommitHandlerState,
2377        execution_time_observations: Vec<ExecutionTimeObservation>,
2378    ) {
2379        let mut execution_time_estimator = self
2380            .epoch_store
2381            .execution_time_estimator
2382            .try_lock()
2383            .expect("should only ever be called from the commit handler thread");
2384
2385        for ExecutionTimeObservation {
2386            authority,
2387            generation,
2388            estimates,
2389        } in execution_time_observations
2390        {
2391            let authority_index = self
2392                .epoch_store
2393                .committee()
2394                .authority_index(&authority)
2395                .unwrap();
2396            execution_time_estimator.process_observations_from_consensus(
2397                authority_index,
2398                Some(generation),
2399                &estimates,
2400            );
2401            state
2402                .output
2403                .insert_execution_time_observation(authority_index, generation, estimates);
2404        }
2405    }
2406
2407    fn process_checkpoint_signature_messages(
2408        &self,
2409        checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
2410    ) {
2411        for checkpoint_signature_message in checkpoint_signature_messages {
2412            self.checkpoint_service
2413                .notify_checkpoint_signature(&checkpoint_signature_message)
2414                .expect("db error");
2415        }
2416    }
2417
2418    async fn process_dkg_updates(
2419        &self,
2420        state: &mut CommitHandlerState,
2421        commit_info: &ConsensusCommitInfo,
2422        randomness_manager: Option<&mut RandomnessManager>,
2423        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2424        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2425    ) {
2426        if !self.epoch_store.randomness_state_enabled() {
2427            let num_dkg_messages = randomness_dkg_messages.len();
2428            let num_dkg_confirmations = randomness_dkg_confirmations.len();
2429            if num_dkg_messages + num_dkg_confirmations > 0 {
2430                debug_fatal!(
2431                    "received {} RandomnessDkgMessage and {} RandomnessDkgConfirmation messages when randomness is not enabled",
2432                    num_dkg_messages,
2433                    num_dkg_confirmations
2434                );
2435            }
2436            return;
2437        }
2438
2439        let randomness_manager =
2440            randomness_manager.expect("randomness manager should exist if randomness is enabled");
2441
2442        let randomness_dkg_updates =
2443            self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages);
2444
2445        let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations(
2446            state,
2447            randomness_manager,
2448            randomness_dkg_confirmations,
2449        );
2450
2451        if randomness_dkg_updates || randomness_dkg_confirmation_updates {
2452            randomness_manager
2453                .advance_dkg(&mut state.output, commit_info.round)
2454                .await
2455                .expect("epoch ended");
2456        }
2457    }
2458
2459    fn process_randomness_dkg_messages(
2460        &self,
2461        randomness_manager: &mut RandomnessManager,
2462        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2463    ) -> bool /* randomness state updated */ {
2464        if randomness_dkg_messages.is_empty() {
2465            return false;
2466        }
2467
2468        let mut randomness_state_updated = false;
2469        for (authority, bytes) in randomness_dkg_messages {
2470            match bcs::from_bytes(&bytes) {
2471                Ok(message) => {
2472                    randomness_manager
2473                        .add_message(&authority, message)
2474                        // TODO: make infallible
2475                        .expect("epoch ended");
2476                    randomness_state_updated = true;
2477                }
2478
2479                Err(e) => {
2480                    warn!(
2481                        "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
2482                        authority.concise(),
2483                    );
2484                }
2485            }
2486        }
2487
2488        randomness_state_updated
2489    }
2490
2491    fn process_randomness_dkg_confirmations(
2492        &self,
2493        state: &mut CommitHandlerState,
2494        randomness_manager: &mut RandomnessManager,
2495        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2496    ) -> bool /* randomness state updated */ {
2497        if randomness_dkg_confirmations.is_empty() {
2498            return false;
2499        }
2500
2501        let mut randomness_state_updated = false;
2502        for (authority, bytes) in randomness_dkg_confirmations {
2503            match bcs::from_bytes(&bytes) {
2504                Ok(message) => {
2505                    randomness_manager
2506                        .add_confirmation(&mut state.output, &authority, message)
2507                        // TODO: make infallible
2508                        .expect("epoch ended");
2509                    randomness_state_updated = true;
2510                }
2511                Err(e) => {
2512                    warn!(
2513                        "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}",
2514                        authority.concise(),
2515                    );
2516                }
2517            }
2518        }
2519
2520        randomness_state_updated
2521    }
2522
2523    /// Returns true if we have collected a quorum of end of publish messages (either in this round or a previous round).
2524    fn process_end_of_publish_transactions(
2525        &self,
2526        state: &mut CommitHandlerState,
2527        end_of_publish_transactions: Vec<AuthorityName>,
2528    ) -> bool {
2529        let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect(
2530            "No contention on end_of_publish as it is only accessed from consensus handler",
2531        );
2532
2533        if eop_aggregator.has_quorum() {
2534            return true;
2535        }
2536
2537        if end_of_publish_transactions.is_empty() {
2538            return false;
2539        }
2540
2541        for authority in end_of_publish_transactions {
2542            info!("Received EndOfPublish from {:?}", authority.concise());
2543
2544            // It is ok to just release lock here as this function is the only place that transition into RejectAllCerts state
2545            // And this function itself is always executed from consensus task
2546            state.output.insert_end_of_publish(authority);
2547            if eop_aggregator
2548                .insert_generic(authority, ())
2549                .is_quorum_reached()
2550            {
2551                debug!(
2552                    "Collected enough end_of_publish messages with last message from validator {:?}",
2553                    authority.concise(),
2554                );
2555                return true;
2556            }
2557        }
2558
2559        false
2560    }
2561
2562    /// After we have collected 2f+1 EndOfPublish messages, we call this function every round until the epoch
2563    /// ends.
2564    fn advance_eop_state_machine(
2565        &self,
2566        state: &mut CommitHandlerState,
2567    ) -> (
2568        RwLockWriteGuard<'_, ReconfigState>,
2569        bool, // true if final round
2570    ) {
2571        let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard();
2572        let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx();
2573
2574        reconfig_state.close_all_certs();
2575
2576        let commit_has_deferred_txns = state.output.has_deferred_transactions();
2577        let previous_commits_have_deferred_txns = !self.epoch_store.deferred_transactions_empty();
2578
2579        if !commit_has_deferred_txns && !previous_commits_have_deferred_txns {
2580            if !start_state_is_reject_all_tx {
2581                info!("Transitioning to RejectAllTx");
2582            }
2583            reconfig_state.close_all_tx();
2584        } else {
2585            debug!(
2586                "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}",
2587                previous_commits_have_deferred_txns, commit_has_deferred_txns,
2588            );
2589        }
2590
2591        state.output.store_reconfig_state(reconfig_state.clone());
2592
2593        if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() {
2594            (reconfig_state, true)
2595        } else {
2596            (reconfig_state, false)
2597        }
2598    }
2599
2600    fn gather_commit_metadata(
2601        &self,
2602        consensus_commit: &impl ConsensusCommitAPI,
2603    ) -> (u64, AuthorityIndex, u64) {
2604        let timestamp = consensus_commit.commit_timestamp_ms();
2605        let leader_author = consensus_commit.leader_author_index();
2606        let commit_sub_dag_index = consensus_commit.commit_sub_dag_index();
2607
2608        let system_time_ms = SystemTime::now()
2609            .duration_since(UNIX_EPOCH)
2610            .unwrap()
2611            .as_millis() as i64;
2612
2613        let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64);
2614        let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0;
2615        self.metrics
2616            .consensus_timestamp_bias
2617            .observe(consensus_timestamp_bias_seconds);
2618
2619        let epoch_start = self
2620            .epoch_store
2621            .epoch_start_config()
2622            .epoch_start_timestamp_ms();
2623        let timestamp = if timestamp < epoch_start {
2624            error!(
2625                "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}"
2626            );
2627            epoch_start
2628        } else {
2629            timestamp
2630        };
2631
2632        (timestamp, leader_author, commit_sub_dag_index)
2633    }
2634
2635    fn create_authenticator_state_update(
2636        &self,
2637        last_committed_round: u64,
2638        commit_info: &ConsensusCommitInfo,
2639    ) -> Option<VerifiedExecutableTransactionWithAliases> {
2640        // Load all jwks that became active in the previous round, and commit them in this round.
2641        // We want to delay one round because none of the transactions in the previous round could
2642        // have been authenticated with the jwks that became active in that round.
2643        //
2644        // Because of this delay, jwks that become active in the last round of the epoch will
2645        // never be committed. That is ok, because in the new epoch, the validators should
2646        // immediately re-submit these jwks, and they can become active then.
2647        let new_jwks = self
2648            .epoch_store
2649            .get_new_jwks(last_committed_round)
2650            .expect("Unrecoverable error in consensus handler");
2651
2652        if !new_jwks.is_empty() {
2653            let authenticator_state_update_transaction = authenticator_state_update_transaction(
2654                &self.epoch_store,
2655                commit_info.round,
2656                new_jwks,
2657            );
2658            debug!(
2659                "adding AuthenticatorStateUpdate({:?}) tx: {:?}",
2660                authenticator_state_update_transaction.digest(),
2661                authenticator_state_update_transaction,
2662            );
2663
2664            Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2665                authenticator_state_update_transaction,
2666            ))
2667        } else {
2668            None
2669        }
2670    }
2671
2672    // Filters out rejected or deprecated transactions.
2673    // Returns FilteredConsensusOutput containing transactions and owned_object_locks.
2674    #[instrument(level = "trace", skip_all)]
2675    fn filter_consensus_txns(
2676        &mut self,
2677        initial_reconfig_state: ReconfigState,
2678        commit_info: &ConsensusCommitInfo,
2679        consensus_commit: &impl ConsensusCommitAPI,
2680    ) -> FilteredConsensusOutput {
2681        let mut transactions = Vec::new();
2682        let mut owned_object_locks = HashMap::new();
2683        let epoch = self.epoch_store.epoch();
2684        let mut num_finalized_user_transactions = vec![0; self.committee.size()];
2685        let mut num_rejected_user_transactions = vec![0; self.committee.size()];
2686        for (block, parsed_transactions) in consensus_commit.transactions() {
2687            let author = block.author.value();
2688            // TODO: consider only messages within 1~3 rounds of the leader?
2689            self.last_consensus_stats.stats.inc_num_messages(author);
2690
2691            // Set the "ping" transaction status for this block. This is necessary as there might be some ping requests waiting for the ping transaction to be certified.
2692            self.epoch_store.set_consensus_tx_status(
2693                ConsensusPosition::ping(epoch, block),
2694                ConsensusTxStatus::Finalized,
2695            );
2696
2697            for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() {
2698                let position = ConsensusPosition {
2699                    epoch,
2700                    block,
2701                    index: tx_index as TransactionIndex,
2702                };
2703
2704                // Transaction has appeared in consensus output, we can increment the submission count
2705                // for this tx for DoS protection.
2706                if let Some(tx) = parsed.transaction.kind.as_user_transaction() {
2707                    let digest = tx.digest();
2708                    if let Some((spam_weight, submitter_client_addrs)) = self
2709                        .epoch_store
2710                        .submitted_transaction_cache
2711                        .increment_submission_count(digest)
2712                    {
2713                        if let Some(ref traffic_controller) = self.traffic_controller {
2714                            debug!(
2715                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses",
2716                                submitter_client_addrs.len()
2717                            );
2718
2719                            // Apply spam weight to all client addresses that submitted this transaction
2720                            for addr in submitter_client_addrs {
2721                                traffic_controller.tally(TrafficTally::new(
2722                                    Some(addr),
2723                                    None,
2724                                    None,
2725                                    spam_weight.clone(),
2726                                ));
2727                            }
2728                        } else {
2729                            warn!(
2730                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)",
2731                                submitter_client_addrs.len()
2732                            );
2733                        }
2734                    }
2735                }
2736
2737                if parsed.rejected {
2738                    // TODO(fastpath): Add metrics for rejected transactions.
2739                    if parsed.transaction.is_user_transaction() {
2740                        self.epoch_store
2741                            .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
2742                        num_rejected_user_transactions[author] += 1;
2743                    }
2744                    // Skip processing rejected transactions.
2745                    continue;
2746                }
2747
2748                let kind = classify(&parsed.transaction);
2749                self.metrics
2750                    .consensus_handler_processed
2751                    .with_label_values(&[kind])
2752                    .inc();
2753                self.metrics
2754                    .consensus_handler_transaction_sizes
2755                    .with_label_values(&[kind])
2756                    .observe(parsed.serialized_len as f64);
2757                if parsed.transaction.is_user_transaction() {
2758                    self.last_consensus_stats
2759                        .stats
2760                        .inc_num_user_transactions(author);
2761                }
2762
2763                if !initial_reconfig_state.should_accept_consensus_certs() {
2764                    // (Note: we no longer need to worry about the previously deferred condition, since we are only
2765                    // processing newly-received transactions at this time).
2766                    match &parsed.transaction.kind {
2767                        ConsensusTransactionKind::UserTransactionV2(_)
2768                        // deprecated and ignore later, but added for exhaustive match
2769                        | ConsensusTransactionKind::UserTransaction(_)
2770                        | ConsensusTransactionKind::CertifiedTransaction(_)
2771                        | ConsensusTransactionKind::CapabilityNotification(_)
2772                        | ConsensusTransactionKind::CapabilityNotificationV2(_)
2773                        | ConsensusTransactionKind::EndOfPublish(_)
2774                        // Note: we no longer have to check protocol_config.ignore_execution_time_observations_after_certs_closed()
2775                        | ConsensusTransactionKind::ExecutionTimeObservation(_)
2776                        | ConsensusTransactionKind::NewJWKFetched(_, _, _) => {
2777                            debug!(
2778                                "Ignoring consensus transaction {:?} because of end of epoch",
2779                                parsed.transaction.key()
2780                            );
2781                            continue;
2782                        }
2783
2784                        // These are the message types that are still processed even if !should_accept_consensus_certs()
2785                        ConsensusTransactionKind::CheckpointSignature(_)
2786                        | ConsensusTransactionKind::CheckpointSignatureV2(_)
2787                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2788                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
2789                        | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => ()
2790                    }
2791                }
2792
2793                if !initial_reconfig_state.should_accept_tx() {
2794                    match &parsed.transaction.kind {
2795                        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
2796                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue,
2797                        _ => {}
2798                    }
2799                }
2800
2801                // Handle deprecated messages
2802                match &parsed.transaction.kind {
2803                    ConsensusTransactionKind::CapabilityNotification(_)
2804                    | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2805                    | ConsensusTransactionKind::CheckpointSignature(_) => {
2806                        debug_fatal!(
2807                            "BUG: saw deprecated tx {:?}for commit round {}",
2808                            parsed.transaction.key(),
2809                            commit_info.round
2810                        );
2811                        continue;
2812                    }
2813                    _ => {}
2814                }
2815
2816                if parsed.transaction.is_user_transaction() {
2817                    let author_name = self
2818                        .epoch_store
2819                        .committee()
2820                        .authority_by_index(author as u32)
2821                        .unwrap();
2822                    if self
2823                        .epoch_store
2824                        .has_received_end_of_publish_from(author_name)
2825                    {
2826                        // In some edge cases, consensus might resend previously seen certificate after EndOfPublish
2827                        // An honest validator should not send a new transaction after EndOfPublish. Whether the
2828                        // transaction is duplicate or not, we filter it out here.
2829                        warn!(
2830                            "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus",
2831                            author_name.concise(),
2832                            parsed.transaction.key(),
2833                        );
2834                        continue;
2835                    }
2836                }
2837
2838                // Perform post-consensus owned object conflict detection. If lock acquisition
2839                // fails, the transaction has invalid/conflicting owned inputs and should be dropped.
2840                // This must happen AFTER all filtering checks above to avoid acquiring locks
2841                // for transactions that will be dropped (e.g., during epoch change).
2842                // Only applies to UserTransactionV2 - other transaction types don't need lock acquisition.
2843                if let ConsensusTransactionKind::UserTransactionV2(tx_with_claims) =
2844                    &parsed.transaction.kind
2845                {
2846                    let immutable_object_ids: HashSet<ObjectID> =
2847                        tx_with_claims.get_immutable_objects().into_iter().collect();
2848                    let tx = tx_with_claims.tx();
2849
2850                    let Ok(input_objects) = tx.transaction_data().input_objects() else {
2851                        debug_fatal!("Invalid input objects for transaction {}", tx.digest());
2852                        continue;
2853                    };
2854
2855                    // Filter ImmOrOwnedMoveObject inputs, excluding those claimed to be immutable.
2856                    // Immutable objects don't need lock acquisition as they can be used concurrently.
2857                    let owned_object_refs: Vec<_> = input_objects
2858                        .iter()
2859                        .filter_map(|obj| match obj {
2860                            InputObjectKind::ImmOrOwnedMoveObject(obj_ref)
2861                                if !immutable_object_ids.contains(&obj_ref.0) =>
2862                            {
2863                                Some(*obj_ref)
2864                            }
2865                            _ => None,
2866                        })
2867                        .collect();
2868
2869                    match self
2870                        .epoch_store
2871                        .try_acquire_owned_object_locks_post_consensus(
2872                            &owned_object_refs,
2873                            *tx.digest(),
2874                            &owned_object_locks,
2875                        ) {
2876                        Ok(new_locks) => {
2877                            owned_object_locks.extend(new_locks.into_iter());
2878                            // Lock acquisition succeeded - now set Finalized status
2879                            self.epoch_store
2880                                .set_consensus_tx_status(position, ConsensusTxStatus::Finalized);
2881                            num_finalized_user_transactions[author] += 1;
2882                        }
2883                        Err(e) => {
2884                            debug!("Dropping transaction {}: {}", tx.digest(), e);
2885                            self.epoch_store
2886                                .set_consensus_tx_status(position, ConsensusTxStatus::Dropped);
2887                            self.epoch_store.set_rejection_vote_reason(position, &e);
2888                            continue;
2889                        }
2890                    }
2891                }
2892
2893                let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);
2894                transactions.push((transaction, author as u32));
2895            }
2896        }
2897
2898        for (i, authority) in self.committee.authorities() {
2899            let hostname = &authority.hostname;
2900            self.metrics
2901                .consensus_committed_messages
2902                .with_label_values(&[hostname])
2903                .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
2904            self.metrics
2905                .consensus_committed_user_transactions
2906                .with_label_values(&[hostname])
2907                .set(
2908                    self.last_consensus_stats
2909                        .stats
2910                        .get_num_user_transactions(i.value()) as i64,
2911                );
2912            self.metrics
2913                .consensus_finalized_user_transactions
2914                .with_label_values(&[hostname])
2915                .add(num_finalized_user_transactions[i.value()] as i64);
2916            self.metrics
2917                .consensus_rejected_user_transactions
2918                .with_label_values(&[hostname])
2919                .add(num_rejected_user_transactions[i.value()] as i64);
2920        }
2921
2922        FilteredConsensusOutput {
2923            transactions,
2924            owned_object_locks,
2925        }
2926    }
2927
2928    fn deduplicate_consensus_txns(
2929        &mut self,
2930        state: &mut CommitHandlerState,
2931        commit_info: &ConsensusCommitInfo,
2932        transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
2933    ) -> Vec<VerifiedSequencedConsensusTransaction> {
2934        let mut all_transactions = Vec::new();
2935
2936        // Track occurrence counts for each transaction key within this commit.
2937        // Also serves as the deduplication set (count > 1 means duplicate within commit).
2938        let mut occurrence_counts: HashMap<SequencedConsensusTransactionKey, u32> = HashMap::new();
2939        // Keys being seen for the first time (not duplicates from previous commits).
2940        let mut first_commit_keys: HashSet<SequencedConsensusTransactionKey> = HashSet::new();
2941
2942        for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
2943            // In process_consensus_transactions_and_commit_boundary(), we will add a system consensus commit
2944            // prologue transaction, which will be the first transaction in this consensus commit batch.
2945            // Therefore, the transaction sequence number starts from 1 here.
2946            let current_tx_index = ExecutionIndices {
2947                last_committed_round: commit_info.round,
2948                sub_dag_index: commit_info.consensus_commit_ref.index.into(),
2949                transaction_index: (seq + 1) as u64,
2950            };
2951
2952            self.last_consensus_stats.index = current_tx_index;
2953
2954            let certificate_author = *self
2955                .epoch_store
2956                .committee()
2957                .authority_by_index(cert_origin)
2958                .unwrap();
2959
2960            let sequenced_transaction = SequencedConsensusTransaction {
2961                certificate_author_index: cert_origin,
2962                certificate_author,
2963                consensus_index: current_tx_index,
2964                transaction,
2965            };
2966
2967            let Some(verified_transaction) = self
2968                .epoch_store
2969                .verify_consensus_transaction(sequenced_transaction)
2970            else {
2971                continue;
2972            };
2973
2974            let key = verified_transaction.0.key();
2975
2976            if let Some(tx_digest) = key.user_transaction_digest() {
2977                self.epoch_store
2978                    .cache_recently_finalized_transaction(tx_digest);
2979            }
2980
2981            // Increment count and check if this is a duplicate within this commit.
2982            // This replaces the separate processed_set HashSet.
2983            let count = occurrence_counts.entry(key.clone()).or_insert(0);
2984            *count += 1;
2985            let in_commit = *count > 1;
2986
2987            let in_cache = self.processed_cache.put(key.clone(), ()).is_some();
2988            if in_commit || in_cache {
2989                self.metrics.skipped_consensus_txns_cache_hit.inc();
2990                continue;
2991            }
2992            if self
2993                .epoch_store
2994                .is_consensus_message_processed(&key)
2995                .expect("db error")
2996            {
2997                self.metrics.skipped_consensus_txns.inc();
2998                continue;
2999            }
3000
3001            first_commit_keys.insert(key.clone());
3002
3003            state.output.record_consensus_message_processed(key);
3004
3005            all_transactions.push(verified_transaction);
3006        }
3007
3008        for key in first_commit_keys {
3009            if let Some(&count) = occurrence_counts.get(&key)
3010                && count > 1
3011            {
3012                self.metrics
3013                    .consensus_handler_duplicate_tx_count
3014                    .observe(count as f64);
3015            }
3016        }
3017
3018        // Copy user transaction occurrence counts to state for unpaid amplification detection.
3019        assert!(
3020            state.occurrence_counts.is_empty(),
3021            "occurrence_counts should be empty before populating"
3022        );
3023        state.occurrence_counts.reserve(occurrence_counts.len());
3024        state.occurrence_counts.extend(
3025            occurrence_counts
3026                .into_iter()
3027                .filter_map(|(key, count)| key.user_transaction_digest().map(|d| (d, count))),
3028        );
3029
3030        all_transactions
3031    }
3032
3033    fn build_commit_handler_input(
3034        &self,
3035        transactions: Vec<VerifiedSequencedConsensusTransaction>,
3036    ) -> CommitHandlerInput {
3037        let epoch = self.epoch_store.epoch();
3038        let mut commit_handler_input = CommitHandlerInput::default();
3039
3040        for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() {
3041            match transaction.transaction {
3042                SequencedConsensusTransactionKind::External(consensus_transaction) => {
3043                    match consensus_transaction.kind {
3044                        // === User transactions ===
3045                        ConsensusTransactionKind::UserTransactionV2(tx) => {
3046                            // Extract the aliases claim (required) from the claims
3047                            let used_alias_versions = if self
3048                                .epoch_store
3049                                .protocol_config()
3050                                .fix_checkpoint_signature_mapping()
3051                            {
3052                                tx.aliases()
3053                            } else {
3054                                // Convert V1 to V2 format using dummy signature indices
3055                                // which will be ignored with `fix_checkpoint_signature_mapping`
3056                                // disabled.
3057                                tx.aliases_v1().map(|a| {
3058                                    NonEmpty::from_vec(
3059                                        a.into_iter()
3060                                            .enumerate()
3061                                            .map(|(idx, (_, seq))| (idx as u8, seq))
3062                                            .collect(),
3063                                    )
3064                                    .unwrap()
3065                                })
3066                            };
3067                            let inner_tx = tx.into_tx();
3068                            // Safe because transactions are certified by consensus.
3069                            let tx = VerifiedTransaction::new_unchecked(inner_tx);
3070                            // TODO(fastpath): accept position in consensus, after plumbing consensus round, authority index, and transaction index here.
3071                            let transaction =
3072                                VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
3073                            if let Some(used_alias_versions) = used_alias_versions {
3074                                commit_handler_input
3075                                    .user_transactions
3076                                    .push(WithAliases::new(transaction, used_alias_versions));
3077                            } else {
3078                                commit_handler_input.user_transactions.push(
3079                                    VerifiedExecutableTransactionWithAliases::no_aliases(
3080                                        transaction,
3081                                    ),
3082                                );
3083                            }
3084                        }
3085
3086                        // === State machines ===
3087                        ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => {
3088                            commit_handler_input
3089                                .end_of_publish_transactions
3090                                .push(authority_public_key_bytes);
3091                        }
3092                        ConsensusTransactionKind::NewJWKFetched(
3093                            authority_public_key_bytes,
3094                            jwk_id,
3095                            jwk,
3096                        ) => {
3097                            commit_handler_input.new_jwks.push((
3098                                authority_public_key_bytes,
3099                                jwk_id,
3100                                jwk,
3101                            ));
3102                        }
3103                        ConsensusTransactionKind::RandomnessDkgMessage(
3104                            authority_public_key_bytes,
3105                            items,
3106                        ) => {
3107                            commit_handler_input
3108                                .randomness_dkg_messages
3109                                .push((authority_public_key_bytes, items));
3110                        }
3111                        ConsensusTransactionKind::RandomnessDkgConfirmation(
3112                            authority_public_key_bytes,
3113                            items,
3114                        ) => {
3115                            commit_handler_input
3116                                .randomness_dkg_confirmations
3117                                .push((authority_public_key_bytes, items));
3118                        }
3119                        ConsensusTransactionKind::CapabilityNotificationV2(
3120                            authority_capabilities_v2,
3121                        ) => {
3122                            commit_handler_input
3123                                .capability_notifications
3124                                .push(authority_capabilities_v2);
3125                        }
3126                        ConsensusTransactionKind::ExecutionTimeObservation(
3127                            execution_time_observation,
3128                        ) => {
3129                            commit_handler_input
3130                                .execution_time_observations
3131                                .push(execution_time_observation);
3132                        }
3133                        ConsensusTransactionKind::CheckpointSignatureV2(
3134                            checkpoint_signature_message,
3135                        ) => {
3136                            commit_handler_input
3137                                .checkpoint_signature_messages
3138                                .push(*checkpoint_signature_message);
3139                        }
3140
3141                        // Deprecated messages, filtered earlier by filter_consensus_txns()
3142                        // or rejected by SuiTxValidator. Kept for exhaustiveness.
3143                        ConsensusTransactionKind::CheckpointSignature(_)
3144                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
3145                        | ConsensusTransactionKind::CapabilityNotification(_)
3146                        | ConsensusTransactionKind::CertifiedTransaction(_)
3147                        | ConsensusTransactionKind::UserTransaction(_) => {
3148                            unreachable!("filtered earlier")
3149                        }
3150                    }
3151                }
3152                // TODO: I think we can delete this, it was only used to inject randomness state update into the tx stream.
3153                SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(),
3154            }
3155        }
3156
3157        commit_handler_input
3158    }
3159
3160    async fn send_end_of_publish_if_needed(&self) {
3161        if self
3162            .epoch_store
3163            .protocol_config()
3164            .timestamp_based_epoch_close()
3165        {
3166            return;
3167        }
3168        if !self.epoch_store.should_send_end_of_publish() {
3169            return;
3170        }
3171
3172        let end_of_publish = ConsensusTransaction::new_end_of_publish(self.epoch_store.name);
3173        if let Err(err) =
3174            self.consensus_adapter
3175                .submit(end_of_publish, None, &self.epoch_store, None, None)
3176        {
3177            warn!(
3178                "Error when sending EndOfPublish message from ConsensusHandler: {:?}",
3179                err
3180            );
3181        } else {
3182            info!(epoch=?self.epoch_store.epoch(), "Sending EndOfPublish message to consensus");
3183        }
3184    }
3185}
3186
3187/// Sends transactions to the execution scheduler in a separate task,
3188/// to avoid blocking consensus handler.
3189pub(crate) type SchedulerMessage = (
3190    Vec<(Schedulable, AssignedVersions)>,
3191    Option<SettlementBatchInfo>,
3192);
3193
3194#[derive(Clone)]
3195pub(crate) struct ExecutionSchedulerSender {
3196    sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3197}
3198
3199impl ExecutionSchedulerSender {
3200    fn start(
3201        settlement_scheduler: SettlementScheduler,
3202        epoch_store: Arc<AuthorityPerEpochStore>,
3203    ) -> Self {
3204        let (sender, recv) = monitored_mpsc::unbounded_channel("execution_scheduler_sender");
3205        spawn_monitored_task!(Self::run(recv, settlement_scheduler, epoch_store));
3206        Self { sender }
3207    }
3208
3209    pub(crate) fn new_for_testing(
3210        sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3211    ) -> Self {
3212        Self { sender }
3213    }
3214
3215    fn send(
3216        &self,
3217        transactions: Vec<(Schedulable, AssignedVersions)>,
3218        settlement: Option<SettlementBatchInfo>,
3219    ) {
3220        let _ = self.sender.send((transactions, settlement));
3221    }
3222
3223    async fn run(
3224        mut recv: monitored_mpsc::UnboundedReceiver<SchedulerMessage>,
3225        settlement_scheduler: SettlementScheduler,
3226        epoch_store: Arc<AuthorityPerEpochStore>,
3227    ) {
3228        while let Some((transactions, settlement)) = recv.recv().await {
3229            let _guard = monitored_scope("ConsensusHandler::enqueue");
3230            let txns = transactions
3231                .into_iter()
3232                .map(|(txn, versions)| (txn, ExecutionEnv::new().with_assigned_versions(versions)))
3233                .collect();
3234            if let Some(settlement) = settlement {
3235                settlement_scheduler.enqueue_v2(txns, settlement, &epoch_store);
3236            } else {
3237                settlement_scheduler.enqueue(txns, &epoch_store);
3238            }
3239        }
3240    }
3241}
3242
3243/// Manages the lifetime of tasks handling the commits and transactions output by consensus.
3244pub(crate) struct MysticetiConsensusHandler {
3245    tasks: JoinSet<()>,
3246}
3247
3248impl MysticetiConsensusHandler {
3249    pub(crate) fn new(
3250        last_processed_commit_at_startup: CommitIndex,
3251        mut consensus_handler: ConsensusHandler<CheckpointService>,
3252        mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
3253        commit_consumer_monitor: Arc<CommitConsumerMonitor>,
3254        process_consensus_commits: bool,
3255    ) -> Self {
3256        debug!(
3257            last_processed_commit_at_startup,
3258            process_consensus_commits, "Starting consensus replay"
3259        );
3260        let mut tasks = JoinSet::new();
3261        tasks.spawn(monitored_future!(async move {
3262            // TODO: pause when execution is overloaded, so consensus can detect the backpressure.
3263            while let Some(consensus_commit) = commit_receiver.recv().await {
3264                let commit_index = consensus_commit.commit_ref.index;
3265                if !process_consensus_commits {
3266                    debug!(
3267                        commit_index,
3268                        "Observer skipping consensus commit processing"
3269                    );
3270                } else if commit_index <= last_processed_commit_at_startup {
3271                    consensus_handler.handle_prior_consensus_commit(consensus_commit);
3272                } else {
3273                    consensus_handler
3274                        .handle_consensus_commit(consensus_commit)
3275                        .await;
3276                }
3277                commit_consumer_monitor.set_highest_handled_commit(commit_index);
3278            }
3279        }));
3280        Self { tasks }
3281    }
3282
3283    pub(crate) async fn abort(&mut self) {
3284        self.tasks.shutdown().await;
3285    }
3286}
3287
3288fn authenticator_state_update_transaction(
3289    epoch_store: &AuthorityPerEpochStore,
3290    round: u64,
3291    mut new_active_jwks: Vec<ActiveJwk>,
3292) -> VerifiedExecutableTransaction {
3293    let epoch = epoch_store.epoch();
3294    new_active_jwks.sort();
3295
3296    info!("creating authenticator state update transaction");
3297    assert!(epoch_store.authenticator_state_enabled());
3298    let transaction = VerifiedTransaction::new_authenticator_state_update(
3299        epoch,
3300        round,
3301        new_active_jwks,
3302        epoch_store
3303            .epoch_start_config()
3304            .authenticator_obj_initial_shared_version()
3305            .expect("authenticator state obj must exist"),
3306    );
3307    VerifiedExecutableTransaction::new_system(transaction, epoch)
3308}
3309
3310pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
3311    match &transaction.kind {
3312        // Deprecated and rejected by SuiTxValidator; never classified in practice.
3313        ConsensusTransactionKind::CertifiedTransaction(_) => "_deprecated_certificate",
3314        ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
3315        ConsensusTransactionKind::CheckpointSignatureV2(_) => "checkpoint_signature",
3316        ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
3317        ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
3318        ConsensusTransactionKind::CapabilityNotificationV2(_) => "capability_notification_v2",
3319        ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
3320        ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
3321        ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
3322        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
3323        ConsensusTransactionKind::UserTransaction(_) => "_deprecated_user_transaction",
3324        ConsensusTransactionKind::UserTransactionV2(tx) => {
3325            if tx.tx().is_consensus_tx() {
3326                "shared_user_transaction_v2"
3327            } else {
3328                "owned_user_transaction_v2"
3329            }
3330        }
3331        ConsensusTransactionKind::ExecutionTimeObservation(_) => "execution_time_observation",
3332    }
3333}
3334
3335#[derive(Debug, Clone, Serialize, Deserialize)]
3336pub struct SequencedConsensusTransaction {
3337    pub certificate_author_index: AuthorityIndex,
3338    pub certificate_author: AuthorityName,
3339    pub consensus_index: ExecutionIndices,
3340    pub transaction: SequencedConsensusTransactionKind,
3341}
3342
3343#[derive(Debug, Clone)]
3344#[allow(clippy::large_enum_variant)]
3345pub enum SequencedConsensusTransactionKind {
3346    External(ConsensusTransaction),
3347    System(VerifiedExecutableTransaction),
3348}
3349
3350impl Serialize for SequencedConsensusTransactionKind {
3351    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
3352        let serializable = SerializableSequencedConsensusTransactionKind::from(self);
3353        serializable.serialize(serializer)
3354    }
3355}
3356
3357impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
3358    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
3359        let serializable =
3360            SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
3361        Ok(serializable.into())
3362    }
3363}
3364
3365// We can't serialize SequencedConsensusTransactionKind directly because it contains a
3366// VerifiedExecutableTransaction, which is not serializable (by design). This wrapper allows us to
3367// convert to a serializable format easily.
3368#[derive(Debug, Clone, Serialize, Deserialize)]
3369#[allow(clippy::large_enum_variant)]
3370enum SerializableSequencedConsensusTransactionKind {
3371    External(ConsensusTransaction),
3372    System(TrustedExecutableTransaction),
3373}
3374
3375impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
3376    fn from(kind: &SequencedConsensusTransactionKind) -> Self {
3377        match kind {
3378            SequencedConsensusTransactionKind::External(ext) => {
3379                SerializableSequencedConsensusTransactionKind::External(ext.clone())
3380            }
3381            SequencedConsensusTransactionKind::System(txn) => {
3382                SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
3383            }
3384        }
3385    }
3386}
3387
3388impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
3389    fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
3390        match kind {
3391            SerializableSequencedConsensusTransactionKind::External(ext) => {
3392                SequencedConsensusTransactionKind::External(ext)
3393            }
3394            SerializableSequencedConsensusTransactionKind::System(txn) => {
3395                SequencedConsensusTransactionKind::System(txn.into())
3396            }
3397        }
3398    }
3399}
3400
3401#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
3402pub enum SequencedConsensusTransactionKey {
3403    External(ConsensusTransactionKey),
3404    System(TransactionDigest),
3405}
3406
3407impl SequencedConsensusTransactionKey {
3408    pub fn user_transaction_digest(&self) -> Option<TransactionDigest> {
3409        match self {
3410            SequencedConsensusTransactionKey::External(key) => match key {
3411                ConsensusTransactionKey::Certificate(digest) => Some(*digest),
3412                _ => None,
3413            },
3414            SequencedConsensusTransactionKey::System(_) => None,
3415        }
3416    }
3417}
3418
3419impl SequencedConsensusTransactionKind {
3420    pub fn key(&self) -> SequencedConsensusTransactionKey {
3421        match self {
3422            SequencedConsensusTransactionKind::External(ext) => {
3423                SequencedConsensusTransactionKey::External(ext.key())
3424            }
3425            SequencedConsensusTransactionKind::System(txn) => {
3426                SequencedConsensusTransactionKey::System(*txn.digest())
3427            }
3428        }
3429    }
3430
3431    pub fn get_tracking_id(&self) -> u64 {
3432        match self {
3433            SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
3434            SequencedConsensusTransactionKind::System(_txn) => 0,
3435        }
3436    }
3437
3438    pub fn is_executable_transaction(&self) -> bool {
3439        match self {
3440            SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(),
3441            SequencedConsensusTransactionKind::System(_) => true,
3442        }
3443    }
3444
3445    pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
3446        match self {
3447            SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
3448                ConsensusTransactionKind::UserTransactionV2(txn) => Some(*txn.tx().digest()),
3449                _ => None,
3450            },
3451            SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
3452        }
3453    }
3454
3455    pub fn is_end_of_publish(&self) -> bool {
3456        match self {
3457            SequencedConsensusTransactionKind::External(ext) => {
3458                matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
3459            }
3460            SequencedConsensusTransactionKind::System(_) => false,
3461        }
3462    }
3463}
3464
3465impl SequencedConsensusTransaction {
3466    pub fn sender_authority(&self) -> AuthorityName {
3467        self.certificate_author
3468    }
3469
3470    pub fn key(&self) -> SequencedConsensusTransactionKey {
3471        self.transaction.key()
3472    }
3473
3474    pub fn is_end_of_publish(&self) -> bool {
3475        if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
3476            matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
3477        } else {
3478            false
3479        }
3480    }
3481
3482    pub fn try_take_execution_time_observation(&mut self) -> Option<ExecutionTimeObservation> {
3483        if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
3484            kind: ConsensusTransactionKind::ExecutionTimeObservation(observation),
3485            ..
3486        }) = &mut self.transaction
3487        {
3488            Some(std::mem::take(observation))
3489        } else {
3490            None
3491        }
3492    }
3493
3494    pub fn is_system(&self) -> bool {
3495        matches!(
3496            self.transaction,
3497            SequencedConsensusTransactionKind::System(_)
3498        )
3499    }
3500
3501    pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
3502        if !randomness_state_enabled {
3503            // If randomness is disabled, these should be processed same as a tx without randomness,
3504            // which will eventually fail when the randomness state object is not found.
3505            return false;
3506        }
3507        match &self.transaction {
3508            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3509                kind: ConsensusTransactionKind::UserTransactionV2(txn),
3510                ..
3511            }) => txn.tx().transaction_data().uses_randomness(),
3512            _ => false,
3513        }
3514    }
3515
3516    pub fn as_consensus_txn(&self) -> Option<&SenderSignedData> {
3517        match &self.transaction {
3518            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3519                kind: ConsensusTransactionKind::UserTransactionV2(txn),
3520                ..
3521            }) if txn.tx().is_consensus_tx() => Some(txn.tx().data()),
3522            SequencedConsensusTransactionKind::System(txn) if txn.is_consensus_tx() => {
3523                Some(txn.data())
3524            }
3525            _ => None,
3526        }
3527    }
3528}
3529
3530#[derive(Debug, Clone, Serialize, Deserialize)]
3531pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
3532
3533#[cfg(test)]
3534impl VerifiedSequencedConsensusTransaction {
3535    pub fn new_test(transaction: ConsensusTransaction) -> Self {
3536        Self(SequencedConsensusTransaction::new_test(transaction))
3537    }
3538}
3539
3540impl SequencedConsensusTransaction {
3541    pub fn new_test(transaction: ConsensusTransaction) -> Self {
3542        Self {
3543            certificate_author_index: 0,
3544            certificate_author: AuthorityName::ZERO,
3545            consensus_index: Default::default(),
3546            transaction: SequencedConsensusTransactionKind::External(transaction),
3547        }
3548    }
3549}
3550
3551#[derive(Serialize, Deserialize)]
3552pub(crate) struct CommitIntervalObserver {
3553    ring_buffer: VecDeque<u64>,
3554}
3555
3556impl CommitIntervalObserver {
3557    pub fn new(window_size: u32) -> Self {
3558        Self {
3559            ring_buffer: VecDeque::with_capacity(window_size as usize),
3560        }
3561    }
3562
3563    pub fn observe_commit_time(&mut self, consensus_commit: &impl ConsensusCommitAPI) {
3564        let commit_time = consensus_commit.commit_timestamp_ms();
3565        if self.ring_buffer.len() == self.ring_buffer.capacity() {
3566            self.ring_buffer.pop_front();
3567        }
3568        self.ring_buffer.push_back(commit_time);
3569    }
3570
3571    pub fn commit_interval_estimate(&self) -> Option<Duration> {
3572        if self.ring_buffer.len() <= 1 {
3573            None
3574        } else {
3575            let first = self.ring_buffer.front().unwrap();
3576            let last = self.ring_buffer.back().unwrap();
3577            let duration = last.saturating_sub(*first);
3578            let num_commits = self.ring_buffer.len() as u64;
3579            Some(Duration::from_millis(duration.div_ceil(num_commits)))
3580        }
3581    }
3582}
3583
3584#[cfg(test)]
3585mod tests {
3586    use std::collections::HashSet;
3587
3588    use consensus_core::{
3589        BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock,
3590    };
3591    use futures::pin_mut;
3592    use prometheus::Registry;
3593    use sui_protocol_config::{ConsensusTransactionOrdering, ProtocolConfig};
3594    use sui_types::{
3595        base_types::ExecutionDigests,
3596        base_types::{AuthorityName, FullObjectRef, ObjectID, SuiAddress, random_object_ref},
3597        committee::Committee,
3598        crypto::deterministic_random_account_key,
3599        gas::GasCostSummary,
3600        message_envelope::Message,
3601        messages_checkpoint::{
3602            CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
3603            SignedCheckpointSummary,
3604        },
3605        messages_consensus::ConsensusTransaction,
3606        object::Object,
3607        transaction::{
3608            CertifiedTransaction, TransactionData, TransactionDataAPI, VerifiedCertificate,
3609        },
3610    };
3611
3612    use super::*;
3613    use crate::{
3614        authority::{
3615            authority_per_epoch_store::ConsensusStatsAPI,
3616            test_authority_builder::TestAuthorityBuilder,
3617        },
3618        checkpoints::CheckpointServiceNoop,
3619        consensus_adapter::consensus_tests::test_user_transaction,
3620        consensus_test_utils::make_consensus_adapter_for_test,
3621        post_consensus_tx_reorder::PostConsensusTxReorder,
3622    };
3623
3624    #[tokio::test(flavor = "current_thread", start_paused = true)]
3625    async fn test_consensus_commit_handler() {
3626        telemetry_subscribers::init_for_testing();
3627
3628        // GIVEN
3629        // 1 account keypair
3630        let (sender, keypair) = deterministic_random_account_key();
3631        // 12 gas objects.
3632        let gas_objects: Vec<Object> = (0..12)
3633            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3634            .collect();
3635        // 4 owned objects.
3636        let owned_objects: Vec<Object> = (0..4)
3637            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3638            .collect();
3639        // 6 shared objects.
3640        let shared_objects: Vec<Object> = (0..6)
3641            .map(|_| Object::shared_for_testing())
3642            .collect::<Vec<_>>();
3643        let mut all_objects = gas_objects.clone();
3644        all_objects.extend(owned_objects.clone());
3645        all_objects.extend(shared_objects.clone());
3646
3647        let network_config =
3648            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3649                .with_objects(all_objects.clone())
3650                .build();
3651
3652        let state = TestAuthorityBuilder::new()
3653            .with_network_config(&network_config, 0)
3654            .build()
3655            .await;
3656
3657        let epoch_store = state.epoch_store_for_testing().clone();
3658        let new_epoch_start_state = epoch_store.epoch_start_state();
3659        let consensus_committee = new_epoch_start_state.get_consensus_committee();
3660
3661        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3662
3663        let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
3664
3665        let backpressure_manager = BackpressureManager::new_for_tests();
3666        let consensus_adapter =
3667            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3668        let settlement_scheduler = SettlementScheduler::new(
3669            state.execution_scheduler().as_ref().clone(),
3670            state.get_transaction_cache_reader().clone(),
3671            state.metrics.clone(),
3672        );
3673        let mut consensus_handler = ConsensusHandler::new(
3674            epoch_store,
3675            Arc::new(CheckpointServiceNoop {}),
3676            settlement_scheduler,
3677            consensus_adapter,
3678            state.get_object_cache_reader().clone(),
3679            consensus_committee.clone(),
3680            metrics,
3681            Arc::new(throughput_calculator),
3682            backpressure_manager.subscribe(),
3683            state.traffic_controller.clone(),
3684            None,
3685            state.consensus_gasless_counter.clone(),
3686        );
3687
3688        // AND create test user transactions alternating between owned and shared input.
3689        let mut user_transactions = vec![];
3690        for (i, gas_object) in gas_objects[0..8].iter().enumerate() {
3691            let input_object = if i % 2 == 0 {
3692                owned_objects.get(i / 2).unwrap().clone()
3693            } else {
3694                shared_objects.get(i / 2).unwrap().clone()
3695            };
3696            let transaction = test_user_transaction(
3697                &state,
3698                sender,
3699                &keypair,
3700                gas_object.clone(),
3701                vec![input_object],
3702            )
3703            .await;
3704            user_transactions.push(transaction);
3705        }
3706
3707        // AND create 4 more user transactions with remaining gas objects and 2 shared objects.
3708        // Having more txns on the same shared object may get deferred.
3709        for (i, gas_object) in gas_objects[8..12].iter().enumerate() {
3710            let shared_object = if i < 2 {
3711                shared_objects[4].clone()
3712            } else {
3713                shared_objects[5].clone()
3714            };
3715            let transaction = test_user_transaction(
3716                &state,
3717                sender,
3718                &keypair,
3719                gas_object.clone(),
3720                vec![shared_object],
3721            )
3722            .await;
3723            user_transactions.push(transaction);
3724        }
3725
3726        // AND create block for each user transaction
3727        let mut blocks = Vec::new();
3728        for (i, consensus_transaction) in user_transactions
3729            .iter()
3730            .cloned()
3731            .map(|t| ConsensusTransaction::new_user_transaction_v2_message(&state.name, t.into()))
3732            .enumerate()
3733        {
3734            let transaction_bytes = bcs::to_bytes(&consensus_transaction).unwrap();
3735            let block = VerifiedBlock::new_for_test(
3736                TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
3737                    .set_transactions(vec![Transaction::new(transaction_bytes)])
3738                    .build(),
3739            );
3740
3741            blocks.push(block);
3742        }
3743
3744        // AND create the consensus commit
3745        let leader_block = blocks[0].clone();
3746        let committed_sub_dag = CommittedSubDag::new(
3747            leader_block.reference(),
3748            blocks.clone(),
3749            leader_block.timestamp_ms(),
3750            CommitRef::new(10, CommitDigest::MIN),
3751        );
3752
3753        // Test that the consensus handler respects backpressure.
3754        backpressure_manager.set_backpressure(true);
3755        // Default watermarks are 0,0 which will suppress the backpressure.
3756        backpressure_manager.update_highest_certified_checkpoint(1);
3757
3758        // AND process the consensus commit once
3759        {
3760            let waiter = consensus_handler.handle_consensus_commit(committed_sub_dag.clone());
3761            pin_mut!(waiter);
3762
3763            // waiter should not complete within 5 seconds
3764            tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
3765                .await
3766                .unwrap_err();
3767
3768            // lift backpressure
3769            backpressure_manager.set_backpressure(false);
3770
3771            // waiter completes now.
3772            tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
3773                .await
3774                .unwrap();
3775        }
3776
3777        // THEN check the consensus stats
3778        let num_blocks = blocks.len();
3779        let num_transactions = user_transactions.len();
3780        let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
3781        assert_eq!(
3782            last_consensus_stats_1.index.transaction_index,
3783            num_transactions as u64
3784        );
3785        assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
3786        assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
3787        assert_eq!(
3788            last_consensus_stats_1.stats.get_num_messages(0),
3789            num_blocks as u64
3790        );
3791        assert_eq!(
3792            last_consensus_stats_1.stats.get_num_user_transactions(0),
3793            num_transactions as u64
3794        );
3795
3796        // THEN check for execution status of user transactions.
3797        for (i, t) in user_transactions.iter().enumerate() {
3798            let digest = t.tx().digest();
3799            if tokio::time::timeout(
3800                std::time::Duration::from_secs(10),
3801                state.notify_read_effects_for_testing("", *digest),
3802            )
3803            .await
3804            .is_ok()
3805            {
3806                // Effects exist as expected.
3807            } else {
3808                panic!("User transaction {} {} did not execute", i, digest);
3809            }
3810        }
3811
3812        // THEN check for no inflight or suspended transactions.
3813        state.execution_scheduler().check_empty_for_testing().await;
3814    }
3815
3816    fn to_short_strings(txs: Vec<VerifiedExecutableTransactionWithAliases>) -> Vec<String> {
3817        txs.into_iter()
3818            .map(|tx| format!("transaction({})", tx.tx().transaction_data().gas_price()))
3819            .collect()
3820    }
3821
3822    #[test]
3823    fn test_order_by_gas_price() {
3824        let mut v = vec![user_txn(42), user_txn(100)];
3825        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3826        assert_eq!(
3827            to_short_strings(v),
3828            vec![
3829                "transaction(100)".to_string(),
3830                "transaction(42)".to_string(),
3831            ]
3832        );
3833
3834        let mut v = vec![
3835            user_txn(1200),
3836            user_txn(12),
3837            user_txn(1000),
3838            user_txn(42),
3839            user_txn(100),
3840            user_txn(1000),
3841        ];
3842        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3843        assert_eq!(
3844            to_short_strings(v),
3845            vec![
3846                "transaction(1200)".to_string(),
3847                "transaction(1000)".to_string(),
3848                "transaction(1000)".to_string(),
3849                "transaction(100)".to_string(),
3850                "transaction(42)".to_string(),
3851                "transaction(12)".to_string(),
3852            ]
3853        );
3854    }
3855
3856    #[tokio::test(flavor = "current_thread")]
3857    async fn test_checkpoint_signature_dedup() {
3858        telemetry_subscribers::init_for_testing();
3859
3860        let network_config =
3861            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3862        let state = TestAuthorityBuilder::new()
3863            .with_network_config(&network_config, 0)
3864            .build()
3865            .await;
3866
3867        let epoch_store = state.epoch_store_for_testing().clone();
3868        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3869
3870        let make_signed = || {
3871            let epoch = epoch_store.epoch();
3872            let contents =
3873                CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3874            let summary = CheckpointSummary::new(
3875                &ProtocolConfig::get_for_max_version_UNSAFE(),
3876                epoch,
3877                42, // sequence number
3878                10, // network_total_transactions
3879                &contents,
3880                None, // previous_digest
3881                GasCostSummary::default(),
3882                None,       // end_of_epoch_data
3883                0,          // timestamp
3884                Vec::new(), // randomness_rounds
3885                Vec::new(), // checkpoint_artifact_digests
3886            );
3887            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name)
3888        };
3889
3890        // Prepare V2 pair: same (authority, seq), different digests => different keys
3891        let v2_s1 = make_signed();
3892        let v2_s1_clone = v2_s1.clone();
3893        let v2_digest_a = v2_s1.data().digest();
3894        let v2_a =
3895            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3896                summary: v2_s1,
3897            });
3898
3899        let v2_s2 = make_signed();
3900        let v2_digest_b = v2_s2.data().digest();
3901        let v2_b =
3902            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3903                summary: v2_s2,
3904            });
3905
3906        assert_ne!(v2_digest_a, v2_digest_b);
3907
3908        // Create an exact duplicate with same digest to exercise valid dedup
3909        assert_eq!(v2_s1_clone.data().digest(), v2_digest_a);
3910        let v2_dup =
3911            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3912                summary: v2_s1_clone,
3913            });
3914
3915        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3916        let block = VerifiedBlock::new_for_test(
3917            TestBlock::new(100, 0)
3918                .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)])
3919                .build(),
3920        );
3921        let commit = CommittedSubDag::new(
3922            block.reference(),
3923            vec![block.clone()],
3924            block.timestamp_ms(),
3925            CommitRef::new(10, CommitDigest::MIN),
3926        );
3927
3928        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3929        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3930        let backpressure = BackpressureManager::new_for_tests();
3931        let consensus_adapter =
3932            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3933        let settlement_scheduler = SettlementScheduler::new(
3934            state.execution_scheduler().as_ref().clone(),
3935            state.get_transaction_cache_reader().clone(),
3936            state.metrics.clone(),
3937        );
3938        let mut handler = ConsensusHandler::new(
3939            epoch_store.clone(),
3940            Arc::new(CheckpointServiceNoop {}),
3941            settlement_scheduler,
3942            consensus_adapter,
3943            state.get_object_cache_reader().clone(),
3944            consensus_committee.clone(),
3945            metrics,
3946            Arc::new(throughput),
3947            backpressure.subscribe(),
3948            state.traffic_controller.clone(),
3949            None,
3950            state.consensus_gasless_counter.clone(),
3951        );
3952
3953        handler.handle_consensus_commit(commit).await;
3954
3955        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3956        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3957
3958        // V2 distinct digests: both must be processed. If these were collapsed to one CheckpointSeq num, only one would process.
3959        let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a));
3960        let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b));
3961        assert!(
3962            epoch_store
3963                .is_consensus_message_processed(&v2_key_a)
3964                .unwrap()
3965        );
3966        assert!(
3967            epoch_store
3968                .is_consensus_message_processed(&v2_key_b)
3969                .unwrap()
3970        );
3971    }
3972
3973    #[tokio::test(flavor = "current_thread")]
3974    async fn test_verify_consensus_transaction_filters_mismatched_authorities() {
3975        telemetry_subscribers::init_for_testing();
3976
3977        let network_config =
3978            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3979        let state = TestAuthorityBuilder::new()
3980            .with_network_config(&network_config, 0)
3981            .build()
3982            .await;
3983
3984        let epoch_store = state.epoch_store_for_testing().clone();
3985        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3986
3987        // Create a different authority than our test authority
3988        use fastcrypto::traits::KeyPair;
3989        let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair();
3990        let wrong_authority: AuthorityName = wrong_keypair.public().into();
3991
3992        // Create EndOfPublish transaction with mismatched authority
3993        let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority);
3994
3995        // Create valid EndOfPublish transaction with correct authority
3996        let valid_eop = ConsensusTransaction::new_end_of_publish(state.name);
3997
3998        // Create CheckpointSignature with mismatched authority
3999        let epoch = epoch_store.epoch();
4000        let contents =
4001            CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
4002        let summary = CheckpointSummary::new(
4003            &ProtocolConfig::get_for_max_version_UNSAFE(),
4004            epoch,
4005            42, // sequence number
4006            10, // network_total_transactions
4007            &contents,
4008            None, // previous_digest
4009            GasCostSummary::default(),
4010            None,       // end_of_epoch_data
4011            0,          // timestamp
4012            Vec::new(), // randomness_rounds
4013            Vec::new(), // checkpoint commitments
4014        );
4015
4016        // Create a signed checkpoint with the wrong authority
4017        let mismatched_checkpoint_signed =
4018            SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority);
4019        let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest();
4020        let mismatched_checkpoint =
4021            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4022                summary: mismatched_checkpoint_signed,
4023            });
4024
4025        // Create a valid checkpoint signature with correct authority
4026        let valid_checkpoint_signed =
4027            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name);
4028        let valid_checkpoint_digest = valid_checkpoint_signed.data().digest();
4029        let valid_checkpoint =
4030            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4031                summary: valid_checkpoint_signed,
4032            });
4033
4034        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
4035
4036        // Create a block with both valid and invalid transactions
4037        let block = VerifiedBlock::new_for_test(
4038            TestBlock::new(100, 0)
4039                .set_transactions(vec![
4040                    to_tx(&mismatched_eop),
4041                    to_tx(&valid_eop),
4042                    to_tx(&mismatched_checkpoint),
4043                    to_tx(&valid_checkpoint),
4044                ])
4045                .build(),
4046        );
4047        let commit = CommittedSubDag::new(
4048            block.reference(),
4049            vec![block.clone()],
4050            block.timestamp_ms(),
4051            CommitRef::new(10, CommitDigest::MIN),
4052        );
4053
4054        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
4055        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
4056        let backpressure = BackpressureManager::new_for_tests();
4057        let consensus_adapter =
4058            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
4059        let settlement_scheduler = SettlementScheduler::new(
4060            state.execution_scheduler().as_ref().clone(),
4061            state.get_transaction_cache_reader().clone(),
4062            state.metrics.clone(),
4063        );
4064        let mut handler = ConsensusHandler::new(
4065            epoch_store.clone(),
4066            Arc::new(CheckpointServiceNoop {}),
4067            settlement_scheduler,
4068            consensus_adapter,
4069            state.get_object_cache_reader().clone(),
4070            consensus_committee.clone(),
4071            metrics,
4072            Arc::new(throughput),
4073            backpressure.subscribe(),
4074            state.traffic_controller.clone(),
4075            None,
4076            state.consensus_gasless_counter.clone(),
4077        );
4078
4079        handler.handle_consensus_commit(commit).await;
4080
4081        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
4082        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
4083
4084        // Check that valid transactions were processed
4085        let valid_eop_key = SK::External(CK::EndOfPublish(state.name));
4086        assert!(
4087            epoch_store
4088                .is_consensus_message_processed(&valid_eop_key)
4089                .unwrap(),
4090            "Valid EndOfPublish should have been processed"
4091        );
4092
4093        let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4094            state.name,
4095            42,
4096            valid_checkpoint_digest,
4097        ));
4098        assert!(
4099            epoch_store
4100                .is_consensus_message_processed(&valid_checkpoint_key)
4101                .unwrap(),
4102            "Valid CheckpointSignature should have been processed"
4103        );
4104
4105        // Check that mismatched authority transactions were NOT processed (filtered out by verify_consensus_transaction)
4106        let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority));
4107        assert!(
4108            !epoch_store
4109                .is_consensus_message_processed(&mismatched_eop_key)
4110                .unwrap(),
4111            "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)"
4112        );
4113
4114        let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4115            wrong_authority,
4116            42,
4117            mismatched_checkpoint_digest,
4118        ));
4119        assert!(
4120            !epoch_store
4121                .is_consensus_message_processed(&mismatched_checkpoint_key)
4122                .unwrap(),
4123            "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)"
4124        );
4125    }
4126
4127    fn user_txn(gas_price: u64) -> VerifiedExecutableTransactionWithAliases {
4128        let (committee, keypairs) = Committee::new_simple_test_committee();
4129        let (sender, sender_keypair) = deterministic_random_account_key();
4130        let tx = sui_types::transaction::Transaction::from_data_and_signer(
4131            TransactionData::new_transfer(
4132                SuiAddress::default(),
4133                FullObjectRef::from_fastpath_ref(random_object_ref()),
4134                sender,
4135                random_object_ref(),
4136                1000 * gas_price,
4137                gas_price,
4138            ),
4139            vec![&sender_keypair],
4140        );
4141        let tx = VerifiedExecutableTransaction::new_from_certificate(
4142            VerifiedCertificate::new_unchecked(
4143                CertifiedTransaction::new_from_keypairs_for_testing(
4144                    tx.into_data(),
4145                    &keypairs,
4146                    &committee,
4147                ),
4148            ),
4149        );
4150        VerifiedExecutableTransactionWithAliases::no_aliases(tx)
4151    }
4152
4153    mod checkpoint_queue_tests {
4154        use super::*;
4155        use consensus_core::CommitRef;
4156        use sui_types::digests::Digest;
4157
4158        fn make_chunk(tx_count: usize, height: u64) -> Chunk {
4159            Chunk {
4160                schedulables: (0..tx_count)
4161                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4162                    .collect(),
4163                settlement: None,
4164                height,
4165            }
4166        }
4167
4168        fn make_commit_ref(index: u32) -> CommitRef {
4169            CommitRef {
4170                index,
4171                digest: CommitDigest::MIN,
4172            }
4173        }
4174
4175        fn default_versions() -> HashMap<TransactionKey, AssignedVersions> {
4176            HashMap::new()
4177        }
4178
4179        #[test]
4180        fn test_flush_all_checkpoint_roots() {
4181            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4182            let versions = default_versions();
4183
4184            queue.push_chunk(
4185                make_chunk(5, 1),
4186                &versions,
4187                1000,
4188                make_commit_ref(1),
4189                Digest::default(),
4190            );
4191            queue.push_chunk(
4192                make_chunk(3, 2),
4193                &versions,
4194                1000,
4195                make_commit_ref(1),
4196                Digest::default(),
4197            );
4198
4199            let pending = queue.flush(1000, true);
4200
4201            assert!(pending.is_some());
4202            assert!(queue.pending_roots.is_empty());
4203        }
4204
4205        #[test]
4206        fn test_flush_respects_min_checkpoint_interval() {
4207            let min_interval = 200;
4208            let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, 1000, min_interval);
4209            let versions = default_versions();
4210
4211            queue.push_chunk(
4212                make_chunk(5, 1),
4213                &versions,
4214                1000,
4215                make_commit_ref(1),
4216                Digest::default(),
4217            );
4218
4219            let pending = queue.flush(1000 + min_interval - 1, false);
4220            assert!(pending.is_none());
4221            assert_eq!(queue.pending_roots.len(), 1);
4222
4223            let pending = queue.flush(1000 + min_interval, false);
4224            assert!(pending.is_some());
4225            assert!(queue.pending_roots.is_empty());
4226        }
4227
4228        #[test]
4229        fn test_push_chunk_flushes_when_exceeds_max() {
4230            let max_tx = 10;
4231            let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, max_tx, 0);
4232            let versions = default_versions();
4233
4234            queue.push_chunk(
4235                make_chunk(max_tx / 2 + 1, 1),
4236                &versions,
4237                1000,
4238                make_commit_ref(1),
4239                Digest::default(),
4240            );
4241
4242            let flushed = queue.push_chunk(
4243                make_chunk(max_tx / 2 + 1, 2),
4244                &versions,
4245                1000,
4246                make_commit_ref(2),
4247                Digest::default(),
4248            );
4249
4250            assert_eq!(flushed.len(), 1);
4251            assert_eq!(queue.pending_roots.len(), 1);
4252        }
4253
4254        #[test]
4255        fn test_multiple_chunks_merged_into_one_checkpoint() {
4256            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 200);
4257            let versions = default_versions();
4258
4259            queue.push_chunk(
4260                make_chunk(10, 1),
4261                &versions,
4262                1000,
4263                make_commit_ref(1),
4264                Digest::default(),
4265            );
4266            queue.push_chunk(
4267                make_chunk(10, 2),
4268                &versions,
4269                1000,
4270                make_commit_ref(2),
4271                Digest::default(),
4272            );
4273            queue.push_chunk(
4274                make_chunk(10, 3),
4275                &versions,
4276                1000,
4277                make_commit_ref(3),
4278                Digest::default(),
4279            );
4280
4281            let pending = queue.flush(1000, true).unwrap();
4282
4283            assert_eq!(pending.roots.len(), 3);
4284        }
4285
4286        #[test]
4287        fn test_push_chunk_handles_overflow() {
4288            let max_tx = 10;
4289            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4290            let versions = default_versions();
4291
4292            let flushed1 = queue.push_chunk(
4293                make_chunk(max_tx / 2, 1),
4294                &versions,
4295                1000,
4296                make_commit_ref(1),
4297                Digest::default(),
4298            );
4299            assert!(flushed1.is_empty());
4300
4301            let flushed2 = queue.push_chunk(
4302                make_chunk(max_tx / 2, 2),
4303                &versions,
4304                1000,
4305                make_commit_ref(2),
4306                Digest::default(),
4307            );
4308            assert!(flushed2.is_empty());
4309
4310            let flushed3 = queue.push_chunk(
4311                make_chunk(max_tx / 2, 3),
4312                &versions,
4313                1000,
4314                make_commit_ref(3),
4315                Digest::default(),
4316            );
4317            assert_eq!(flushed3.len(), 1);
4318
4319            let pending = queue.flush(1000, true);
4320
4321            for p in pending.iter().chain(flushed3.iter()) {
4322                let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4323                assert!(tx_count <= max_tx);
4324            }
4325        }
4326
4327        #[test]
4328        fn test_checkpoint_uses_last_chunk_height() {
4329            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4330            let versions = default_versions();
4331
4332            queue.push_chunk(
4333                make_chunk(10, 100),
4334                &versions,
4335                1000,
4336                make_commit_ref(1),
4337                Digest::default(),
4338            );
4339            queue.push_chunk(
4340                make_chunk(10, 200),
4341                &versions,
4342                1000,
4343                make_commit_ref(2),
4344                Digest::default(),
4345            );
4346
4347            let pending = queue.flush(1000, true).unwrap();
4348
4349            assert_eq!(pending.details.checkpoint_height, 200);
4350        }
4351
4352        #[test]
4353        fn test_last_built_timestamp_updated_on_flush() {
4354            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4355            let versions = default_versions();
4356
4357            queue.push_chunk(
4358                make_chunk(10, 1),
4359                &versions,
4360                5000,
4361                make_commit_ref(1),
4362                Digest::default(),
4363            );
4364
4365            assert_eq!(queue.last_built_timestamp, 0);
4366
4367            let _ = queue.flush(5000, true);
4368
4369            assert_eq!(queue.last_built_timestamp, 5000);
4370        }
4371
4372        #[test]
4373        fn test_settlement_info_sent_through_channel() {
4374            let mut queue = CheckpointQueue::new_for_testing(0, 0, 5, 1000, 0);
4375            let versions = default_versions();
4376
4377            let chunk1 = Chunk {
4378                schedulables: vec![
4379                    Schedulable::ConsensusCommitPrologue(0, 1, 0),
4380                    Schedulable::ConsensusCommitPrologue(0, 2, 0),
4381                    Schedulable::ConsensusCommitPrologue(0, 3, 0),
4382                ],
4383                settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4384                height: 1,
4385            };
4386
4387            let chunk2 = Chunk {
4388                schedulables: vec![
4389                    Schedulable::ConsensusCommitPrologue(0, 4, 0),
4390                    Schedulable::ConsensusCommitPrologue(0, 5, 0),
4391                ],
4392                settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4393                height: 2,
4394            };
4395
4396            queue.push_chunk(
4397                chunk1,
4398                &versions,
4399                1000,
4400                make_commit_ref(1),
4401                Digest::default(),
4402            );
4403            queue.push_chunk(
4404                chunk2,
4405                &versions,
4406                1000,
4407                make_commit_ref(1),
4408                Digest::default(),
4409            );
4410        }
4411
4412        #[test]
4413        fn test_settlement_checkpoint_seq_correct_after_flush() {
4414            let max_tx = 10;
4415            let initial_seq = 5;
4416            let (sender, mut receiver) = monitored_mpsc::unbounded_channel("test_settlement_seq");
4417            let mut queue =
4418                CheckpointQueue::new_for_testing_with_sender(0, 0, initial_seq, max_tx, 0, sender);
4419            let versions = default_versions();
4420
4421            // Push a chunk that partially fills the queue (no flush).
4422            let chunk1 = Chunk {
4423                schedulables: (0..max_tx / 2 + 1)
4424                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4425                    .collect(),
4426                settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4427                height: 1,
4428            };
4429            queue.push_chunk(
4430                chunk1,
4431                &versions,
4432                1000,
4433                make_commit_ref(1),
4434                Digest::default(),
4435            );
4436
4437            // Drain the first message from the channel.
4438            let msg1 = receiver.try_recv().unwrap();
4439            let settlement1 = msg1.1.unwrap();
4440            assert_eq!(settlement1.checkpoint_seq, initial_seq);
4441
4442            // Push a second chunk that triggers a flush of chunk1's roots.
4443            let chunk2 = Chunk {
4444                schedulables: (0..max_tx / 2 + 1)
4445                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4446                    .collect(),
4447                settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4448                height: 2,
4449            };
4450            let flushed = queue.push_chunk(
4451                chunk2,
4452                &versions,
4453                1000,
4454                make_commit_ref(2),
4455                Digest::default(),
4456            );
4457            assert_eq!(flushed.len(), 1);
4458            assert_eq!(flushed[0].details.checkpoint_seq, Some(initial_seq));
4459
4460            // The second settlement must have checkpoint_seq = initial_seq + 1,
4461            // because the flush incremented current_checkpoint_seq.
4462            let msg2 = receiver.try_recv().unwrap();
4463            let settlement2 = msg2.1.unwrap();
4464            assert_eq!(settlement2.checkpoint_seq, initial_seq + 1);
4465
4466            // Flush the remaining roots and verify the PendingCheckpointV2's seq
4467            // matches the settlement's seq.
4468            let pending = queue.flush_forced().unwrap();
4469            assert_eq!(
4470                pending.details.checkpoint_seq,
4471                Some(settlement2.checkpoint_seq)
4472            );
4473        }
4474
4475        #[test]
4476        fn test_checkpoint_seq_increments_on_flush() {
4477            let mut queue = CheckpointQueue::new_for_testing(0, 0, 10, 1000, 0);
4478            let versions = default_versions();
4479
4480            queue.push_chunk(
4481                make_chunk(5, 1),
4482                &versions,
4483                1000,
4484                make_commit_ref(1),
4485                Digest::default(),
4486            );
4487
4488            let pending = queue.flush(1000, true).unwrap();
4489
4490            assert_eq!(pending.details.checkpoint_seq, Some(10));
4491            assert_eq!(queue.current_checkpoint_seq, 11);
4492        }
4493
4494        #[test]
4495        fn test_multiple_chunks_with_overflow() {
4496            let max_tx = 10;
4497            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4498            let versions = default_versions();
4499
4500            let flushed1 = queue.push_chunk(
4501                make_chunk(max_tx / 2 + 1, 1),
4502                &versions,
4503                1000,
4504                make_commit_ref(1),
4505                Digest::default(),
4506            );
4507            let flushed2 = queue.push_chunk(
4508                make_chunk(max_tx / 2 + 1, 2),
4509                &versions,
4510                1000,
4511                make_commit_ref(1),
4512                Digest::default(),
4513            );
4514            let flushed3 = queue.push_chunk(
4515                make_chunk(max_tx / 2 + 1, 3),
4516                &versions,
4517                1000,
4518                make_commit_ref(1),
4519                Digest::default(),
4520            );
4521
4522            let all_flushed: Vec<_> = flushed1
4523                .into_iter()
4524                .chain(flushed2)
4525                .chain(flushed3)
4526                .collect();
4527            assert_eq!(all_flushed.len(), 2);
4528            assert_eq!(queue.pending_roots.len(), 1);
4529
4530            for p in &all_flushed {
4531                let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4532                assert!(tx_count <= max_tx);
4533            }
4534        }
4535    }
4536}