sui_core/
consensus_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet, VecDeque},
6    hash::Hash,
7    num::NonZeroUsize,
8    sync::{Arc, Mutex},
9    time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use arc_swap::ArcSwap;
13use consensus_config::Committee as ConsensusCommittee;
14use consensus_core::{CommitConsumerMonitor, CommitIndex, CommitRef};
15use consensus_types::block::TransactionIndex;
16use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
17use lru::LruCache;
18use mysten_common::{
19    assert_reachable, assert_sometimes, debug_fatal, random_util::randomize_cache_capacity_in_tests,
20};
21use mysten_metrics::{
22    monitored_future,
23    monitored_mpsc::{self, UnboundedReceiver},
24    monitored_scope, spawn_monitored_task,
25};
26use nonempty::NonEmpty;
27use parking_lot::RwLockWriteGuard;
28use serde::{Deserialize, Serialize};
29use sui_config::node::CongestionLogConfig;
30use sui_macros::{fail_point, fail_point_arg, fail_point_if};
31use sui_protocol_config::{PerObjectCongestionControlMode, ProtocolConfig};
32use sui_types::{
33    SUI_RANDOMNESS_STATE_OBJECT_ID,
34    authenticator_state::ActiveJwk,
35    base_types::{
36        AuthorityName, ConciseableName, ConsensusObjectSequenceKey, ObjectID, ObjectRef,
37        SequenceNumber, TransactionDigest,
38    },
39    crypto::RandomnessRound,
40    digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest, Digest},
41    executable_transaction::{
42        TrustedExecutableTransaction, VerifiedExecutableTransaction,
43        VerifiedExecutableTransactionWithAliases,
44    },
45    messages_checkpoint::{
46        CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointTimestamp,
47    },
48    messages_consensus::{
49        AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments,
50        ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
51        ExecutionTimeObservation,
52    },
53    sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
54    transaction::{
55        InputObjectKind, SenderSignedData, TransactionDataAPI, TransactionKey, VerifiedCertificate,
56        VerifiedTransaction, WithAliases,
57    },
58};
59use tokio::task::JoinSet;
60use tracing::{debug, error, info, instrument, trace, warn};
61
62use crate::{
63    authority::{
64        AuthorityMetrics, AuthorityState, ExecutionEnv,
65        authority_per_epoch_store::{
66            AuthorityPerEpochStore, CancelConsensusCertificateReason, ConsensusStats,
67            ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStatsV2,
68            consensus_quarantine::ConsensusCommitOutput,
69        },
70        backpressure::{BackpressureManager, BackpressureSubscriber},
71        congestion_log::CongestionCommitLogger,
72        consensus_tx_status_cache::ConsensusTxStatus,
73        epoch_start_configuration::EpochStartConfigTrait,
74        execution_time_estimator::ExecutionTimeEstimator,
75        shared_object_congestion_tracker::SharedObjectCongestionTracker,
76        shared_object_version_manager::{AssignedTxAndVersions, AssignedVersions, Schedulable},
77        transaction_deferral::{DeferralKey, DeferralReason, transaction_deferral_within_limit},
78    },
79    checkpoints::{
80        CheckpointHeight, CheckpointRoots, CheckpointService, CheckpointServiceNotify,
81        PendingCheckpoint, PendingCheckpointInfo, PendingCheckpointV2,
82    },
83    consensus_adapter::ConsensusAdapter,
84    consensus_throughput_calculator::ConsensusThroughputCalculator,
85    consensus_types::consensus_output_api::ConsensusCommitAPI,
86    epoch::{
87        randomness::{DkgStatus, RandomnessManager},
88        reconfiguration::ReconfigState,
89    },
90    execution_cache::ObjectCacheRead,
91    execution_scheduler::{SettlementBatchInfo, SettlementScheduler},
92    post_consensus_tx_reorder::PostConsensusTxReorder,
93    scoring_decision::update_low_scoring_authorities,
94    traffic_controller::{TrafficController, policies::TrafficTally},
95};
96
97/// Output from filtering consensus transactions.
98/// Contains the filtered transactions and any owned object locks acquired post-consensus.
99struct FilteredConsensusOutput {
100    transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
101    owned_object_locks: HashMap<ObjectRef, TransactionDigest>,
102}
103
104pub struct ConsensusHandlerInitializer {
105    state: Arc<AuthorityState>,
106    checkpoint_service: Arc<CheckpointService>,
107    epoch_store: Arc<AuthorityPerEpochStore>,
108    consensus_adapter: Arc<ConsensusAdapter>,
109    low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
110    throughput_calculator: Arc<ConsensusThroughputCalculator>,
111    backpressure_manager: Arc<BackpressureManager>,
112    congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
113}
114
115impl ConsensusHandlerInitializer {
116    pub fn new(
117        state: Arc<AuthorityState>,
118        checkpoint_service: Arc<CheckpointService>,
119        epoch_store: Arc<AuthorityPerEpochStore>,
120        consensus_adapter: Arc<ConsensusAdapter>,
121        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
122        throughput_calculator: Arc<ConsensusThroughputCalculator>,
123        backpressure_manager: Arc<BackpressureManager>,
124        congestion_log_config: Option<CongestionLogConfig>,
125    ) -> Self {
126        let congestion_logger =
127            congestion_log_config.and_then(|config| match CongestionCommitLogger::new(&config) {
128                Ok(logger) => Some(Arc::new(Mutex::new(logger))),
129                Err(e) => {
130                    debug_fatal!("Failed to create congestion logger: {e}");
131                    None
132                }
133            });
134        Self {
135            state,
136            checkpoint_service,
137            epoch_store,
138            consensus_adapter,
139            low_scoring_authorities,
140            throughput_calculator,
141            backpressure_manager,
142            congestion_logger,
143        }
144    }
145
146    #[cfg(test)]
147    pub(crate) fn new_for_testing(
148        state: Arc<AuthorityState>,
149        checkpoint_service: Arc<CheckpointService>,
150    ) -> Self {
151        use crate::consensus_test_utils::make_consensus_adapter_for_test;
152        use std::collections::HashSet;
153
154        let backpressure_manager = BackpressureManager::new_for_tests();
155        let consensus_adapter =
156            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
157        Self {
158            state: state.clone(),
159            checkpoint_service,
160            epoch_store: state.epoch_store_for_testing().clone(),
161            consensus_adapter,
162            low_scoring_authorities: Arc::new(Default::default()),
163            throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
164                None,
165                state.metrics.clone(),
166            )),
167            backpressure_manager,
168            congestion_logger: None,
169        }
170    }
171
172    pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
173        let new_epoch_start_state = self.epoch_store.epoch_start_state();
174        let consensus_committee = new_epoch_start_state.get_consensus_committee();
175
176        let settlement_scheduler = SettlementScheduler::new(
177            self.state.execution_scheduler().as_ref().clone(),
178            self.state.get_transaction_cache_reader().clone(),
179        );
180        ConsensusHandler::new(
181            self.epoch_store.clone(),
182            self.checkpoint_service.clone(),
183            settlement_scheduler,
184            self.consensus_adapter.clone(),
185            self.state.get_object_cache_reader().clone(),
186            self.low_scoring_authorities.clone(),
187            consensus_committee,
188            self.state.metrics.clone(),
189            self.throughput_calculator.clone(),
190            self.backpressure_manager.subscribe(),
191            self.state.traffic_controller.clone(),
192            self.congestion_logger.clone(),
193        )
194    }
195}
196
197mod additional_consensus_state {
198    use std::marker::PhantomData;
199
200    use consensus_core::CommitRef;
201    use fastcrypto::hash::HashFunction as _;
202    use sui_types::{crypto::DefaultHash, digests::Digest};
203
204    use super::*;
205    /// AdditionalConsensusState tracks any in-memory state that is retained by ConsensusHandler
206    /// between consensus commits. Because of crash recovery, using such data is inherently risky.
207    /// In order to do this safely, we must store data from a fixed number of previous commits.
208    /// Then, at start-up, that same fixed number of already processed commits is replayed to
209    /// reconstruct the state.
210    ///
211    /// To make sure that bugs in this process appear immediately, we record the digest of this
212    /// state in ConsensusCommitPrologue, so that any deviation causes an immediate fork.
213    #[derive(Serialize, Deserialize)]
214    pub(super) struct AdditionalConsensusState {
215        commit_interval_observer: CommitIntervalObserver,
216    }
217
218    impl AdditionalConsensusState {
219        pub fn new(additional_consensus_state_window_size: u32) -> Self {
220            Self {
221                commit_interval_observer: CommitIntervalObserver::new(
222                    additional_consensus_state_window_size,
223                ),
224            }
225        }
226
227        /// Update all internal state based on the new commit
228        pub(crate) fn observe_commit(
229            &mut self,
230            protocol_config: &ProtocolConfig,
231            epoch_start_time: u64,
232            consensus_commit: &impl ConsensusCommitAPI,
233        ) -> ConsensusCommitInfo {
234            self.commit_interval_observer
235                .observe_commit_time(consensus_commit);
236
237            let estimated_commit_period = self
238                .commit_interval_observer
239                .commit_interval_estimate()
240                .unwrap_or(Duration::from_millis(
241                    protocol_config.min_checkpoint_interval_ms(),
242                ));
243
244            info!("estimated commit rate: {:?}", estimated_commit_period);
245
246            self.commit_info_impl(
247                epoch_start_time,
248                consensus_commit,
249                Some(estimated_commit_period),
250            )
251        }
252
253        fn commit_info_impl(
254            &self,
255            epoch_start_time: u64,
256            consensus_commit: &impl ConsensusCommitAPI,
257            estimated_commit_period: Option<Duration>,
258        ) -> ConsensusCommitInfo {
259            let leader_author = consensus_commit.leader_author_index();
260            let timestamp = consensus_commit.commit_timestamp_ms();
261
262            let timestamp = if timestamp < epoch_start_time {
263                error!(
264                    "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}"
265                );
266                epoch_start_time
267            } else {
268                timestamp
269            };
270
271            ConsensusCommitInfo {
272                _phantom: PhantomData,
273                round: consensus_commit.leader_round(),
274                timestamp,
275                leader_author,
276                consensus_commit_ref: consensus_commit.commit_ref(),
277                rejected_transactions_digest: consensus_commit.rejected_transactions_digest(),
278                additional_state_digest: Some(self.digest()),
279                estimated_commit_period,
280                skip_consensus_commit_prologue_in_test: false,
281            }
282        }
283
284        /// Get the digest of the current state.
285        fn digest(&self) -> AdditionalConsensusStateDigest {
286            let mut hash = DefaultHash::new();
287            bcs::serialize_into(&mut hash, self).unwrap();
288            AdditionalConsensusStateDigest::new(hash.finalize().into())
289        }
290    }
291
292    pub struct ConsensusCommitInfo {
293        // prevent public construction
294        _phantom: PhantomData<()>,
295
296        pub round: u64,
297        pub timestamp: u64,
298        pub leader_author: AuthorityIndex,
299        pub consensus_commit_ref: CommitRef,
300        pub rejected_transactions_digest: Digest,
301
302        additional_state_digest: Option<AdditionalConsensusStateDigest>,
303        estimated_commit_period: Option<Duration>,
304
305        pub skip_consensus_commit_prologue_in_test: bool,
306    }
307
308    impl ConsensusCommitInfo {
309        pub fn new_for_test(
310            commit_round: u64,
311            commit_timestamp: u64,
312            estimated_commit_period: Option<Duration>,
313            skip_consensus_commit_prologue_in_test: bool,
314        ) -> Self {
315            Self {
316                _phantom: PhantomData,
317                round: commit_round,
318                timestamp: commit_timestamp,
319                leader_author: 0,
320                consensus_commit_ref: CommitRef::default(),
321                rejected_transactions_digest: Digest::default(),
322                additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO),
323                estimated_commit_period,
324                skip_consensus_commit_prologue_in_test,
325            }
326        }
327
328        pub fn new_for_congestion_test(
329            commit_round: u64,
330            commit_timestamp: u64,
331            estimated_commit_period: Duration,
332        ) -> Self {
333            Self::new_for_test(
334                commit_round,
335                commit_timestamp,
336                Some(estimated_commit_period),
337                true,
338            )
339        }
340
341        pub fn additional_state_digest(&self) -> AdditionalConsensusStateDigest {
342            // this method cannot be called if stateless_commit_info is used
343            self.additional_state_digest
344                .expect("additional_state_digest is not available")
345        }
346
347        pub fn estimated_commit_period(&self) -> Duration {
348            // this method cannot be called if stateless_commit_info is used
349            self.estimated_commit_period
350                .expect("estimated commit period is not available")
351        }
352
353        fn consensus_commit_digest(&self) -> ConsensusCommitDigest {
354            ConsensusCommitDigest::new(self.consensus_commit_ref.digest.into_inner())
355        }
356
357        fn consensus_commit_prologue_transaction(
358            &self,
359            epoch: u64,
360        ) -> VerifiedExecutableTransaction {
361            let transaction = VerifiedTransaction::new_consensus_commit_prologue(
362                epoch,
363                self.round,
364                self.timestamp,
365            );
366            VerifiedExecutableTransaction::new_system(transaction, epoch)
367        }
368
369        fn consensus_commit_prologue_v2_transaction(
370            &self,
371            epoch: u64,
372        ) -> VerifiedExecutableTransaction {
373            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
374                epoch,
375                self.round,
376                self.timestamp,
377                self.consensus_commit_digest(),
378            );
379            VerifiedExecutableTransaction::new_system(transaction, epoch)
380        }
381
382        fn consensus_commit_prologue_v3_transaction(
383            &self,
384            epoch: u64,
385            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
386        ) -> VerifiedExecutableTransaction {
387            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v3(
388                epoch,
389                self.round,
390                self.timestamp,
391                self.consensus_commit_digest(),
392                consensus_determined_version_assignments,
393            );
394            VerifiedExecutableTransaction::new_system(transaction, epoch)
395        }
396
397        fn consensus_commit_prologue_v4_transaction(
398            &self,
399            epoch: u64,
400            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
401            additional_state_digest: AdditionalConsensusStateDigest,
402        ) -> VerifiedExecutableTransaction {
403            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v4(
404                epoch,
405                self.round,
406                self.timestamp,
407                self.consensus_commit_digest(),
408                consensus_determined_version_assignments,
409                additional_state_digest,
410            );
411            VerifiedExecutableTransaction::new_system(transaction, epoch)
412        }
413
414        pub fn create_consensus_commit_prologue_transaction(
415            &self,
416            epoch: u64,
417            protocol_config: &ProtocolConfig,
418            cancelled_txn_version_assignment: Vec<(
419                TransactionDigest,
420                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
421            )>,
422            commit_info: &ConsensusCommitInfo,
423            indirect_state_observer: IndirectStateObserver,
424        ) -> VerifiedExecutableTransaction {
425            let version_assignments = if protocol_config
426                .record_consensus_determined_version_assignments_in_prologue_v2()
427            {
428                Some(
429                    ConsensusDeterminedVersionAssignments::CancelledTransactionsV2(
430                        cancelled_txn_version_assignment,
431                    ),
432                )
433            } else if protocol_config.record_consensus_determined_version_assignments_in_prologue()
434            {
435                Some(
436                    ConsensusDeterminedVersionAssignments::CancelledTransactions(
437                        cancelled_txn_version_assignment
438                            .into_iter()
439                            .map(|(tx_digest, versions)| {
440                                (
441                                    tx_digest,
442                                    versions.into_iter().map(|(id, v)| (id.0, v)).collect(),
443                                )
444                            })
445                            .collect(),
446                    ),
447                )
448            } else {
449                None
450            };
451
452            if protocol_config.record_additional_state_digest_in_prologue() {
453                let additional_state_digest =
454                    if protocol_config.additional_consensus_digest_indirect_state() {
455                        let d1 = commit_info.additional_state_digest();
456                        indirect_state_observer.fold_with(d1)
457                    } else {
458                        commit_info.additional_state_digest()
459                    };
460
461                self.consensus_commit_prologue_v4_transaction(
462                    epoch,
463                    version_assignments.unwrap(),
464                    additional_state_digest,
465                )
466            } else if let Some(version_assignments) = version_assignments {
467                self.consensus_commit_prologue_v3_transaction(epoch, version_assignments)
468            } else if protocol_config.include_consensus_digest_in_prologue() {
469                self.consensus_commit_prologue_v2_transaction(epoch)
470            } else {
471                self.consensus_commit_prologue_transaction(epoch)
472            }
473        }
474    }
475
476    #[derive(Default)]
477    pub struct IndirectStateObserver {
478        hash: DefaultHash,
479    }
480
481    impl IndirectStateObserver {
482        pub fn new() -> Self {
483            Self::default()
484        }
485
486        pub fn observe_indirect_state<T: Serialize>(&mut self, state: &T) {
487            bcs::serialize_into(&mut self.hash, state).unwrap();
488        }
489
490        pub fn fold_with(
491            self,
492            d1: AdditionalConsensusStateDigest,
493        ) -> AdditionalConsensusStateDigest {
494            let hash = self.hash.finalize();
495            let d2 = AdditionalConsensusStateDigest::new(hash.into());
496
497            let mut hasher = DefaultHash::new();
498            bcs::serialize_into(&mut hasher, &d1).unwrap();
499            bcs::serialize_into(&mut hasher, &d2).unwrap();
500            AdditionalConsensusStateDigest::new(hasher.finalize().into())
501        }
502    }
503
504    #[test]
505    fn test_additional_consensus_state() {
506        use crate::consensus_test_utils::TestConsensusCommit;
507
508        fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) {
509            let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
510            state.observe_commit(
511                &protocol_config,
512                100,
513                &TestConsensusCommit::empty(round, timestamp, 0),
514            );
515        }
516
517        let mut s1 = AdditionalConsensusState::new(3);
518        observe(&mut s1, 1, 1000);
519        observe(&mut s1, 2, 2000);
520        observe(&mut s1, 3, 3000);
521        observe(&mut s1, 4, 4000);
522
523        let mut s2 = AdditionalConsensusState::new(3);
524        // Because state uses a ring buffer, we should get the same digest
525        // even though we only added the 3 latest observations.
526        observe(&mut s2, 2, 2000);
527        observe(&mut s2, 3, 3000);
528        observe(&mut s2, 4, 4000);
529
530        assert_eq!(s1.digest(), s2.digest());
531
532        observe(&mut s1, 5, 5000);
533        observe(&mut s2, 5, 5000);
534
535        assert_eq!(s1.digest(), s2.digest());
536    }
537}
538use additional_consensus_state::AdditionalConsensusState;
539pub(crate) use additional_consensus_state::{ConsensusCommitInfo, IndirectStateObserver};
540
541struct QueuedCheckpointRoots {
542    roots: CheckpointRoots,
543    timestamp: CheckpointTimestamp,
544    consensus_commit_ref: CommitRef,
545    rejected_transactions_digest: Digest,
546}
547
548struct Chunk<
549    T: crate::authority::shared_object_version_manager::AsTx = VerifiedExecutableTransaction,
550> {
551    schedulables: Vec<Schedulable<T>>,
552    settlement: Option<Schedulable<T>>,
553    height: CheckpointHeight,
554}
555
556impl<T: crate::authority::shared_object_version_manager::AsTx + Clone> Chunk<T> {
557    fn all_schedulables(&self) -> impl Iterator<Item = &Schedulable<T>> + Clone {
558        self.schedulables.iter().chain(self.settlement.iter())
559    }
560
561    fn all_schedulables_from(chunks: &[Self]) -> impl Iterator<Item = &Schedulable<T>> + Clone {
562        chunks.iter().flat_map(|c| c.all_schedulables())
563    }
564
565    fn to_checkpoint_roots(&self) -> CheckpointRoots {
566        let tx_roots: Vec<_> = self.schedulables.iter().map(|s| s.key()).collect();
567        let settlement_root = self.settlement.as_ref().map(|s| s.key());
568        CheckpointRoots {
569            tx_roots,
570            settlement_root,
571            height: self.height,
572        }
573    }
574}
575
576impl From<Chunk<VerifiedExecutableTransactionWithAliases>> for Chunk {
577    fn from(chunk: Chunk<VerifiedExecutableTransactionWithAliases>) -> Self {
578        Chunk {
579            schedulables: chunk.schedulables.into_iter().map(|s| s.into()).collect(),
580            settlement: chunk.settlement.map(|s| s.into()),
581            height: chunk.height,
582        }
583    }
584}
585
586// Accumulates checkpoint roots from consensus and flushes them into PendingCheckpointV2s.
587// Roots are buffered in `pending_roots` until a flush is triggered (by max_tx overflow or
588// time interval). A flush always drains all buffered roots into a single PendingCheckpointV2,
589// so the queue only ever holds roots for one pending checkpoint at a time.
590//
591// Owns an ExecutionSchedulerSender so push_chunk can send schedulables and settlement info
592// for execution in one shot.
593pub(crate) struct CheckpointQueue {
594    last_built_timestamp: CheckpointTimestamp,
595    pending_roots: VecDeque<QueuedCheckpointRoots>,
596    height: u64,
597    pending_tx_count: usize,
598    current_checkpoint_seq: CheckpointSequenceNumber,
599    max_tx: usize,
600    min_checkpoint_interval_ms: u64,
601    execution_scheduler_sender: ExecutionSchedulerSender,
602}
603
604impl CheckpointQueue {
605    pub(crate) fn new(
606        last_built_timestamp: CheckpointTimestamp,
607        checkpoint_height: u64,
608        next_checkpoint_seq: CheckpointSequenceNumber,
609        max_tx: usize,
610        min_checkpoint_interval_ms: u64,
611        execution_scheduler_sender: ExecutionSchedulerSender,
612    ) -> Self {
613        Self {
614            last_built_timestamp,
615            pending_roots: VecDeque::new(),
616            height: checkpoint_height,
617            pending_tx_count: 0,
618            current_checkpoint_seq: next_checkpoint_seq,
619            max_tx,
620            min_checkpoint_interval_ms,
621            execution_scheduler_sender,
622        }
623    }
624
625    #[cfg(test)]
626    fn new_for_testing(
627        last_built_timestamp: CheckpointTimestamp,
628        checkpoint_height: u64,
629        next_checkpoint_seq: CheckpointSequenceNumber,
630        max_tx: usize,
631        min_checkpoint_interval_ms: u64,
632    ) -> Self {
633        let (sender, _receiver) = monitored_mpsc::unbounded_channel("test_checkpoint_queue_sender");
634        Self {
635            last_built_timestamp,
636            pending_roots: VecDeque::new(),
637            height: checkpoint_height,
638            pending_tx_count: 0,
639            current_checkpoint_seq: next_checkpoint_seq,
640            max_tx,
641            min_checkpoint_interval_ms,
642            execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
643        }
644    }
645
646    #[cfg(test)]
647    fn new_for_testing_with_sender(
648        last_built_timestamp: CheckpointTimestamp,
649        checkpoint_height: u64,
650        next_checkpoint_seq: CheckpointSequenceNumber,
651        max_tx: usize,
652        min_checkpoint_interval_ms: u64,
653        sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
654    ) -> Self {
655        Self {
656            last_built_timestamp,
657            pending_roots: VecDeque::new(),
658            height: checkpoint_height,
659            pending_tx_count: 0,
660            current_checkpoint_seq: next_checkpoint_seq,
661            max_tx,
662            min_checkpoint_interval_ms,
663            execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
664        }
665    }
666
667    pub(crate) fn last_built_timestamp(&self) -> CheckpointTimestamp {
668        self.last_built_timestamp
669    }
670
671    pub(crate) fn is_empty(&self) -> bool {
672        self.pending_roots.is_empty()
673    }
674
675    fn next_height(&mut self) -> u64 {
676        self.height += 1;
677        self.height
678    }
679
680    fn push_chunk(
681        &mut self,
682        chunk: Chunk,
683        assigned_versions: &HashMap<TransactionKey, AssignedVersions>,
684        timestamp: CheckpointTimestamp,
685        consensus_commit_ref: CommitRef,
686        rejected_transactions_digest: Digest,
687    ) -> Vec<PendingCheckpointV2> {
688        let max_tx = self.max_tx;
689        let user_tx_count = chunk.schedulables.len();
690
691        let roots = chunk.to_checkpoint_roots();
692
693        let schedulables: Vec<_> = chunk
694            .schedulables
695            .into_iter()
696            .map(|s| {
697                let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
698                (s, versions)
699            })
700            .collect();
701
702        let mut flushed_checkpoints = Vec::new();
703
704        if self.pending_tx_count > 0
705            && self.pending_tx_count + user_tx_count > max_tx
706            && let Some(checkpoint) = self.flush_forced()
707        {
708            flushed_checkpoints.push(checkpoint);
709        }
710
711        let settlement_info = chunk.settlement.as_ref().map(|s| {
712            let settlement_key = s.key();
713            let tx_keys: Vec<_> = schedulables.iter().map(|(s, _)| s.key()).collect();
714            SettlementBatchInfo {
715                settlement_key,
716                tx_keys,
717                checkpoint_height: chunk.height,
718                checkpoint_seq: self.current_checkpoint_seq,
719                assigned_versions: assigned_versions
720                    .get(&settlement_key)
721                    .cloned()
722                    .unwrap_or_default(),
723            }
724        });
725
726        self.execution_scheduler_sender
727            .send(schedulables, settlement_info);
728
729        self.pending_tx_count += user_tx_count;
730        self.pending_roots.push_back(QueuedCheckpointRoots {
731            roots,
732            timestamp,
733            consensus_commit_ref,
734            rejected_transactions_digest,
735        });
736
737        flushed_checkpoints
738    }
739
740    pub(crate) fn flush(
741        &mut self,
742        current_timestamp: CheckpointTimestamp,
743        force: bool,
744    ) -> Option<PendingCheckpointV2> {
745        if !force && current_timestamp < self.last_built_timestamp + self.min_checkpoint_interval_ms
746        {
747            return None;
748        }
749        self.flush_forced()
750    }
751
752    fn flush_forced(&mut self) -> Option<PendingCheckpointV2> {
753        if self.pending_roots.is_empty() {
754            return None;
755        }
756
757        let to_flush: Vec<_> = self.pending_roots.drain(..).collect();
758        let last_root = to_flush.last().unwrap();
759
760        let checkpoint = PendingCheckpointV2 {
761            roots: to_flush.iter().map(|q| q.roots.clone()).collect(),
762            details: PendingCheckpointInfo {
763                timestamp_ms: last_root.timestamp,
764                last_of_epoch: false,
765                checkpoint_height: last_root.roots.height,
766                consensus_commit_ref: last_root.consensus_commit_ref,
767                rejected_transactions_digest: last_root.rejected_transactions_digest,
768                checkpoint_seq: Some(self.current_checkpoint_seq),
769            },
770        };
771
772        self.last_built_timestamp = last_root.timestamp;
773        self.pending_tx_count = 0;
774        self.current_checkpoint_seq += 1;
775
776        Some(checkpoint)
777    }
778
779    pub(crate) fn checkpoint_seq(&self) -> CheckpointSequenceNumber {
780        self.current_checkpoint_seq
781            .checked_sub(1)
782            .expect("checkpoint_seq called before any checkpoint was assigned")
783    }
784}
785
786pub struct ConsensusHandler<C> {
787    /// A store created for each epoch. ConsensusHandler is recreated each epoch, with the
788    /// corresponding store. This store is also used to get the current epoch ID.
789    epoch_store: Arc<AuthorityPerEpochStore>,
790    /// Holds the indices, hash and stats after the last consensus commit
791    /// It is used for avoiding replaying already processed transactions,
792    /// checking chain consistency, and accumulating per-epoch consensus output stats.
793    last_consensus_stats: ExecutionIndicesWithStatsV2,
794    checkpoint_service: Arc<C>,
795    /// cache reader is needed when determining the next version to assign for shared objects.
796    cache_reader: Arc<dyn ObjectCacheRead>,
797    /// Reputation scores used by consensus adapter that we update, forwarded from consensus
798    low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
799    /// The consensus committee used to do stake computations for deciding set of low scoring authorities
800    committee: ConsensusCommittee,
801    // TODO: ConsensusHandler doesn't really share metrics with AuthorityState. We could define
802    // a new metrics type here if we want to.
803    metrics: Arc<AuthorityMetrics>,
804    /// Lru cache to quickly discard transactions processed by consensus
805    processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
806    /// Enqueues transactions to the execution scheduler via a separate task.
807    execution_scheduler_sender: ExecutionSchedulerSender,
808    /// Consensus adapter for submitting transactions to consensus
809    consensus_adapter: Arc<ConsensusAdapter>,
810
811    /// Using the throughput calculator to record the current consensus throughput
812    throughput_calculator: Arc<ConsensusThroughputCalculator>,
813
814    additional_consensus_state: AdditionalConsensusState,
815
816    backpressure_subscriber: BackpressureSubscriber,
817
818    traffic_controller: Option<Arc<TrafficController>>,
819
820    congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
821
822    checkpoint_queue: Mutex<CheckpointQueue>,
823}
824
825const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
826
827impl<C> ConsensusHandler<C> {
828    pub(crate) fn new(
829        epoch_store: Arc<AuthorityPerEpochStore>,
830        checkpoint_service: Arc<C>,
831        settlement_scheduler: SettlementScheduler,
832        consensus_adapter: Arc<ConsensusAdapter>,
833        cache_reader: Arc<dyn ObjectCacheRead>,
834        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
835        committee: ConsensusCommittee,
836        metrics: Arc<AuthorityMetrics>,
837        throughput_calculator: Arc<ConsensusThroughputCalculator>,
838        backpressure_subscriber: BackpressureSubscriber,
839        traffic_controller: Option<Arc<TrafficController>>,
840        congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
841    ) -> Self {
842        assert!(
843            matches!(
844                epoch_store
845                    .protocol_config()
846                    .per_object_congestion_control_mode(),
847                PerObjectCongestionControlMode::ExecutionTimeEstimate(_)
848            ),
849            "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
850        );
851
852        // Recover last_consensus_stats so it is consistent across validators.
853        let mut last_consensus_stats = epoch_store
854            .get_last_consensus_stats()
855            .expect("Should be able to read last consensus index");
856        // stats is empty at the beginning of epoch.
857        if !last_consensus_stats.stats.is_initialized() {
858            last_consensus_stats.stats = ConsensusStats::new(committee.size());
859            if epoch_store
860                .protocol_config()
861                .split_checkpoints_in_consensus_handler()
862            {
863                last_consensus_stats.checkpoint_seq = epoch_store.previous_epoch_last_checkpoint();
864            }
865        }
866        let max_tx = epoch_store
867            .protocol_config()
868            .max_transactions_per_checkpoint() as usize;
869        let min_checkpoint_interval_ms = epoch_store
870            .protocol_config()
871            .min_checkpoint_interval_ms_as_option()
872            .unwrap_or_default();
873        let execution_scheduler_sender =
874            ExecutionSchedulerSender::start(settlement_scheduler, epoch_store.clone());
875        let commit_rate_estimate_window_size = epoch_store
876            .protocol_config()
877            .get_consensus_commit_rate_estimation_window_size();
878        let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
879        let checkpoint_height = last_consensus_stats.height;
880        let next_checkpoint_seq = if epoch_store
881            .protocol_config()
882            .split_checkpoints_in_consensus_handler()
883        {
884            last_consensus_stats.checkpoint_seq + 1
885        } else {
886            0
887        };
888        Self {
889            epoch_store,
890            last_consensus_stats,
891            checkpoint_service,
892            cache_reader,
893            low_scoring_authorities,
894            committee,
895            metrics,
896            processed_cache: LruCache::new(
897                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
898            ),
899            execution_scheduler_sender: execution_scheduler_sender.clone(),
900            consensus_adapter,
901            throughput_calculator,
902            additional_consensus_state: AdditionalConsensusState::new(
903                commit_rate_estimate_window_size,
904            ),
905            backpressure_subscriber,
906            traffic_controller,
907            congestion_logger,
908            checkpoint_queue: Mutex::new(CheckpointQueue::new(
909                last_built_timestamp,
910                checkpoint_height,
911                next_checkpoint_seq,
912                max_tx,
913                min_checkpoint_interval_ms,
914                execution_scheduler_sender,
915            )),
916        }
917    }
918
919    /// Returns the last subdag index processed by the handler.
920    pub(crate) fn last_processed_subdag_index(&self) -> u64 {
921        self.last_consensus_stats.index.sub_dag_index
922    }
923
924    pub(crate) fn new_for_testing(
925        epoch_store: Arc<AuthorityPerEpochStore>,
926        checkpoint_service: Arc<C>,
927        execution_scheduler_sender: ExecutionSchedulerSender,
928        consensus_adapter: Arc<ConsensusAdapter>,
929        cache_reader: Arc<dyn ObjectCacheRead>,
930        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
931        committee: ConsensusCommittee,
932        metrics: Arc<AuthorityMetrics>,
933        throughput_calculator: Arc<ConsensusThroughputCalculator>,
934        backpressure_subscriber: BackpressureSubscriber,
935        traffic_controller: Option<Arc<TrafficController>>,
936        last_consensus_stats: ExecutionIndicesWithStatsV2,
937    ) -> Self {
938        let commit_rate_estimate_window_size = epoch_store
939            .protocol_config()
940            .get_consensus_commit_rate_estimation_window_size();
941        let max_tx = epoch_store
942            .protocol_config()
943            .max_transactions_per_checkpoint() as usize;
944        let min_checkpoint_interval_ms = epoch_store
945            .protocol_config()
946            .min_checkpoint_interval_ms_as_option()
947            .unwrap_or_default();
948        let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
949        let checkpoint_height = last_consensus_stats.height;
950        Self {
951            epoch_store,
952            last_consensus_stats,
953            checkpoint_service,
954            cache_reader,
955            low_scoring_authorities,
956            committee,
957            metrics,
958            processed_cache: LruCache::new(
959                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
960            ),
961            execution_scheduler_sender: execution_scheduler_sender.clone(),
962            consensus_adapter,
963            throughput_calculator,
964            additional_consensus_state: AdditionalConsensusState::new(
965                commit_rate_estimate_window_size,
966            ),
967            backpressure_subscriber,
968            traffic_controller,
969            congestion_logger: None,
970            checkpoint_queue: Mutex::new(CheckpointQueue::new(
971                last_built_timestamp,
972                checkpoint_height,
973                0,
974                max_tx,
975                min_checkpoint_interval_ms,
976                execution_scheduler_sender,
977            )),
978        }
979    }
980}
981
982#[derive(Default)]
983struct CommitHandlerInput {
984    user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
985    capability_notifications: Vec<AuthorityCapabilitiesV2>,
986    execution_time_observations: Vec<ExecutionTimeObservation>,
987    checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
988    randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
989    randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
990    end_of_publish_transactions: Vec<AuthorityName>,
991    new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
992}
993
994struct CommitHandlerState {
995    dkg_failed: bool,
996    randomness_round: Option<RandomnessRound>,
997    output: ConsensusCommitOutput,
998    indirect_state_observer: Option<IndirectStateObserver>,
999    initial_reconfig_state: ReconfigState,
1000    // Occurrence counts for user transactions, used for unpaid amplification detection.
1001    occurrence_counts: HashMap<TransactionDigest, u32>,
1002}
1003
1004impl CommitHandlerState {
1005    fn get_notifications(&self) -> Vec<SequencedConsensusTransactionKey> {
1006        self.output
1007            .get_consensus_messages_processed()
1008            .cloned()
1009            .collect()
1010    }
1011
1012    fn init_randomness<'a, 'epoch>(
1013        &'a mut self,
1014        epoch_store: &'epoch AuthorityPerEpochStore,
1015        commit_info: &'a ConsensusCommitInfo,
1016    ) -> Option<tokio::sync::MutexGuard<'epoch, RandomnessManager>> {
1017        let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| {
1018            rm.try_lock()
1019                .expect("should only ever be called from the commit handler thread")
1020        });
1021
1022        let mut dkg_failed = false;
1023        let randomness_round = if epoch_store.randomness_state_enabled() {
1024            let randomness_manager = randomness_manager
1025                .as_mut()
1026                .expect("randomness manager should exist if randomness is enabled");
1027            match randomness_manager.dkg_status() {
1028                DkgStatus::Pending => None,
1029                DkgStatus::Failed => {
1030                    dkg_failed = true;
1031                    None
1032                }
1033                DkgStatus::Successful => {
1034                    // Generate randomness for this commit if DKG is successful and we are still
1035                    // accepting certs.
1036                    if self.initial_reconfig_state.should_accept_tx() {
1037                        randomness_manager
1038                            // TODO: make infallible
1039                            .reserve_next_randomness(commit_info.timestamp, &mut self.output)
1040                            .expect("epoch ended")
1041                    } else {
1042                        None
1043                    }
1044                }
1045            }
1046        } else {
1047            None
1048        };
1049
1050        if randomness_round.is_some() {
1051            assert!(!dkg_failed); // invariant check
1052        }
1053
1054        self.randomness_round = randomness_round;
1055        self.dkg_failed = dkg_failed;
1056
1057        randomness_manager
1058    }
1059}
1060
1061impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
1062    /// Called during startup to allow us to observe commits we previously processed, for crash recovery.
1063    /// Any state computed here must be a pure function of the commits observed, it cannot depend on any
1064    /// state recorded in the epoch db.
1065    fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
1066        assert!(
1067            self.epoch_store
1068                .protocol_config()
1069                .record_additional_state_digest_in_prologue()
1070        );
1071        let protocol_config = self.epoch_store.protocol_config();
1072        let epoch_start_time = self
1073            .epoch_store
1074            .epoch_start_config()
1075            .epoch_start_timestamp_ms();
1076
1077        self.additional_consensus_state.observe_commit(
1078            protocol_config,
1079            epoch_start_time,
1080            &consensus_commit,
1081        );
1082    }
1083
1084    #[cfg(test)]
1085    pub(crate) async fn handle_consensus_commit_for_test(
1086        &mut self,
1087        consensus_commit: impl ConsensusCommitAPI,
1088    ) {
1089        self.handle_consensus_commit(consensus_commit).await;
1090    }
1091
1092    #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))]
1093    pub(crate) async fn handle_consensus_commit(
1094        &mut self,
1095        consensus_commit: impl ConsensusCommitAPI,
1096    ) {
1097        {
1098            let protocol_config = self.epoch_store.protocol_config();
1099
1100            // Assert all protocol config settings for which we don't support old behavior.
1101            assert!(protocol_config.ignore_execution_time_observations_after_certs_closed());
1102            assert!(protocol_config.record_time_estimate_processed());
1103            assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints());
1104            assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest());
1105            assert!(protocol_config.authority_capabilities_v2());
1106            assert!(protocol_config.cancel_for_failed_dkg_early());
1107        }
1108
1109        // This may block until one of two conditions happens:
1110        // - Number of uncommitted transactions in the writeback cache goes below the
1111        //   backpressure threshold.
1112        // - The highest executed checkpoint catches up to the highest certified checkpoint.
1113        self.backpressure_subscriber.await_no_backpressure().await;
1114
1115        let epoch = self.epoch_store.epoch();
1116
1117        let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit");
1118
1119        let last_committed_round = self.last_consensus_stats.index.last_committed_round;
1120
1121        if let Some(consensus_tx_status_cache) = self.epoch_store.consensus_tx_status_cache.as_ref()
1122        {
1123            consensus_tx_status_cache
1124                .update_last_committed_leader_round(last_committed_round as u32)
1125                .await;
1126        }
1127        if let Some(tx_reject_reason_cache) = self.epoch_store.tx_reject_reason_cache.as_ref() {
1128            tx_reject_reason_cache.set_last_committed_leader_round(last_committed_round as u32);
1129        }
1130
1131        let commit_info = self.additional_consensus_state.observe_commit(
1132            self.epoch_store.protocol_config(),
1133            self.epoch_store
1134                .epoch_start_config()
1135                .epoch_start_timestamp_ms(),
1136            &consensus_commit,
1137        );
1138        assert!(commit_info.round > last_committed_round);
1139
1140        let (timestamp, leader_author, commit_sub_dag_index) =
1141            self.gather_commit_metadata(&consensus_commit);
1142
1143        info!(
1144            %consensus_commit,
1145            "Received consensus output {}. Rejected transactions {}",
1146            consensus_commit.commit_ref(),
1147            consensus_commit.rejected_transactions_debug_string(),
1148        );
1149
1150        self.last_consensus_stats.index = ExecutionIndices {
1151            last_committed_round: commit_info.round,
1152            sub_dag_index: commit_sub_dag_index,
1153            transaction_index: 0_u64,
1154        };
1155
1156        update_low_scoring_authorities(
1157            self.low_scoring_authorities.clone(),
1158            self.epoch_store.committee(),
1159            &self.committee,
1160            consensus_commit.reputation_score_sorted_desc(),
1161            &self.metrics,
1162            self.epoch_store
1163                .protocol_config()
1164                .consensus_bad_nodes_stake_threshold(),
1165        );
1166
1167        self.metrics
1168            .consensus_committed_subdags
1169            .with_label_values(&[&leader_author.to_string()])
1170            .inc();
1171
1172        let mut state = CommitHandlerState {
1173            output: ConsensusCommitOutput::new(commit_info.round),
1174            dkg_failed: false,
1175            randomness_round: None,
1176            indirect_state_observer: Some(IndirectStateObserver::new()),
1177            initial_reconfig_state: self
1178                .epoch_store
1179                .get_reconfig_state_read_lock_guard()
1180                .clone(),
1181            occurrence_counts: HashMap::new(),
1182        };
1183
1184        let FilteredConsensusOutput {
1185            transactions,
1186            owned_object_locks,
1187        } = self.filter_consensus_txns(
1188            state.initial_reconfig_state.clone(),
1189            &commit_info,
1190            &consensus_commit,
1191        );
1192        // Buffer owned object locks for batch write when preconsensus locking is disabled
1193        if !owned_object_locks.is_empty() {
1194            state.output.set_owned_object_locks(owned_object_locks);
1195        }
1196        let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions);
1197
1198        let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info);
1199
1200        let CommitHandlerInput {
1201            user_transactions,
1202            capability_notifications,
1203            execution_time_observations,
1204            checkpoint_signature_messages,
1205            randomness_dkg_messages,
1206            randomness_dkg_confirmations,
1207            end_of_publish_transactions,
1208            new_jwks,
1209        } = self.build_commit_handler_input(transactions);
1210
1211        self.process_jwks(&mut state, &commit_info, new_jwks);
1212        self.process_capability_notifications(capability_notifications);
1213        self.process_execution_time_observations(&mut state, execution_time_observations);
1214        self.process_checkpoint_signature_messages(checkpoint_signature_messages);
1215
1216        self.process_dkg_updates(
1217            &mut state,
1218            &commit_info,
1219            randomness_manager.as_deref_mut(),
1220            randomness_dkg_messages,
1221            randomness_dkg_confirmations,
1222        )
1223        .await;
1224
1225        let mut execution_time_estimator = self
1226            .epoch_store
1227            .execution_time_estimator
1228            .try_lock()
1229            .expect("should only ever be called from the commit handler thread");
1230
1231        let authenticator_state_update_transaction =
1232            self.create_authenticator_state_update(last_committed_round, &commit_info);
1233
1234        let split_checkpoints = self
1235            .epoch_store
1236            .protocol_config()
1237            .split_checkpoints_in_consensus_handler();
1238
1239        let (lock, final_round, num_schedulables, checkpoint_height) = if !split_checkpoints {
1240            let (schedulables, randomness_schedulables, assigned_versions) = self
1241                .process_transactions(
1242                    &mut state,
1243                    &mut execution_time_estimator,
1244                    &commit_info,
1245                    authenticator_state_update_transaction,
1246                    user_transactions,
1247                );
1248
1249            let (should_accept_tx, lock, final_round) =
1250                self.handle_eop(&mut state, end_of_publish_transactions);
1251
1252            let make_checkpoint = should_accept_tx || final_round;
1253            if !make_checkpoint {
1254                // No need for any further processing
1255                return;
1256            }
1257
1258            // If this is the final round, record execution time observations for storage in the
1259            // end-of-epoch tx.
1260            if final_round {
1261                self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1262            }
1263
1264            let checkpoint_height = self
1265                .epoch_store
1266                .calculate_pending_checkpoint_height(commit_info.round);
1267
1268            self.create_pending_checkpoints(
1269                &mut state,
1270                &commit_info,
1271                &schedulables,
1272                &randomness_schedulables,
1273                final_round,
1274            );
1275
1276            let num_schedulables = schedulables.len();
1277            let mut all_schedulables = schedulables;
1278            all_schedulables.extend(randomness_schedulables);
1279            let assigned_versions = assigned_versions.into_map();
1280            let paired: Vec<_> = all_schedulables
1281                .into_iter()
1282                .map(|s| {
1283                    let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
1284                    (s, versions)
1285                })
1286                .collect();
1287            self.execution_scheduler_sender.send(paired, None);
1288
1289            (lock, final_round, num_schedulables, checkpoint_height)
1290        } else {
1291            let (
1292                transactions_to_schedule,
1293                randomness_transactions_to_schedule,
1294                cancelled_txns,
1295                randomness_state_update_transaction,
1296            ) = self.collect_transactions_to_schedule(
1297                &mut state,
1298                &mut execution_time_estimator,
1299                &commit_info,
1300                user_transactions,
1301            );
1302
1303            let (should_accept_tx, lock, final_round) =
1304                self.handle_eop(&mut state, end_of_publish_transactions);
1305
1306            let make_checkpoint = should_accept_tx || final_round;
1307            if !make_checkpoint {
1308                // No need for any further processing
1309                return;
1310            }
1311
1312            // If this is the final round, record execution time observations for storage in the
1313            // end-of-epoch tx.
1314            if final_round {
1315                self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1316            }
1317
1318            let epoch = self.epoch_store.epoch();
1319            let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1320                .then_some(Schedulable::ConsensusCommitPrologue(
1321                    epoch,
1322                    commit_info.round,
1323                    commit_info.consensus_commit_ref.index,
1324                ));
1325
1326            let schedulables: Vec<_> = itertools::chain!(
1327                consensus_commit_prologue.into_iter(),
1328                authenticator_state_update_transaction
1329                    .into_iter()
1330                    .map(Schedulable::Transaction),
1331                transactions_to_schedule
1332                    .into_iter()
1333                    .map(Schedulable::Transaction),
1334            )
1335            .collect();
1336
1337            let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1338                .into_iter()
1339                .chain(
1340                    randomness_transactions_to_schedule
1341                        .into_iter()
1342                        .map(Schedulable::Transaction),
1343                )
1344                .collect();
1345
1346            let num_schedulables = schedulables.len();
1347            let checkpoint_height = self.create_pending_checkpoints_v2(
1348                &mut state,
1349                &commit_info,
1350                schedulables,
1351                randomness_schedulables,
1352                &cancelled_txns,
1353                final_round,
1354            );
1355
1356            (lock, final_round, num_schedulables, checkpoint_height)
1357        };
1358
1359        let notifications = state.get_notifications();
1360
1361        let mut stats_to_record = self.last_consensus_stats.clone();
1362        stats_to_record.height = checkpoint_height;
1363        if split_checkpoints {
1364            let queue = self.checkpoint_queue.lock().unwrap();
1365            stats_to_record.last_checkpoint_flush_timestamp = queue.last_built_timestamp();
1366            stats_to_record.checkpoint_seq = queue.checkpoint_seq();
1367        }
1368
1369        state.output.record_consensus_commit_stats(stats_to_record);
1370
1371        self.record_deferral_deletion(&mut state);
1372
1373        self.epoch_store
1374            .consensus_quarantine
1375            .write()
1376            .push_consensus_output(state.output, &self.epoch_store)
1377            .expect("push_consensus_output should not fail");
1378
1379        debug!(
1380            ?commit_info.round,
1381            "Notifying checkpoint service about new pending checkpoint(s)",
1382        );
1383        self.checkpoint_service
1384            .notify_checkpoint()
1385            .expect("failed to notify checkpoint service");
1386
1387        if let Some(randomness_round) = state.randomness_round {
1388            randomness_manager
1389                .as_ref()
1390                .expect("randomness manager should exist if randomness round is provided")
1391                .generate_randomness(epoch, randomness_round);
1392        }
1393
1394        self.epoch_store.process_notifications(notifications.iter());
1395
1396        // pass lock by value to ensure that it is held until this point
1397        self.log_final_round(lock, final_round);
1398
1399        // update the calculated throughput
1400        self.throughput_calculator
1401            .add_transactions(timestamp, num_schedulables as u64);
1402
1403        fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
1404            let key = [commit_sub_dag_index, epoch];
1405            if sui_simulator::random::deterministic_probability_once(&key, 0.01) {
1406                sui_simulator::task::kill_current_node(None);
1407            }
1408        });
1409
1410        fail_point!("crash");
1411
1412        self.send_end_of_publish_if_needed().await;
1413    }
1414
1415    fn handle_eop(
1416        &self,
1417        state: &mut CommitHandlerState,
1418        end_of_publish_transactions: Vec<AuthorityName>,
1419    ) -> (bool, Option<RwLockWriteGuard<'_, ReconfigState>>, bool) {
1420        let collected_eop =
1421            self.process_end_of_publish_transactions(state, end_of_publish_transactions);
1422        if collected_eop {
1423            let (lock, final_round) = self.advance_eop_state_machine(state);
1424            (lock.should_accept_tx(), Some(lock), final_round)
1425        } else {
1426            (true, None, false)
1427        }
1428    }
1429
1430    fn record_end_of_epoch_execution_time_observations(
1431        &self,
1432        estimator: &mut ExecutionTimeEstimator,
1433    ) {
1434        self.epoch_store
1435            .end_of_epoch_execution_time_observations
1436            .set(estimator.take_observations())
1437            .expect("`stored_execution_time_observations` should only be set once at end of epoch");
1438    }
1439
1440    fn record_deferral_deletion(&self, state: &mut CommitHandlerState) {
1441        let mut deferred_transactions = self
1442            .epoch_store
1443            .consensus_output_cache
1444            .deferred_transactions
1445            .lock();
1446        for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() {
1447            deferred_transactions.remove(&deleted_deferred_key);
1448        }
1449    }
1450
1451    fn log_final_round(&self, lock: Option<RwLockWriteGuard<ReconfigState>>, final_round: bool) {
1452        if final_round {
1453            let epoch = self.epoch_store.epoch();
1454            info!(
1455                ?epoch,
1456                lock=?lock.as_ref(),
1457                final_round=?final_round,
1458                "Notified last checkpoint"
1459            );
1460            self.epoch_store.record_end_of_message_quorum_time_metric();
1461        }
1462    }
1463
1464    fn create_pending_checkpoints(
1465        &self,
1466        state: &mut CommitHandlerState,
1467        commit_info: &ConsensusCommitInfo,
1468        schedulables: &[Schedulable],
1469        randomness_schedulables: &[Schedulable],
1470        final_round: bool,
1471    ) {
1472        assert!(
1473            !self
1474                .epoch_store
1475                .protocol_config()
1476                .split_checkpoints_in_consensus_handler()
1477        );
1478
1479        let checkpoint_height = self
1480            .epoch_store
1481            .calculate_pending_checkpoint_height(commit_info.round);
1482
1483        // Determine whether to write pending checkpoint for user tx with randomness.
1484        // - If randomness is not generated for this commit, we will skip the
1485        //   checkpoint with the associated height. Therefore checkpoint heights may
1486        //   not be contiguous.
1487        // - Exception: if DKG fails, we always need to write out a PendingCheckpoint
1488        //   for randomness tx that are canceled.
1489        let should_write_random_checkpoint = state.randomness_round.is_some()
1490            || (state.dkg_failed && !randomness_schedulables.is_empty());
1491
1492        let pending_checkpoint = PendingCheckpoint {
1493            roots: schedulables.iter().map(|s| s.key()).collect(),
1494            details: PendingCheckpointInfo {
1495                timestamp_ms: commit_info.timestamp,
1496                last_of_epoch: final_round && !should_write_random_checkpoint,
1497                checkpoint_height,
1498                consensus_commit_ref: commit_info.consensus_commit_ref,
1499                rejected_transactions_digest: commit_info.rejected_transactions_digest,
1500                checkpoint_seq: None,
1501            },
1502        };
1503        self.epoch_store
1504            .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1505            .expect("failed to write pending checkpoint");
1506
1507        info!(
1508            "Written pending checkpoint: {:?}",
1509            pending_checkpoint.details,
1510        );
1511
1512        if should_write_random_checkpoint {
1513            let pending_checkpoint = PendingCheckpoint {
1514                roots: randomness_schedulables.iter().map(|s| s.key()).collect(),
1515                details: PendingCheckpointInfo {
1516                    timestamp_ms: commit_info.timestamp,
1517                    last_of_epoch: final_round,
1518                    checkpoint_height: checkpoint_height + 1,
1519                    consensus_commit_ref: commit_info.consensus_commit_ref,
1520                    rejected_transactions_digest: commit_info.rejected_transactions_digest,
1521                    checkpoint_seq: None,
1522                },
1523            };
1524            self.epoch_store
1525                .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1526                .expect("failed to write pending checkpoint");
1527        }
1528    }
1529
1530    #[allow(clippy::type_complexity)]
1531    fn collect_transactions_to_schedule(
1532        &self,
1533        state: &mut CommitHandlerState,
1534        execution_time_estimator: &mut ExecutionTimeEstimator,
1535        commit_info: &ConsensusCommitInfo,
1536        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1537    ) -> (
1538        Vec<VerifiedExecutableTransactionWithAliases>,
1539        Vec<VerifiedExecutableTransactionWithAliases>,
1540        BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1541        Option<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1542    ) {
1543        let protocol_config = self.epoch_store.protocol_config();
1544        let epoch = self.epoch_store.epoch();
1545
1546        let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) =
1547            self.merge_and_reorder_transactions(state, commit_info, user_transactions);
1548
1549        let mut shared_object_congestion_tracker =
1550            self.init_congestion_tracker(commit_info, false, &ordered_txns);
1551        let mut shared_object_using_randomness_congestion_tracker =
1552            self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns);
1553
1554        let randomness_state_update_transaction = state
1555            .randomness_round
1556            .map(|round| Schedulable::RandomnessStateUpdate(epoch, round));
1557        debug!(
1558            "Randomness state update transaction: {:?}",
1559            randomness_state_update_transaction
1560                .as_ref()
1561                .map(|t| t.key())
1562        );
1563
1564        let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len());
1565        let mut randomness_transactions_to_schedule =
1566            Vec::with_capacity(ordered_randomness_txns.len());
1567        let mut deferred_txns = BTreeMap::new();
1568        let mut cancelled_txns = BTreeMap::new();
1569
1570        for transaction in ordered_txns {
1571            self.handle_deferral_and_cancellation(
1572                state,
1573                &mut cancelled_txns,
1574                &mut deferred_txns,
1575                &mut transactions_to_schedule,
1576                protocol_config,
1577                commit_info,
1578                transaction,
1579                &mut shared_object_congestion_tracker,
1580                &previously_deferred_tx_digests,
1581                execution_time_estimator,
1582            );
1583        }
1584
1585        for transaction in ordered_randomness_txns {
1586            if state.dkg_failed {
1587                debug!(
1588                    "Canceling randomness-using transaction {:?} because DKG failed",
1589                    transaction.tx().digest(),
1590                );
1591                cancelled_txns.insert(
1592                    *transaction.tx().digest(),
1593                    CancelConsensusCertificateReason::DkgFailed,
1594                );
1595                randomness_transactions_to_schedule.push(transaction);
1596                continue;
1597            }
1598            self.handle_deferral_and_cancellation(
1599                state,
1600                &mut cancelled_txns,
1601                &mut deferred_txns,
1602                &mut randomness_transactions_to_schedule,
1603                protocol_config,
1604                commit_info,
1605                transaction,
1606                &mut shared_object_using_randomness_congestion_tracker,
1607                &previously_deferred_tx_digests,
1608                execution_time_estimator,
1609            );
1610        }
1611
1612        let mut total_deferred_txns = 0;
1613        {
1614            let mut deferred_transactions = self
1615                .epoch_store
1616                .consensus_output_cache
1617                .deferred_transactions
1618                .lock();
1619            for (key, txns) in deferred_txns.into_iter() {
1620                total_deferred_txns += txns.len();
1621                deferred_transactions.insert(key, txns.clone());
1622                state.output.defer_transactions(key, txns);
1623            }
1624        }
1625
1626        self.metrics
1627            .consensus_handler_deferred_transactions
1628            .inc_by(total_deferred_txns as u64);
1629        self.metrics
1630            .consensus_handler_cancelled_transactions
1631            .inc_by(cancelled_txns.len() as u64);
1632        self.metrics
1633            .consensus_handler_max_object_costs
1634            .with_label_values(&["regular_commit"])
1635            .set(shared_object_congestion_tracker.max_cost() as i64);
1636        self.metrics
1637            .consensus_handler_max_object_costs
1638            .with_label_values(&["randomness_commit"])
1639            .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
1640
1641        let congestion_commit_data = shared_object_congestion_tracker.finish_commit(commit_info);
1642        let randomness_congestion_commit_data =
1643            shared_object_using_randomness_congestion_tracker.finish_commit(commit_info);
1644
1645        if let Some(logger) = &self.congestion_logger {
1646            let epoch = self.epoch_store.epoch();
1647            let mut logger = logger.lock().unwrap();
1648            logger.write_commit_log(epoch, commit_info, false, &congestion_commit_data);
1649            logger.write_commit_log(epoch, commit_info, true, &randomness_congestion_commit_data);
1650        }
1651
1652        if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get()
1653            && let Err(e) = tx_object_debts.try_send(
1654                congestion_commit_data
1655                    .accumulated_debts
1656                    .iter()
1657                    .chain(randomness_congestion_commit_data.accumulated_debts.iter())
1658                    .map(|(id, _)| *id)
1659                    .collect(),
1660            )
1661        {
1662            info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
1663        }
1664
1665        state
1666            .output
1667            .set_congestion_control_object_debts(congestion_commit_data.accumulated_debts);
1668        state.output.set_congestion_control_randomness_object_debts(
1669            randomness_congestion_commit_data.accumulated_debts,
1670        );
1671
1672        (
1673            transactions_to_schedule,
1674            randomness_transactions_to_schedule,
1675            cancelled_txns,
1676            randomness_state_update_transaction,
1677        )
1678    }
1679
1680    fn process_transactions(
1681        &self,
1682        state: &mut CommitHandlerState,
1683        execution_time_estimator: &mut ExecutionTimeEstimator,
1684        commit_info: &ConsensusCommitInfo,
1685        authenticator_state_update_transaction: Option<VerifiedExecutableTransactionWithAliases>,
1686        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1687    ) -> (Vec<Schedulable>, Vec<Schedulable>, AssignedTxAndVersions) {
1688        let protocol_config = self.epoch_store.protocol_config();
1689        assert!(!protocol_config.split_checkpoints_in_consensus_handler());
1690        let epoch = self.epoch_store.epoch();
1691
1692        let (
1693            transactions_to_schedule,
1694            randomness_transactions_to_schedule,
1695            cancelled_txns,
1696            randomness_state_update_transaction,
1697        ) = self.collect_transactions_to_schedule(
1698            state,
1699            execution_time_estimator,
1700            commit_info,
1701            user_transactions,
1702        );
1703
1704        let mut settlement = None;
1705        let mut randomness_settlement = None;
1706        if self.epoch_store.accumulators_enabled() {
1707            let checkpoint_height = self
1708                .epoch_store
1709                .calculate_pending_checkpoint_height(commit_info.round);
1710
1711            settlement = Some(Schedulable::AccumulatorSettlement(epoch, checkpoint_height));
1712
1713            if state.randomness_round.is_some() || !randomness_transactions_to_schedule.is_empty() {
1714                randomness_settlement = Some(Schedulable::AccumulatorSettlement(
1715                    epoch,
1716                    checkpoint_height + 1,
1717                ));
1718            }
1719        }
1720
1721        let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1722            .then_some(Schedulable::ConsensusCommitPrologue(
1723                epoch,
1724                commit_info.round,
1725                commit_info.consensus_commit_ref.index,
1726            ));
1727
1728        let schedulables: Vec<_> = itertools::chain!(
1729            consensus_commit_prologue.into_iter(),
1730            authenticator_state_update_transaction
1731                .into_iter()
1732                .map(Schedulable::Transaction),
1733            transactions_to_schedule
1734                .into_iter()
1735                .map(Schedulable::Transaction),
1736            settlement,
1737        )
1738        .collect();
1739
1740        let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1741            .into_iter()
1742            .chain(
1743                randomness_transactions_to_schedule
1744                    .into_iter()
1745                    .map(Schedulable::Transaction),
1746            )
1747            .chain(randomness_settlement)
1748            .collect();
1749
1750        let assigned_versions = self
1751            .epoch_store
1752            .process_consensus_transaction_shared_object_versions(
1753                self.cache_reader.as_ref(),
1754                schedulables.iter(),
1755                randomness_schedulables.iter(),
1756                &cancelled_txns,
1757                &mut state.output,
1758            )
1759            .expect("failed to assign shared object versions");
1760
1761        let consensus_commit_prologue =
1762            self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1763
1764        let mut schedulables = schedulables;
1765        let mut assigned_versions = assigned_versions;
1766        if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1767            assert!(matches!(
1768                schedulables[0],
1769                Schedulable::ConsensusCommitPrologue(..)
1770            ));
1771            assert!(matches!(
1772                assigned_versions.0[0].0,
1773                TransactionKey::ConsensusCommitPrologue(..)
1774            ));
1775            assigned_versions.0[0].0 =
1776                TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1777            schedulables[0] = Schedulable::Transaction(consensus_commit_prologue);
1778        }
1779
1780        self.epoch_store
1781            .process_user_signatures(schedulables.iter().chain(randomness_schedulables.iter()));
1782
1783        // After this point we can throw away alias version information.
1784        let schedulables: Vec<Schedulable> = schedulables.into_iter().map(|s| s.into()).collect();
1785        let randomness_schedulables: Vec<Schedulable> = randomness_schedulables
1786            .into_iter()
1787            .map(|s| s.into())
1788            .collect();
1789
1790        (schedulables, randomness_schedulables, assigned_versions)
1791    }
1792
1793    #[allow(clippy::type_complexity)]
1794    fn create_pending_checkpoints_v2(
1795        &self,
1796        state: &mut CommitHandlerState,
1797        commit_info: &ConsensusCommitInfo,
1798        schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1799        randomness_schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1800        cancelled_txns: &BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1801        final_round: bool,
1802    ) -> CheckpointHeight {
1803        let protocol_config = self.epoch_store.protocol_config();
1804        assert!(protocol_config.split_checkpoints_in_consensus_handler());
1805
1806        let epoch = self.epoch_store.epoch();
1807        let accumulators_enabled = self.epoch_store.accumulators_enabled();
1808        let max_transactions_per_checkpoint =
1809            protocol_config.max_transactions_per_checkpoint() as usize;
1810
1811        let should_write_random_checkpoint = state.randomness_round.is_some()
1812            || (state.dkg_failed && !randomness_schedulables.is_empty());
1813
1814        let mut checkpoint_queue = self.checkpoint_queue.lock().unwrap();
1815
1816        let build_chunks =
1817            |schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1818             queue: &mut CheckpointQueue|
1819             -> Vec<Chunk<VerifiedExecutableTransactionWithAliases>> {
1820                schedulables
1821                    .chunks(max_transactions_per_checkpoint)
1822                    .map(|chunk| {
1823                        let height = queue.next_height();
1824                        let schedulables: Vec<_> = chunk.to_vec();
1825                        let settlement = if accumulators_enabled {
1826                            Some(Schedulable::AccumulatorSettlement(epoch, height))
1827                        } else {
1828                            None
1829                        };
1830                        Chunk {
1831                            schedulables,
1832                            settlement,
1833                            height,
1834                        }
1835                    })
1836                    .collect()
1837            };
1838
1839        let num_schedulables = schedulables.len();
1840        let chunked_schedulables = build_chunks(schedulables, &mut checkpoint_queue);
1841        if chunked_schedulables.len() > 1 {
1842            info!(
1843                "Splitting transactions into {} checkpoint chunks (num_schedulables={}, max_tx={})",
1844                chunked_schedulables.len(),
1845                num_schedulables,
1846                max_transactions_per_checkpoint
1847            );
1848            assert_reachable!("checkpoint split due to transaction limit");
1849        }
1850        let chunked_randomness_schedulables = if should_write_random_checkpoint {
1851            build_chunks(randomness_schedulables, &mut checkpoint_queue)
1852        } else {
1853            vec![]
1854        };
1855
1856        let schedulables_for_version_assignment =
1857            Chunk::all_schedulables_from(&chunked_schedulables);
1858        let randomness_schedulables_for_version_assignment =
1859            Chunk::all_schedulables_from(&chunked_randomness_schedulables);
1860
1861        let assigned_versions = self
1862            .epoch_store
1863            .process_consensus_transaction_shared_object_versions(
1864                self.cache_reader.as_ref(),
1865                schedulables_for_version_assignment,
1866                randomness_schedulables_for_version_assignment,
1867                cancelled_txns,
1868                &mut state.output,
1869            )
1870            .expect("failed to assign shared object versions");
1871
1872        let consensus_commit_prologue =
1873            self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1874
1875        let mut chunked_schedulables = chunked_schedulables;
1876        let mut assigned_versions = assigned_versions;
1877        if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1878            assert!(matches!(
1879                chunked_schedulables[0].schedulables[0],
1880                Schedulable::ConsensusCommitPrologue(..)
1881            ));
1882            assert!(matches!(
1883                assigned_versions.0[0].0,
1884                TransactionKey::ConsensusCommitPrologue(..)
1885            ));
1886            assigned_versions.0[0].0 =
1887                TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1888            chunked_schedulables[0].schedulables[0] =
1889                Schedulable::Transaction(consensus_commit_prologue);
1890        }
1891
1892        let assigned_versions = assigned_versions.into_map();
1893
1894        self.epoch_store.process_user_signatures(
1895            chunked_schedulables
1896                .iter()
1897                .flat_map(|c| c.all_schedulables())
1898                .chain(
1899                    chunked_randomness_schedulables
1900                        .iter()
1901                        .flat_map(|c| c.all_schedulables()),
1902                ),
1903        );
1904
1905        let commit_height = chunked_randomness_schedulables
1906            .last()
1907            .or(chunked_schedulables.last())
1908            .map(|c| c.height)
1909            .expect("at least one checkpoint root must be created per commit");
1910
1911        let mut pending_checkpoints = Vec::new();
1912        for chunk in chunked_schedulables {
1913            pending_checkpoints.extend(checkpoint_queue.push_chunk(
1914                chunk.into(),
1915                &assigned_versions,
1916                commit_info.timestamp,
1917                commit_info.consensus_commit_ref,
1918                commit_info.rejected_transactions_digest,
1919            ));
1920        }
1921
1922        if protocol_config.merge_randomness_into_checkpoint() {
1923            // We don't want to block checkpoint formation for non-randomness schedulables
1924            // on randomness state update. Therefore, we include randomness chunks in the
1925            // subsequent checkpoint. First we flush the queue, then enqueue randomness
1926            // for merging into the subsequent commit.
1927            pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, final_round));
1928
1929            if should_write_random_checkpoint {
1930                for chunk in chunked_randomness_schedulables {
1931                    pending_checkpoints.extend(checkpoint_queue.push_chunk(
1932                        chunk.into(),
1933                        &assigned_versions,
1934                        commit_info.timestamp,
1935                        commit_info.consensus_commit_ref,
1936                        commit_info.rejected_transactions_digest,
1937                    ));
1938                }
1939                if final_round {
1940                    pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1941                }
1942            }
1943        } else {
1944            let force = final_round || should_write_random_checkpoint;
1945            pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, force));
1946
1947            if should_write_random_checkpoint {
1948                for chunk in chunked_randomness_schedulables {
1949                    pending_checkpoints.extend(checkpoint_queue.push_chunk(
1950                        chunk.into(),
1951                        &assigned_versions,
1952                        commit_info.timestamp,
1953                        commit_info.consensus_commit_ref,
1954                        commit_info.rejected_transactions_digest,
1955                    ));
1956                }
1957                pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1958            }
1959        }
1960
1961        if final_round && let Some(last) = pending_checkpoints.last_mut() {
1962            last.details.last_of_epoch = true;
1963        }
1964
1965        let queue_drained = checkpoint_queue.is_empty();
1966        drop(checkpoint_queue);
1967
1968        for pending_checkpoint in pending_checkpoints {
1969            debug!(
1970                checkpoint_height = pending_checkpoint.details.checkpoint_height,
1971                roots_count = pending_checkpoint.num_roots(),
1972                "Writing pending checkpoint",
1973            );
1974            self.epoch_store
1975                .write_pending_checkpoint_v2(&mut state.output, &pending_checkpoint)
1976                .expect("failed to write pending checkpoint");
1977        }
1978
1979        state.output.set_checkpoint_queue_drained(queue_drained);
1980
1981        commit_height
1982    }
1983
1984    // Adds the consensus commit prologue transaction to the beginning of input `transactions` to update
1985    // the system clock used in all transactions in the current consensus commit.
1986    // Returns the root of the consensus commit prologue transaction if it was added to the input.
1987    fn add_consensus_commit_prologue_transaction<'a>(
1988        &'a self,
1989        state: &'a mut CommitHandlerState,
1990        commit_info: &'a ConsensusCommitInfo,
1991        assigned_versions: &AssignedTxAndVersions,
1992    ) -> Option<VerifiedExecutableTransactionWithAliases> {
1993        {
1994            if commit_info.skip_consensus_commit_prologue_in_test {
1995                return None;
1996            }
1997        }
1998
1999        let mut cancelled_txn_version_assignment = Vec::new();
2000
2001        let protocol_config = self.epoch_store.protocol_config();
2002
2003        for (txn_key, assigned_versions) in assigned_versions.0.iter() {
2004            let Some(d) = txn_key.as_digest() else {
2005                continue;
2006            };
2007
2008            if !protocol_config.include_cancelled_randomness_txns_in_prologue()
2009                && assigned_versions
2010                    .shared_object_versions
2011                    .iter()
2012                    .any(|((id, _), _)| *id == SUI_RANDOMNESS_STATE_OBJECT_ID)
2013            {
2014                continue;
2015            }
2016
2017            if assigned_versions
2018                .shared_object_versions
2019                .iter()
2020                .any(|(_, version)| version.is_cancelled())
2021            {
2022                assert_reachable!("cancelled transactions");
2023                cancelled_txn_version_assignment
2024                    .push((*d, assigned_versions.shared_object_versions.clone()));
2025            }
2026        }
2027
2028        fail_point_arg!(
2029            "additional_cancelled_txns_for_tests",
2030            |additional_cancelled_txns: Vec<(
2031                TransactionDigest,
2032                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>
2033            )>| {
2034                cancelled_txn_version_assignment.extend(additional_cancelled_txns);
2035            }
2036        );
2037
2038        let transaction = commit_info.create_consensus_commit_prologue_transaction(
2039            self.epoch_store.epoch(),
2040            self.epoch_store.protocol_config(),
2041            cancelled_txn_version_assignment,
2042            commit_info,
2043            state.indirect_state_observer.take().unwrap(),
2044        );
2045        Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2046            transaction,
2047        ))
2048    }
2049
2050    fn handle_deferral_and_cancellation(
2051        &self,
2052        state: &mut CommitHandlerState,
2053        cancelled_txns: &mut BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
2054        deferred_txns: &mut BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>,
2055        scheduled_txns: &mut Vec<VerifiedExecutableTransactionWithAliases>,
2056        protocol_config: &ProtocolConfig,
2057        commit_info: &ConsensusCommitInfo,
2058        transaction: VerifiedExecutableTransactionWithAliases,
2059        shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
2060        previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
2061        execution_time_estimator: &ExecutionTimeEstimator,
2062    ) {
2063        // Check for unpaid amplification before other deferral checks.
2064        // SIP-45: Paid amplification allows (gas_price / RGP + 1) submissions.
2065        // Transactions with more duplicates than paid for are deferred.
2066        if protocol_config.defer_unpaid_amplification() {
2067            let occurrence_count = state
2068                .occurrence_counts
2069                .get(transaction.tx().digest())
2070                .copied()
2071                .unwrap_or(0);
2072
2073            let rgp = self.epoch_store.reference_gas_price();
2074            let gas_price = transaction.tx().transaction_data().gas_price();
2075            let allowed_count = (gas_price / rgp.max(1)) + 1;
2076
2077            if occurrence_count as u64 > allowed_count {
2078                self.metrics
2079                    .consensus_handler_unpaid_amplification_deferrals
2080                    .inc();
2081
2082                let deferred_from_round = previously_deferred_tx_digests
2083                    .get(transaction.tx().digest())
2084                    .map(|k| k.deferred_from_round())
2085                    .unwrap_or(commit_info.round);
2086
2087                let deferral_key = DeferralKey::new_for_consensus_round(
2088                    commit_info.round + 1,
2089                    deferred_from_round,
2090                );
2091
2092                if transaction_deferral_within_limit(
2093                    &deferral_key,
2094                    protocol_config.max_deferral_rounds_for_congestion_control(),
2095                ) {
2096                    assert_reachable!("unpaid amplification deferral");
2097                    debug!(
2098                        "Deferring transaction {:?} due to unpaid amplification (count={}, allowed={})",
2099                        transaction.tx().digest(),
2100                        occurrence_count,
2101                        allowed_count
2102                    );
2103                    deferred_txns
2104                        .entry(deferral_key)
2105                        .or_default()
2106                        .push(transaction);
2107                    return;
2108                }
2109            }
2110        }
2111
2112        let tx_cost = shared_object_congestion_tracker.get_tx_cost(
2113            execution_time_estimator,
2114            transaction.tx(),
2115            state.indirect_state_observer.as_mut().unwrap(),
2116        );
2117
2118        let deferral_info = self.epoch_store.should_defer(
2119            transaction.tx(),
2120            commit_info,
2121            state.dkg_failed,
2122            state.randomness_round.is_some(),
2123            previously_deferred_tx_digests,
2124            shared_object_congestion_tracker,
2125        );
2126
2127        if let Some((deferral_key, deferral_reason)) = deferral_info {
2128            debug!(
2129                "Deferring consensus certificate for transaction {:?} until {:?}",
2130                transaction.tx().digest(),
2131                deferral_key
2132            );
2133
2134            match deferral_reason {
2135                DeferralReason::RandomnessNotReady => {
2136                    deferred_txns
2137                        .entry(deferral_key)
2138                        .or_default()
2139                        .push(transaction);
2140                }
2141                DeferralReason::SharedObjectCongestion(congested_objects) => {
2142                    self.metrics.consensus_handler_congested_transactions.inc();
2143                    if transaction_deferral_within_limit(
2144                        &deferral_key,
2145                        protocol_config.max_deferral_rounds_for_congestion_control(),
2146                    ) {
2147                        deferred_txns
2148                            .entry(deferral_key)
2149                            .or_default()
2150                            .push(transaction);
2151                    } else {
2152                        assert_sometimes!(
2153                            transaction.tx().data().transaction_data().uses_randomness(),
2154                            "cancelled randomness-using transaction"
2155                        );
2156                        assert_sometimes!(
2157                            !transaction.tx().data().transaction_data().uses_randomness(),
2158                            "cancelled non-randomness-using transaction"
2159                        );
2160
2161                        // Cancel the transaction that has been deferred for too long.
2162                        debug!(
2163                            "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}",
2164                            transaction.tx().digest(),
2165                            deferral_key,
2166                            congested_objects
2167                        );
2168                        cancelled_txns.insert(
2169                            *transaction.tx().digest(),
2170                            CancelConsensusCertificateReason::CongestionOnObjects(
2171                                congested_objects,
2172                            ),
2173                        );
2174                        scheduled_txns.push(transaction);
2175                    }
2176                }
2177            }
2178        } else {
2179            // Update object execution cost for all scheduled transactions
2180            shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, transaction.tx());
2181            scheduled_txns.push(transaction);
2182        }
2183    }
2184
2185    fn merge_and_reorder_transactions(
2186        &self,
2187        state: &mut CommitHandlerState,
2188        commit_info: &ConsensusCommitInfo,
2189        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
2190    ) -> (
2191        Vec<VerifiedExecutableTransactionWithAliases>,
2192        Vec<VerifiedExecutableTransactionWithAliases>,
2193        HashMap<TransactionDigest, DeferralKey>,
2194    ) {
2195        let protocol_config = self.epoch_store.protocol_config();
2196
2197        let (mut txns, mut randomness_txns, previously_deferred_tx_digests) =
2198            self.load_deferred_transactions(state, commit_info);
2199
2200        txns.reserve(user_transactions.len());
2201        randomness_txns.reserve(user_transactions.len());
2202
2203        // There may be randomness transactions in `txns`, which were deferred due to congestion.
2204        // They must be placed back into `randomness_txns`.
2205        let mut txns: Vec<_> = txns
2206            .into_iter()
2207            .filter_map(|tx| {
2208                if tx.tx().transaction_data().uses_randomness() {
2209                    randomness_txns.push(tx);
2210                    None
2211                } else {
2212                    Some(tx)
2213                }
2214            })
2215            .collect();
2216
2217        for txn in user_transactions {
2218            if txn.tx().transaction_data().uses_randomness() {
2219                randomness_txns.push(txn);
2220            } else {
2221                txns.push(txn);
2222            }
2223        }
2224
2225        PostConsensusTxReorder::reorder(
2226            &mut txns,
2227            protocol_config.consensus_transaction_ordering(),
2228        );
2229        PostConsensusTxReorder::reorder(
2230            &mut randomness_txns,
2231            protocol_config.consensus_transaction_ordering(),
2232        );
2233
2234        (txns, randomness_txns, previously_deferred_tx_digests)
2235    }
2236
2237    fn load_deferred_transactions(
2238        &self,
2239        state: &mut CommitHandlerState,
2240        commit_info: &ConsensusCommitInfo,
2241    ) -> (
2242        Vec<VerifiedExecutableTransactionWithAliases>,
2243        Vec<VerifiedExecutableTransactionWithAliases>,
2244        HashMap<TransactionDigest, DeferralKey>,
2245    ) {
2246        let mut previously_deferred_tx_digests = HashMap::new();
2247
2248        let deferred_txs: Vec<_> = self
2249            .epoch_store
2250            .load_deferred_transactions_for_up_to_consensus_round_v2(
2251                &mut state.output,
2252                commit_info.round,
2253            )
2254            .expect("db error")
2255            .into_iter()
2256            .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2257            .map(|(key, tx)| {
2258                previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2259                tx
2260            })
2261            .collect();
2262        trace!(
2263            "loading deferred transactions: {:?}",
2264            deferred_txs.iter().map(|tx| tx.tx().digest())
2265        );
2266
2267        let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() {
2268            let txns: Vec<_> = self
2269                .epoch_store
2270                .load_deferred_transactions_for_randomness_v2(&mut state.output)
2271                .expect("db error")
2272                .into_iter()
2273                .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2274                .map(|(key, tx)| {
2275                    previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2276                    tx
2277                })
2278                .collect();
2279            trace!(
2280                "loading deferred randomness transactions: {:?}",
2281                txns.iter().map(|tx| tx.tx().digest())
2282            );
2283            txns
2284        } else {
2285            vec![]
2286        };
2287
2288        (
2289            deferred_txs,
2290            deferred_randomness_txs,
2291            previously_deferred_tx_digests,
2292        )
2293    }
2294
2295    fn init_congestion_tracker(
2296        &self,
2297        commit_info: &ConsensusCommitInfo,
2298        for_randomness: bool,
2299        txns: &[VerifiedExecutableTransactionWithAliases],
2300    ) -> SharedObjectCongestionTracker {
2301        #[allow(unused_mut)]
2302        let mut ret = SharedObjectCongestionTracker::from_protocol_config(
2303            self.epoch_store
2304                .consensus_quarantine
2305                .read()
2306                .load_initial_object_debts(
2307                    &self.epoch_store,
2308                    commit_info.round,
2309                    for_randomness,
2310                    txns,
2311                )
2312                .expect("db error"),
2313            self.epoch_store.protocol_config(),
2314            for_randomness,
2315            self.congestion_logger.is_some(),
2316        );
2317
2318        fail_point_arg!(
2319            "initial_congestion_tracker",
2320            |tracker: SharedObjectCongestionTracker| {
2321                info!(
2322                    "Initialize shared_object_congestion_tracker to  {:?}",
2323                    tracker
2324                );
2325                ret = tracker;
2326            }
2327        );
2328
2329        ret
2330    }
2331
2332    fn process_jwks(
2333        &self,
2334        state: &mut CommitHandlerState,
2335        commit_info: &ConsensusCommitInfo,
2336        new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
2337    ) {
2338        for (authority_name, jwk_id, jwk) in new_jwks {
2339            self.epoch_store.record_jwk_vote(
2340                &mut state.output,
2341                commit_info.round,
2342                authority_name,
2343                &jwk_id,
2344                &jwk,
2345            );
2346        }
2347    }
2348
2349    fn process_capability_notifications(
2350        &self,
2351        capability_notifications: Vec<AuthorityCapabilitiesV2>,
2352    ) {
2353        for capabilities in capability_notifications {
2354            self.epoch_store
2355                .record_capabilities_v2(&capabilities)
2356                .expect("db error");
2357        }
2358    }
2359
2360    fn process_execution_time_observations(
2361        &self,
2362        state: &mut CommitHandlerState,
2363        execution_time_observations: Vec<ExecutionTimeObservation>,
2364    ) {
2365        let mut execution_time_estimator = self
2366            .epoch_store
2367            .execution_time_estimator
2368            .try_lock()
2369            .expect("should only ever be called from the commit handler thread");
2370
2371        for ExecutionTimeObservation {
2372            authority,
2373            generation,
2374            estimates,
2375        } in execution_time_observations
2376        {
2377            let authority_index = self
2378                .epoch_store
2379                .committee()
2380                .authority_index(&authority)
2381                .unwrap();
2382            execution_time_estimator.process_observations_from_consensus(
2383                authority_index,
2384                Some(generation),
2385                &estimates,
2386            );
2387            state
2388                .output
2389                .insert_execution_time_observation(authority_index, generation, estimates);
2390        }
2391    }
2392
2393    fn process_checkpoint_signature_messages(
2394        &self,
2395        checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
2396    ) {
2397        for checkpoint_signature_message in checkpoint_signature_messages {
2398            self.checkpoint_service
2399                .notify_checkpoint_signature(&checkpoint_signature_message)
2400                .expect("db error");
2401        }
2402    }
2403
2404    async fn process_dkg_updates(
2405        &self,
2406        state: &mut CommitHandlerState,
2407        commit_info: &ConsensusCommitInfo,
2408        randomness_manager: Option<&mut RandomnessManager>,
2409        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2410        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2411    ) {
2412        if !self.epoch_store.randomness_state_enabled() {
2413            let num_dkg_messages = randomness_dkg_messages.len();
2414            let num_dkg_confirmations = randomness_dkg_confirmations.len();
2415            if num_dkg_messages + num_dkg_confirmations > 0 {
2416                debug_fatal!(
2417                    "received {} RandomnessDkgMessage and {} RandomnessDkgConfirmation messages when randomness is not enabled",
2418                    num_dkg_messages,
2419                    num_dkg_confirmations
2420                );
2421            }
2422            return;
2423        }
2424
2425        let randomness_manager =
2426            randomness_manager.expect("randomness manager should exist if randomness is enabled");
2427
2428        let randomness_dkg_updates =
2429            self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages);
2430
2431        let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations(
2432            state,
2433            randomness_manager,
2434            randomness_dkg_confirmations,
2435        );
2436
2437        if randomness_dkg_updates || randomness_dkg_confirmation_updates {
2438            randomness_manager
2439                .advance_dkg(&mut state.output, commit_info.round)
2440                .await
2441                .expect("epoch ended");
2442        }
2443    }
2444
2445    fn process_randomness_dkg_messages(
2446        &self,
2447        randomness_manager: &mut RandomnessManager,
2448        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2449    ) -> bool /* randomness state updated */ {
2450        if randomness_dkg_messages.is_empty() {
2451            return false;
2452        }
2453
2454        let mut randomness_state_updated = false;
2455        for (authority, bytes) in randomness_dkg_messages {
2456            match bcs::from_bytes(&bytes) {
2457                Ok(message) => {
2458                    randomness_manager
2459                        .add_message(&authority, message)
2460                        // TODO: make infallible
2461                        .expect("epoch ended");
2462                    randomness_state_updated = true;
2463                }
2464
2465                Err(e) => {
2466                    warn!(
2467                        "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
2468                        authority.concise(),
2469                    );
2470                }
2471            }
2472        }
2473
2474        randomness_state_updated
2475    }
2476
2477    fn process_randomness_dkg_confirmations(
2478        &self,
2479        state: &mut CommitHandlerState,
2480        randomness_manager: &mut RandomnessManager,
2481        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2482    ) -> bool /* randomness state updated */ {
2483        if randomness_dkg_confirmations.is_empty() {
2484            return false;
2485        }
2486
2487        let mut randomness_state_updated = false;
2488        for (authority, bytes) in randomness_dkg_confirmations {
2489            match bcs::from_bytes(&bytes) {
2490                Ok(message) => {
2491                    randomness_manager
2492                        .add_confirmation(&mut state.output, &authority, message)
2493                        // TODO: make infallible
2494                        .expect("epoch ended");
2495                    randomness_state_updated = true;
2496                }
2497                Err(e) => {
2498                    warn!(
2499                        "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}",
2500                        authority.concise(),
2501                    );
2502                }
2503            }
2504        }
2505
2506        randomness_state_updated
2507    }
2508
2509    /// Returns true if we have collected a quorum of end of publish messages (either in this round or a previous round).
2510    fn process_end_of_publish_transactions(
2511        &self,
2512        state: &mut CommitHandlerState,
2513        end_of_publish_transactions: Vec<AuthorityName>,
2514    ) -> bool {
2515        let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect(
2516            "No contention on end_of_publish as it is only accessed from consensus handler",
2517        );
2518
2519        if eop_aggregator.has_quorum() {
2520            return true;
2521        }
2522
2523        if end_of_publish_transactions.is_empty() {
2524            return false;
2525        }
2526
2527        for authority in end_of_publish_transactions {
2528            info!("Received EndOfPublish from {:?}", authority.concise());
2529
2530            // It is ok to just release lock here as this function is the only place that transition into RejectAllCerts state
2531            // And this function itself is always executed from consensus task
2532            state.output.insert_end_of_publish(authority);
2533            if eop_aggregator
2534                .insert_generic(authority, ())
2535                .is_quorum_reached()
2536            {
2537                debug!(
2538                    "Collected enough end_of_publish messages with last message from validator {:?}",
2539                    authority.concise(),
2540                );
2541                return true;
2542            }
2543        }
2544
2545        false
2546    }
2547
2548    /// After we have collected 2f+1 EndOfPublish messages, we call this function every round until the epoch
2549    /// ends.
2550    fn advance_eop_state_machine(
2551        &self,
2552        state: &mut CommitHandlerState,
2553    ) -> (
2554        RwLockWriteGuard<'_, ReconfigState>,
2555        bool, // true if final round
2556    ) {
2557        let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard();
2558        let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx();
2559
2560        reconfig_state.close_all_certs();
2561
2562        let commit_has_deferred_txns = state.output.has_deferred_transactions();
2563        let previous_commits_have_deferred_txns = !self.epoch_store.deferred_transactions_empty();
2564
2565        if !commit_has_deferred_txns && !previous_commits_have_deferred_txns {
2566            if !start_state_is_reject_all_tx {
2567                info!("Transitioning to RejectAllTx");
2568            }
2569            reconfig_state.close_all_tx();
2570        } else {
2571            debug!(
2572                "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}",
2573                previous_commits_have_deferred_txns, commit_has_deferred_txns,
2574            );
2575        }
2576
2577        state.output.store_reconfig_state(reconfig_state.clone());
2578
2579        if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() {
2580            (reconfig_state, true)
2581        } else {
2582            (reconfig_state, false)
2583        }
2584    }
2585
2586    fn gather_commit_metadata(
2587        &self,
2588        consensus_commit: &impl ConsensusCommitAPI,
2589    ) -> (u64, AuthorityIndex, u64) {
2590        let timestamp = consensus_commit.commit_timestamp_ms();
2591        let leader_author = consensus_commit.leader_author_index();
2592        let commit_sub_dag_index = consensus_commit.commit_sub_dag_index();
2593
2594        let system_time_ms = SystemTime::now()
2595            .duration_since(UNIX_EPOCH)
2596            .unwrap()
2597            .as_millis() as i64;
2598
2599        let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64);
2600        let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0;
2601        self.metrics
2602            .consensus_timestamp_bias
2603            .observe(consensus_timestamp_bias_seconds);
2604
2605        let epoch_start = self
2606            .epoch_store
2607            .epoch_start_config()
2608            .epoch_start_timestamp_ms();
2609        let timestamp = if timestamp < epoch_start {
2610            error!(
2611                "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}"
2612            );
2613            epoch_start
2614        } else {
2615            timestamp
2616        };
2617
2618        (timestamp, leader_author, commit_sub_dag_index)
2619    }
2620
2621    fn create_authenticator_state_update(
2622        &self,
2623        last_committed_round: u64,
2624        commit_info: &ConsensusCommitInfo,
2625    ) -> Option<VerifiedExecutableTransactionWithAliases> {
2626        // Load all jwks that became active in the previous round, and commit them in this round.
2627        // We want to delay one round because none of the transactions in the previous round could
2628        // have been authenticated with the jwks that became active in that round.
2629        //
2630        // Because of this delay, jwks that become active in the last round of the epoch will
2631        // never be committed. That is ok, because in the new epoch, the validators should
2632        // immediately re-submit these jwks, and they can become active then.
2633        let new_jwks = self
2634            .epoch_store
2635            .get_new_jwks(last_committed_round)
2636            .expect("Unrecoverable error in consensus handler");
2637
2638        if !new_jwks.is_empty() {
2639            let authenticator_state_update_transaction = authenticator_state_update_transaction(
2640                &self.epoch_store,
2641                commit_info.round,
2642                new_jwks,
2643            );
2644            debug!(
2645                "adding AuthenticatorStateUpdate({:?}) tx: {:?}",
2646                authenticator_state_update_transaction.digest(),
2647                authenticator_state_update_transaction,
2648            );
2649
2650            Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2651                authenticator_state_update_transaction,
2652            ))
2653        } else {
2654            None
2655        }
2656    }
2657
2658    // Filters out rejected or deprecated transactions.
2659    // Returns FilteredConsensusOutput containing transactions and owned_object_locks
2660    // (collected when preconsensus locking is disabled).
2661    #[instrument(level = "trace", skip_all)]
2662    fn filter_consensus_txns(
2663        &mut self,
2664        initial_reconfig_state: ReconfigState,
2665        commit_info: &ConsensusCommitInfo,
2666        consensus_commit: &impl ConsensusCommitAPI,
2667    ) -> FilteredConsensusOutput {
2668        let mut transactions = Vec::new();
2669        let mut owned_object_locks = HashMap::new();
2670        let epoch = self.epoch_store.epoch();
2671        let mut num_finalized_user_transactions = vec![0; self.committee.size()];
2672        let mut num_rejected_user_transactions = vec![0; self.committee.size()];
2673        for (block, parsed_transactions) in consensus_commit.transactions() {
2674            let author = block.author.value();
2675            // TODO: consider only messages within 1~3 rounds of the leader?
2676            self.last_consensus_stats.stats.inc_num_messages(author);
2677
2678            // Set the "ping" transaction status for this block. This is necessary as there might be some ping requests waiting for the ping transaction to be certified.
2679            self.epoch_store.set_consensus_tx_status(
2680                ConsensusPosition::ping(epoch, block),
2681                ConsensusTxStatus::Finalized,
2682            );
2683
2684            for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() {
2685                let position = ConsensusPosition {
2686                    epoch,
2687                    block,
2688                    index: tx_index as TransactionIndex,
2689                };
2690
2691                // Transaction has appeared in consensus output, we can increment the submission count
2692                // for this tx for DoS protection.
2693                if self.epoch_store.protocol_config().mysticeti_fastpath()
2694                    && let Some(tx) = parsed.transaction.kind.as_user_transaction()
2695                {
2696                    let digest = tx.digest();
2697                    if let Some((spam_weight, submitter_client_addrs)) = self
2698                        .epoch_store
2699                        .submitted_transaction_cache
2700                        .increment_submission_count(digest)
2701                    {
2702                        if let Some(ref traffic_controller) = self.traffic_controller {
2703                            debug!(
2704                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses",
2705                                submitter_client_addrs.len()
2706                            );
2707
2708                            // Apply spam weight to all client addresses that submitted this transaction
2709                            for addr in submitter_client_addrs {
2710                                traffic_controller.tally(TrafficTally::new(
2711                                    Some(addr),
2712                                    None,
2713                                    None,
2714                                    spam_weight.clone(),
2715                                ));
2716                            }
2717                        } else {
2718                            warn!(
2719                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)",
2720                                submitter_client_addrs.len()
2721                            );
2722                        }
2723                    }
2724                }
2725
2726                if parsed.rejected {
2727                    // TODO(fastpath): Add metrics for rejected transactions.
2728                    if matches!(
2729                        parsed.transaction.kind,
2730                        ConsensusTransactionKind::UserTransaction(_)
2731                            | ConsensusTransactionKind::UserTransactionV2(_)
2732                    ) {
2733                        self.epoch_store
2734                            .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
2735                        num_rejected_user_transactions[author] += 1;
2736                    }
2737                    // Skip processing rejected transactions.
2738                    continue;
2739                }
2740
2741                let kind = classify(&parsed.transaction);
2742                self.metrics
2743                    .consensus_handler_processed
2744                    .with_label_values(&[kind])
2745                    .inc();
2746                self.metrics
2747                    .consensus_handler_transaction_sizes
2748                    .with_label_values(&[kind])
2749                    .observe(parsed.serialized_len as f64);
2750                // UserTransaction exists only when mysticeti_fastpath is enabled in protocol config.
2751                if matches!(
2752                    &parsed.transaction.kind,
2753                    ConsensusTransactionKind::CertifiedTransaction(_)
2754                        | ConsensusTransactionKind::UserTransaction(_)
2755                        | ConsensusTransactionKind::UserTransactionV2(_)
2756                ) {
2757                    self.last_consensus_stats
2758                        .stats
2759                        .inc_num_user_transactions(author);
2760                }
2761
2762                if !initial_reconfig_state.should_accept_consensus_certs() {
2763                    // (Note: we no longer need to worry about the previously deferred condition, since we are only
2764                    // processing newly-received transactions at this time).
2765                    match &parsed.transaction.kind {
2766                        ConsensusTransactionKind::UserTransaction(_)
2767                        | ConsensusTransactionKind::UserTransactionV2(_)
2768                        | ConsensusTransactionKind::CertifiedTransaction(_)
2769                        // deprecated and ignore later, but added for exhaustive match
2770                        | ConsensusTransactionKind::CapabilityNotification(_)
2771                        | ConsensusTransactionKind::CapabilityNotificationV2(_)
2772                        | ConsensusTransactionKind::EndOfPublish(_)
2773                        // Note: we no longer have to check protocol_config.ignore_execution_time_observations_after_certs_closed()
2774                        | ConsensusTransactionKind::ExecutionTimeObservation(_)
2775                        | ConsensusTransactionKind::NewJWKFetched(_, _, _) => {
2776                            debug!(
2777                                "Ignoring consensus transaction {:?} because of end of epoch",
2778                                parsed.transaction.key()
2779                            );
2780                            continue;
2781                        }
2782
2783                        // These are the message types that are still processed even if !should_accept_consensus_certs()
2784                        ConsensusTransactionKind::CheckpointSignature(_)
2785                        | ConsensusTransactionKind::CheckpointSignatureV2(_)
2786                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2787                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
2788                        | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => ()
2789                    }
2790                }
2791
2792                if !initial_reconfig_state.should_accept_tx() {
2793                    match &parsed.transaction.kind {
2794                        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
2795                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue,
2796                        _ => {}
2797                    }
2798                }
2799
2800                if parsed.transaction.is_user_transaction()
2801                    && !self.epoch_store.protocol_config().mysticeti_fastpath()
2802                {
2803                    debug!(
2804                        "Ignoring MFP transaction {:?} because MFP is disabled",
2805                        parsed.transaction.key()
2806                    );
2807                    continue;
2808                }
2809
2810                if let ConsensusTransactionKind::CertifiedTransaction(certificate) =
2811                    &parsed.transaction.kind
2812                    && certificate.epoch() != epoch
2813                {
2814                    debug!(
2815                        "Certificate epoch ({:?}) doesn't match the current epoch ({:?})",
2816                        certificate.epoch(),
2817                        epoch
2818                    );
2819                    continue;
2820                }
2821
2822                // Handle deprecated messages
2823                match &parsed.transaction.kind {
2824                    ConsensusTransactionKind::CapabilityNotification(_)
2825                    | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2826                    | ConsensusTransactionKind::CheckpointSignature(_) => {
2827                        debug_fatal!(
2828                            "BUG: saw deprecated tx {:?}for commit round {}",
2829                            parsed.transaction.key(),
2830                            commit_info.round
2831                        );
2832                        continue;
2833                    }
2834                    _ => {}
2835                }
2836
2837                if matches!(
2838                    &parsed.transaction.kind,
2839                    ConsensusTransactionKind::UserTransaction(_)
2840                        | ConsensusTransactionKind::UserTransactionV2(_)
2841                        | ConsensusTransactionKind::CertifiedTransaction(_)
2842                ) {
2843                    let author_name = self
2844                        .epoch_store
2845                        .committee()
2846                        .authority_by_index(author as u32)
2847                        .unwrap();
2848                    if self
2849                        .epoch_store
2850                        .has_received_end_of_publish_from(author_name)
2851                    {
2852                        // In some edge cases, consensus might resend previously seen certificate after EndOfPublish
2853                        // An honest validator should not send a new transaction after EndOfPublish. Whether the
2854                        // transaction is duplicate or not, we filter it out here.
2855                        warn!(
2856                            "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus",
2857                            author_name.concise(),
2858                            parsed.transaction.key(),
2859                        );
2860                        continue;
2861                    }
2862                }
2863
2864                // When preconsensus locking is disabled, perform post-consensus owned object
2865                // conflict detection. If lock acquisition fails, the transaction has
2866                // invalid/conflicting owned inputs and should be dropped.
2867                // This must happen AFTER all filtering checks above to avoid acquiring locks
2868                // for transactions that will be dropped (e.g., during epoch change).
2869                // Only applies to UserTransactionV2 - other transaction types don't need lock acquisition.
2870                if let ConsensusTransactionKind::UserTransactionV2(tx_with_claims) =
2871                    &parsed.transaction.kind
2872                {
2873                    let immutable_object_ids: HashSet<ObjectID> =
2874                        tx_with_claims.get_immutable_objects().into_iter().collect();
2875                    let tx = tx_with_claims.tx();
2876
2877                    let Ok(input_objects) = tx.transaction_data().input_objects() else {
2878                        debug_fatal!("Invalid input objects for transaction {}", tx.digest());
2879                        continue;
2880                    };
2881
2882                    // Filter ImmOrOwnedMoveObject inputs, excluding those claimed to be immutable.
2883                    // Immutable objects don't need lock acquisition as they can be used concurrently.
2884                    let owned_object_refs: Vec<_> = input_objects
2885                        .iter()
2886                        .filter_map(|obj| match obj {
2887                            InputObjectKind::ImmOrOwnedMoveObject(obj_ref)
2888                                if !immutable_object_ids.contains(&obj_ref.0) =>
2889                            {
2890                                Some(*obj_ref)
2891                            }
2892                            _ => None,
2893                        })
2894                        .collect();
2895
2896                    match self
2897                        .epoch_store
2898                        .try_acquire_owned_object_locks_post_consensus(
2899                            &owned_object_refs,
2900                            *tx.digest(),
2901                            &owned_object_locks,
2902                        ) {
2903                        Ok(new_locks) => {
2904                            owned_object_locks.extend(new_locks.into_iter());
2905                            // Lock acquisition succeeded - now set Finalized status
2906                            self.epoch_store
2907                                .set_consensus_tx_status(position, ConsensusTxStatus::Finalized);
2908                            num_finalized_user_transactions[author] += 1;
2909                        }
2910                        Err(e) => {
2911                            debug!("Dropping transaction {}: {}", tx.digest(), e);
2912                            self.epoch_store
2913                                .set_consensus_tx_status(position, ConsensusTxStatus::Dropped);
2914                            self.epoch_store.set_rejection_vote_reason(position, &e);
2915                            continue;
2916                        }
2917                    }
2918                }
2919
2920                let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);
2921                transactions.push((transaction, author as u32));
2922            }
2923        }
2924
2925        for (i, authority) in self.committee.authorities() {
2926            let hostname = &authority.hostname;
2927            self.metrics
2928                .consensus_committed_messages
2929                .with_label_values(&[hostname])
2930                .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
2931            self.metrics
2932                .consensus_committed_user_transactions
2933                .with_label_values(&[hostname])
2934                .set(
2935                    self.last_consensus_stats
2936                        .stats
2937                        .get_num_user_transactions(i.value()) as i64,
2938                );
2939            self.metrics
2940                .consensus_finalized_user_transactions
2941                .with_label_values(&[hostname])
2942                .add(num_finalized_user_transactions[i.value()] as i64);
2943            self.metrics
2944                .consensus_rejected_user_transactions
2945                .with_label_values(&[hostname])
2946                .add(num_rejected_user_transactions[i.value()] as i64);
2947        }
2948
2949        FilteredConsensusOutput {
2950            transactions,
2951            owned_object_locks,
2952        }
2953    }
2954
2955    fn deduplicate_consensus_txns(
2956        &mut self,
2957        state: &mut CommitHandlerState,
2958        commit_info: &ConsensusCommitInfo,
2959        transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
2960    ) -> Vec<VerifiedSequencedConsensusTransaction> {
2961        let mut all_transactions = Vec::new();
2962
2963        // Track occurrence counts for each transaction key within this commit.
2964        // Also serves as the deduplication set (count > 1 means duplicate within commit).
2965        let mut occurrence_counts: HashMap<SequencedConsensusTransactionKey, u32> = HashMap::new();
2966        // Keys being seen for the first time (not duplicates from previous commits).
2967        let mut first_commit_keys: HashSet<SequencedConsensusTransactionKey> = HashSet::new();
2968
2969        for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
2970            // In process_consensus_transactions_and_commit_boundary(), we will add a system consensus commit
2971            // prologue transaction, which will be the first transaction in this consensus commit batch.
2972            // Therefore, the transaction sequence number starts from 1 here.
2973            let current_tx_index = ExecutionIndices {
2974                last_committed_round: commit_info.round,
2975                sub_dag_index: commit_info.consensus_commit_ref.index.into(),
2976                transaction_index: (seq + 1) as u64,
2977            };
2978
2979            self.last_consensus_stats.index = current_tx_index;
2980
2981            let certificate_author = *self
2982                .epoch_store
2983                .committee()
2984                .authority_by_index(cert_origin)
2985                .unwrap();
2986
2987            let sequenced_transaction = SequencedConsensusTransaction {
2988                certificate_author_index: cert_origin,
2989                certificate_author,
2990                consensus_index: current_tx_index,
2991                transaction,
2992            };
2993
2994            let Some(verified_transaction) = self
2995                .epoch_store
2996                .verify_consensus_transaction(sequenced_transaction)
2997            else {
2998                continue;
2999            };
3000
3001            let key = verified_transaction.0.key();
3002
3003            if let Some(tx_digest) = key.user_transaction_digest() {
3004                self.epoch_store
3005                    .cache_recently_finalized_transaction(tx_digest);
3006            }
3007
3008            // Increment count and check if this is a duplicate within this commit.
3009            // This replaces the separate processed_set HashSet.
3010            let count = occurrence_counts.entry(key.clone()).or_insert(0);
3011            *count += 1;
3012            let in_commit = *count > 1;
3013
3014            let in_cache = self.processed_cache.put(key.clone(), ()).is_some();
3015            if in_commit || in_cache {
3016                self.metrics.skipped_consensus_txns_cache_hit.inc();
3017                continue;
3018            }
3019            if self
3020                .epoch_store
3021                .is_consensus_message_processed(&key)
3022                .expect("db error")
3023            {
3024                self.metrics.skipped_consensus_txns.inc();
3025                continue;
3026            }
3027
3028            first_commit_keys.insert(key.clone());
3029
3030            state.output.record_consensus_message_processed(key);
3031
3032            all_transactions.push(verified_transaction);
3033        }
3034
3035        for key in first_commit_keys {
3036            if let Some(&count) = occurrence_counts.get(&key)
3037                && count > 1
3038            {
3039                self.metrics
3040                    .consensus_handler_duplicate_tx_count
3041                    .observe(count as f64);
3042            }
3043        }
3044
3045        // Copy user transaction occurrence counts to state for unpaid amplification detection.
3046        assert!(
3047            state.occurrence_counts.is_empty(),
3048            "occurrence_counts should be empty before populating"
3049        );
3050        state.occurrence_counts.reserve(occurrence_counts.len());
3051        state.occurrence_counts.extend(
3052            occurrence_counts
3053                .into_iter()
3054                .filter_map(|(key, count)| key.user_transaction_digest().map(|d| (d, count))),
3055        );
3056
3057        all_transactions
3058    }
3059
3060    fn build_commit_handler_input(
3061        &self,
3062        transactions: Vec<VerifiedSequencedConsensusTransaction>,
3063    ) -> CommitHandlerInput {
3064        let epoch = self.epoch_store.epoch();
3065        let mut commit_handler_input = CommitHandlerInput::default();
3066
3067        for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() {
3068            match transaction.transaction {
3069                SequencedConsensusTransactionKind::External(consensus_transaction) => {
3070                    match consensus_transaction.kind {
3071                        // === User transactions ===
3072                        ConsensusTransactionKind::CertifiedTransaction(cert) => {
3073                            // Safe because signatures are verified when consensus called into SuiTxValidator::validate_batch.
3074                            let cert = VerifiedCertificate::new_unchecked(*cert);
3075                            let transaction =
3076                                VerifiedExecutableTransaction::new_from_certificate(cert);
3077                            commit_handler_input.user_transactions.push(
3078                                VerifiedExecutableTransactionWithAliases::no_aliases(transaction),
3079                            );
3080                        }
3081                        ConsensusTransactionKind::UserTransaction(tx) => {
3082                            // Safe because transactions are certified by consensus.
3083                            let tx = VerifiedTransaction::new_unchecked(*tx);
3084                            // TODO(fastpath): accept position in consensus, after plumbing consensus round, authority index, and transaction index here.
3085                            let transaction =
3086                                VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
3087                            commit_handler_input
3088                                .user_transactions
3089                                // Use of v1 UserTransaction implies commitment to no aliases.
3090                                .push(VerifiedExecutableTransactionWithAliases::no_aliases(
3091                                    transaction,
3092                                ));
3093                        }
3094                        ConsensusTransactionKind::UserTransactionV2(tx) => {
3095                            // Extract the aliases claim (required) from the claims
3096                            let used_alias_versions = if self
3097                                .epoch_store
3098                                .protocol_config()
3099                                .fix_checkpoint_signature_mapping()
3100                            {
3101                                tx.aliases()
3102                            } else {
3103                                // Convert V1 to V2 format using dummy signature indices
3104                                // which will be ignored with `fix_checkpoint_signature_mapping`
3105                                // disabled.
3106                                tx.aliases_v1().map(|a| {
3107                                    NonEmpty::from_vec(
3108                                        a.into_iter()
3109                                            .enumerate()
3110                                            .map(|(idx, (_, seq))| (idx as u8, seq))
3111                                            .collect(),
3112                                    )
3113                                    .unwrap()
3114                                })
3115                            };
3116                            let inner_tx = tx.into_tx();
3117                            // Safe because transactions are certified by consensus.
3118                            let tx = VerifiedTransaction::new_unchecked(inner_tx);
3119                            // TODO(fastpath): accept position in consensus, after plumbing consensus round, authority index, and transaction index here.
3120                            let transaction =
3121                                VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
3122                            if let Some(used_alias_versions) = used_alias_versions {
3123                                commit_handler_input
3124                                    .user_transactions
3125                                    .push(WithAliases::new(transaction, used_alias_versions));
3126                            } else {
3127                                commit_handler_input.user_transactions.push(
3128                                    VerifiedExecutableTransactionWithAliases::no_aliases(
3129                                        transaction,
3130                                    ),
3131                                );
3132                            }
3133                        }
3134
3135                        // === State machines ===
3136                        ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => {
3137                            commit_handler_input
3138                                .end_of_publish_transactions
3139                                .push(authority_public_key_bytes);
3140                        }
3141                        ConsensusTransactionKind::NewJWKFetched(
3142                            authority_public_key_bytes,
3143                            jwk_id,
3144                            jwk,
3145                        ) => {
3146                            commit_handler_input.new_jwks.push((
3147                                authority_public_key_bytes,
3148                                jwk_id,
3149                                jwk,
3150                            ));
3151                        }
3152                        ConsensusTransactionKind::RandomnessDkgMessage(
3153                            authority_public_key_bytes,
3154                            items,
3155                        ) => {
3156                            commit_handler_input
3157                                .randomness_dkg_messages
3158                                .push((authority_public_key_bytes, items));
3159                        }
3160                        ConsensusTransactionKind::RandomnessDkgConfirmation(
3161                            authority_public_key_bytes,
3162                            items,
3163                        ) => {
3164                            commit_handler_input
3165                                .randomness_dkg_confirmations
3166                                .push((authority_public_key_bytes, items));
3167                        }
3168                        ConsensusTransactionKind::CapabilityNotificationV2(
3169                            authority_capabilities_v2,
3170                        ) => {
3171                            commit_handler_input
3172                                .capability_notifications
3173                                .push(authority_capabilities_v2);
3174                        }
3175                        ConsensusTransactionKind::ExecutionTimeObservation(
3176                            execution_time_observation,
3177                        ) => {
3178                            commit_handler_input
3179                                .execution_time_observations
3180                                .push(execution_time_observation);
3181                        }
3182                        ConsensusTransactionKind::CheckpointSignatureV2(
3183                            checkpoint_signature_message,
3184                        ) => {
3185                            commit_handler_input
3186                                .checkpoint_signature_messages
3187                                .push(*checkpoint_signature_message);
3188                        }
3189
3190                        // Deprecated messages, filtered earlier by filter_consensus_txns()
3191                        ConsensusTransactionKind::CheckpointSignature(_)
3192                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
3193                        | ConsensusTransactionKind::CapabilityNotification(_) => {
3194                            unreachable!("filtered earlier")
3195                        }
3196                    }
3197                }
3198                // TODO: I think we can delete this, it was only used to inject randomness state update into the tx stream.
3199                SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(),
3200            }
3201        }
3202
3203        commit_handler_input
3204    }
3205
3206    async fn send_end_of_publish_if_needed(&self) {
3207        if !self.epoch_store.should_send_end_of_publish() {
3208            return;
3209        }
3210
3211        let end_of_publish = ConsensusTransaction::new_end_of_publish(self.epoch_store.name);
3212        if let Err(err) =
3213            self.consensus_adapter
3214                .submit(end_of_publish, None, &self.epoch_store, None, None)
3215        {
3216            warn!(
3217                "Error when sending EndOfPublish message from ConsensusHandler: {:?}",
3218                err
3219            );
3220        } else {
3221            info!(epoch=?self.epoch_store.epoch(), "Sending EndOfPublish message to consensus");
3222        }
3223    }
3224}
3225
3226/// Sends transactions to the execution scheduler in a separate task,
3227/// to avoid blocking consensus handler.
3228pub(crate) type SchedulerMessage = (
3229    Vec<(Schedulable, AssignedVersions)>,
3230    Option<SettlementBatchInfo>,
3231);
3232
3233#[derive(Clone)]
3234pub(crate) struct ExecutionSchedulerSender {
3235    sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3236}
3237
3238impl ExecutionSchedulerSender {
3239    fn start(
3240        settlement_scheduler: SettlementScheduler,
3241        epoch_store: Arc<AuthorityPerEpochStore>,
3242    ) -> Self {
3243        let (sender, recv) = monitored_mpsc::unbounded_channel("execution_scheduler_sender");
3244        spawn_monitored_task!(Self::run(recv, settlement_scheduler, epoch_store));
3245        Self { sender }
3246    }
3247
3248    pub(crate) fn new_for_testing(
3249        sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3250    ) -> Self {
3251        Self { sender }
3252    }
3253
3254    fn send(
3255        &self,
3256        transactions: Vec<(Schedulable, AssignedVersions)>,
3257        settlement: Option<SettlementBatchInfo>,
3258    ) {
3259        let _ = self.sender.send((transactions, settlement));
3260    }
3261
3262    async fn run(
3263        mut recv: monitored_mpsc::UnboundedReceiver<SchedulerMessage>,
3264        settlement_scheduler: SettlementScheduler,
3265        epoch_store: Arc<AuthorityPerEpochStore>,
3266    ) {
3267        while let Some((transactions, settlement)) = recv.recv().await {
3268            let _guard = monitored_scope("ConsensusHandler::enqueue");
3269            let txns = transactions
3270                .into_iter()
3271                .map(|(txn, versions)| (txn, ExecutionEnv::new().with_assigned_versions(versions)))
3272                .collect();
3273            if let Some(settlement) = settlement {
3274                settlement_scheduler.enqueue_v2(txns, settlement, &epoch_store);
3275            } else {
3276                settlement_scheduler.enqueue(txns, &epoch_store);
3277            }
3278        }
3279    }
3280}
3281
3282/// Manages the lifetime of tasks handling the commits and transactions output by consensus.
3283pub(crate) struct MysticetiConsensusHandler {
3284    tasks: JoinSet<()>,
3285}
3286
3287impl MysticetiConsensusHandler {
3288    pub(crate) fn new(
3289        last_processed_commit_at_startup: CommitIndex,
3290        mut consensus_handler: ConsensusHandler<CheckpointService>,
3291        mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
3292        commit_consumer_monitor: Arc<CommitConsumerMonitor>,
3293    ) -> Self {
3294        debug!(
3295            last_processed_commit_at_startup,
3296            "Starting consensus replay"
3297        );
3298        let mut tasks = JoinSet::new();
3299        tasks.spawn(monitored_future!(async move {
3300            // TODO: pause when execution is overloaded, so consensus can detect the backpressure.
3301            while let Some(consensus_commit) = commit_receiver.recv().await {
3302                let commit_index = consensus_commit.commit_ref.index;
3303                if commit_index <= last_processed_commit_at_startup {
3304                    consensus_handler.handle_prior_consensus_commit(consensus_commit);
3305                } else {
3306                    consensus_handler
3307                        .handle_consensus_commit(consensus_commit)
3308                        .await;
3309                }
3310                commit_consumer_monitor.set_highest_handled_commit(commit_index);
3311            }
3312        }));
3313        Self { tasks }
3314    }
3315
3316    pub(crate) async fn abort(&mut self) {
3317        self.tasks.shutdown().await;
3318    }
3319}
3320
3321fn authenticator_state_update_transaction(
3322    epoch_store: &AuthorityPerEpochStore,
3323    round: u64,
3324    mut new_active_jwks: Vec<ActiveJwk>,
3325) -> VerifiedExecutableTransaction {
3326    let epoch = epoch_store.epoch();
3327    new_active_jwks.sort();
3328
3329    info!("creating authenticator state update transaction");
3330    assert!(epoch_store.authenticator_state_enabled());
3331    let transaction = VerifiedTransaction::new_authenticator_state_update(
3332        epoch,
3333        round,
3334        new_active_jwks,
3335        epoch_store
3336            .epoch_start_config()
3337            .authenticator_obj_initial_shared_version()
3338            .expect("authenticator state obj must exist"),
3339    );
3340    VerifiedExecutableTransaction::new_system(transaction, epoch)
3341}
3342
3343pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
3344    match &transaction.kind {
3345        ConsensusTransactionKind::CertifiedTransaction(certificate) => {
3346            if certificate.is_consensus_tx() {
3347                "shared_certificate"
3348            } else {
3349                "owned_certificate"
3350            }
3351        }
3352        ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
3353        ConsensusTransactionKind::CheckpointSignatureV2(_) => "checkpoint_signature",
3354        ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
3355        ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
3356        ConsensusTransactionKind::CapabilityNotificationV2(_) => "capability_notification_v2",
3357        ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
3358        ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
3359        ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
3360        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
3361        ConsensusTransactionKind::UserTransaction(tx) => {
3362            if tx.is_consensus_tx() {
3363                "shared_user_transaction"
3364            } else {
3365                "owned_user_transaction"
3366            }
3367        }
3368        ConsensusTransactionKind::UserTransactionV2(tx) => {
3369            if tx.tx().is_consensus_tx() {
3370                "shared_user_transaction_v2"
3371            } else {
3372                "owned_user_transaction_v2"
3373            }
3374        }
3375        ConsensusTransactionKind::ExecutionTimeObservation(_) => "execution_time_observation",
3376    }
3377}
3378
3379#[derive(Debug, Clone, Serialize, Deserialize)]
3380pub struct SequencedConsensusTransaction {
3381    pub certificate_author_index: AuthorityIndex,
3382    pub certificate_author: AuthorityName,
3383    pub consensus_index: ExecutionIndices,
3384    pub transaction: SequencedConsensusTransactionKind,
3385}
3386
3387#[derive(Debug, Clone)]
3388#[allow(clippy::large_enum_variant)]
3389pub enum SequencedConsensusTransactionKind {
3390    External(ConsensusTransaction),
3391    System(VerifiedExecutableTransaction),
3392}
3393
3394impl Serialize for SequencedConsensusTransactionKind {
3395    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
3396        let serializable = SerializableSequencedConsensusTransactionKind::from(self);
3397        serializable.serialize(serializer)
3398    }
3399}
3400
3401impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
3402    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
3403        let serializable =
3404            SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
3405        Ok(serializable.into())
3406    }
3407}
3408
3409// We can't serialize SequencedConsensusTransactionKind directly because it contains a
3410// VerifiedExecutableTransaction, which is not serializable (by design). This wrapper allows us to
3411// convert to a serializable format easily.
3412#[derive(Debug, Clone, Serialize, Deserialize)]
3413#[allow(clippy::large_enum_variant)]
3414enum SerializableSequencedConsensusTransactionKind {
3415    External(ConsensusTransaction),
3416    System(TrustedExecutableTransaction),
3417}
3418
3419impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
3420    fn from(kind: &SequencedConsensusTransactionKind) -> Self {
3421        match kind {
3422            SequencedConsensusTransactionKind::External(ext) => {
3423                SerializableSequencedConsensusTransactionKind::External(ext.clone())
3424            }
3425            SequencedConsensusTransactionKind::System(txn) => {
3426                SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
3427            }
3428        }
3429    }
3430}
3431
3432impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
3433    fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
3434        match kind {
3435            SerializableSequencedConsensusTransactionKind::External(ext) => {
3436                SequencedConsensusTransactionKind::External(ext)
3437            }
3438            SerializableSequencedConsensusTransactionKind::System(txn) => {
3439                SequencedConsensusTransactionKind::System(txn.into())
3440            }
3441        }
3442    }
3443}
3444
3445#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
3446pub enum SequencedConsensusTransactionKey {
3447    External(ConsensusTransactionKey),
3448    System(TransactionDigest),
3449}
3450
3451impl SequencedConsensusTransactionKey {
3452    pub fn user_transaction_digest(&self) -> Option<TransactionDigest> {
3453        match self {
3454            SequencedConsensusTransactionKey::External(key) => match key {
3455                ConsensusTransactionKey::Certificate(digest) => Some(*digest),
3456                _ => None,
3457            },
3458            SequencedConsensusTransactionKey::System(_) => None,
3459        }
3460    }
3461}
3462
3463impl SequencedConsensusTransactionKind {
3464    pub fn key(&self) -> SequencedConsensusTransactionKey {
3465        match self {
3466            SequencedConsensusTransactionKind::External(ext) => {
3467                SequencedConsensusTransactionKey::External(ext.key())
3468            }
3469            SequencedConsensusTransactionKind::System(txn) => {
3470                SequencedConsensusTransactionKey::System(*txn.digest())
3471            }
3472        }
3473    }
3474
3475    pub fn get_tracking_id(&self) -> u64 {
3476        match self {
3477            SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
3478            SequencedConsensusTransactionKind::System(_txn) => 0,
3479        }
3480    }
3481
3482    pub fn is_executable_transaction(&self) -> bool {
3483        match self {
3484            SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(),
3485            SequencedConsensusTransactionKind::System(_) => true,
3486        }
3487    }
3488
3489    pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
3490        match self {
3491            SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
3492                ConsensusTransactionKind::CertifiedTransaction(txn) => Some(*txn.digest()),
3493                ConsensusTransactionKind::UserTransaction(txn) => Some(*txn.digest()),
3494                ConsensusTransactionKind::UserTransactionV2(txn) => Some(*txn.tx().digest()),
3495                _ => None,
3496            },
3497            SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
3498        }
3499    }
3500
3501    pub fn is_end_of_publish(&self) -> bool {
3502        match self {
3503            SequencedConsensusTransactionKind::External(ext) => {
3504                matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
3505            }
3506            SequencedConsensusTransactionKind::System(_) => false,
3507        }
3508    }
3509}
3510
3511impl SequencedConsensusTransaction {
3512    pub fn sender_authority(&self) -> AuthorityName {
3513        self.certificate_author
3514    }
3515
3516    pub fn key(&self) -> SequencedConsensusTransactionKey {
3517        self.transaction.key()
3518    }
3519
3520    pub fn is_end_of_publish(&self) -> bool {
3521        if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
3522            matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
3523        } else {
3524            false
3525        }
3526    }
3527
3528    pub fn try_take_execution_time_observation(&mut self) -> Option<ExecutionTimeObservation> {
3529        if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
3530            kind: ConsensusTransactionKind::ExecutionTimeObservation(observation),
3531            ..
3532        }) = &mut self.transaction
3533        {
3534            Some(std::mem::take(observation))
3535        } else {
3536            None
3537        }
3538    }
3539
3540    pub fn is_system(&self) -> bool {
3541        matches!(
3542            self.transaction,
3543            SequencedConsensusTransactionKind::System(_)
3544        )
3545    }
3546
3547    pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
3548        if !randomness_state_enabled {
3549            // If randomness is disabled, these should be processed same as a tx without randomness,
3550            // which will eventually fail when the randomness state object is not found.
3551            return false;
3552        }
3553        match &self.transaction {
3554            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3555                kind: ConsensusTransactionKind::CertifiedTransaction(cert),
3556                ..
3557            }) => cert.transaction_data().uses_randomness(),
3558            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3559                kind: ConsensusTransactionKind::UserTransaction(txn),
3560                ..
3561            }) => txn.transaction_data().uses_randomness(),
3562            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3563                kind: ConsensusTransactionKind::UserTransactionV2(txn),
3564                ..
3565            }) => txn.tx().transaction_data().uses_randomness(),
3566            _ => false,
3567        }
3568    }
3569
3570    pub fn as_consensus_txn(&self) -> Option<&SenderSignedData> {
3571        match &self.transaction {
3572            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3573                kind: ConsensusTransactionKind::CertifiedTransaction(certificate),
3574                ..
3575            }) if certificate.is_consensus_tx() => Some(certificate.data()),
3576            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3577                kind: ConsensusTransactionKind::UserTransaction(txn),
3578                ..
3579            }) if txn.is_consensus_tx() => Some(txn.data()),
3580            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3581                kind: ConsensusTransactionKind::UserTransactionV2(txn),
3582                ..
3583            }) if txn.tx().is_consensus_tx() => Some(txn.tx().data()),
3584            SequencedConsensusTransactionKind::System(txn) if txn.is_consensus_tx() => {
3585                Some(txn.data())
3586            }
3587            _ => None,
3588        }
3589    }
3590}
3591
3592#[derive(Debug, Clone, Serialize, Deserialize)]
3593pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
3594
3595#[cfg(test)]
3596impl VerifiedSequencedConsensusTransaction {
3597    pub fn new_test(transaction: ConsensusTransaction) -> Self {
3598        Self(SequencedConsensusTransaction::new_test(transaction))
3599    }
3600}
3601
3602impl SequencedConsensusTransaction {
3603    pub fn new_test(transaction: ConsensusTransaction) -> Self {
3604        Self {
3605            certificate_author_index: 0,
3606            certificate_author: AuthorityName::ZERO,
3607            consensus_index: Default::default(),
3608            transaction: SequencedConsensusTransactionKind::External(transaction),
3609        }
3610    }
3611}
3612
3613#[derive(Serialize, Deserialize)]
3614pub(crate) struct CommitIntervalObserver {
3615    ring_buffer: VecDeque<u64>,
3616}
3617
3618impl CommitIntervalObserver {
3619    pub fn new(window_size: u32) -> Self {
3620        Self {
3621            ring_buffer: VecDeque::with_capacity(window_size as usize),
3622        }
3623    }
3624
3625    pub fn observe_commit_time(&mut self, consensus_commit: &impl ConsensusCommitAPI) {
3626        let commit_time = consensus_commit.commit_timestamp_ms();
3627        if self.ring_buffer.len() == self.ring_buffer.capacity() {
3628            self.ring_buffer.pop_front();
3629        }
3630        self.ring_buffer.push_back(commit_time);
3631    }
3632
3633    pub fn commit_interval_estimate(&self) -> Option<Duration> {
3634        if self.ring_buffer.len() <= 1 {
3635            None
3636        } else {
3637            let first = self.ring_buffer.front().unwrap();
3638            let last = self.ring_buffer.back().unwrap();
3639            let duration = last.saturating_sub(*first);
3640            let num_commits = self.ring_buffer.len() as u64;
3641            Some(Duration::from_millis(duration.div_ceil(num_commits)))
3642        }
3643    }
3644}
3645
3646#[cfg(test)]
3647mod tests {
3648    use std::collections::HashSet;
3649
3650    use consensus_core::{
3651        BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock,
3652    };
3653    use futures::pin_mut;
3654    use prometheus::Registry;
3655    use sui_protocol_config::{ConsensusTransactionOrdering, ProtocolConfig};
3656    use sui_types::{
3657        base_types::ExecutionDigests,
3658        base_types::{AuthorityName, FullObjectRef, ObjectID, SuiAddress, random_object_ref},
3659        committee::Committee,
3660        crypto::deterministic_random_account_key,
3661        gas::GasCostSummary,
3662        message_envelope::Message,
3663        messages_checkpoint::{
3664            CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
3665            SignedCheckpointSummary,
3666        },
3667        messages_consensus::ConsensusTransaction,
3668        object::Object,
3669        transaction::{
3670            CertifiedTransaction, TransactionData, TransactionDataAPI, VerifiedCertificate,
3671        },
3672    };
3673
3674    use super::*;
3675    use crate::{
3676        authority::{
3677            authority_per_epoch_store::ConsensusStatsAPI,
3678            test_authority_builder::TestAuthorityBuilder,
3679        },
3680        checkpoints::CheckpointServiceNoop,
3681        consensus_adapter::consensus_tests::test_user_transaction,
3682        consensus_test_utils::make_consensus_adapter_for_test,
3683        post_consensus_tx_reorder::PostConsensusTxReorder,
3684    };
3685
3686    #[tokio::test(flavor = "current_thread", start_paused = true)]
3687    async fn test_consensus_commit_handler() {
3688        telemetry_subscribers::init_for_testing();
3689
3690        // GIVEN
3691        // 1 account keypair
3692        let (sender, keypair) = deterministic_random_account_key();
3693        // 12 gas objects.
3694        let gas_objects: Vec<Object> = (0..12)
3695            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3696            .collect();
3697        // 4 owned objects.
3698        let owned_objects: Vec<Object> = (0..4)
3699            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3700            .collect();
3701        // 6 shared objects.
3702        let shared_objects: Vec<Object> = (0..6)
3703            .map(|_| Object::shared_for_testing())
3704            .collect::<Vec<_>>();
3705        let mut all_objects = gas_objects.clone();
3706        all_objects.extend(owned_objects.clone());
3707        all_objects.extend(shared_objects.clone());
3708
3709        let network_config =
3710            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3711                .with_objects(all_objects.clone())
3712                .build();
3713
3714        let state = TestAuthorityBuilder::new()
3715            .with_network_config(&network_config, 0)
3716            .build()
3717            .await;
3718
3719        let epoch_store = state.epoch_store_for_testing().clone();
3720        let new_epoch_start_state = epoch_store.epoch_start_state();
3721        let consensus_committee = new_epoch_start_state.get_consensus_committee();
3722
3723        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3724
3725        let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
3726
3727        let backpressure_manager = BackpressureManager::new_for_tests();
3728        let consensus_adapter =
3729            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3730        let settlement_scheduler = SettlementScheduler::new(
3731            state.execution_scheduler().as_ref().clone(),
3732            state.get_transaction_cache_reader().clone(),
3733        );
3734        let mut consensus_handler = ConsensusHandler::new(
3735            epoch_store,
3736            Arc::new(CheckpointServiceNoop {}),
3737            settlement_scheduler,
3738            consensus_adapter,
3739            state.get_object_cache_reader().clone(),
3740            Arc::new(ArcSwap::default()),
3741            consensus_committee.clone(),
3742            metrics,
3743            Arc::new(throughput_calculator),
3744            backpressure_manager.subscribe(),
3745            state.traffic_controller.clone(),
3746            None,
3747        );
3748
3749        // AND create test user transactions alternating between owned and shared input.
3750        let mut user_transactions = vec![];
3751        for (i, gas_object) in gas_objects[0..8].iter().enumerate() {
3752            let input_object = if i % 2 == 0 {
3753                owned_objects.get(i / 2).unwrap().clone()
3754            } else {
3755                shared_objects.get(i / 2).unwrap().clone()
3756            };
3757            let transaction = test_user_transaction(
3758                &state,
3759                sender,
3760                &keypair,
3761                gas_object.clone(),
3762                vec![input_object],
3763            )
3764            .await;
3765            user_transactions.push(transaction);
3766        }
3767
3768        // AND create 4 more user transactions with remaining gas objects and 2 shared objects.
3769        // Having more txns on the same shared object may get deferred.
3770        for (i, gas_object) in gas_objects[8..12].iter().enumerate() {
3771            let shared_object = if i < 2 {
3772                shared_objects[4].clone()
3773            } else {
3774                shared_objects[5].clone()
3775            };
3776            let transaction = test_user_transaction(
3777                &state,
3778                sender,
3779                &keypair,
3780                gas_object.clone(),
3781                vec![shared_object],
3782            )
3783            .await;
3784            user_transactions.push(transaction);
3785        }
3786
3787        // AND create block for each user transaction
3788        let mut blocks = Vec::new();
3789        for (i, consensus_transaction) in user_transactions
3790            .iter()
3791            .cloned()
3792            .map(|t| ConsensusTransaction::new_user_transaction_v2_message(&state.name, t.into()))
3793            .enumerate()
3794        {
3795            let transaction_bytes = bcs::to_bytes(&consensus_transaction).unwrap();
3796            let block = VerifiedBlock::new_for_test(
3797                TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
3798                    .set_transactions(vec![Transaction::new(transaction_bytes)])
3799                    .build(),
3800            );
3801
3802            blocks.push(block);
3803        }
3804
3805        // AND create the consensus commit
3806        let leader_block = blocks[0].clone();
3807        let committed_sub_dag = CommittedSubDag::new(
3808            leader_block.reference(),
3809            blocks.clone(),
3810            leader_block.timestamp_ms(),
3811            CommitRef::new(10, CommitDigest::MIN),
3812            true,
3813        );
3814
3815        // Test that the consensus handler respects backpressure.
3816        backpressure_manager.set_backpressure(true);
3817        // Default watermarks are 0,0 which will suppress the backpressure.
3818        backpressure_manager.update_highest_certified_checkpoint(1);
3819
3820        // AND process the consensus commit once
3821        {
3822            let waiter = consensus_handler.handle_consensus_commit(committed_sub_dag.clone());
3823            pin_mut!(waiter);
3824
3825            // waiter should not complete within 5 seconds
3826            tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
3827                .await
3828                .unwrap_err();
3829
3830            // lift backpressure
3831            backpressure_manager.set_backpressure(false);
3832
3833            // waiter completes now.
3834            tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
3835                .await
3836                .unwrap();
3837        }
3838
3839        // THEN check the consensus stats
3840        let num_blocks = blocks.len();
3841        let num_transactions = user_transactions.len();
3842        let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
3843        assert_eq!(
3844            last_consensus_stats_1.index.transaction_index,
3845            num_transactions as u64
3846        );
3847        assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
3848        assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
3849        assert_eq!(
3850            last_consensus_stats_1.stats.get_num_messages(0),
3851            num_blocks as u64
3852        );
3853        assert_eq!(
3854            last_consensus_stats_1.stats.get_num_user_transactions(0),
3855            num_transactions as u64
3856        );
3857
3858        // THEN check for execution status of user transactions.
3859        for (i, t) in user_transactions.iter().enumerate() {
3860            let digest = t.tx().digest();
3861            if tokio::time::timeout(
3862                std::time::Duration::from_secs(10),
3863                state.notify_read_effects_for_testing("", *digest),
3864            )
3865            .await
3866            .is_ok()
3867            {
3868                // Effects exist as expected.
3869            } else {
3870                panic!("User transaction {} {} did not execute", i, digest);
3871            }
3872        }
3873
3874        // THEN check for no inflight or suspended transactions.
3875        state.execution_scheduler().check_empty_for_testing().await;
3876    }
3877
3878    fn to_short_strings(txs: Vec<VerifiedExecutableTransactionWithAliases>) -> Vec<String> {
3879        txs.into_iter()
3880            .map(|tx| format!("transaction({})", tx.tx().transaction_data().gas_price()))
3881            .collect()
3882    }
3883
3884    #[test]
3885    fn test_order_by_gas_price() {
3886        let mut v = vec![user_txn(42), user_txn(100)];
3887        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3888        assert_eq!(
3889            to_short_strings(v),
3890            vec![
3891                "transaction(100)".to_string(),
3892                "transaction(42)".to_string(),
3893            ]
3894        );
3895
3896        let mut v = vec![
3897            user_txn(1200),
3898            user_txn(12),
3899            user_txn(1000),
3900            user_txn(42),
3901            user_txn(100),
3902            user_txn(1000),
3903        ];
3904        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3905        assert_eq!(
3906            to_short_strings(v),
3907            vec![
3908                "transaction(1200)".to_string(),
3909                "transaction(1000)".to_string(),
3910                "transaction(1000)".to_string(),
3911                "transaction(100)".to_string(),
3912                "transaction(42)".to_string(),
3913                "transaction(12)".to_string(),
3914            ]
3915        );
3916    }
3917
3918    #[tokio::test(flavor = "current_thread")]
3919    async fn test_checkpoint_signature_dedup() {
3920        telemetry_subscribers::init_for_testing();
3921
3922        let network_config =
3923            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3924        let state = TestAuthorityBuilder::new()
3925            .with_network_config(&network_config, 0)
3926            .build()
3927            .await;
3928
3929        let epoch_store = state.epoch_store_for_testing().clone();
3930        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3931
3932        let make_signed = || {
3933            let epoch = epoch_store.epoch();
3934            let contents =
3935                CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3936            let summary = CheckpointSummary::new(
3937                &ProtocolConfig::get_for_max_version_UNSAFE(),
3938                epoch,
3939                42, // sequence number
3940                10, // network_total_transactions
3941                &contents,
3942                None, // previous_digest
3943                GasCostSummary::default(),
3944                None,       // end_of_epoch_data
3945                0,          // timestamp
3946                Vec::new(), // randomness_rounds
3947                Vec::new(), // checkpoint_artifact_digests
3948            );
3949            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name)
3950        };
3951
3952        // Prepare V2 pair: same (authority, seq), different digests => different keys
3953        let v2_s1 = make_signed();
3954        let v2_s1_clone = v2_s1.clone();
3955        let v2_digest_a = v2_s1.data().digest();
3956        let v2_a =
3957            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3958                summary: v2_s1,
3959            });
3960
3961        let v2_s2 = make_signed();
3962        let v2_digest_b = v2_s2.data().digest();
3963        let v2_b =
3964            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3965                summary: v2_s2,
3966            });
3967
3968        assert_ne!(v2_digest_a, v2_digest_b);
3969
3970        // Create an exact duplicate with same digest to exercise valid dedup
3971        assert_eq!(v2_s1_clone.data().digest(), v2_digest_a);
3972        let v2_dup =
3973            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3974                summary: v2_s1_clone,
3975            });
3976
3977        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3978        let block = VerifiedBlock::new_for_test(
3979            TestBlock::new(100, 0)
3980                .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)])
3981                .build(),
3982        );
3983        let commit = CommittedSubDag::new(
3984            block.reference(),
3985            vec![block.clone()],
3986            block.timestamp_ms(),
3987            CommitRef::new(10, CommitDigest::MIN),
3988            true,
3989        );
3990
3991        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3992        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3993        let backpressure = BackpressureManager::new_for_tests();
3994        let consensus_adapter =
3995            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3996        let settlement_scheduler = SettlementScheduler::new(
3997            state.execution_scheduler().as_ref().clone(),
3998            state.get_transaction_cache_reader().clone(),
3999        );
4000        let mut handler = ConsensusHandler::new(
4001            epoch_store.clone(),
4002            Arc::new(CheckpointServiceNoop {}),
4003            settlement_scheduler,
4004            consensus_adapter,
4005            state.get_object_cache_reader().clone(),
4006            Arc::new(ArcSwap::default()),
4007            consensus_committee.clone(),
4008            metrics,
4009            Arc::new(throughput),
4010            backpressure.subscribe(),
4011            state.traffic_controller.clone(),
4012            None,
4013        );
4014
4015        handler.handle_consensus_commit(commit).await;
4016
4017        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
4018        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
4019
4020        // V2 distinct digests: both must be processed. If these were collapsed to one CheckpointSeq num, only one would process.
4021        let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a));
4022        let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b));
4023        assert!(
4024            epoch_store
4025                .is_consensus_message_processed(&v2_key_a)
4026                .unwrap()
4027        );
4028        assert!(
4029            epoch_store
4030                .is_consensus_message_processed(&v2_key_b)
4031                .unwrap()
4032        );
4033    }
4034
4035    #[tokio::test(flavor = "current_thread")]
4036    async fn test_verify_consensus_transaction_filters_mismatched_authorities() {
4037        telemetry_subscribers::init_for_testing();
4038
4039        let network_config =
4040            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
4041        let state = TestAuthorityBuilder::new()
4042            .with_network_config(&network_config, 0)
4043            .build()
4044            .await;
4045
4046        let epoch_store = state.epoch_store_for_testing().clone();
4047        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
4048
4049        // Create a different authority than our test authority
4050        use fastcrypto::traits::KeyPair;
4051        let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair();
4052        let wrong_authority: AuthorityName = wrong_keypair.public().into();
4053
4054        // Create EndOfPublish transaction with mismatched authority
4055        let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority);
4056
4057        // Create valid EndOfPublish transaction with correct authority
4058        let valid_eop = ConsensusTransaction::new_end_of_publish(state.name);
4059
4060        // Create CheckpointSignature with mismatched authority
4061        let epoch = epoch_store.epoch();
4062        let contents =
4063            CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
4064        let summary = CheckpointSummary::new(
4065            &ProtocolConfig::get_for_max_version_UNSAFE(),
4066            epoch,
4067            42, // sequence number
4068            10, // network_total_transactions
4069            &contents,
4070            None, // previous_digest
4071            GasCostSummary::default(),
4072            None,       // end_of_epoch_data
4073            0,          // timestamp
4074            Vec::new(), // randomness_rounds
4075            Vec::new(), // checkpoint commitments
4076        );
4077
4078        // Create a signed checkpoint with the wrong authority
4079        let mismatched_checkpoint_signed =
4080            SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority);
4081        let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest();
4082        let mismatched_checkpoint =
4083            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4084                summary: mismatched_checkpoint_signed,
4085            });
4086
4087        // Create a valid checkpoint signature with correct authority
4088        let valid_checkpoint_signed =
4089            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name);
4090        let valid_checkpoint_digest = valid_checkpoint_signed.data().digest();
4091        let valid_checkpoint =
4092            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4093                summary: valid_checkpoint_signed,
4094            });
4095
4096        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
4097
4098        // Create a block with both valid and invalid transactions
4099        let block = VerifiedBlock::new_for_test(
4100            TestBlock::new(100, 0)
4101                .set_transactions(vec![
4102                    to_tx(&mismatched_eop),
4103                    to_tx(&valid_eop),
4104                    to_tx(&mismatched_checkpoint),
4105                    to_tx(&valid_checkpoint),
4106                ])
4107                .build(),
4108        );
4109        let commit = CommittedSubDag::new(
4110            block.reference(),
4111            vec![block.clone()],
4112            block.timestamp_ms(),
4113            CommitRef::new(10, CommitDigest::MIN),
4114            true,
4115        );
4116
4117        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
4118        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
4119        let backpressure = BackpressureManager::new_for_tests();
4120        let consensus_adapter =
4121            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
4122        let settlement_scheduler = SettlementScheduler::new(
4123            state.execution_scheduler().as_ref().clone(),
4124            state.get_transaction_cache_reader().clone(),
4125        );
4126        let mut handler = ConsensusHandler::new(
4127            epoch_store.clone(),
4128            Arc::new(CheckpointServiceNoop {}),
4129            settlement_scheduler,
4130            consensus_adapter,
4131            state.get_object_cache_reader().clone(),
4132            Arc::new(ArcSwap::default()),
4133            consensus_committee.clone(),
4134            metrics,
4135            Arc::new(throughput),
4136            backpressure.subscribe(),
4137            state.traffic_controller.clone(),
4138            None,
4139        );
4140
4141        handler.handle_consensus_commit(commit).await;
4142
4143        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
4144        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
4145
4146        // Check that valid transactions were processed
4147        let valid_eop_key = SK::External(CK::EndOfPublish(state.name));
4148        assert!(
4149            epoch_store
4150                .is_consensus_message_processed(&valid_eop_key)
4151                .unwrap(),
4152            "Valid EndOfPublish should have been processed"
4153        );
4154
4155        let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4156            state.name,
4157            42,
4158            valid_checkpoint_digest,
4159        ));
4160        assert!(
4161            epoch_store
4162                .is_consensus_message_processed(&valid_checkpoint_key)
4163                .unwrap(),
4164            "Valid CheckpointSignature should have been processed"
4165        );
4166
4167        // Check that mismatched authority transactions were NOT processed (filtered out by verify_consensus_transaction)
4168        let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority));
4169        assert!(
4170            !epoch_store
4171                .is_consensus_message_processed(&mismatched_eop_key)
4172                .unwrap(),
4173            "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)"
4174        );
4175
4176        let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4177            wrong_authority,
4178            42,
4179            mismatched_checkpoint_digest,
4180        ));
4181        assert!(
4182            !epoch_store
4183                .is_consensus_message_processed(&mismatched_checkpoint_key)
4184                .unwrap(),
4185            "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)"
4186        );
4187    }
4188
4189    fn user_txn(gas_price: u64) -> VerifiedExecutableTransactionWithAliases {
4190        let (committee, keypairs) = Committee::new_simple_test_committee();
4191        let (sender, sender_keypair) = deterministic_random_account_key();
4192        let tx = sui_types::transaction::Transaction::from_data_and_signer(
4193            TransactionData::new_transfer(
4194                SuiAddress::default(),
4195                FullObjectRef::from_fastpath_ref(random_object_ref()),
4196                sender,
4197                random_object_ref(),
4198                1000 * gas_price,
4199                gas_price,
4200            ),
4201            vec![&sender_keypair],
4202        );
4203        let tx = VerifiedExecutableTransaction::new_from_certificate(
4204            VerifiedCertificate::new_unchecked(
4205                CertifiedTransaction::new_from_keypairs_for_testing(
4206                    tx.into_data(),
4207                    &keypairs,
4208                    &committee,
4209                ),
4210            ),
4211        );
4212        VerifiedExecutableTransactionWithAliases::no_aliases(tx)
4213    }
4214
4215    mod checkpoint_queue_tests {
4216        use super::*;
4217        use consensus_core::CommitRef;
4218        use sui_types::digests::Digest;
4219
4220        fn make_chunk(tx_count: usize, height: u64) -> Chunk {
4221            Chunk {
4222                schedulables: (0..tx_count)
4223                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4224                    .collect(),
4225                settlement: None,
4226                height,
4227            }
4228        }
4229
4230        fn make_commit_ref(index: u32) -> CommitRef {
4231            CommitRef {
4232                index,
4233                digest: CommitDigest::MIN,
4234            }
4235        }
4236
4237        fn default_versions() -> HashMap<TransactionKey, AssignedVersions> {
4238            HashMap::new()
4239        }
4240
4241        #[test]
4242        fn test_flush_all_checkpoint_roots() {
4243            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4244            let versions = default_versions();
4245
4246            queue.push_chunk(
4247                make_chunk(5, 1),
4248                &versions,
4249                1000,
4250                make_commit_ref(1),
4251                Digest::default(),
4252            );
4253            queue.push_chunk(
4254                make_chunk(3, 2),
4255                &versions,
4256                1000,
4257                make_commit_ref(1),
4258                Digest::default(),
4259            );
4260
4261            let pending = queue.flush(1000, true);
4262
4263            assert!(pending.is_some());
4264            assert!(queue.pending_roots.is_empty());
4265        }
4266
4267        #[test]
4268        fn test_flush_respects_min_checkpoint_interval() {
4269            let min_interval = 200;
4270            let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, 1000, min_interval);
4271            let versions = default_versions();
4272
4273            queue.push_chunk(
4274                make_chunk(5, 1),
4275                &versions,
4276                1000,
4277                make_commit_ref(1),
4278                Digest::default(),
4279            );
4280
4281            let pending = queue.flush(1000 + min_interval - 1, false);
4282            assert!(pending.is_none());
4283            assert_eq!(queue.pending_roots.len(), 1);
4284
4285            let pending = queue.flush(1000 + min_interval, false);
4286            assert!(pending.is_some());
4287            assert!(queue.pending_roots.is_empty());
4288        }
4289
4290        #[test]
4291        fn test_push_chunk_flushes_when_exceeds_max() {
4292            let max_tx = 10;
4293            let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, max_tx, 0);
4294            let versions = default_versions();
4295
4296            queue.push_chunk(
4297                make_chunk(max_tx / 2 + 1, 1),
4298                &versions,
4299                1000,
4300                make_commit_ref(1),
4301                Digest::default(),
4302            );
4303
4304            let flushed = queue.push_chunk(
4305                make_chunk(max_tx / 2 + 1, 2),
4306                &versions,
4307                1000,
4308                make_commit_ref(2),
4309                Digest::default(),
4310            );
4311
4312            assert_eq!(flushed.len(), 1);
4313            assert_eq!(queue.pending_roots.len(), 1);
4314        }
4315
4316        #[test]
4317        fn test_multiple_chunks_merged_into_one_checkpoint() {
4318            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 200);
4319            let versions = default_versions();
4320
4321            queue.push_chunk(
4322                make_chunk(10, 1),
4323                &versions,
4324                1000,
4325                make_commit_ref(1),
4326                Digest::default(),
4327            );
4328            queue.push_chunk(
4329                make_chunk(10, 2),
4330                &versions,
4331                1000,
4332                make_commit_ref(2),
4333                Digest::default(),
4334            );
4335            queue.push_chunk(
4336                make_chunk(10, 3),
4337                &versions,
4338                1000,
4339                make_commit_ref(3),
4340                Digest::default(),
4341            );
4342
4343            let pending = queue.flush(1000, true).unwrap();
4344
4345            assert_eq!(pending.roots.len(), 3);
4346        }
4347
4348        #[test]
4349        fn test_push_chunk_handles_overflow() {
4350            let max_tx = 10;
4351            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4352            let versions = default_versions();
4353
4354            let flushed1 = queue.push_chunk(
4355                make_chunk(max_tx / 2, 1),
4356                &versions,
4357                1000,
4358                make_commit_ref(1),
4359                Digest::default(),
4360            );
4361            assert!(flushed1.is_empty());
4362
4363            let flushed2 = queue.push_chunk(
4364                make_chunk(max_tx / 2, 2),
4365                &versions,
4366                1000,
4367                make_commit_ref(2),
4368                Digest::default(),
4369            );
4370            assert!(flushed2.is_empty());
4371
4372            let flushed3 = queue.push_chunk(
4373                make_chunk(max_tx / 2, 3),
4374                &versions,
4375                1000,
4376                make_commit_ref(3),
4377                Digest::default(),
4378            );
4379            assert_eq!(flushed3.len(), 1);
4380
4381            let pending = queue.flush(1000, true);
4382
4383            for p in pending.iter().chain(flushed3.iter()) {
4384                let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4385                assert!(tx_count <= max_tx);
4386            }
4387        }
4388
4389        #[test]
4390        fn test_checkpoint_uses_last_chunk_height() {
4391            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4392            let versions = default_versions();
4393
4394            queue.push_chunk(
4395                make_chunk(10, 100),
4396                &versions,
4397                1000,
4398                make_commit_ref(1),
4399                Digest::default(),
4400            );
4401            queue.push_chunk(
4402                make_chunk(10, 200),
4403                &versions,
4404                1000,
4405                make_commit_ref(2),
4406                Digest::default(),
4407            );
4408
4409            let pending = queue.flush(1000, true).unwrap();
4410
4411            assert_eq!(pending.details.checkpoint_height, 200);
4412        }
4413
4414        #[test]
4415        fn test_last_built_timestamp_updated_on_flush() {
4416            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4417            let versions = default_versions();
4418
4419            queue.push_chunk(
4420                make_chunk(10, 1),
4421                &versions,
4422                5000,
4423                make_commit_ref(1),
4424                Digest::default(),
4425            );
4426
4427            assert_eq!(queue.last_built_timestamp, 0);
4428
4429            let _ = queue.flush(5000, true);
4430
4431            assert_eq!(queue.last_built_timestamp, 5000);
4432        }
4433
4434        #[test]
4435        fn test_settlement_info_sent_through_channel() {
4436            let mut queue = CheckpointQueue::new_for_testing(0, 0, 5, 1000, 0);
4437            let versions = default_versions();
4438
4439            let chunk1 = Chunk {
4440                schedulables: vec![
4441                    Schedulable::ConsensusCommitPrologue(0, 1, 0),
4442                    Schedulable::ConsensusCommitPrologue(0, 2, 0),
4443                    Schedulable::ConsensusCommitPrologue(0, 3, 0),
4444                ],
4445                settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4446                height: 1,
4447            };
4448
4449            let chunk2 = Chunk {
4450                schedulables: vec![
4451                    Schedulable::ConsensusCommitPrologue(0, 4, 0),
4452                    Schedulable::ConsensusCommitPrologue(0, 5, 0),
4453                ],
4454                settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4455                height: 2,
4456            };
4457
4458            queue.push_chunk(
4459                chunk1,
4460                &versions,
4461                1000,
4462                make_commit_ref(1),
4463                Digest::default(),
4464            );
4465            queue.push_chunk(
4466                chunk2,
4467                &versions,
4468                1000,
4469                make_commit_ref(1),
4470                Digest::default(),
4471            );
4472        }
4473
4474        #[test]
4475        fn test_settlement_checkpoint_seq_correct_after_flush() {
4476            let max_tx = 10;
4477            let initial_seq = 5;
4478            let (sender, mut receiver) = monitored_mpsc::unbounded_channel("test_settlement_seq");
4479            let mut queue =
4480                CheckpointQueue::new_for_testing_with_sender(0, 0, initial_seq, max_tx, 0, sender);
4481            let versions = default_versions();
4482
4483            // Push a chunk that partially fills the queue (no flush).
4484            let chunk1 = Chunk {
4485                schedulables: (0..max_tx / 2 + 1)
4486                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4487                    .collect(),
4488                settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4489                height: 1,
4490            };
4491            queue.push_chunk(
4492                chunk1,
4493                &versions,
4494                1000,
4495                make_commit_ref(1),
4496                Digest::default(),
4497            );
4498
4499            // Drain the first message from the channel.
4500            let msg1 = receiver.try_recv().unwrap();
4501            let settlement1 = msg1.1.unwrap();
4502            assert_eq!(settlement1.checkpoint_seq, initial_seq);
4503
4504            // Push a second chunk that triggers a flush of chunk1's roots.
4505            let chunk2 = Chunk {
4506                schedulables: (0..max_tx / 2 + 1)
4507                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4508                    .collect(),
4509                settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4510                height: 2,
4511            };
4512            let flushed = queue.push_chunk(
4513                chunk2,
4514                &versions,
4515                1000,
4516                make_commit_ref(2),
4517                Digest::default(),
4518            );
4519            assert_eq!(flushed.len(), 1);
4520            assert_eq!(flushed[0].details.checkpoint_seq, Some(initial_seq));
4521
4522            // The second settlement must have checkpoint_seq = initial_seq + 1,
4523            // because the flush incremented current_checkpoint_seq.
4524            let msg2 = receiver.try_recv().unwrap();
4525            let settlement2 = msg2.1.unwrap();
4526            assert_eq!(settlement2.checkpoint_seq, initial_seq + 1);
4527
4528            // Flush the remaining roots and verify the PendingCheckpointV2's seq
4529            // matches the settlement's seq.
4530            let pending = queue.flush_forced().unwrap();
4531            assert_eq!(
4532                pending.details.checkpoint_seq,
4533                Some(settlement2.checkpoint_seq)
4534            );
4535        }
4536
4537        #[test]
4538        fn test_checkpoint_seq_increments_on_flush() {
4539            let mut queue = CheckpointQueue::new_for_testing(0, 0, 10, 1000, 0);
4540            let versions = default_versions();
4541
4542            queue.push_chunk(
4543                make_chunk(5, 1),
4544                &versions,
4545                1000,
4546                make_commit_ref(1),
4547                Digest::default(),
4548            );
4549
4550            let pending = queue.flush(1000, true).unwrap();
4551
4552            assert_eq!(pending.details.checkpoint_seq, Some(10));
4553            assert_eq!(queue.current_checkpoint_seq, 11);
4554        }
4555
4556        #[test]
4557        fn test_multiple_chunks_with_overflow() {
4558            let max_tx = 10;
4559            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4560            let versions = default_versions();
4561
4562            let flushed1 = queue.push_chunk(
4563                make_chunk(max_tx / 2 + 1, 1),
4564                &versions,
4565                1000,
4566                make_commit_ref(1),
4567                Digest::default(),
4568            );
4569            let flushed2 = queue.push_chunk(
4570                make_chunk(max_tx / 2 + 1, 2),
4571                &versions,
4572                1000,
4573                make_commit_ref(1),
4574                Digest::default(),
4575            );
4576            let flushed3 = queue.push_chunk(
4577                make_chunk(max_tx / 2 + 1, 3),
4578                &versions,
4579                1000,
4580                make_commit_ref(1),
4581                Digest::default(),
4582            );
4583
4584            let all_flushed: Vec<_> = flushed1
4585                .into_iter()
4586                .chain(flushed2)
4587                .chain(flushed3)
4588                .collect();
4589            assert_eq!(all_flushed.len(), 2);
4590            assert_eq!(queue.pending_roots.len(), 1);
4591
4592            for p in &all_flushed {
4593                let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4594                assert!(tx_count <= max_tx);
4595            }
4596        }
4597    }
4598}