sui_core/
consensus_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet, VecDeque},
6    hash::Hash,
7    num::NonZeroUsize,
8    sync::{Arc, Mutex},
9    time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use arc_swap::ArcSwap;
13use consensus_config::Committee as ConsensusCommittee;
14use consensus_core::{CommitConsumerMonitor, CommitIndex, CommitRef};
15use consensus_types::block::TransactionIndex;
16use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
17use lru::LruCache;
18use mysten_common::{
19    assert_reachable, assert_sometimes, debug_fatal, random_util::randomize_cache_capacity_in_tests,
20};
21use mysten_metrics::{
22    monitored_future,
23    monitored_mpsc::{self, UnboundedReceiver},
24    monitored_scope, spawn_monitored_task,
25};
26use nonempty::NonEmpty;
27use parking_lot::RwLockWriteGuard;
28use serde::{Deserialize, Serialize};
29use sui_config::node::CongestionLogConfig;
30use sui_macros::{fail_point, fail_point_arg, fail_point_if};
31use sui_protocol_config::{PerObjectCongestionControlMode, ProtocolConfig};
32use sui_types::{
33    SUI_RANDOMNESS_STATE_OBJECT_ID,
34    authenticator_state::ActiveJwk,
35    base_types::{
36        AuthorityName, ConciseableName, ConsensusObjectSequenceKey, ObjectID, ObjectRef,
37        SequenceNumber, TransactionDigest,
38    },
39    crypto::RandomnessRound,
40    digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest, Digest},
41    executable_transaction::{
42        TrustedExecutableTransaction, VerifiedExecutableTransaction,
43        VerifiedExecutableTransactionWithAliases,
44    },
45    messages_checkpoint::{
46        CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointTimestamp,
47    },
48    messages_consensus::{
49        AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments,
50        ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
51        ExecutionTimeObservation,
52    },
53    sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
54    transaction::{
55        InputObjectKind, SenderSignedData, TransactionDataAPI, TransactionKey, VerifiedCertificate,
56        VerifiedTransaction, WithAliases,
57    },
58};
59use tokio::task::JoinSet;
60use tracing::{debug, error, info, instrument, trace, warn};
61
62use crate::{
63    authority::{
64        AuthorityMetrics, AuthorityState, ExecutionEnv,
65        authority_per_epoch_store::{
66            AuthorityPerEpochStore, CancelConsensusCertificateReason, ConsensusStats,
67            ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStatsV2,
68            consensus_quarantine::ConsensusCommitOutput,
69        },
70        backpressure::{BackpressureManager, BackpressureSubscriber},
71        congestion_log::CongestionCommitLogger,
72        consensus_tx_status_cache::ConsensusTxStatus,
73        epoch_start_configuration::EpochStartConfigTrait,
74        execution_time_estimator::ExecutionTimeEstimator,
75        shared_object_congestion_tracker::SharedObjectCongestionTracker,
76        shared_object_version_manager::{AssignedTxAndVersions, AssignedVersions, Schedulable},
77        transaction_deferral::{DeferralKey, DeferralReason, transaction_deferral_within_limit},
78    },
79    checkpoints::{
80        CheckpointHeight, CheckpointRoots, CheckpointService, CheckpointServiceNotify,
81        PendingCheckpoint, PendingCheckpointInfo, PendingCheckpointV2,
82    },
83    consensus_adapter::ConsensusAdapter,
84    consensus_throughput_calculator::ConsensusThroughputCalculator,
85    consensus_types::consensus_output_api::ConsensusCommitAPI,
86    epoch::{
87        randomness::{DkgStatus, RandomnessManager},
88        reconfiguration::ReconfigState,
89    },
90    execution_cache::ObjectCacheRead,
91    execution_scheduler::{SettlementBatchInfo, SettlementScheduler},
92    post_consensus_tx_reorder::PostConsensusTxReorder,
93    scoring_decision::update_low_scoring_authorities,
94    traffic_controller::{TrafficController, policies::TrafficTally},
95};
96
97/// Output from filtering consensus transactions.
98/// Contains the filtered transactions and any owned object locks acquired post-consensus.
99struct FilteredConsensusOutput {
100    transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
101    owned_object_locks: HashMap<ObjectRef, TransactionDigest>,
102}
103
104pub struct ConsensusHandlerInitializer {
105    state: Arc<AuthorityState>,
106    checkpoint_service: Arc<CheckpointService>,
107    epoch_store: Arc<AuthorityPerEpochStore>,
108    consensus_adapter: Arc<ConsensusAdapter>,
109    low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
110    throughput_calculator: Arc<ConsensusThroughputCalculator>,
111    backpressure_manager: Arc<BackpressureManager>,
112    congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
113}
114
115impl ConsensusHandlerInitializer {
116    pub fn new(
117        state: Arc<AuthorityState>,
118        checkpoint_service: Arc<CheckpointService>,
119        epoch_store: Arc<AuthorityPerEpochStore>,
120        consensus_adapter: Arc<ConsensusAdapter>,
121        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
122        throughput_calculator: Arc<ConsensusThroughputCalculator>,
123        backpressure_manager: Arc<BackpressureManager>,
124        congestion_log_config: Option<CongestionLogConfig>,
125    ) -> Self {
126        let congestion_logger =
127            congestion_log_config.and_then(|config| match CongestionCommitLogger::new(&config) {
128                Ok(logger) => Some(Arc::new(Mutex::new(logger))),
129                Err(e) => {
130                    debug_fatal!("Failed to create congestion logger: {e}");
131                    None
132                }
133            });
134        Self {
135            state,
136            checkpoint_service,
137            epoch_store,
138            consensus_adapter,
139            low_scoring_authorities,
140            throughput_calculator,
141            backpressure_manager,
142            congestion_logger,
143        }
144    }
145
146    #[cfg(test)]
147    pub(crate) fn new_for_testing(
148        state: Arc<AuthorityState>,
149        checkpoint_service: Arc<CheckpointService>,
150    ) -> Self {
151        use crate::consensus_test_utils::make_consensus_adapter_for_test;
152        use std::collections::HashSet;
153
154        let backpressure_manager = BackpressureManager::new_for_tests();
155        let consensus_adapter =
156            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
157        Self {
158            state: state.clone(),
159            checkpoint_service,
160            epoch_store: state.epoch_store_for_testing().clone(),
161            consensus_adapter,
162            low_scoring_authorities: Arc::new(Default::default()),
163            throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
164                None,
165                state.metrics.clone(),
166            )),
167            backpressure_manager,
168            congestion_logger: None,
169        }
170    }
171
172    pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
173        let new_epoch_start_state = self.epoch_store.epoch_start_state();
174        let consensus_committee = new_epoch_start_state.get_consensus_committee();
175
176        let settlement_scheduler = SettlementScheduler::new(
177            self.state.execution_scheduler().as_ref().clone(),
178            self.state.get_transaction_cache_reader().clone(),
179        );
180        ConsensusHandler::new(
181            self.epoch_store.clone(),
182            self.checkpoint_service.clone(),
183            settlement_scheduler,
184            self.consensus_adapter.clone(),
185            self.state.get_object_cache_reader().clone(),
186            self.low_scoring_authorities.clone(),
187            consensus_committee,
188            self.state.metrics.clone(),
189            self.throughput_calculator.clone(),
190            self.backpressure_manager.subscribe(),
191            self.state.traffic_controller.clone(),
192            self.congestion_logger.clone(),
193        )
194    }
195}
196
197mod additional_consensus_state {
198    use std::marker::PhantomData;
199
200    use consensus_core::CommitRef;
201    use fastcrypto::hash::HashFunction as _;
202    use sui_types::{crypto::DefaultHash, digests::Digest};
203
204    use super::*;
205    /// AdditionalConsensusState tracks any in-memory state that is retained by ConsensusHandler
206    /// between consensus commits. Because of crash recovery, using such data is inherently risky.
207    /// In order to do this safely, we must store data from a fixed number of previous commits.
208    /// Then, at start-up, that same fixed number of already processed commits is replayed to
209    /// reconstruct the state.
210    ///
211    /// To make sure that bugs in this process appear immediately, we record the digest of this
212    /// state in ConsensusCommitPrologue, so that any deviation causes an immediate fork.
213    #[derive(Serialize, Deserialize)]
214    pub(super) struct AdditionalConsensusState {
215        commit_interval_observer: CommitIntervalObserver,
216    }
217
218    impl AdditionalConsensusState {
219        pub fn new(additional_consensus_state_window_size: u32) -> Self {
220            Self {
221                commit_interval_observer: CommitIntervalObserver::new(
222                    additional_consensus_state_window_size,
223                ),
224            }
225        }
226
227        /// Update all internal state based on the new commit
228        pub(crate) fn observe_commit(
229            &mut self,
230            protocol_config: &ProtocolConfig,
231            epoch_start_time: u64,
232            consensus_commit: &impl ConsensusCommitAPI,
233        ) -> ConsensusCommitInfo {
234            self.commit_interval_observer
235                .observe_commit_time(consensus_commit);
236
237            let estimated_commit_period = self
238                .commit_interval_observer
239                .commit_interval_estimate()
240                .unwrap_or(Duration::from_millis(
241                    protocol_config.min_checkpoint_interval_ms(),
242                ));
243
244            info!("estimated commit rate: {:?}", estimated_commit_period);
245
246            self.commit_info_impl(
247                epoch_start_time,
248                consensus_commit,
249                Some(estimated_commit_period),
250            )
251        }
252
253        fn commit_info_impl(
254            &self,
255            epoch_start_time: u64,
256            consensus_commit: &impl ConsensusCommitAPI,
257            estimated_commit_period: Option<Duration>,
258        ) -> ConsensusCommitInfo {
259            let leader_author = consensus_commit.leader_author_index();
260            let timestamp = consensus_commit.commit_timestamp_ms();
261
262            let timestamp = if timestamp < epoch_start_time {
263                error!(
264                    "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}"
265                );
266                epoch_start_time
267            } else {
268                timestamp
269            };
270
271            ConsensusCommitInfo {
272                _phantom: PhantomData,
273                round: consensus_commit.leader_round(),
274                timestamp,
275                leader_author,
276                consensus_commit_ref: consensus_commit.commit_ref(),
277                rejected_transactions_digest: consensus_commit.rejected_transactions_digest(),
278                additional_state_digest: Some(self.digest()),
279                estimated_commit_period,
280                skip_consensus_commit_prologue_in_test: false,
281            }
282        }
283
284        /// Get the digest of the current state.
285        fn digest(&self) -> AdditionalConsensusStateDigest {
286            let mut hash = DefaultHash::new();
287            bcs::serialize_into(&mut hash, self).unwrap();
288            AdditionalConsensusStateDigest::new(hash.finalize().into())
289        }
290    }
291
292    pub struct ConsensusCommitInfo {
293        // prevent public construction
294        _phantom: PhantomData<()>,
295
296        pub round: u64,
297        pub timestamp: u64,
298        pub leader_author: AuthorityIndex,
299        pub consensus_commit_ref: CommitRef,
300        pub rejected_transactions_digest: Digest,
301
302        additional_state_digest: Option<AdditionalConsensusStateDigest>,
303        estimated_commit_period: Option<Duration>,
304
305        pub skip_consensus_commit_prologue_in_test: bool,
306    }
307
308    impl ConsensusCommitInfo {
309        pub fn new_for_test(
310            commit_round: u64,
311            commit_timestamp: u64,
312            estimated_commit_period: Option<Duration>,
313            skip_consensus_commit_prologue_in_test: bool,
314        ) -> Self {
315            Self {
316                _phantom: PhantomData,
317                round: commit_round,
318                timestamp: commit_timestamp,
319                leader_author: 0,
320                consensus_commit_ref: CommitRef::default(),
321                rejected_transactions_digest: Digest::default(),
322                additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO),
323                estimated_commit_period,
324                skip_consensus_commit_prologue_in_test,
325            }
326        }
327
328        pub fn new_for_congestion_test(
329            commit_round: u64,
330            commit_timestamp: u64,
331            estimated_commit_period: Duration,
332        ) -> Self {
333            Self::new_for_test(
334                commit_round,
335                commit_timestamp,
336                Some(estimated_commit_period),
337                true,
338            )
339        }
340
341        pub fn additional_state_digest(&self) -> AdditionalConsensusStateDigest {
342            // this method cannot be called if stateless_commit_info is used
343            self.additional_state_digest
344                .expect("additional_state_digest is not available")
345        }
346
347        pub fn estimated_commit_period(&self) -> Duration {
348            // this method cannot be called if stateless_commit_info is used
349            self.estimated_commit_period
350                .expect("estimated commit period is not available")
351        }
352
353        fn consensus_commit_digest(&self) -> ConsensusCommitDigest {
354            ConsensusCommitDigest::new(self.consensus_commit_ref.digest.into_inner())
355        }
356
357        fn consensus_commit_prologue_transaction(
358            &self,
359            epoch: u64,
360        ) -> VerifiedExecutableTransaction {
361            let transaction = VerifiedTransaction::new_consensus_commit_prologue(
362                epoch,
363                self.round,
364                self.timestamp,
365            );
366            VerifiedExecutableTransaction::new_system(transaction, epoch)
367        }
368
369        fn consensus_commit_prologue_v2_transaction(
370            &self,
371            epoch: u64,
372        ) -> VerifiedExecutableTransaction {
373            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
374                epoch,
375                self.round,
376                self.timestamp,
377                self.consensus_commit_digest(),
378            );
379            VerifiedExecutableTransaction::new_system(transaction, epoch)
380        }
381
382        fn consensus_commit_prologue_v3_transaction(
383            &self,
384            epoch: u64,
385            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
386        ) -> VerifiedExecutableTransaction {
387            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v3(
388                epoch,
389                self.round,
390                self.timestamp,
391                self.consensus_commit_digest(),
392                consensus_determined_version_assignments,
393            );
394            VerifiedExecutableTransaction::new_system(transaction, epoch)
395        }
396
397        fn consensus_commit_prologue_v4_transaction(
398            &self,
399            epoch: u64,
400            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
401            additional_state_digest: AdditionalConsensusStateDigest,
402        ) -> VerifiedExecutableTransaction {
403            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v4(
404                epoch,
405                self.round,
406                self.timestamp,
407                self.consensus_commit_digest(),
408                consensus_determined_version_assignments,
409                additional_state_digest,
410            );
411            VerifiedExecutableTransaction::new_system(transaction, epoch)
412        }
413
414        pub fn create_consensus_commit_prologue_transaction(
415            &self,
416            epoch: u64,
417            protocol_config: &ProtocolConfig,
418            cancelled_txn_version_assignment: Vec<(
419                TransactionDigest,
420                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
421            )>,
422            commit_info: &ConsensusCommitInfo,
423            indirect_state_observer: IndirectStateObserver,
424        ) -> VerifiedExecutableTransaction {
425            let version_assignments = if protocol_config
426                .record_consensus_determined_version_assignments_in_prologue_v2()
427            {
428                Some(
429                    ConsensusDeterminedVersionAssignments::CancelledTransactionsV2(
430                        cancelled_txn_version_assignment,
431                    ),
432                )
433            } else if protocol_config.record_consensus_determined_version_assignments_in_prologue()
434            {
435                Some(
436                    ConsensusDeterminedVersionAssignments::CancelledTransactions(
437                        cancelled_txn_version_assignment
438                            .into_iter()
439                            .map(|(tx_digest, versions)| {
440                                (
441                                    tx_digest,
442                                    versions.into_iter().map(|(id, v)| (id.0, v)).collect(),
443                                )
444                            })
445                            .collect(),
446                    ),
447                )
448            } else {
449                None
450            };
451
452            if protocol_config.record_additional_state_digest_in_prologue() {
453                let additional_state_digest =
454                    if protocol_config.additional_consensus_digest_indirect_state() {
455                        let d1 = commit_info.additional_state_digest();
456                        indirect_state_observer.fold_with(d1)
457                    } else {
458                        commit_info.additional_state_digest()
459                    };
460
461                self.consensus_commit_prologue_v4_transaction(
462                    epoch,
463                    version_assignments.unwrap(),
464                    additional_state_digest,
465                )
466            } else if let Some(version_assignments) = version_assignments {
467                self.consensus_commit_prologue_v3_transaction(epoch, version_assignments)
468            } else if protocol_config.include_consensus_digest_in_prologue() {
469                self.consensus_commit_prologue_v2_transaction(epoch)
470            } else {
471                self.consensus_commit_prologue_transaction(epoch)
472            }
473        }
474    }
475
476    #[derive(Default)]
477    pub struct IndirectStateObserver {
478        hash: DefaultHash,
479    }
480
481    impl IndirectStateObserver {
482        pub fn new() -> Self {
483            Self::default()
484        }
485
486        pub fn observe_indirect_state<T: Serialize>(&mut self, state: &T) {
487            bcs::serialize_into(&mut self.hash, state).unwrap();
488        }
489
490        pub fn fold_with(
491            self,
492            d1: AdditionalConsensusStateDigest,
493        ) -> AdditionalConsensusStateDigest {
494            let hash = self.hash.finalize();
495            let d2 = AdditionalConsensusStateDigest::new(hash.into());
496
497            let mut hasher = DefaultHash::new();
498            bcs::serialize_into(&mut hasher, &d1).unwrap();
499            bcs::serialize_into(&mut hasher, &d2).unwrap();
500            AdditionalConsensusStateDigest::new(hasher.finalize().into())
501        }
502    }
503
504    #[test]
505    fn test_additional_consensus_state() {
506        use crate::consensus_test_utils::TestConsensusCommit;
507
508        fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) {
509            let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
510            state.observe_commit(
511                &protocol_config,
512                100,
513                &TestConsensusCommit::empty(round, timestamp, 0),
514            );
515        }
516
517        let mut s1 = AdditionalConsensusState::new(3);
518        observe(&mut s1, 1, 1000);
519        observe(&mut s1, 2, 2000);
520        observe(&mut s1, 3, 3000);
521        observe(&mut s1, 4, 4000);
522
523        let mut s2 = AdditionalConsensusState::new(3);
524        // Because state uses a ring buffer, we should get the same digest
525        // even though we only added the 3 latest observations.
526        observe(&mut s2, 2, 2000);
527        observe(&mut s2, 3, 3000);
528        observe(&mut s2, 4, 4000);
529
530        assert_eq!(s1.digest(), s2.digest());
531
532        observe(&mut s1, 5, 5000);
533        observe(&mut s2, 5, 5000);
534
535        assert_eq!(s1.digest(), s2.digest());
536    }
537}
538use additional_consensus_state::AdditionalConsensusState;
539pub(crate) use additional_consensus_state::{ConsensusCommitInfo, IndirectStateObserver};
540
541struct QueuedCheckpointRoots {
542    roots: CheckpointRoots,
543    timestamp: CheckpointTimestamp,
544    consensus_commit_ref: CommitRef,
545    rejected_transactions_digest: Digest,
546}
547
548struct Chunk<
549    T: crate::authority::shared_object_version_manager::AsTx = VerifiedExecutableTransaction,
550> {
551    schedulables: Vec<Schedulable<T>>,
552    settlement: Option<Schedulable<T>>,
553    height: CheckpointHeight,
554}
555
556impl<T: crate::authority::shared_object_version_manager::AsTx + Clone> Chunk<T> {
557    fn all_schedulables(&self) -> impl Iterator<Item = &Schedulable<T>> + Clone {
558        self.schedulables.iter().chain(self.settlement.iter())
559    }
560
561    fn all_schedulables_from(chunks: &[Self]) -> impl Iterator<Item = &Schedulable<T>> + Clone {
562        chunks.iter().flat_map(|c| c.all_schedulables())
563    }
564
565    fn to_checkpoint_roots(&self) -> CheckpointRoots {
566        let tx_roots: Vec<_> = self.schedulables.iter().map(|s| s.key()).collect();
567        let settlement_root = self.settlement.as_ref().map(|s| s.key());
568        CheckpointRoots {
569            tx_roots,
570            settlement_root,
571            height: self.height,
572        }
573    }
574}
575
576impl From<Chunk<VerifiedExecutableTransactionWithAliases>> for Chunk {
577    fn from(chunk: Chunk<VerifiedExecutableTransactionWithAliases>) -> Self {
578        Chunk {
579            schedulables: chunk.schedulables.into_iter().map(|s| s.into()).collect(),
580            settlement: chunk.settlement.map(|s| s.into()),
581            height: chunk.height,
582        }
583    }
584}
585
586// Accumulates checkpoint roots from consensus and flushes them into PendingCheckpointV2s.
587// Roots are buffered in `pending_roots` until a flush is triggered (by max_tx overflow or
588// time interval). A flush always drains all buffered roots into a single PendingCheckpointV2,
589// so the queue only ever holds roots for one pending checkpoint at a time.
590//
591// Owns an ExecutionSchedulerSender so push_chunk can send schedulables and settlement info
592// for execution in one shot.
593pub(crate) struct CheckpointQueue {
594    last_built_timestamp: CheckpointTimestamp,
595    pending_roots: VecDeque<QueuedCheckpointRoots>,
596    height: u64,
597    pending_tx_count: usize,
598    current_checkpoint_seq: CheckpointSequenceNumber,
599    max_tx: usize,
600    min_checkpoint_interval_ms: u64,
601    execution_scheduler_sender: ExecutionSchedulerSender,
602}
603
604impl CheckpointQueue {
605    pub(crate) fn new(
606        last_built_timestamp: CheckpointTimestamp,
607        checkpoint_height: u64,
608        next_checkpoint_seq: CheckpointSequenceNumber,
609        max_tx: usize,
610        min_checkpoint_interval_ms: u64,
611        execution_scheduler_sender: ExecutionSchedulerSender,
612    ) -> Self {
613        Self {
614            last_built_timestamp,
615            pending_roots: VecDeque::new(),
616            height: checkpoint_height,
617            pending_tx_count: 0,
618            current_checkpoint_seq: next_checkpoint_seq,
619            max_tx,
620            min_checkpoint_interval_ms,
621            execution_scheduler_sender,
622        }
623    }
624
625    #[cfg(test)]
626    fn new_for_testing(
627        last_built_timestamp: CheckpointTimestamp,
628        checkpoint_height: u64,
629        next_checkpoint_seq: CheckpointSequenceNumber,
630        max_tx: usize,
631        min_checkpoint_interval_ms: u64,
632    ) -> Self {
633        let (sender, _receiver) = monitored_mpsc::unbounded_channel("test_checkpoint_queue_sender");
634        Self {
635            last_built_timestamp,
636            pending_roots: VecDeque::new(),
637            height: checkpoint_height,
638            pending_tx_count: 0,
639            current_checkpoint_seq: next_checkpoint_seq,
640            max_tx,
641            min_checkpoint_interval_ms,
642            execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
643        }
644    }
645
646    #[cfg(test)]
647    fn new_for_testing_with_sender(
648        last_built_timestamp: CheckpointTimestamp,
649        checkpoint_height: u64,
650        next_checkpoint_seq: CheckpointSequenceNumber,
651        max_tx: usize,
652        min_checkpoint_interval_ms: u64,
653        sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
654    ) -> Self {
655        Self {
656            last_built_timestamp,
657            pending_roots: VecDeque::new(),
658            height: checkpoint_height,
659            pending_tx_count: 0,
660            current_checkpoint_seq: next_checkpoint_seq,
661            max_tx,
662            min_checkpoint_interval_ms,
663            execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
664        }
665    }
666
667    pub(crate) fn last_built_timestamp(&self) -> CheckpointTimestamp {
668        self.last_built_timestamp
669    }
670
671    pub(crate) fn is_empty(&self) -> bool {
672        self.pending_roots.is_empty()
673    }
674
675    fn next_height(&mut self) -> u64 {
676        self.height += 1;
677        self.height
678    }
679
680    fn push_chunk(
681        &mut self,
682        chunk: Chunk,
683        assigned_versions: &HashMap<TransactionKey, AssignedVersions>,
684        timestamp: CheckpointTimestamp,
685        consensus_commit_ref: CommitRef,
686        rejected_transactions_digest: Digest,
687    ) -> Vec<PendingCheckpointV2> {
688        let max_tx = self.max_tx;
689        let user_tx_count = chunk.schedulables.len();
690
691        let roots = chunk.to_checkpoint_roots();
692
693        let schedulables: Vec<_> = chunk
694            .schedulables
695            .into_iter()
696            .map(|s| {
697                let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
698                (s, versions)
699            })
700            .collect();
701
702        let mut flushed_checkpoints = Vec::new();
703
704        if self.pending_tx_count > 0
705            && self.pending_tx_count + user_tx_count > max_tx
706            && let Some(checkpoint) = self.flush_forced()
707        {
708            flushed_checkpoints.push(checkpoint);
709        }
710
711        let settlement_info = chunk.settlement.as_ref().map(|s| {
712            let settlement_key = s.key();
713            let tx_keys: Vec<_> = schedulables.iter().map(|(s, _)| s.key()).collect();
714            SettlementBatchInfo {
715                settlement_key,
716                tx_keys,
717                checkpoint_height: chunk.height,
718                checkpoint_seq: self.current_checkpoint_seq,
719                assigned_versions: assigned_versions
720                    .get(&settlement_key)
721                    .cloned()
722                    .unwrap_or_default(),
723            }
724        });
725
726        self.execution_scheduler_sender
727            .send(schedulables, settlement_info);
728
729        self.pending_tx_count += user_tx_count;
730        self.pending_roots.push_back(QueuedCheckpointRoots {
731            roots,
732            timestamp,
733            consensus_commit_ref,
734            rejected_transactions_digest,
735        });
736
737        flushed_checkpoints
738    }
739
740    pub(crate) fn flush(
741        &mut self,
742        current_timestamp: CheckpointTimestamp,
743        force: bool,
744    ) -> Option<PendingCheckpointV2> {
745        if !force && current_timestamp < self.last_built_timestamp + self.min_checkpoint_interval_ms
746        {
747            return None;
748        }
749        self.flush_forced()
750    }
751
752    fn flush_forced(&mut self) -> Option<PendingCheckpointV2> {
753        if self.pending_roots.is_empty() {
754            return None;
755        }
756
757        let to_flush: Vec<_> = self.pending_roots.drain(..).collect();
758        let last_root = to_flush.last().unwrap();
759
760        let checkpoint = PendingCheckpointV2 {
761            roots: to_flush.iter().map(|q| q.roots.clone()).collect(),
762            details: PendingCheckpointInfo {
763                timestamp_ms: last_root.timestamp,
764                last_of_epoch: false,
765                checkpoint_height: last_root.roots.height,
766                consensus_commit_ref: last_root.consensus_commit_ref,
767                rejected_transactions_digest: last_root.rejected_transactions_digest,
768                checkpoint_seq: Some(self.current_checkpoint_seq),
769            },
770        };
771
772        self.last_built_timestamp = last_root.timestamp;
773        self.pending_tx_count = 0;
774        self.current_checkpoint_seq += 1;
775
776        Some(checkpoint)
777    }
778
779    pub(crate) fn checkpoint_seq(&self) -> CheckpointSequenceNumber {
780        self.current_checkpoint_seq
781            .checked_sub(1)
782            .expect("checkpoint_seq called before any checkpoint was assigned")
783    }
784}
785
786pub struct ConsensusHandler<C> {
787    /// A store created for each epoch. ConsensusHandler is recreated each epoch, with the
788    /// corresponding store. This store is also used to get the current epoch ID.
789    epoch_store: Arc<AuthorityPerEpochStore>,
790    /// Holds the indices, hash and stats after the last consensus commit
791    /// It is used for avoiding replaying already processed transactions,
792    /// checking chain consistency, and accumulating per-epoch consensus output stats.
793    last_consensus_stats: ExecutionIndicesWithStatsV2,
794    checkpoint_service: Arc<C>,
795    /// cache reader is needed when determining the next version to assign for shared objects.
796    cache_reader: Arc<dyn ObjectCacheRead>,
797    /// Reputation scores used by consensus adapter that we update, forwarded from consensus
798    low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
799    /// The consensus committee used to do stake computations for deciding set of low scoring authorities
800    committee: ConsensusCommittee,
801    // TODO: ConsensusHandler doesn't really share metrics with AuthorityState. We could define
802    // a new metrics type here if we want to.
803    metrics: Arc<AuthorityMetrics>,
804    /// Lru cache to quickly discard transactions processed by consensus
805    processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
806    /// Enqueues transactions to the execution scheduler via a separate task.
807    execution_scheduler_sender: ExecutionSchedulerSender,
808    /// Consensus adapter for submitting transactions to consensus
809    consensus_adapter: Arc<ConsensusAdapter>,
810
811    /// Using the throughput calculator to record the current consensus throughput
812    throughput_calculator: Arc<ConsensusThroughputCalculator>,
813
814    additional_consensus_state: AdditionalConsensusState,
815
816    backpressure_subscriber: BackpressureSubscriber,
817
818    traffic_controller: Option<Arc<TrafficController>>,
819
820    congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
821
822    checkpoint_queue: Mutex<CheckpointQueue>,
823}
824
825const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
826
827impl<C> ConsensusHandler<C> {
828    pub(crate) fn new(
829        epoch_store: Arc<AuthorityPerEpochStore>,
830        checkpoint_service: Arc<C>,
831        settlement_scheduler: SettlementScheduler,
832        consensus_adapter: Arc<ConsensusAdapter>,
833        cache_reader: Arc<dyn ObjectCacheRead>,
834        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
835        committee: ConsensusCommittee,
836        metrics: Arc<AuthorityMetrics>,
837        throughput_calculator: Arc<ConsensusThroughputCalculator>,
838        backpressure_subscriber: BackpressureSubscriber,
839        traffic_controller: Option<Arc<TrafficController>>,
840        congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
841    ) -> Self {
842        assert!(
843            matches!(
844                epoch_store
845                    .protocol_config()
846                    .per_object_congestion_control_mode(),
847                PerObjectCongestionControlMode::ExecutionTimeEstimate(_)
848            ),
849            "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
850        );
851
852        // Recover last_consensus_stats so it is consistent across validators.
853        let mut last_consensus_stats = epoch_store
854            .get_last_consensus_stats()
855            .expect("Should be able to read last consensus index");
856        // stats is empty at the beginning of epoch.
857        if !last_consensus_stats.stats.is_initialized() {
858            last_consensus_stats.stats = ConsensusStats::new(committee.size());
859            if epoch_store
860                .protocol_config()
861                .split_checkpoints_in_consensus_handler()
862            {
863                last_consensus_stats.checkpoint_seq = epoch_store.previous_epoch_last_checkpoint();
864            }
865        }
866        let max_tx = epoch_store
867            .protocol_config()
868            .max_transactions_per_checkpoint() as usize;
869        let min_checkpoint_interval_ms = epoch_store
870            .protocol_config()
871            .min_checkpoint_interval_ms_as_option()
872            .unwrap_or_default();
873        let execution_scheduler_sender =
874            ExecutionSchedulerSender::start(settlement_scheduler, epoch_store.clone());
875        let commit_rate_estimate_window_size = epoch_store
876            .protocol_config()
877            .get_consensus_commit_rate_estimation_window_size();
878        let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
879        let checkpoint_height = last_consensus_stats.height;
880        let next_checkpoint_seq = if epoch_store
881            .protocol_config()
882            .split_checkpoints_in_consensus_handler()
883        {
884            last_consensus_stats.checkpoint_seq + 1
885        } else {
886            0
887        };
888        Self {
889            epoch_store,
890            last_consensus_stats,
891            checkpoint_service,
892            cache_reader,
893            low_scoring_authorities,
894            committee,
895            metrics,
896            processed_cache: LruCache::new(
897                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
898            ),
899            execution_scheduler_sender: execution_scheduler_sender.clone(),
900            consensus_adapter,
901            throughput_calculator,
902            additional_consensus_state: AdditionalConsensusState::new(
903                commit_rate_estimate_window_size,
904            ),
905            backpressure_subscriber,
906            traffic_controller,
907            congestion_logger,
908            checkpoint_queue: Mutex::new(CheckpointQueue::new(
909                last_built_timestamp,
910                checkpoint_height,
911                next_checkpoint_seq,
912                max_tx,
913                min_checkpoint_interval_ms,
914                execution_scheduler_sender,
915            )),
916        }
917    }
918
919    /// Returns the last subdag index processed by the handler.
920    pub(crate) fn last_processed_subdag_index(&self) -> u64 {
921        self.last_consensus_stats.index.sub_dag_index
922    }
923
924    pub(crate) fn new_for_testing(
925        epoch_store: Arc<AuthorityPerEpochStore>,
926        checkpoint_service: Arc<C>,
927        execution_scheduler_sender: ExecutionSchedulerSender,
928        consensus_adapter: Arc<ConsensusAdapter>,
929        cache_reader: Arc<dyn ObjectCacheRead>,
930        low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
931        committee: ConsensusCommittee,
932        metrics: Arc<AuthorityMetrics>,
933        throughput_calculator: Arc<ConsensusThroughputCalculator>,
934        backpressure_subscriber: BackpressureSubscriber,
935        traffic_controller: Option<Arc<TrafficController>>,
936        last_consensus_stats: ExecutionIndicesWithStatsV2,
937    ) -> Self {
938        let commit_rate_estimate_window_size = epoch_store
939            .protocol_config()
940            .get_consensus_commit_rate_estimation_window_size();
941        let max_tx = epoch_store
942            .protocol_config()
943            .max_transactions_per_checkpoint() as usize;
944        let min_checkpoint_interval_ms = epoch_store
945            .protocol_config()
946            .min_checkpoint_interval_ms_as_option()
947            .unwrap_or_default();
948        let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
949        let checkpoint_height = last_consensus_stats.height;
950        Self {
951            epoch_store,
952            last_consensus_stats,
953            checkpoint_service,
954            cache_reader,
955            low_scoring_authorities,
956            committee,
957            metrics,
958            processed_cache: LruCache::new(
959                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
960            ),
961            execution_scheduler_sender: execution_scheduler_sender.clone(),
962            consensus_adapter,
963            throughput_calculator,
964            additional_consensus_state: AdditionalConsensusState::new(
965                commit_rate_estimate_window_size,
966            ),
967            backpressure_subscriber,
968            traffic_controller,
969            congestion_logger: None,
970            checkpoint_queue: Mutex::new(CheckpointQueue::new(
971                last_built_timestamp,
972                checkpoint_height,
973                0,
974                max_tx,
975                min_checkpoint_interval_ms,
976                execution_scheduler_sender,
977            )),
978        }
979    }
980}
981
982#[derive(Default)]
983struct CommitHandlerInput {
984    user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
985    capability_notifications: Vec<AuthorityCapabilitiesV2>,
986    execution_time_observations: Vec<ExecutionTimeObservation>,
987    checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
988    randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
989    randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
990    end_of_publish_transactions: Vec<AuthorityName>,
991    new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
992}
993
994struct CommitHandlerState {
995    dkg_failed: bool,
996    randomness_round: Option<RandomnessRound>,
997    output: ConsensusCommitOutput,
998    indirect_state_observer: Option<IndirectStateObserver>,
999    initial_reconfig_state: ReconfigState,
1000    // Occurrence counts for user transactions, used for unpaid amplification detection.
1001    occurrence_counts: HashMap<TransactionDigest, u32>,
1002}
1003
1004impl CommitHandlerState {
1005    fn get_notifications(&self) -> Vec<SequencedConsensusTransactionKey> {
1006        self.output
1007            .get_consensus_messages_processed()
1008            .cloned()
1009            .collect()
1010    }
1011
1012    fn init_randomness<'a, 'epoch>(
1013        &'a mut self,
1014        epoch_store: &'epoch AuthorityPerEpochStore,
1015        commit_info: &'a ConsensusCommitInfo,
1016    ) -> Option<tokio::sync::MutexGuard<'epoch, RandomnessManager>> {
1017        let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| {
1018            rm.try_lock()
1019                .expect("should only ever be called from the commit handler thread")
1020        });
1021
1022        let mut dkg_failed = false;
1023        let randomness_round = if epoch_store.randomness_state_enabled() {
1024            let randomness_manager = randomness_manager
1025                .as_mut()
1026                .expect("randomness manager should exist if randomness is enabled");
1027            match randomness_manager.dkg_status() {
1028                DkgStatus::Pending => None,
1029                DkgStatus::Failed => {
1030                    dkg_failed = true;
1031                    None
1032                }
1033                DkgStatus::Successful => {
1034                    // Generate randomness for this commit if DKG is successful and we are still
1035                    // accepting certs.
1036                    if self.initial_reconfig_state.should_accept_tx() {
1037                        randomness_manager
1038                            // TODO: make infallible
1039                            .reserve_next_randomness(commit_info.timestamp, &mut self.output)
1040                            .expect("epoch ended")
1041                    } else {
1042                        None
1043                    }
1044                }
1045            }
1046        } else {
1047            None
1048        };
1049
1050        if randomness_round.is_some() {
1051            assert!(!dkg_failed); // invariant check
1052        }
1053
1054        self.randomness_round = randomness_round;
1055        self.dkg_failed = dkg_failed;
1056
1057        randomness_manager
1058    }
1059}
1060
1061impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
1062    /// Called during startup to allow us to observe commits we previously processed, for crash recovery.
1063    /// Any state computed here must be a pure function of the commits observed, it cannot depend on any
1064    /// state recorded in the epoch db.
1065    fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
1066        assert!(
1067            self.epoch_store
1068                .protocol_config()
1069                .record_additional_state_digest_in_prologue()
1070        );
1071        let protocol_config = self.epoch_store.protocol_config();
1072        let epoch_start_time = self
1073            .epoch_store
1074            .epoch_start_config()
1075            .epoch_start_timestamp_ms();
1076
1077        self.additional_consensus_state.observe_commit(
1078            protocol_config,
1079            epoch_start_time,
1080            &consensus_commit,
1081        );
1082    }
1083
1084    #[cfg(test)]
1085    pub(crate) async fn handle_consensus_commit_for_test(
1086        &mut self,
1087        consensus_commit: impl ConsensusCommitAPI,
1088    ) {
1089        self.handle_consensus_commit(consensus_commit).await;
1090    }
1091
1092    #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))]
1093    pub(crate) async fn handle_consensus_commit(
1094        &mut self,
1095        consensus_commit: impl ConsensusCommitAPI,
1096    ) {
1097        {
1098            let protocol_config = self.epoch_store.protocol_config();
1099
1100            // Assert all protocol config settings for which we don't support old behavior.
1101            assert!(protocol_config.ignore_execution_time_observations_after_certs_closed());
1102            assert!(protocol_config.record_time_estimate_processed());
1103            assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints());
1104            assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest());
1105            assert!(protocol_config.authority_capabilities_v2());
1106            assert!(protocol_config.cancel_for_failed_dkg_early());
1107        }
1108
1109        // This may block until one of two conditions happens:
1110        // - Number of uncommitted transactions in the writeback cache goes below the
1111        //   backpressure threshold.
1112        // - The highest executed checkpoint catches up to the highest certified checkpoint.
1113        self.backpressure_subscriber.await_no_backpressure().await;
1114
1115        let epoch = self.epoch_store.epoch();
1116
1117        let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit");
1118
1119        let last_committed_round = self.last_consensus_stats.index.last_committed_round;
1120
1121        if let Some(consensus_tx_status_cache) = self.epoch_store.consensus_tx_status_cache.as_ref()
1122        {
1123            consensus_tx_status_cache
1124                .update_last_committed_leader_round(last_committed_round as u32)
1125                .await;
1126        }
1127        if let Some(tx_reject_reason_cache) = self.epoch_store.tx_reject_reason_cache.as_ref() {
1128            tx_reject_reason_cache.set_last_committed_leader_round(last_committed_round as u32);
1129        }
1130
1131        let commit_info = self.additional_consensus_state.observe_commit(
1132            self.epoch_store.protocol_config(),
1133            self.epoch_store
1134                .epoch_start_config()
1135                .epoch_start_timestamp_ms(),
1136            &consensus_commit,
1137        );
1138        assert!(commit_info.round > last_committed_round);
1139
1140        let (timestamp, leader_author, commit_sub_dag_index) =
1141            self.gather_commit_metadata(&consensus_commit);
1142
1143        info!(
1144            %consensus_commit,
1145            "Received consensus output {}. Rejected transactions {}",
1146            consensus_commit.commit_ref(),
1147            consensus_commit.rejected_transactions_debug_string(),
1148        );
1149
1150        self.last_consensus_stats.index = ExecutionIndices {
1151            last_committed_round: commit_info.round,
1152            sub_dag_index: commit_sub_dag_index,
1153            transaction_index: 0_u64,
1154        };
1155
1156        update_low_scoring_authorities(
1157            self.low_scoring_authorities.clone(),
1158            self.epoch_store.committee(),
1159            &self.committee,
1160            consensus_commit.reputation_score_sorted_desc(),
1161            &self.metrics,
1162            self.epoch_store
1163                .protocol_config()
1164                .consensus_bad_nodes_stake_threshold(),
1165        );
1166
1167        self.metrics
1168            .consensus_committed_subdags
1169            .with_label_values(&[&leader_author.to_string()])
1170            .inc();
1171
1172        let mut state = CommitHandlerState {
1173            output: ConsensusCommitOutput::new(commit_info.round),
1174            dkg_failed: false,
1175            randomness_round: None,
1176            indirect_state_observer: Some(IndirectStateObserver::new()),
1177            initial_reconfig_state: self
1178                .epoch_store
1179                .get_reconfig_state_read_lock_guard()
1180                .clone(),
1181            occurrence_counts: HashMap::new(),
1182        };
1183
1184        let FilteredConsensusOutput {
1185            transactions,
1186            owned_object_locks,
1187        } = self.filter_consensus_txns(
1188            state.initial_reconfig_state.clone(),
1189            &commit_info,
1190            &consensus_commit,
1191        );
1192        // Buffer owned object locks for batch write when preconsensus locking is disabled
1193        if !owned_object_locks.is_empty() {
1194            state.output.set_owned_object_locks(owned_object_locks);
1195        }
1196        let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions);
1197
1198        let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info);
1199
1200        let CommitHandlerInput {
1201            user_transactions,
1202            capability_notifications,
1203            execution_time_observations,
1204            checkpoint_signature_messages,
1205            randomness_dkg_messages,
1206            randomness_dkg_confirmations,
1207            end_of_publish_transactions,
1208            new_jwks,
1209        } = self.build_commit_handler_input(transactions);
1210
1211        self.process_jwks(&mut state, &commit_info, new_jwks);
1212        self.process_capability_notifications(capability_notifications);
1213        self.process_execution_time_observations(&mut state, execution_time_observations);
1214        self.process_checkpoint_signature_messages(checkpoint_signature_messages);
1215
1216        self.process_dkg_updates(
1217            &mut state,
1218            &commit_info,
1219            randomness_manager.as_deref_mut(),
1220            randomness_dkg_messages,
1221            randomness_dkg_confirmations,
1222        )
1223        .await;
1224
1225        let mut execution_time_estimator = self
1226            .epoch_store
1227            .execution_time_estimator
1228            .try_lock()
1229            .expect("should only ever be called from the commit handler thread");
1230
1231        let authenticator_state_update_transaction =
1232            self.create_authenticator_state_update(last_committed_round, &commit_info);
1233
1234        let split_checkpoints = self
1235            .epoch_store
1236            .protocol_config()
1237            .split_checkpoints_in_consensus_handler();
1238
1239        let (lock, final_round, num_schedulables, checkpoint_height) = if !split_checkpoints {
1240            let (schedulables, randomness_schedulables, assigned_versions) = self
1241                .process_transactions(
1242                    &mut state,
1243                    &mut execution_time_estimator,
1244                    &commit_info,
1245                    authenticator_state_update_transaction,
1246                    user_transactions,
1247                );
1248
1249            let (should_accept_tx, lock, final_round) =
1250                self.handle_eop(&mut state, end_of_publish_transactions);
1251
1252            let make_checkpoint = should_accept_tx || final_round;
1253            if !make_checkpoint {
1254                // No need for any further processing
1255                return;
1256            }
1257
1258            // If this is the final round, record execution time observations for storage in the
1259            // end-of-epoch tx.
1260            if final_round {
1261                self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1262            }
1263
1264            let checkpoint_height = self
1265                .epoch_store
1266                .calculate_pending_checkpoint_height(commit_info.round);
1267
1268            self.create_pending_checkpoints(
1269                &mut state,
1270                &commit_info,
1271                &schedulables,
1272                &randomness_schedulables,
1273                final_round,
1274            );
1275
1276            let num_schedulables = schedulables.len();
1277            let mut all_schedulables = schedulables;
1278            all_schedulables.extend(randomness_schedulables);
1279            let assigned_versions = assigned_versions.into_map();
1280            let paired: Vec<_> = all_schedulables
1281                .into_iter()
1282                .map(|s| {
1283                    let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
1284                    (s, versions)
1285                })
1286                .collect();
1287            self.execution_scheduler_sender.send(paired, None);
1288
1289            (lock, final_round, num_schedulables, checkpoint_height)
1290        } else {
1291            let (
1292                transactions_to_schedule,
1293                randomness_transactions_to_schedule,
1294                cancelled_txns,
1295                randomness_state_update_transaction,
1296            ) = self.collect_transactions_to_schedule(
1297                &mut state,
1298                &mut execution_time_estimator,
1299                &commit_info,
1300                user_transactions,
1301            );
1302
1303            let (should_accept_tx, lock, final_round) =
1304                self.handle_eop(&mut state, end_of_publish_transactions);
1305
1306            let make_checkpoint = should_accept_tx || final_round;
1307            if !make_checkpoint {
1308                // No need for any further processing
1309                return;
1310            }
1311
1312            // If this is the final round, record execution time observations for storage in the
1313            // end-of-epoch tx.
1314            if final_round {
1315                self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1316            }
1317
1318            let epoch = self.epoch_store.epoch();
1319            let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1320                .then_some(Schedulable::ConsensusCommitPrologue(
1321                    epoch,
1322                    commit_info.round,
1323                    commit_info.consensus_commit_ref.index,
1324                ));
1325
1326            let schedulables: Vec<_> = itertools::chain!(
1327                consensus_commit_prologue.into_iter(),
1328                authenticator_state_update_transaction
1329                    .into_iter()
1330                    .map(Schedulable::Transaction),
1331                transactions_to_schedule
1332                    .into_iter()
1333                    .map(Schedulable::Transaction),
1334            )
1335            .collect();
1336
1337            let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1338                .into_iter()
1339                .chain(
1340                    randomness_transactions_to_schedule
1341                        .into_iter()
1342                        .map(Schedulable::Transaction),
1343                )
1344                .collect();
1345
1346            let num_schedulables = schedulables.len();
1347            let checkpoint_height = self.create_pending_checkpoints_v2(
1348                &mut state,
1349                &commit_info,
1350                schedulables,
1351                randomness_schedulables,
1352                &cancelled_txns,
1353                final_round,
1354            );
1355
1356            (lock, final_round, num_schedulables, checkpoint_height)
1357        };
1358
1359        let notifications = state.get_notifications();
1360
1361        let mut stats_to_record = self.last_consensus_stats.clone();
1362        stats_to_record.height = checkpoint_height;
1363        if split_checkpoints {
1364            let queue = self.checkpoint_queue.lock().unwrap();
1365            stats_to_record.last_checkpoint_flush_timestamp = queue.last_built_timestamp();
1366            stats_to_record.checkpoint_seq = queue.checkpoint_seq();
1367        }
1368
1369        state.output.record_consensus_commit_stats(stats_to_record);
1370
1371        self.record_deferral_deletion(&mut state);
1372
1373        self.epoch_store
1374            .consensus_quarantine
1375            .write()
1376            .push_consensus_output(state.output, &self.epoch_store)
1377            .expect("push_consensus_output should not fail");
1378
1379        debug!(
1380            ?commit_info.round,
1381            "Notifying checkpoint service about new pending checkpoint(s)",
1382        );
1383        self.checkpoint_service
1384            .notify_checkpoint()
1385            .expect("failed to notify checkpoint service");
1386
1387        if let Some(randomness_round) = state.randomness_round {
1388            randomness_manager
1389                .as_ref()
1390                .expect("randomness manager should exist if randomness round is provided")
1391                .generate_randomness(epoch, randomness_round);
1392        }
1393
1394        self.epoch_store.process_notifications(notifications.iter());
1395
1396        // pass lock by value to ensure that it is held until this point
1397        self.log_final_round(lock, final_round);
1398
1399        // update the calculated throughput
1400        self.throughput_calculator
1401            .add_transactions(timestamp, num_schedulables as u64);
1402
1403        fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
1404            let key = [commit_sub_dag_index, epoch];
1405            if sui_simulator::random::deterministic_probability_once(&key, 0.01) {
1406                sui_simulator::task::kill_current_node(None);
1407            }
1408        });
1409
1410        fail_point!("crash");
1411
1412        self.send_end_of_publish_if_needed().await;
1413    }
1414
1415    fn handle_eop(
1416        &self,
1417        state: &mut CommitHandlerState,
1418        end_of_publish_transactions: Vec<AuthorityName>,
1419    ) -> (bool, Option<RwLockWriteGuard<'_, ReconfigState>>, bool) {
1420        let collected_eop =
1421            self.process_end_of_publish_transactions(state, end_of_publish_transactions);
1422        if collected_eop {
1423            let (lock, final_round) = self.advance_eop_state_machine(state);
1424            (lock.should_accept_tx(), Some(lock), final_round)
1425        } else {
1426            (true, None, false)
1427        }
1428    }
1429
1430    fn record_end_of_epoch_execution_time_observations(
1431        &self,
1432        estimator: &mut ExecutionTimeEstimator,
1433    ) {
1434        self.epoch_store
1435            .end_of_epoch_execution_time_observations
1436            .set(estimator.take_observations())
1437            .expect("`stored_execution_time_observations` should only be set once at end of epoch");
1438    }
1439
1440    fn record_deferral_deletion(&self, state: &mut CommitHandlerState) {
1441        let mut deferred_transactions = self
1442            .epoch_store
1443            .consensus_output_cache
1444            .deferred_transactions
1445            .lock();
1446        for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() {
1447            deferred_transactions.remove(&deleted_deferred_key);
1448        }
1449    }
1450
1451    fn log_final_round(&self, lock: Option<RwLockWriteGuard<ReconfigState>>, final_round: bool) {
1452        if final_round {
1453            let epoch = self.epoch_store.epoch();
1454            info!(
1455                ?epoch,
1456                lock=?lock.as_ref(),
1457                final_round=?final_round,
1458                "Notified last checkpoint"
1459            );
1460            self.epoch_store.record_end_of_message_quorum_time_metric();
1461        }
1462    }
1463
1464    fn create_pending_checkpoints(
1465        &self,
1466        state: &mut CommitHandlerState,
1467        commit_info: &ConsensusCommitInfo,
1468        schedulables: &[Schedulable],
1469        randomness_schedulables: &[Schedulable],
1470        final_round: bool,
1471    ) {
1472        assert!(
1473            !self
1474                .epoch_store
1475                .protocol_config()
1476                .split_checkpoints_in_consensus_handler()
1477        );
1478
1479        let checkpoint_height = self
1480            .epoch_store
1481            .calculate_pending_checkpoint_height(commit_info.round);
1482
1483        // Determine whether to write pending checkpoint for user tx with randomness.
1484        // - If randomness is not generated for this commit, we will skip the
1485        //   checkpoint with the associated height. Therefore checkpoint heights may
1486        //   not be contiguous.
1487        // - Exception: if DKG fails, we always need to write out a PendingCheckpoint
1488        //   for randomness tx that are canceled.
1489        let should_write_random_checkpoint = state.randomness_round.is_some()
1490            || (state.dkg_failed && !randomness_schedulables.is_empty());
1491
1492        let pending_checkpoint = PendingCheckpoint {
1493            roots: schedulables.iter().map(|s| s.key()).collect(),
1494            details: PendingCheckpointInfo {
1495                timestamp_ms: commit_info.timestamp,
1496                last_of_epoch: final_round && !should_write_random_checkpoint,
1497                checkpoint_height,
1498                consensus_commit_ref: commit_info.consensus_commit_ref,
1499                rejected_transactions_digest: commit_info.rejected_transactions_digest,
1500                checkpoint_seq: None,
1501            },
1502        };
1503        self.epoch_store
1504            .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1505            .expect("failed to write pending checkpoint");
1506
1507        info!(
1508            "Written pending checkpoint: {:?}",
1509            pending_checkpoint.details,
1510        );
1511
1512        if should_write_random_checkpoint {
1513            let pending_checkpoint = PendingCheckpoint {
1514                roots: randomness_schedulables.iter().map(|s| s.key()).collect(),
1515                details: PendingCheckpointInfo {
1516                    timestamp_ms: commit_info.timestamp,
1517                    last_of_epoch: final_round,
1518                    checkpoint_height: checkpoint_height + 1,
1519                    consensus_commit_ref: commit_info.consensus_commit_ref,
1520                    rejected_transactions_digest: commit_info.rejected_transactions_digest,
1521                    checkpoint_seq: None,
1522                },
1523            };
1524            self.epoch_store
1525                .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1526                .expect("failed to write pending checkpoint");
1527        }
1528    }
1529
1530    #[allow(clippy::type_complexity)]
1531    fn collect_transactions_to_schedule(
1532        &self,
1533        state: &mut CommitHandlerState,
1534        execution_time_estimator: &mut ExecutionTimeEstimator,
1535        commit_info: &ConsensusCommitInfo,
1536        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1537    ) -> (
1538        Vec<VerifiedExecutableTransactionWithAliases>,
1539        Vec<VerifiedExecutableTransactionWithAliases>,
1540        BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1541        Option<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1542    ) {
1543        let protocol_config = self.epoch_store.protocol_config();
1544        let epoch = self.epoch_store.epoch();
1545
1546        let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) =
1547            self.merge_and_reorder_transactions(state, commit_info, user_transactions);
1548
1549        let mut shared_object_congestion_tracker =
1550            self.init_congestion_tracker(commit_info, false, &ordered_txns);
1551        let mut shared_object_using_randomness_congestion_tracker =
1552            self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns);
1553
1554        let randomness_state_update_transaction = state
1555            .randomness_round
1556            .map(|round| Schedulable::RandomnessStateUpdate(epoch, round));
1557        debug!(
1558            "Randomness state update transaction: {:?}",
1559            randomness_state_update_transaction
1560                .as_ref()
1561                .map(|t| t.key())
1562        );
1563
1564        let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len());
1565        let mut randomness_transactions_to_schedule =
1566            Vec::with_capacity(ordered_randomness_txns.len());
1567        let mut deferred_txns = BTreeMap::new();
1568        let mut cancelled_txns = BTreeMap::new();
1569
1570        for transaction in ordered_txns {
1571            self.handle_deferral_and_cancellation(
1572                state,
1573                &mut cancelled_txns,
1574                &mut deferred_txns,
1575                &mut transactions_to_schedule,
1576                protocol_config,
1577                commit_info,
1578                transaction,
1579                &mut shared_object_congestion_tracker,
1580                &previously_deferred_tx_digests,
1581                execution_time_estimator,
1582            );
1583        }
1584
1585        for transaction in ordered_randomness_txns {
1586            if state.dkg_failed {
1587                debug!(
1588                    "Canceling randomness-using transaction {:?} because DKG failed",
1589                    transaction.tx().digest(),
1590                );
1591                cancelled_txns.insert(
1592                    *transaction.tx().digest(),
1593                    CancelConsensusCertificateReason::DkgFailed,
1594                );
1595                randomness_transactions_to_schedule.push(transaction);
1596                continue;
1597            }
1598            self.handle_deferral_and_cancellation(
1599                state,
1600                &mut cancelled_txns,
1601                &mut deferred_txns,
1602                &mut randomness_transactions_to_schedule,
1603                protocol_config,
1604                commit_info,
1605                transaction,
1606                &mut shared_object_using_randomness_congestion_tracker,
1607                &previously_deferred_tx_digests,
1608                execution_time_estimator,
1609            );
1610        }
1611
1612        let mut total_deferred_txns = 0;
1613        {
1614            let mut deferred_transactions = self
1615                .epoch_store
1616                .consensus_output_cache
1617                .deferred_transactions
1618                .lock();
1619            for (key, txns) in deferred_txns.into_iter() {
1620                total_deferred_txns += txns.len();
1621                deferred_transactions.insert(key, txns.clone());
1622                state.output.defer_transactions(key, txns);
1623            }
1624        }
1625
1626        self.metrics
1627            .consensus_handler_deferred_transactions
1628            .inc_by(total_deferred_txns as u64);
1629        self.metrics
1630            .consensus_handler_cancelled_transactions
1631            .inc_by(cancelled_txns.len() as u64);
1632        self.metrics
1633            .consensus_handler_max_object_costs
1634            .with_label_values(&["regular_commit"])
1635            .set(shared_object_congestion_tracker.max_cost() as i64);
1636        self.metrics
1637            .consensus_handler_max_object_costs
1638            .with_label_values(&["randomness_commit"])
1639            .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
1640
1641        let congestion_commit_data = shared_object_congestion_tracker.finish_commit(commit_info);
1642        let randomness_congestion_commit_data =
1643            shared_object_using_randomness_congestion_tracker.finish_commit(commit_info);
1644
1645        if let Some(logger) = &self.congestion_logger {
1646            let epoch = self.epoch_store.epoch();
1647            let mut logger = logger.lock().unwrap();
1648            logger.write_commit_log(epoch, commit_info, false, &congestion_commit_data);
1649            logger.write_commit_log(epoch, commit_info, true, &randomness_congestion_commit_data);
1650        }
1651
1652        if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get()
1653            && let Err(e) = tx_object_debts.try_send(
1654                congestion_commit_data
1655                    .accumulated_debts
1656                    .iter()
1657                    .chain(randomness_congestion_commit_data.accumulated_debts.iter())
1658                    .map(|(id, _)| *id)
1659                    .collect(),
1660            )
1661        {
1662            info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
1663        }
1664
1665        state
1666            .output
1667            .set_congestion_control_object_debts(congestion_commit_data.accumulated_debts);
1668        state.output.set_congestion_control_randomness_object_debts(
1669            randomness_congestion_commit_data.accumulated_debts,
1670        );
1671
1672        (
1673            transactions_to_schedule,
1674            randomness_transactions_to_schedule,
1675            cancelled_txns,
1676            randomness_state_update_transaction,
1677        )
1678    }
1679
1680    fn process_transactions(
1681        &self,
1682        state: &mut CommitHandlerState,
1683        execution_time_estimator: &mut ExecutionTimeEstimator,
1684        commit_info: &ConsensusCommitInfo,
1685        authenticator_state_update_transaction: Option<VerifiedExecutableTransactionWithAliases>,
1686        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1687    ) -> (Vec<Schedulable>, Vec<Schedulable>, AssignedTxAndVersions) {
1688        let protocol_config = self.epoch_store.protocol_config();
1689        assert!(!protocol_config.split_checkpoints_in_consensus_handler());
1690        let epoch = self.epoch_store.epoch();
1691
1692        let (
1693            transactions_to_schedule,
1694            randomness_transactions_to_schedule,
1695            cancelled_txns,
1696            randomness_state_update_transaction,
1697        ) = self.collect_transactions_to_schedule(
1698            state,
1699            execution_time_estimator,
1700            commit_info,
1701            user_transactions,
1702        );
1703
1704        let mut settlement = None;
1705        let mut randomness_settlement = None;
1706        if self.epoch_store.accumulators_enabled() {
1707            let checkpoint_height = self
1708                .epoch_store
1709                .calculate_pending_checkpoint_height(commit_info.round);
1710
1711            settlement = Some(Schedulable::AccumulatorSettlement(epoch, checkpoint_height));
1712
1713            if state.randomness_round.is_some() || !randomness_transactions_to_schedule.is_empty() {
1714                randomness_settlement = Some(Schedulable::AccumulatorSettlement(
1715                    epoch,
1716                    checkpoint_height + 1,
1717                ));
1718            }
1719        }
1720
1721        let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1722            .then_some(Schedulable::ConsensusCommitPrologue(
1723                epoch,
1724                commit_info.round,
1725                commit_info.consensus_commit_ref.index,
1726            ));
1727
1728        let schedulables: Vec<_> = itertools::chain!(
1729            consensus_commit_prologue.into_iter(),
1730            authenticator_state_update_transaction
1731                .into_iter()
1732                .map(Schedulable::Transaction),
1733            transactions_to_schedule
1734                .into_iter()
1735                .map(Schedulable::Transaction),
1736            settlement,
1737        )
1738        .collect();
1739
1740        let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1741            .into_iter()
1742            .chain(
1743                randomness_transactions_to_schedule
1744                    .into_iter()
1745                    .map(Schedulable::Transaction),
1746            )
1747            .chain(randomness_settlement)
1748            .collect();
1749
1750        let assigned_versions = self
1751            .epoch_store
1752            .process_consensus_transaction_shared_object_versions(
1753                self.cache_reader.as_ref(),
1754                schedulables.iter(),
1755                randomness_schedulables.iter(),
1756                &cancelled_txns,
1757                &mut state.output,
1758            )
1759            .expect("failed to assign shared object versions");
1760
1761        let consensus_commit_prologue =
1762            self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1763
1764        let mut schedulables = schedulables;
1765        let mut assigned_versions = assigned_versions;
1766        if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1767            assert!(matches!(
1768                schedulables[0],
1769                Schedulable::ConsensusCommitPrologue(..)
1770            ));
1771            assert!(matches!(
1772                assigned_versions.0[0].0,
1773                TransactionKey::ConsensusCommitPrologue(..)
1774            ));
1775            assigned_versions.0[0].0 =
1776                TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1777            schedulables[0] = Schedulable::Transaction(consensus_commit_prologue);
1778        }
1779
1780        self.epoch_store
1781            .process_user_signatures(schedulables.iter().chain(randomness_schedulables.iter()));
1782
1783        // After this point we can throw away alias version information.
1784        let schedulables: Vec<Schedulable> = schedulables.into_iter().map(|s| s.into()).collect();
1785        let randomness_schedulables: Vec<Schedulable> = randomness_schedulables
1786            .into_iter()
1787            .map(|s| s.into())
1788            .collect();
1789
1790        (schedulables, randomness_schedulables, assigned_versions)
1791    }
1792
1793    #[allow(clippy::type_complexity)]
1794    fn create_pending_checkpoints_v2(
1795        &self,
1796        state: &mut CommitHandlerState,
1797        commit_info: &ConsensusCommitInfo,
1798        schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1799        randomness_schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1800        cancelled_txns: &BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1801        final_round: bool,
1802    ) -> CheckpointHeight {
1803        let protocol_config = self.epoch_store.protocol_config();
1804        assert!(protocol_config.split_checkpoints_in_consensus_handler());
1805
1806        let epoch = self.epoch_store.epoch();
1807        let accumulators_enabled = self.epoch_store.accumulators_enabled();
1808        let max_transactions_per_checkpoint =
1809            protocol_config.max_transactions_per_checkpoint() as usize;
1810
1811        let should_write_random_checkpoint = state.randomness_round.is_some()
1812            || (state.dkg_failed && !randomness_schedulables.is_empty());
1813
1814        let mut checkpoint_queue = self.checkpoint_queue.lock().unwrap();
1815
1816        let build_chunks =
1817            |schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1818             queue: &mut CheckpointQueue|
1819             -> Vec<Chunk<VerifiedExecutableTransactionWithAliases>> {
1820                schedulables
1821                    .chunks(max_transactions_per_checkpoint)
1822                    .map(|chunk| {
1823                        let height = queue.next_height();
1824                        let schedulables: Vec<_> = chunk.to_vec();
1825                        let settlement = if accumulators_enabled {
1826                            Some(Schedulable::AccumulatorSettlement(epoch, height))
1827                        } else {
1828                            None
1829                        };
1830                        Chunk {
1831                            schedulables,
1832                            settlement,
1833                            height,
1834                        }
1835                    })
1836                    .collect()
1837            };
1838
1839        let num_schedulables = schedulables.len();
1840        let chunked_schedulables = build_chunks(schedulables, &mut checkpoint_queue);
1841        if chunked_schedulables.len() > 1 {
1842            info!(
1843                "Splitting transactions into {} checkpoint chunks (num_schedulables={}, max_tx={})",
1844                chunked_schedulables.len(),
1845                num_schedulables,
1846                max_transactions_per_checkpoint
1847            );
1848            assert_reachable!("checkpoint split due to transaction limit");
1849        }
1850        let chunked_randomness_schedulables = if should_write_random_checkpoint {
1851            build_chunks(randomness_schedulables, &mut checkpoint_queue)
1852        } else {
1853            vec![]
1854        };
1855
1856        let schedulables_for_version_assignment =
1857            Chunk::all_schedulables_from(&chunked_schedulables);
1858        let randomness_schedulables_for_version_assignment =
1859            Chunk::all_schedulables_from(&chunked_randomness_schedulables);
1860
1861        let assigned_versions = self
1862            .epoch_store
1863            .process_consensus_transaction_shared_object_versions(
1864                self.cache_reader.as_ref(),
1865                schedulables_for_version_assignment,
1866                randomness_schedulables_for_version_assignment,
1867                cancelled_txns,
1868                &mut state.output,
1869            )
1870            .expect("failed to assign shared object versions");
1871
1872        let consensus_commit_prologue =
1873            self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1874
1875        let mut chunked_schedulables = chunked_schedulables;
1876        let mut assigned_versions = assigned_versions;
1877        if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1878            assert!(matches!(
1879                chunked_schedulables[0].schedulables[0],
1880                Schedulable::ConsensusCommitPrologue(..)
1881            ));
1882            assert!(matches!(
1883                assigned_versions.0[0].0,
1884                TransactionKey::ConsensusCommitPrologue(..)
1885            ));
1886            assigned_versions.0[0].0 =
1887                TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1888            chunked_schedulables[0].schedulables[0] =
1889                Schedulable::Transaction(consensus_commit_prologue);
1890        }
1891
1892        let assigned_versions = assigned_versions.into_map();
1893
1894        self.epoch_store.process_user_signatures(
1895            chunked_schedulables
1896                .iter()
1897                .flat_map(|c| c.all_schedulables())
1898                .chain(
1899                    chunked_randomness_schedulables
1900                        .iter()
1901                        .flat_map(|c| c.all_schedulables()),
1902                ),
1903        );
1904
1905        let commit_height = chunked_randomness_schedulables
1906            .last()
1907            .or(chunked_schedulables.last())
1908            .map(|c| c.height)
1909            .expect("at least one checkpoint root must be created per commit");
1910
1911        let mut pending_checkpoints = Vec::new();
1912        for chunk in chunked_schedulables {
1913            pending_checkpoints.extend(checkpoint_queue.push_chunk(
1914                chunk.into(),
1915                &assigned_versions,
1916                commit_info.timestamp,
1917                commit_info.consensus_commit_ref,
1918                commit_info.rejected_transactions_digest,
1919            ));
1920        }
1921
1922        let force = final_round || should_write_random_checkpoint;
1923        pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, force));
1924
1925        if should_write_random_checkpoint {
1926            for chunk in chunked_randomness_schedulables {
1927                pending_checkpoints.extend(checkpoint_queue.push_chunk(
1928                    chunk.into(),
1929                    &assigned_versions,
1930                    commit_info.timestamp,
1931                    commit_info.consensus_commit_ref,
1932                    commit_info.rejected_transactions_digest,
1933                ));
1934            }
1935
1936            pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1937        }
1938
1939        if final_round && let Some(last) = pending_checkpoints.last_mut() {
1940            last.details.last_of_epoch = true;
1941        }
1942
1943        let queue_drained = checkpoint_queue.is_empty();
1944        drop(checkpoint_queue);
1945
1946        for pending_checkpoint in pending_checkpoints {
1947            debug!(
1948                checkpoint_height = pending_checkpoint.details.checkpoint_height,
1949                roots_count = pending_checkpoint.num_roots(),
1950                "Writing pending checkpoint",
1951            );
1952            self.epoch_store
1953                .write_pending_checkpoint_v2(&mut state.output, &pending_checkpoint)
1954                .expect("failed to write pending checkpoint");
1955        }
1956
1957        state.output.set_checkpoint_queue_drained(queue_drained);
1958
1959        commit_height
1960    }
1961
1962    // Adds the consensus commit prologue transaction to the beginning of input `transactions` to update
1963    // the system clock used in all transactions in the current consensus commit.
1964    // Returns the root of the consensus commit prologue transaction if it was added to the input.
1965    fn add_consensus_commit_prologue_transaction<'a>(
1966        &'a self,
1967        state: &'a mut CommitHandlerState,
1968        commit_info: &'a ConsensusCommitInfo,
1969        assigned_versions: &AssignedTxAndVersions,
1970    ) -> Option<VerifiedExecutableTransactionWithAliases> {
1971        {
1972            if commit_info.skip_consensus_commit_prologue_in_test {
1973                return None;
1974            }
1975        }
1976
1977        let mut cancelled_txn_version_assignment = Vec::new();
1978
1979        let protocol_config = self.epoch_store.protocol_config();
1980
1981        for (txn_key, assigned_versions) in assigned_versions.0.iter() {
1982            let Some(d) = txn_key.as_digest() else {
1983                continue;
1984            };
1985
1986            if !protocol_config.include_cancelled_randomness_txns_in_prologue()
1987                && assigned_versions
1988                    .shared_object_versions
1989                    .iter()
1990                    .any(|((id, _), _)| *id == SUI_RANDOMNESS_STATE_OBJECT_ID)
1991            {
1992                continue;
1993            }
1994
1995            if assigned_versions
1996                .shared_object_versions
1997                .iter()
1998                .any(|(_, version)| version.is_cancelled())
1999            {
2000                assert_reachable!("cancelled transactions");
2001                cancelled_txn_version_assignment
2002                    .push((*d, assigned_versions.shared_object_versions.clone()));
2003            }
2004        }
2005
2006        fail_point_arg!(
2007            "additional_cancelled_txns_for_tests",
2008            |additional_cancelled_txns: Vec<(
2009                TransactionDigest,
2010                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>
2011            )>| {
2012                cancelled_txn_version_assignment.extend(additional_cancelled_txns);
2013            }
2014        );
2015
2016        let transaction = commit_info.create_consensus_commit_prologue_transaction(
2017            self.epoch_store.epoch(),
2018            self.epoch_store.protocol_config(),
2019            cancelled_txn_version_assignment,
2020            commit_info,
2021            state.indirect_state_observer.take().unwrap(),
2022        );
2023        Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2024            transaction,
2025        ))
2026    }
2027
2028    fn handle_deferral_and_cancellation(
2029        &self,
2030        state: &mut CommitHandlerState,
2031        cancelled_txns: &mut BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
2032        deferred_txns: &mut BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>,
2033        scheduled_txns: &mut Vec<VerifiedExecutableTransactionWithAliases>,
2034        protocol_config: &ProtocolConfig,
2035        commit_info: &ConsensusCommitInfo,
2036        transaction: VerifiedExecutableTransactionWithAliases,
2037        shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
2038        previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
2039        execution_time_estimator: &ExecutionTimeEstimator,
2040    ) {
2041        // Check for unpaid amplification before other deferral checks.
2042        // SIP-45: Paid amplification allows (gas_price / RGP + 1) submissions.
2043        // Transactions with more duplicates than paid for are deferred.
2044        if protocol_config.defer_unpaid_amplification() {
2045            let occurrence_count = state
2046                .occurrence_counts
2047                .get(transaction.tx().digest())
2048                .copied()
2049                .unwrap_or(0);
2050
2051            let rgp = self.epoch_store.reference_gas_price();
2052            let gas_price = transaction.tx().transaction_data().gas_price();
2053            let allowed_count = (gas_price / rgp.max(1)) + 1;
2054
2055            if occurrence_count as u64 > allowed_count {
2056                self.metrics
2057                    .consensus_handler_unpaid_amplification_deferrals
2058                    .inc();
2059
2060                let deferred_from_round = previously_deferred_tx_digests
2061                    .get(transaction.tx().digest())
2062                    .map(|k| k.deferred_from_round())
2063                    .unwrap_or(commit_info.round);
2064
2065                let deferral_key = DeferralKey::new_for_consensus_round(
2066                    commit_info.round + 1,
2067                    deferred_from_round,
2068                );
2069
2070                if transaction_deferral_within_limit(
2071                    &deferral_key,
2072                    protocol_config.max_deferral_rounds_for_congestion_control(),
2073                ) {
2074                    assert_reachable!("unpaid amplification deferral");
2075                    debug!(
2076                        "Deferring transaction {:?} due to unpaid amplification (count={}, allowed={})",
2077                        transaction.tx().digest(),
2078                        occurrence_count,
2079                        allowed_count
2080                    );
2081                    deferred_txns
2082                        .entry(deferral_key)
2083                        .or_default()
2084                        .push(transaction);
2085                    return;
2086                }
2087            }
2088        }
2089
2090        let tx_cost = shared_object_congestion_tracker.get_tx_cost(
2091            execution_time_estimator,
2092            transaction.tx(),
2093            state.indirect_state_observer.as_mut().unwrap(),
2094        );
2095
2096        let deferral_info = self.epoch_store.should_defer(
2097            transaction.tx(),
2098            commit_info,
2099            state.dkg_failed,
2100            state.randomness_round.is_some(),
2101            previously_deferred_tx_digests,
2102            shared_object_congestion_tracker,
2103        );
2104
2105        if let Some((deferral_key, deferral_reason)) = deferral_info {
2106            debug!(
2107                "Deferring consensus certificate for transaction {:?} until {:?}",
2108                transaction.tx().digest(),
2109                deferral_key
2110            );
2111
2112            match deferral_reason {
2113                DeferralReason::RandomnessNotReady => {
2114                    deferred_txns
2115                        .entry(deferral_key)
2116                        .or_default()
2117                        .push(transaction);
2118                }
2119                DeferralReason::SharedObjectCongestion(congested_objects) => {
2120                    self.metrics.consensus_handler_congested_transactions.inc();
2121                    if transaction_deferral_within_limit(
2122                        &deferral_key,
2123                        protocol_config.max_deferral_rounds_for_congestion_control(),
2124                    ) {
2125                        deferred_txns
2126                            .entry(deferral_key)
2127                            .or_default()
2128                            .push(transaction);
2129                    } else {
2130                        assert_sometimes!(
2131                            transaction.tx().data().transaction_data().uses_randomness(),
2132                            "cancelled randomness-using transaction"
2133                        );
2134                        assert_sometimes!(
2135                            !transaction.tx().data().transaction_data().uses_randomness(),
2136                            "cancelled non-randomness-using transaction"
2137                        );
2138
2139                        // Cancel the transaction that has been deferred for too long.
2140                        debug!(
2141                            "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}",
2142                            transaction.tx().digest(),
2143                            deferral_key,
2144                            congested_objects
2145                        );
2146                        cancelled_txns.insert(
2147                            *transaction.tx().digest(),
2148                            CancelConsensusCertificateReason::CongestionOnObjects(
2149                                congested_objects,
2150                            ),
2151                        );
2152                        scheduled_txns.push(transaction);
2153                    }
2154                }
2155            }
2156        } else {
2157            // Update object execution cost for all scheduled transactions
2158            shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, transaction.tx());
2159            scheduled_txns.push(transaction);
2160        }
2161    }
2162
2163    fn merge_and_reorder_transactions(
2164        &self,
2165        state: &mut CommitHandlerState,
2166        commit_info: &ConsensusCommitInfo,
2167        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
2168    ) -> (
2169        Vec<VerifiedExecutableTransactionWithAliases>,
2170        Vec<VerifiedExecutableTransactionWithAliases>,
2171        HashMap<TransactionDigest, DeferralKey>,
2172    ) {
2173        let protocol_config = self.epoch_store.protocol_config();
2174
2175        let (mut txns, mut randomness_txns, previously_deferred_tx_digests) =
2176            self.load_deferred_transactions(state, commit_info);
2177
2178        txns.reserve(user_transactions.len());
2179        randomness_txns.reserve(user_transactions.len());
2180
2181        // There may be randomness transactions in `txns`, which were deferred due to congestion.
2182        // They must be placed back into `randomness_txns`.
2183        let mut txns: Vec<_> = txns
2184            .into_iter()
2185            .filter_map(|tx| {
2186                if tx.tx().transaction_data().uses_randomness() {
2187                    randomness_txns.push(tx);
2188                    None
2189                } else {
2190                    Some(tx)
2191                }
2192            })
2193            .collect();
2194
2195        for txn in user_transactions {
2196            if txn.tx().transaction_data().uses_randomness() {
2197                randomness_txns.push(txn);
2198            } else {
2199                txns.push(txn);
2200            }
2201        }
2202
2203        PostConsensusTxReorder::reorder(
2204            &mut txns,
2205            protocol_config.consensus_transaction_ordering(),
2206        );
2207        PostConsensusTxReorder::reorder(
2208            &mut randomness_txns,
2209            protocol_config.consensus_transaction_ordering(),
2210        );
2211
2212        (txns, randomness_txns, previously_deferred_tx_digests)
2213    }
2214
2215    fn load_deferred_transactions(
2216        &self,
2217        state: &mut CommitHandlerState,
2218        commit_info: &ConsensusCommitInfo,
2219    ) -> (
2220        Vec<VerifiedExecutableTransactionWithAliases>,
2221        Vec<VerifiedExecutableTransactionWithAliases>,
2222        HashMap<TransactionDigest, DeferralKey>,
2223    ) {
2224        let mut previously_deferred_tx_digests = HashMap::new();
2225
2226        let deferred_txs: Vec<_> = self
2227            .epoch_store
2228            .load_deferred_transactions_for_up_to_consensus_round_v2(
2229                &mut state.output,
2230                commit_info.round,
2231            )
2232            .expect("db error")
2233            .into_iter()
2234            .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2235            .map(|(key, tx)| {
2236                previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2237                tx
2238            })
2239            .collect();
2240        trace!(
2241            "loading deferred transactions: {:?}",
2242            deferred_txs.iter().map(|tx| tx.tx().digest())
2243        );
2244
2245        let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() {
2246            let txns: Vec<_> = self
2247                .epoch_store
2248                .load_deferred_transactions_for_randomness_v2(&mut state.output)
2249                .expect("db error")
2250                .into_iter()
2251                .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2252                .map(|(key, tx)| {
2253                    previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2254                    tx
2255                })
2256                .collect();
2257            trace!(
2258                "loading deferred randomness transactions: {:?}",
2259                txns.iter().map(|tx| tx.tx().digest())
2260            );
2261            txns
2262        } else {
2263            vec![]
2264        };
2265
2266        (
2267            deferred_txs,
2268            deferred_randomness_txs,
2269            previously_deferred_tx_digests,
2270        )
2271    }
2272
2273    fn init_congestion_tracker(
2274        &self,
2275        commit_info: &ConsensusCommitInfo,
2276        for_randomness: bool,
2277        txns: &[VerifiedExecutableTransactionWithAliases],
2278    ) -> SharedObjectCongestionTracker {
2279        #[allow(unused_mut)]
2280        let mut ret = SharedObjectCongestionTracker::from_protocol_config(
2281            self.epoch_store
2282                .consensus_quarantine
2283                .read()
2284                .load_initial_object_debts(
2285                    &self.epoch_store,
2286                    commit_info.round,
2287                    for_randomness,
2288                    txns,
2289                )
2290                .expect("db error"),
2291            self.epoch_store.protocol_config(),
2292            for_randomness,
2293            self.congestion_logger.is_some(),
2294        );
2295
2296        fail_point_arg!(
2297            "initial_congestion_tracker",
2298            |tracker: SharedObjectCongestionTracker| {
2299                info!(
2300                    "Initialize shared_object_congestion_tracker to  {:?}",
2301                    tracker
2302                );
2303                ret = tracker;
2304            }
2305        );
2306
2307        ret
2308    }
2309
2310    fn process_jwks(
2311        &self,
2312        state: &mut CommitHandlerState,
2313        commit_info: &ConsensusCommitInfo,
2314        new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
2315    ) {
2316        for (authority_name, jwk_id, jwk) in new_jwks {
2317            self.epoch_store.record_jwk_vote(
2318                &mut state.output,
2319                commit_info.round,
2320                authority_name,
2321                &jwk_id,
2322                &jwk,
2323            );
2324        }
2325    }
2326
2327    fn process_capability_notifications(
2328        &self,
2329        capability_notifications: Vec<AuthorityCapabilitiesV2>,
2330    ) {
2331        for capabilities in capability_notifications {
2332            self.epoch_store
2333                .record_capabilities_v2(&capabilities)
2334                .expect("db error");
2335        }
2336    }
2337
2338    fn process_execution_time_observations(
2339        &self,
2340        state: &mut CommitHandlerState,
2341        execution_time_observations: Vec<ExecutionTimeObservation>,
2342    ) {
2343        let mut execution_time_estimator = self
2344            .epoch_store
2345            .execution_time_estimator
2346            .try_lock()
2347            .expect("should only ever be called from the commit handler thread");
2348
2349        for ExecutionTimeObservation {
2350            authority,
2351            generation,
2352            estimates,
2353        } in execution_time_observations
2354        {
2355            let authority_index = self
2356                .epoch_store
2357                .committee()
2358                .authority_index(&authority)
2359                .unwrap();
2360            execution_time_estimator.process_observations_from_consensus(
2361                authority_index,
2362                Some(generation),
2363                &estimates,
2364            );
2365            state
2366                .output
2367                .insert_execution_time_observation(authority_index, generation, estimates);
2368        }
2369    }
2370
2371    fn process_checkpoint_signature_messages(
2372        &self,
2373        checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
2374    ) {
2375        for checkpoint_signature_message in checkpoint_signature_messages {
2376            self.checkpoint_service
2377                .notify_checkpoint_signature(&checkpoint_signature_message)
2378                .expect("db error");
2379        }
2380    }
2381
2382    async fn process_dkg_updates(
2383        &self,
2384        state: &mut CommitHandlerState,
2385        commit_info: &ConsensusCommitInfo,
2386        randomness_manager: Option<&mut RandomnessManager>,
2387        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2388        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2389    ) {
2390        if !self.epoch_store.randomness_state_enabled() {
2391            let num_dkg_messages = randomness_dkg_messages.len();
2392            let num_dkg_confirmations = randomness_dkg_confirmations.len();
2393            if num_dkg_messages + num_dkg_confirmations > 0 {
2394                debug_fatal!(
2395                    "received {} RandomnessDkgMessage and {} RandomnessDkgConfirmation messages when randomness is not enabled",
2396                    num_dkg_messages,
2397                    num_dkg_confirmations
2398                );
2399            }
2400            return;
2401        }
2402
2403        let randomness_manager =
2404            randomness_manager.expect("randomness manager should exist if randomness is enabled");
2405
2406        let randomness_dkg_updates =
2407            self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages);
2408
2409        let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations(
2410            state,
2411            randomness_manager,
2412            randomness_dkg_confirmations,
2413        );
2414
2415        if randomness_dkg_updates || randomness_dkg_confirmation_updates {
2416            randomness_manager
2417                .advance_dkg(&mut state.output, commit_info.round)
2418                .await
2419                .expect("epoch ended");
2420        }
2421    }
2422
2423    fn process_randomness_dkg_messages(
2424        &self,
2425        randomness_manager: &mut RandomnessManager,
2426        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2427    ) -> bool /* randomness state updated */ {
2428        if randomness_dkg_messages.is_empty() {
2429            return false;
2430        }
2431
2432        let mut randomness_state_updated = false;
2433        for (authority, bytes) in randomness_dkg_messages {
2434            match bcs::from_bytes(&bytes) {
2435                Ok(message) => {
2436                    randomness_manager
2437                        .add_message(&authority, message)
2438                        // TODO: make infallible
2439                        .expect("epoch ended");
2440                    randomness_state_updated = true;
2441                }
2442
2443                Err(e) => {
2444                    warn!(
2445                        "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
2446                        authority.concise(),
2447                    );
2448                }
2449            }
2450        }
2451
2452        randomness_state_updated
2453    }
2454
2455    fn process_randomness_dkg_confirmations(
2456        &self,
2457        state: &mut CommitHandlerState,
2458        randomness_manager: &mut RandomnessManager,
2459        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2460    ) -> bool /* randomness state updated */ {
2461        if randomness_dkg_confirmations.is_empty() {
2462            return false;
2463        }
2464
2465        let mut randomness_state_updated = false;
2466        for (authority, bytes) in randomness_dkg_confirmations {
2467            match bcs::from_bytes(&bytes) {
2468                Ok(message) => {
2469                    randomness_manager
2470                        .add_confirmation(&mut state.output, &authority, message)
2471                        // TODO: make infallible
2472                        .expect("epoch ended");
2473                    randomness_state_updated = true;
2474                }
2475                Err(e) => {
2476                    warn!(
2477                        "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}",
2478                        authority.concise(),
2479                    );
2480                }
2481            }
2482        }
2483
2484        randomness_state_updated
2485    }
2486
2487    /// Returns true if we have collected a quorum of end of publish messages (either in this round or a previous round).
2488    fn process_end_of_publish_transactions(
2489        &self,
2490        state: &mut CommitHandlerState,
2491        end_of_publish_transactions: Vec<AuthorityName>,
2492    ) -> bool {
2493        let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect(
2494            "No contention on end_of_publish as it is only accessed from consensus handler",
2495        );
2496
2497        if eop_aggregator.has_quorum() {
2498            return true;
2499        }
2500
2501        if end_of_publish_transactions.is_empty() {
2502            return false;
2503        }
2504
2505        for authority in end_of_publish_transactions {
2506            info!("Received EndOfPublish from {:?}", authority.concise());
2507
2508            // It is ok to just release lock here as this function is the only place that transition into RejectAllCerts state
2509            // And this function itself is always executed from consensus task
2510            state.output.insert_end_of_publish(authority);
2511            if eop_aggregator
2512                .insert_generic(authority, ())
2513                .is_quorum_reached()
2514            {
2515                debug!(
2516                    "Collected enough end_of_publish messages with last message from validator {:?}",
2517                    authority.concise(),
2518                );
2519                return true;
2520            }
2521        }
2522
2523        false
2524    }
2525
2526    /// After we have collected 2f+1 EndOfPublish messages, we call this function every round until the epoch
2527    /// ends.
2528    fn advance_eop_state_machine(
2529        &self,
2530        state: &mut CommitHandlerState,
2531    ) -> (
2532        RwLockWriteGuard<'_, ReconfigState>,
2533        bool, // true if final round
2534    ) {
2535        let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard();
2536        let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx();
2537
2538        reconfig_state.close_all_certs();
2539
2540        let commit_has_deferred_txns = state.output.has_deferred_transactions();
2541        let previous_commits_have_deferred_txns = !self.epoch_store.deferred_transactions_empty();
2542
2543        if !commit_has_deferred_txns && !previous_commits_have_deferred_txns {
2544            if !start_state_is_reject_all_tx {
2545                info!("Transitioning to RejectAllTx");
2546            }
2547            reconfig_state.close_all_tx();
2548        } else {
2549            debug!(
2550                "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}",
2551                previous_commits_have_deferred_txns, commit_has_deferred_txns,
2552            );
2553        }
2554
2555        state.output.store_reconfig_state(reconfig_state.clone());
2556
2557        if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() {
2558            (reconfig_state, true)
2559        } else {
2560            (reconfig_state, false)
2561        }
2562    }
2563
2564    fn gather_commit_metadata(
2565        &self,
2566        consensus_commit: &impl ConsensusCommitAPI,
2567    ) -> (u64, AuthorityIndex, u64) {
2568        let timestamp = consensus_commit.commit_timestamp_ms();
2569        let leader_author = consensus_commit.leader_author_index();
2570        let commit_sub_dag_index = consensus_commit.commit_sub_dag_index();
2571
2572        let system_time_ms = SystemTime::now()
2573            .duration_since(UNIX_EPOCH)
2574            .unwrap()
2575            .as_millis() as i64;
2576
2577        let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64);
2578        let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0;
2579        self.metrics
2580            .consensus_timestamp_bias
2581            .observe(consensus_timestamp_bias_seconds);
2582
2583        let epoch_start = self
2584            .epoch_store
2585            .epoch_start_config()
2586            .epoch_start_timestamp_ms();
2587        let timestamp = if timestamp < epoch_start {
2588            error!(
2589                "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}"
2590            );
2591            epoch_start
2592        } else {
2593            timestamp
2594        };
2595
2596        (timestamp, leader_author, commit_sub_dag_index)
2597    }
2598
2599    fn create_authenticator_state_update(
2600        &self,
2601        last_committed_round: u64,
2602        commit_info: &ConsensusCommitInfo,
2603    ) -> Option<VerifiedExecutableTransactionWithAliases> {
2604        // Load all jwks that became active in the previous round, and commit them in this round.
2605        // We want to delay one round because none of the transactions in the previous round could
2606        // have been authenticated with the jwks that became active in that round.
2607        //
2608        // Because of this delay, jwks that become active in the last round of the epoch will
2609        // never be committed. That is ok, because in the new epoch, the validators should
2610        // immediately re-submit these jwks, and they can become active then.
2611        let new_jwks = self
2612            .epoch_store
2613            .get_new_jwks(last_committed_round)
2614            .expect("Unrecoverable error in consensus handler");
2615
2616        if !new_jwks.is_empty() {
2617            let authenticator_state_update_transaction = authenticator_state_update_transaction(
2618                &self.epoch_store,
2619                commit_info.round,
2620                new_jwks,
2621            );
2622            debug!(
2623                "adding AuthenticatorStateUpdate({:?}) tx: {:?}",
2624                authenticator_state_update_transaction.digest(),
2625                authenticator_state_update_transaction,
2626            );
2627
2628            Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2629                authenticator_state_update_transaction,
2630            ))
2631        } else {
2632            None
2633        }
2634    }
2635
2636    // Filters out rejected or deprecated transactions.
2637    // Returns FilteredConsensusOutput containing transactions and owned_object_locks
2638    // (collected when preconsensus locking is disabled).
2639    #[instrument(level = "trace", skip_all)]
2640    fn filter_consensus_txns(
2641        &mut self,
2642        initial_reconfig_state: ReconfigState,
2643        commit_info: &ConsensusCommitInfo,
2644        consensus_commit: &impl ConsensusCommitAPI,
2645    ) -> FilteredConsensusOutput {
2646        let mut transactions = Vec::new();
2647        let mut owned_object_locks = HashMap::new();
2648        let epoch = self.epoch_store.epoch();
2649        let mut num_finalized_user_transactions = vec![0; self.committee.size()];
2650        let mut num_rejected_user_transactions = vec![0; self.committee.size()];
2651        for (block, parsed_transactions) in consensus_commit.transactions() {
2652            let author = block.author.value();
2653            // TODO: consider only messages within 1~3 rounds of the leader?
2654            self.last_consensus_stats.stats.inc_num_messages(author);
2655
2656            // Set the "ping" transaction status for this block. This is necessary as there might be some ping requests waiting for the ping transaction to be certified.
2657            self.epoch_store.set_consensus_tx_status(
2658                ConsensusPosition::ping(epoch, block),
2659                ConsensusTxStatus::Finalized,
2660            );
2661
2662            for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() {
2663                let position = ConsensusPosition {
2664                    epoch,
2665                    block,
2666                    index: tx_index as TransactionIndex,
2667                };
2668
2669                // Transaction has appeared in consensus output, we can increment the submission count
2670                // for this tx for DoS protection.
2671                if self.epoch_store.protocol_config().mysticeti_fastpath()
2672                    && let Some(tx) = parsed.transaction.kind.as_user_transaction()
2673                {
2674                    let digest = tx.digest();
2675                    if let Some((spam_weight, submitter_client_addrs)) = self
2676                        .epoch_store
2677                        .submitted_transaction_cache
2678                        .increment_submission_count(digest)
2679                    {
2680                        if let Some(ref traffic_controller) = self.traffic_controller {
2681                            debug!(
2682                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses",
2683                                submitter_client_addrs.len()
2684                            );
2685
2686                            // Apply spam weight to all client addresses that submitted this transaction
2687                            for addr in submitter_client_addrs {
2688                                traffic_controller.tally(TrafficTally::new(
2689                                    Some(addr),
2690                                    None,
2691                                    None,
2692                                    spam_weight.clone(),
2693                                ));
2694                            }
2695                        } else {
2696                            warn!(
2697                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)",
2698                                submitter_client_addrs.len()
2699                            );
2700                        }
2701                    }
2702                }
2703
2704                if parsed.rejected {
2705                    // TODO(fastpath): Add metrics for rejected transactions.
2706                    if matches!(
2707                        parsed.transaction.kind,
2708                        ConsensusTransactionKind::UserTransaction(_)
2709                            | ConsensusTransactionKind::UserTransactionV2(_)
2710                    ) {
2711                        self.epoch_store
2712                            .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
2713                        num_rejected_user_transactions[author] += 1;
2714                    }
2715                    // Skip processing rejected transactions.
2716                    continue;
2717                }
2718
2719                let kind = classify(&parsed.transaction);
2720                self.metrics
2721                    .consensus_handler_processed
2722                    .with_label_values(&[kind])
2723                    .inc();
2724                self.metrics
2725                    .consensus_handler_transaction_sizes
2726                    .with_label_values(&[kind])
2727                    .observe(parsed.serialized_len as f64);
2728                // UserTransaction exists only when mysticeti_fastpath is enabled in protocol config.
2729                if matches!(
2730                    &parsed.transaction.kind,
2731                    ConsensusTransactionKind::CertifiedTransaction(_)
2732                        | ConsensusTransactionKind::UserTransaction(_)
2733                        | ConsensusTransactionKind::UserTransactionV2(_)
2734                ) {
2735                    self.last_consensus_stats
2736                        .stats
2737                        .inc_num_user_transactions(author);
2738                }
2739
2740                if !initial_reconfig_state.should_accept_consensus_certs() {
2741                    // (Note: we no longer need to worry about the previously deferred condition, since we are only
2742                    // processing newly-received transactions at this time).
2743                    match &parsed.transaction.kind {
2744                        ConsensusTransactionKind::UserTransaction(_)
2745                        | ConsensusTransactionKind::UserTransactionV2(_)
2746                        | ConsensusTransactionKind::CertifiedTransaction(_)
2747                        // deprecated and ignore later, but added for exhaustive match
2748                        | ConsensusTransactionKind::CapabilityNotification(_)
2749                        | ConsensusTransactionKind::CapabilityNotificationV2(_)
2750                        | ConsensusTransactionKind::EndOfPublish(_)
2751                        // Note: we no longer have to check protocol_config.ignore_execution_time_observations_after_certs_closed()
2752                        | ConsensusTransactionKind::ExecutionTimeObservation(_)
2753                        | ConsensusTransactionKind::NewJWKFetched(_, _, _) => {
2754                            debug!(
2755                                "Ignoring consensus transaction {:?} because of end of epoch",
2756                                parsed.transaction.key()
2757                            );
2758                            continue;
2759                        }
2760
2761                        // These are the message types that are still processed even if !should_accept_consensus_certs()
2762                        ConsensusTransactionKind::CheckpointSignature(_)
2763                        | ConsensusTransactionKind::CheckpointSignatureV2(_)
2764                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2765                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
2766                        | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => ()
2767                    }
2768                }
2769
2770                if !initial_reconfig_state.should_accept_tx() {
2771                    match &parsed.transaction.kind {
2772                        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
2773                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue,
2774                        _ => {}
2775                    }
2776                }
2777
2778                if parsed.transaction.is_user_transaction()
2779                    && !self.epoch_store.protocol_config().mysticeti_fastpath()
2780                {
2781                    debug!(
2782                        "Ignoring MFP transaction {:?} because MFP is disabled",
2783                        parsed.transaction.key()
2784                    );
2785                    continue;
2786                }
2787
2788                if let ConsensusTransactionKind::CertifiedTransaction(certificate) =
2789                    &parsed.transaction.kind
2790                    && certificate.epoch() != epoch
2791                {
2792                    debug!(
2793                        "Certificate epoch ({:?}) doesn't match the current epoch ({:?})",
2794                        certificate.epoch(),
2795                        epoch
2796                    );
2797                    continue;
2798                }
2799
2800                // Handle deprecated messages
2801                match &parsed.transaction.kind {
2802                    ConsensusTransactionKind::CapabilityNotification(_)
2803                    | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2804                    | ConsensusTransactionKind::CheckpointSignature(_) => {
2805                        debug_fatal!(
2806                            "BUG: saw deprecated tx {:?}for commit round {}",
2807                            parsed.transaction.key(),
2808                            commit_info.round
2809                        );
2810                        continue;
2811                    }
2812                    _ => {}
2813                }
2814
2815                if matches!(
2816                    &parsed.transaction.kind,
2817                    ConsensusTransactionKind::UserTransaction(_)
2818                        | ConsensusTransactionKind::UserTransactionV2(_)
2819                        | ConsensusTransactionKind::CertifiedTransaction(_)
2820                ) {
2821                    let author_name = self
2822                        .epoch_store
2823                        .committee()
2824                        .authority_by_index(author as u32)
2825                        .unwrap();
2826                    if self
2827                        .epoch_store
2828                        .has_received_end_of_publish_from(author_name)
2829                    {
2830                        // In some edge cases, consensus might resend previously seen certificate after EndOfPublish
2831                        // An honest validator should not send a new transaction after EndOfPublish. Whether the
2832                        // transaction is duplicate or not, we filter it out here.
2833                        warn!(
2834                            "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus",
2835                            author_name.concise(),
2836                            parsed.transaction.key(),
2837                        );
2838                        continue;
2839                    }
2840                }
2841
2842                // When preconsensus locking is disabled, perform post-consensus owned object
2843                // conflict detection. If lock acquisition fails, the transaction has
2844                // invalid/conflicting owned inputs and should be dropped.
2845                // This must happen AFTER all filtering checks above to avoid acquiring locks
2846                // for transactions that will be dropped (e.g., during epoch change).
2847                // Only applies to UserTransactionV2 - other transaction types don't need lock acquisition.
2848                if let ConsensusTransactionKind::UserTransactionV2(tx_with_claims) =
2849                    &parsed.transaction.kind
2850                {
2851                    let immutable_object_ids: HashSet<ObjectID> =
2852                        tx_with_claims.get_immutable_objects().into_iter().collect();
2853                    let tx = tx_with_claims.tx();
2854
2855                    let Ok(input_objects) = tx.transaction_data().input_objects() else {
2856                        debug_fatal!("Invalid input objects for transaction {}", tx.digest());
2857                        continue;
2858                    };
2859
2860                    // Filter ImmOrOwnedMoveObject inputs, excluding those claimed to be immutable.
2861                    // Immutable objects don't need lock acquisition as they can be used concurrently.
2862                    let owned_object_refs: Vec<_> = input_objects
2863                        .iter()
2864                        .filter_map(|obj| match obj {
2865                            InputObjectKind::ImmOrOwnedMoveObject(obj_ref)
2866                                if !immutable_object_ids.contains(&obj_ref.0) =>
2867                            {
2868                                Some(*obj_ref)
2869                            }
2870                            _ => None,
2871                        })
2872                        .collect();
2873
2874                    match self
2875                        .epoch_store
2876                        .try_acquire_owned_object_locks_post_consensus(
2877                            &owned_object_refs,
2878                            *tx.digest(),
2879                            &owned_object_locks,
2880                        ) {
2881                        Ok(new_locks) => {
2882                            owned_object_locks.extend(new_locks.into_iter());
2883                            // Lock acquisition succeeded - now set Finalized status
2884                            self.epoch_store
2885                                .set_consensus_tx_status(position, ConsensusTxStatus::Finalized);
2886                            num_finalized_user_transactions[author] += 1;
2887                        }
2888                        Err(e) => {
2889                            debug!("Dropping transaction {}: {}", tx.digest(), e);
2890                            self.epoch_store
2891                                .set_consensus_tx_status(position, ConsensusTxStatus::Dropped);
2892                            self.epoch_store.set_rejection_vote_reason(position, &e);
2893                            continue;
2894                        }
2895                    }
2896                }
2897
2898                let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);
2899                transactions.push((transaction, author as u32));
2900            }
2901        }
2902
2903        for (i, authority) in self.committee.authorities() {
2904            let hostname = &authority.hostname;
2905            self.metrics
2906                .consensus_committed_messages
2907                .with_label_values(&[hostname])
2908                .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
2909            self.metrics
2910                .consensus_committed_user_transactions
2911                .with_label_values(&[hostname])
2912                .set(
2913                    self.last_consensus_stats
2914                        .stats
2915                        .get_num_user_transactions(i.value()) as i64,
2916                );
2917            self.metrics
2918                .consensus_finalized_user_transactions
2919                .with_label_values(&[hostname])
2920                .add(num_finalized_user_transactions[i.value()] as i64);
2921            self.metrics
2922                .consensus_rejected_user_transactions
2923                .with_label_values(&[hostname])
2924                .add(num_rejected_user_transactions[i.value()] as i64);
2925        }
2926
2927        FilteredConsensusOutput {
2928            transactions,
2929            owned_object_locks,
2930        }
2931    }
2932
2933    fn deduplicate_consensus_txns(
2934        &mut self,
2935        state: &mut CommitHandlerState,
2936        commit_info: &ConsensusCommitInfo,
2937        transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
2938    ) -> Vec<VerifiedSequencedConsensusTransaction> {
2939        let mut all_transactions = Vec::new();
2940
2941        // Track occurrence counts for each transaction key within this commit.
2942        // Also serves as the deduplication set (count > 1 means duplicate within commit).
2943        let mut occurrence_counts: HashMap<SequencedConsensusTransactionKey, u32> = HashMap::new();
2944        // Keys being seen for the first time (not duplicates from previous commits).
2945        let mut first_commit_keys: HashSet<SequencedConsensusTransactionKey> = HashSet::new();
2946
2947        for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
2948            // In process_consensus_transactions_and_commit_boundary(), we will add a system consensus commit
2949            // prologue transaction, which will be the first transaction in this consensus commit batch.
2950            // Therefore, the transaction sequence number starts from 1 here.
2951            let current_tx_index = ExecutionIndices {
2952                last_committed_round: commit_info.round,
2953                sub_dag_index: commit_info.consensus_commit_ref.index.into(),
2954                transaction_index: (seq + 1) as u64,
2955            };
2956
2957            self.last_consensus_stats.index = current_tx_index;
2958
2959            let certificate_author = *self
2960                .epoch_store
2961                .committee()
2962                .authority_by_index(cert_origin)
2963                .unwrap();
2964
2965            let sequenced_transaction = SequencedConsensusTransaction {
2966                certificate_author_index: cert_origin,
2967                certificate_author,
2968                consensus_index: current_tx_index,
2969                transaction,
2970            };
2971
2972            let Some(verified_transaction) = self
2973                .epoch_store
2974                .verify_consensus_transaction(sequenced_transaction)
2975            else {
2976                continue;
2977            };
2978
2979            let key = verified_transaction.0.key();
2980
2981            if let Some(tx_digest) = key.user_transaction_digest() {
2982                self.epoch_store
2983                    .cache_recently_finalized_transaction(tx_digest);
2984            }
2985
2986            // Increment count and check if this is a duplicate within this commit.
2987            // This replaces the separate processed_set HashSet.
2988            let count = occurrence_counts.entry(key.clone()).or_insert(0);
2989            *count += 1;
2990            let in_commit = *count > 1;
2991
2992            let in_cache = self.processed_cache.put(key.clone(), ()).is_some();
2993            if in_commit || in_cache {
2994                self.metrics.skipped_consensus_txns_cache_hit.inc();
2995                continue;
2996            }
2997            if self
2998                .epoch_store
2999                .is_consensus_message_processed(&key)
3000                .expect("db error")
3001            {
3002                self.metrics.skipped_consensus_txns.inc();
3003                continue;
3004            }
3005
3006            first_commit_keys.insert(key.clone());
3007
3008            state.output.record_consensus_message_processed(key);
3009
3010            all_transactions.push(verified_transaction);
3011        }
3012
3013        for key in first_commit_keys {
3014            if let Some(&count) = occurrence_counts.get(&key)
3015                && count > 1
3016            {
3017                self.metrics
3018                    .consensus_handler_duplicate_tx_count
3019                    .observe(count as f64);
3020            }
3021        }
3022
3023        // Copy user transaction occurrence counts to state for unpaid amplification detection.
3024        assert!(
3025            state.occurrence_counts.is_empty(),
3026            "occurrence_counts should be empty before populating"
3027        );
3028        state.occurrence_counts.reserve(occurrence_counts.len());
3029        state.occurrence_counts.extend(
3030            occurrence_counts
3031                .into_iter()
3032                .filter_map(|(key, count)| key.user_transaction_digest().map(|d| (d, count))),
3033        );
3034
3035        all_transactions
3036    }
3037
3038    fn build_commit_handler_input(
3039        &self,
3040        transactions: Vec<VerifiedSequencedConsensusTransaction>,
3041    ) -> CommitHandlerInput {
3042        let epoch = self.epoch_store.epoch();
3043        let mut commit_handler_input = CommitHandlerInput::default();
3044
3045        for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() {
3046            match transaction.transaction {
3047                SequencedConsensusTransactionKind::External(consensus_transaction) => {
3048                    match consensus_transaction.kind {
3049                        // === User transactions ===
3050                        ConsensusTransactionKind::CertifiedTransaction(cert) => {
3051                            // Safe because signatures are verified when consensus called into SuiTxValidator::validate_batch.
3052                            let cert = VerifiedCertificate::new_unchecked(*cert);
3053                            let transaction =
3054                                VerifiedExecutableTransaction::new_from_certificate(cert);
3055                            commit_handler_input.user_transactions.push(
3056                                VerifiedExecutableTransactionWithAliases::no_aliases(transaction),
3057                            );
3058                        }
3059                        ConsensusTransactionKind::UserTransaction(tx) => {
3060                            // Safe because transactions are certified by consensus.
3061                            let tx = VerifiedTransaction::new_unchecked(*tx);
3062                            // TODO(fastpath): accept position in consensus, after plumbing consensus round, authority index, and transaction index here.
3063                            let transaction =
3064                                VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
3065                            commit_handler_input
3066                                .user_transactions
3067                                // Use of v1 UserTransaction implies commitment to no aliases.
3068                                .push(VerifiedExecutableTransactionWithAliases::no_aliases(
3069                                    transaction,
3070                                ));
3071                        }
3072                        ConsensusTransactionKind::UserTransactionV2(tx) => {
3073                            // Extract the aliases claim (required) from the claims
3074                            let used_alias_versions = if self
3075                                .epoch_store
3076                                .protocol_config()
3077                                .fix_checkpoint_signature_mapping()
3078                            {
3079                                tx.aliases()
3080                            } else {
3081                                // Convert V1 to V2 format using dummy signature indices
3082                                // which will be ignored with `fix_checkpoint_signature_mapping`
3083                                // disabled.
3084                                tx.aliases_v1().map(|a| {
3085                                    NonEmpty::from_vec(
3086                                        a.into_iter()
3087                                            .enumerate()
3088                                            .map(|(idx, (_, seq))| (idx as u8, seq))
3089                                            .collect(),
3090                                    )
3091                                    .unwrap()
3092                                })
3093                            };
3094                            let inner_tx = tx.into_tx();
3095                            // Safe because transactions are certified by consensus.
3096                            let tx = VerifiedTransaction::new_unchecked(inner_tx);
3097                            // TODO(fastpath): accept position in consensus, after plumbing consensus round, authority index, and transaction index here.
3098                            let transaction =
3099                                VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
3100                            if let Some(used_alias_versions) = used_alias_versions {
3101                                commit_handler_input
3102                                    .user_transactions
3103                                    .push(WithAliases::new(transaction, used_alias_versions));
3104                            } else {
3105                                commit_handler_input.user_transactions.push(
3106                                    VerifiedExecutableTransactionWithAliases::no_aliases(
3107                                        transaction,
3108                                    ),
3109                                );
3110                            }
3111                        }
3112
3113                        // === State machines ===
3114                        ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => {
3115                            commit_handler_input
3116                                .end_of_publish_transactions
3117                                .push(authority_public_key_bytes);
3118                        }
3119                        ConsensusTransactionKind::NewJWKFetched(
3120                            authority_public_key_bytes,
3121                            jwk_id,
3122                            jwk,
3123                        ) => {
3124                            commit_handler_input.new_jwks.push((
3125                                authority_public_key_bytes,
3126                                jwk_id,
3127                                jwk,
3128                            ));
3129                        }
3130                        ConsensusTransactionKind::RandomnessDkgMessage(
3131                            authority_public_key_bytes,
3132                            items,
3133                        ) => {
3134                            commit_handler_input
3135                                .randomness_dkg_messages
3136                                .push((authority_public_key_bytes, items));
3137                        }
3138                        ConsensusTransactionKind::RandomnessDkgConfirmation(
3139                            authority_public_key_bytes,
3140                            items,
3141                        ) => {
3142                            commit_handler_input
3143                                .randomness_dkg_confirmations
3144                                .push((authority_public_key_bytes, items));
3145                        }
3146                        ConsensusTransactionKind::CapabilityNotificationV2(
3147                            authority_capabilities_v2,
3148                        ) => {
3149                            commit_handler_input
3150                                .capability_notifications
3151                                .push(authority_capabilities_v2);
3152                        }
3153                        ConsensusTransactionKind::ExecutionTimeObservation(
3154                            execution_time_observation,
3155                        ) => {
3156                            commit_handler_input
3157                                .execution_time_observations
3158                                .push(execution_time_observation);
3159                        }
3160                        ConsensusTransactionKind::CheckpointSignatureV2(
3161                            checkpoint_signature_message,
3162                        ) => {
3163                            commit_handler_input
3164                                .checkpoint_signature_messages
3165                                .push(*checkpoint_signature_message);
3166                        }
3167
3168                        // Deprecated messages, filtered earlier by filter_consensus_txns()
3169                        ConsensusTransactionKind::CheckpointSignature(_)
3170                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
3171                        | ConsensusTransactionKind::CapabilityNotification(_) => {
3172                            unreachable!("filtered earlier")
3173                        }
3174                    }
3175                }
3176                // TODO: I think we can delete this, it was only used to inject randomness state update into the tx stream.
3177                SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(),
3178            }
3179        }
3180
3181        commit_handler_input
3182    }
3183
3184    async fn send_end_of_publish_if_needed(&self) {
3185        if !self.epoch_store.should_send_end_of_publish() {
3186            return;
3187        }
3188
3189        let end_of_publish = ConsensusTransaction::new_end_of_publish(self.epoch_store.name);
3190        if let Err(err) =
3191            self.consensus_adapter
3192                .submit(end_of_publish, None, &self.epoch_store, None, None)
3193        {
3194            warn!(
3195                "Error when sending EndOfPublish message from ConsensusHandler: {:?}",
3196                err
3197            );
3198        } else {
3199            info!(epoch=?self.epoch_store.epoch(), "Sending EndOfPublish message to consensus");
3200        }
3201    }
3202}
3203
3204/// Sends transactions to the execution scheduler in a separate task,
3205/// to avoid blocking consensus handler.
3206pub(crate) type SchedulerMessage = (
3207    Vec<(Schedulable, AssignedVersions)>,
3208    Option<SettlementBatchInfo>,
3209);
3210
3211#[derive(Clone)]
3212pub(crate) struct ExecutionSchedulerSender {
3213    sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3214}
3215
3216impl ExecutionSchedulerSender {
3217    fn start(
3218        settlement_scheduler: SettlementScheduler,
3219        epoch_store: Arc<AuthorityPerEpochStore>,
3220    ) -> Self {
3221        let (sender, recv) = monitored_mpsc::unbounded_channel("execution_scheduler_sender");
3222        spawn_monitored_task!(Self::run(recv, settlement_scheduler, epoch_store));
3223        Self { sender }
3224    }
3225
3226    pub(crate) fn new_for_testing(
3227        sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3228    ) -> Self {
3229        Self { sender }
3230    }
3231
3232    fn send(
3233        &self,
3234        transactions: Vec<(Schedulable, AssignedVersions)>,
3235        settlement: Option<SettlementBatchInfo>,
3236    ) {
3237        let _ = self.sender.send((transactions, settlement));
3238    }
3239
3240    async fn run(
3241        mut recv: monitored_mpsc::UnboundedReceiver<SchedulerMessage>,
3242        settlement_scheduler: SettlementScheduler,
3243        epoch_store: Arc<AuthorityPerEpochStore>,
3244    ) {
3245        while let Some((transactions, settlement)) = recv.recv().await {
3246            let _guard = monitored_scope("ConsensusHandler::enqueue");
3247            let txns = transactions
3248                .into_iter()
3249                .map(|(txn, versions)| (txn, ExecutionEnv::new().with_assigned_versions(versions)))
3250                .collect();
3251            if let Some(settlement) = settlement {
3252                settlement_scheduler.enqueue_v2(txns, settlement, &epoch_store);
3253            } else {
3254                settlement_scheduler.enqueue(txns, &epoch_store);
3255            }
3256        }
3257    }
3258}
3259
3260/// Manages the lifetime of tasks handling the commits and transactions output by consensus.
3261pub(crate) struct MysticetiConsensusHandler {
3262    tasks: JoinSet<()>,
3263}
3264
3265impl MysticetiConsensusHandler {
3266    pub(crate) fn new(
3267        last_processed_commit_at_startup: CommitIndex,
3268        mut consensus_handler: ConsensusHandler<CheckpointService>,
3269        mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
3270        commit_consumer_monitor: Arc<CommitConsumerMonitor>,
3271    ) -> Self {
3272        debug!(
3273            last_processed_commit_at_startup,
3274            "Starting consensus replay"
3275        );
3276        let mut tasks = JoinSet::new();
3277        tasks.spawn(monitored_future!(async move {
3278            // TODO: pause when execution is overloaded, so consensus can detect the backpressure.
3279            while let Some(consensus_commit) = commit_receiver.recv().await {
3280                let commit_index = consensus_commit.commit_ref.index;
3281                if commit_index <= last_processed_commit_at_startup {
3282                    consensus_handler.handle_prior_consensus_commit(consensus_commit);
3283                } else {
3284                    consensus_handler
3285                        .handle_consensus_commit(consensus_commit)
3286                        .await;
3287                }
3288                commit_consumer_monitor.set_highest_handled_commit(commit_index);
3289            }
3290        }));
3291        Self { tasks }
3292    }
3293
3294    pub(crate) async fn abort(&mut self) {
3295        self.tasks.shutdown().await;
3296    }
3297}
3298
3299fn authenticator_state_update_transaction(
3300    epoch_store: &AuthorityPerEpochStore,
3301    round: u64,
3302    mut new_active_jwks: Vec<ActiveJwk>,
3303) -> VerifiedExecutableTransaction {
3304    let epoch = epoch_store.epoch();
3305    new_active_jwks.sort();
3306
3307    info!("creating authenticator state update transaction");
3308    assert!(epoch_store.authenticator_state_enabled());
3309    let transaction = VerifiedTransaction::new_authenticator_state_update(
3310        epoch,
3311        round,
3312        new_active_jwks,
3313        epoch_store
3314            .epoch_start_config()
3315            .authenticator_obj_initial_shared_version()
3316            .expect("authenticator state obj must exist"),
3317    );
3318    VerifiedExecutableTransaction::new_system(transaction, epoch)
3319}
3320
3321pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
3322    match &transaction.kind {
3323        ConsensusTransactionKind::CertifiedTransaction(certificate) => {
3324            if certificate.is_consensus_tx() {
3325                "shared_certificate"
3326            } else {
3327                "owned_certificate"
3328            }
3329        }
3330        ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
3331        ConsensusTransactionKind::CheckpointSignatureV2(_) => "checkpoint_signature",
3332        ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
3333        ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
3334        ConsensusTransactionKind::CapabilityNotificationV2(_) => "capability_notification_v2",
3335        ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
3336        ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
3337        ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
3338        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
3339        ConsensusTransactionKind::UserTransaction(tx) => {
3340            if tx.is_consensus_tx() {
3341                "shared_user_transaction"
3342            } else {
3343                "owned_user_transaction"
3344            }
3345        }
3346        ConsensusTransactionKind::UserTransactionV2(tx) => {
3347            if tx.tx().is_consensus_tx() {
3348                "shared_user_transaction_v2"
3349            } else {
3350                "owned_user_transaction_v2"
3351            }
3352        }
3353        ConsensusTransactionKind::ExecutionTimeObservation(_) => "execution_time_observation",
3354    }
3355}
3356
3357#[derive(Debug, Clone, Serialize, Deserialize)]
3358pub struct SequencedConsensusTransaction {
3359    pub certificate_author_index: AuthorityIndex,
3360    pub certificate_author: AuthorityName,
3361    pub consensus_index: ExecutionIndices,
3362    pub transaction: SequencedConsensusTransactionKind,
3363}
3364
3365#[derive(Debug, Clone)]
3366#[allow(clippy::large_enum_variant)]
3367pub enum SequencedConsensusTransactionKind {
3368    External(ConsensusTransaction),
3369    System(VerifiedExecutableTransaction),
3370}
3371
3372impl Serialize for SequencedConsensusTransactionKind {
3373    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
3374        let serializable = SerializableSequencedConsensusTransactionKind::from(self);
3375        serializable.serialize(serializer)
3376    }
3377}
3378
3379impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
3380    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
3381        let serializable =
3382            SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
3383        Ok(serializable.into())
3384    }
3385}
3386
3387// We can't serialize SequencedConsensusTransactionKind directly because it contains a
3388// VerifiedExecutableTransaction, which is not serializable (by design). This wrapper allows us to
3389// convert to a serializable format easily.
3390#[derive(Debug, Clone, Serialize, Deserialize)]
3391#[allow(clippy::large_enum_variant)]
3392enum SerializableSequencedConsensusTransactionKind {
3393    External(ConsensusTransaction),
3394    System(TrustedExecutableTransaction),
3395}
3396
3397impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
3398    fn from(kind: &SequencedConsensusTransactionKind) -> Self {
3399        match kind {
3400            SequencedConsensusTransactionKind::External(ext) => {
3401                SerializableSequencedConsensusTransactionKind::External(ext.clone())
3402            }
3403            SequencedConsensusTransactionKind::System(txn) => {
3404                SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
3405            }
3406        }
3407    }
3408}
3409
3410impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
3411    fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
3412        match kind {
3413            SerializableSequencedConsensusTransactionKind::External(ext) => {
3414                SequencedConsensusTransactionKind::External(ext)
3415            }
3416            SerializableSequencedConsensusTransactionKind::System(txn) => {
3417                SequencedConsensusTransactionKind::System(txn.into())
3418            }
3419        }
3420    }
3421}
3422
3423#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
3424pub enum SequencedConsensusTransactionKey {
3425    External(ConsensusTransactionKey),
3426    System(TransactionDigest),
3427}
3428
3429impl SequencedConsensusTransactionKey {
3430    pub fn user_transaction_digest(&self) -> Option<TransactionDigest> {
3431        match self {
3432            SequencedConsensusTransactionKey::External(key) => match key {
3433                ConsensusTransactionKey::Certificate(digest) => Some(*digest),
3434                _ => None,
3435            },
3436            SequencedConsensusTransactionKey::System(_) => None,
3437        }
3438    }
3439}
3440
3441impl SequencedConsensusTransactionKind {
3442    pub fn key(&self) -> SequencedConsensusTransactionKey {
3443        match self {
3444            SequencedConsensusTransactionKind::External(ext) => {
3445                SequencedConsensusTransactionKey::External(ext.key())
3446            }
3447            SequencedConsensusTransactionKind::System(txn) => {
3448                SequencedConsensusTransactionKey::System(*txn.digest())
3449            }
3450        }
3451    }
3452
3453    pub fn get_tracking_id(&self) -> u64 {
3454        match self {
3455            SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
3456            SequencedConsensusTransactionKind::System(_txn) => 0,
3457        }
3458    }
3459
3460    pub fn is_executable_transaction(&self) -> bool {
3461        match self {
3462            SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(),
3463            SequencedConsensusTransactionKind::System(_) => true,
3464        }
3465    }
3466
3467    pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
3468        match self {
3469            SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
3470                ConsensusTransactionKind::CertifiedTransaction(txn) => Some(*txn.digest()),
3471                ConsensusTransactionKind::UserTransaction(txn) => Some(*txn.digest()),
3472                ConsensusTransactionKind::UserTransactionV2(txn) => Some(*txn.tx().digest()),
3473                _ => None,
3474            },
3475            SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
3476        }
3477    }
3478
3479    pub fn is_end_of_publish(&self) -> bool {
3480        match self {
3481            SequencedConsensusTransactionKind::External(ext) => {
3482                matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
3483            }
3484            SequencedConsensusTransactionKind::System(_) => false,
3485        }
3486    }
3487}
3488
3489impl SequencedConsensusTransaction {
3490    pub fn sender_authority(&self) -> AuthorityName {
3491        self.certificate_author
3492    }
3493
3494    pub fn key(&self) -> SequencedConsensusTransactionKey {
3495        self.transaction.key()
3496    }
3497
3498    pub fn is_end_of_publish(&self) -> bool {
3499        if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
3500            matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
3501        } else {
3502            false
3503        }
3504    }
3505
3506    pub fn try_take_execution_time_observation(&mut self) -> Option<ExecutionTimeObservation> {
3507        if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
3508            kind: ConsensusTransactionKind::ExecutionTimeObservation(observation),
3509            ..
3510        }) = &mut self.transaction
3511        {
3512            Some(std::mem::take(observation))
3513        } else {
3514            None
3515        }
3516    }
3517
3518    pub fn is_system(&self) -> bool {
3519        matches!(
3520            self.transaction,
3521            SequencedConsensusTransactionKind::System(_)
3522        )
3523    }
3524
3525    pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
3526        if !randomness_state_enabled {
3527            // If randomness is disabled, these should be processed same as a tx without randomness,
3528            // which will eventually fail when the randomness state object is not found.
3529            return false;
3530        }
3531        match &self.transaction {
3532            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3533                kind: ConsensusTransactionKind::CertifiedTransaction(cert),
3534                ..
3535            }) => cert.transaction_data().uses_randomness(),
3536            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3537                kind: ConsensusTransactionKind::UserTransaction(txn),
3538                ..
3539            }) => txn.transaction_data().uses_randomness(),
3540            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3541                kind: ConsensusTransactionKind::UserTransactionV2(txn),
3542                ..
3543            }) => txn.tx().transaction_data().uses_randomness(),
3544            _ => false,
3545        }
3546    }
3547
3548    pub fn as_consensus_txn(&self) -> Option<&SenderSignedData> {
3549        match &self.transaction {
3550            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3551                kind: ConsensusTransactionKind::CertifiedTransaction(certificate),
3552                ..
3553            }) if certificate.is_consensus_tx() => Some(certificate.data()),
3554            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3555                kind: ConsensusTransactionKind::UserTransaction(txn),
3556                ..
3557            }) if txn.is_consensus_tx() => Some(txn.data()),
3558            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3559                kind: ConsensusTransactionKind::UserTransactionV2(txn),
3560                ..
3561            }) if txn.tx().is_consensus_tx() => Some(txn.tx().data()),
3562            SequencedConsensusTransactionKind::System(txn) if txn.is_consensus_tx() => {
3563                Some(txn.data())
3564            }
3565            _ => None,
3566        }
3567    }
3568}
3569
3570#[derive(Debug, Clone, Serialize, Deserialize)]
3571pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
3572
3573#[cfg(test)]
3574impl VerifiedSequencedConsensusTransaction {
3575    pub fn new_test(transaction: ConsensusTransaction) -> Self {
3576        Self(SequencedConsensusTransaction::new_test(transaction))
3577    }
3578}
3579
3580impl SequencedConsensusTransaction {
3581    pub fn new_test(transaction: ConsensusTransaction) -> Self {
3582        Self {
3583            certificate_author_index: 0,
3584            certificate_author: AuthorityName::ZERO,
3585            consensus_index: Default::default(),
3586            transaction: SequencedConsensusTransactionKind::External(transaction),
3587        }
3588    }
3589}
3590
3591#[derive(Serialize, Deserialize)]
3592pub(crate) struct CommitIntervalObserver {
3593    ring_buffer: VecDeque<u64>,
3594}
3595
3596impl CommitIntervalObserver {
3597    pub fn new(window_size: u32) -> Self {
3598        Self {
3599            ring_buffer: VecDeque::with_capacity(window_size as usize),
3600        }
3601    }
3602
3603    pub fn observe_commit_time(&mut self, consensus_commit: &impl ConsensusCommitAPI) {
3604        let commit_time = consensus_commit.commit_timestamp_ms();
3605        if self.ring_buffer.len() == self.ring_buffer.capacity() {
3606            self.ring_buffer.pop_front();
3607        }
3608        self.ring_buffer.push_back(commit_time);
3609    }
3610
3611    pub fn commit_interval_estimate(&self) -> Option<Duration> {
3612        if self.ring_buffer.len() <= 1 {
3613            None
3614        } else {
3615            let first = self.ring_buffer.front().unwrap();
3616            let last = self.ring_buffer.back().unwrap();
3617            let duration = last.saturating_sub(*first);
3618            let num_commits = self.ring_buffer.len() as u64;
3619            Some(Duration::from_millis(duration.div_ceil(num_commits)))
3620        }
3621    }
3622}
3623
3624#[cfg(test)]
3625mod tests {
3626    use std::collections::HashSet;
3627
3628    use consensus_core::{
3629        BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock,
3630    };
3631    use futures::pin_mut;
3632    use prometheus::Registry;
3633    use sui_protocol_config::{ConsensusTransactionOrdering, ProtocolConfig};
3634    use sui_types::{
3635        base_types::ExecutionDigests,
3636        base_types::{AuthorityName, FullObjectRef, ObjectID, SuiAddress, random_object_ref},
3637        committee::Committee,
3638        crypto::deterministic_random_account_key,
3639        gas::GasCostSummary,
3640        message_envelope::Message,
3641        messages_checkpoint::{
3642            CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
3643            SignedCheckpointSummary,
3644        },
3645        messages_consensus::ConsensusTransaction,
3646        object::Object,
3647        transaction::{
3648            CertifiedTransaction, TransactionData, TransactionDataAPI, VerifiedCertificate,
3649        },
3650    };
3651
3652    use super::*;
3653    use crate::{
3654        authority::{
3655            authority_per_epoch_store::ConsensusStatsAPI,
3656            test_authority_builder::TestAuthorityBuilder,
3657        },
3658        checkpoints::CheckpointServiceNoop,
3659        consensus_adapter::consensus_tests::test_user_transaction,
3660        consensus_test_utils::make_consensus_adapter_for_test,
3661        post_consensus_tx_reorder::PostConsensusTxReorder,
3662    };
3663
3664    #[tokio::test(flavor = "current_thread", start_paused = true)]
3665    async fn test_consensus_commit_handler() {
3666        telemetry_subscribers::init_for_testing();
3667
3668        // GIVEN
3669        // 1 account keypair
3670        let (sender, keypair) = deterministic_random_account_key();
3671        // 12 gas objects.
3672        let gas_objects: Vec<Object> = (0..12)
3673            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3674            .collect();
3675        // 4 owned objects.
3676        let owned_objects: Vec<Object> = (0..4)
3677            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3678            .collect();
3679        // 6 shared objects.
3680        let shared_objects: Vec<Object> = (0..6)
3681            .map(|_| Object::shared_for_testing())
3682            .collect::<Vec<_>>();
3683        let mut all_objects = gas_objects.clone();
3684        all_objects.extend(owned_objects.clone());
3685        all_objects.extend(shared_objects.clone());
3686
3687        let network_config =
3688            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3689                .with_objects(all_objects.clone())
3690                .build();
3691
3692        let state = TestAuthorityBuilder::new()
3693            .with_network_config(&network_config, 0)
3694            .build()
3695            .await;
3696
3697        let epoch_store = state.epoch_store_for_testing().clone();
3698        let new_epoch_start_state = epoch_store.epoch_start_state();
3699        let consensus_committee = new_epoch_start_state.get_consensus_committee();
3700
3701        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3702
3703        let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
3704
3705        let backpressure_manager = BackpressureManager::new_for_tests();
3706        let consensus_adapter =
3707            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3708        let settlement_scheduler = SettlementScheduler::new(
3709            state.execution_scheduler().as_ref().clone(),
3710            state.get_transaction_cache_reader().clone(),
3711        );
3712        let mut consensus_handler = ConsensusHandler::new(
3713            epoch_store,
3714            Arc::new(CheckpointServiceNoop {}),
3715            settlement_scheduler,
3716            consensus_adapter,
3717            state.get_object_cache_reader().clone(),
3718            Arc::new(ArcSwap::default()),
3719            consensus_committee.clone(),
3720            metrics,
3721            Arc::new(throughput_calculator),
3722            backpressure_manager.subscribe(),
3723            state.traffic_controller.clone(),
3724            None,
3725        );
3726
3727        // AND create test user transactions alternating between owned and shared input.
3728        let mut user_transactions = vec![];
3729        for (i, gas_object) in gas_objects[0..8].iter().enumerate() {
3730            let input_object = if i % 2 == 0 {
3731                owned_objects.get(i / 2).unwrap().clone()
3732            } else {
3733                shared_objects.get(i / 2).unwrap().clone()
3734            };
3735            let transaction = test_user_transaction(
3736                &state,
3737                sender,
3738                &keypair,
3739                gas_object.clone(),
3740                vec![input_object],
3741            )
3742            .await;
3743            user_transactions.push(transaction);
3744        }
3745
3746        // AND create 4 more user transactions with remaining gas objects and 2 shared objects.
3747        // Having more txns on the same shared object may get deferred.
3748        for (i, gas_object) in gas_objects[8..12].iter().enumerate() {
3749            let shared_object = if i < 2 {
3750                shared_objects[4].clone()
3751            } else {
3752                shared_objects[5].clone()
3753            };
3754            let transaction = test_user_transaction(
3755                &state,
3756                sender,
3757                &keypair,
3758                gas_object.clone(),
3759                vec![shared_object],
3760            )
3761            .await;
3762            user_transactions.push(transaction);
3763        }
3764
3765        // AND create block for each user transaction
3766        let mut blocks = Vec::new();
3767        for (i, consensus_transaction) in user_transactions
3768            .iter()
3769            .cloned()
3770            .map(|t| ConsensusTransaction::new_user_transaction_v2_message(&state.name, t.into()))
3771            .enumerate()
3772        {
3773            let transaction_bytes = bcs::to_bytes(&consensus_transaction).unwrap();
3774            let block = VerifiedBlock::new_for_test(
3775                TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
3776                    .set_transactions(vec![Transaction::new(transaction_bytes)])
3777                    .build(),
3778            );
3779
3780            blocks.push(block);
3781        }
3782
3783        // AND create the consensus commit
3784        let leader_block = blocks[0].clone();
3785        let committed_sub_dag = CommittedSubDag::new(
3786            leader_block.reference(),
3787            blocks.clone(),
3788            leader_block.timestamp_ms(),
3789            CommitRef::new(10, CommitDigest::MIN),
3790            true,
3791        );
3792
3793        // Test that the consensus handler respects backpressure.
3794        backpressure_manager.set_backpressure(true);
3795        // Default watermarks are 0,0 which will suppress the backpressure.
3796        backpressure_manager.update_highest_certified_checkpoint(1);
3797
3798        // AND process the consensus commit once
3799        {
3800            let waiter = consensus_handler.handle_consensus_commit(committed_sub_dag.clone());
3801            pin_mut!(waiter);
3802
3803            // waiter should not complete within 5 seconds
3804            tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
3805                .await
3806                .unwrap_err();
3807
3808            // lift backpressure
3809            backpressure_manager.set_backpressure(false);
3810
3811            // waiter completes now.
3812            tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
3813                .await
3814                .unwrap();
3815        }
3816
3817        // THEN check the consensus stats
3818        let num_blocks = blocks.len();
3819        let num_transactions = user_transactions.len();
3820        let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
3821        assert_eq!(
3822            last_consensus_stats_1.index.transaction_index,
3823            num_transactions as u64
3824        );
3825        assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
3826        assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
3827        assert_eq!(
3828            last_consensus_stats_1.stats.get_num_messages(0),
3829            num_blocks as u64
3830        );
3831        assert_eq!(
3832            last_consensus_stats_1.stats.get_num_user_transactions(0),
3833            num_transactions as u64
3834        );
3835
3836        // THEN check for execution status of user transactions.
3837        for (i, t) in user_transactions.iter().enumerate() {
3838            let digest = t.tx().digest();
3839            if tokio::time::timeout(
3840                std::time::Duration::from_secs(10),
3841                state.notify_read_effects_for_testing("", *digest),
3842            )
3843            .await
3844            .is_ok()
3845            {
3846                // Effects exist as expected.
3847            } else {
3848                panic!("User transaction {} {} did not execute", i, digest);
3849            }
3850        }
3851
3852        // THEN check for no inflight or suspended transactions.
3853        state.execution_scheduler().check_empty_for_testing().await;
3854    }
3855
3856    fn to_short_strings(txs: Vec<VerifiedExecutableTransactionWithAliases>) -> Vec<String> {
3857        txs.into_iter()
3858            .map(|tx| format!("transaction({})", tx.tx().transaction_data().gas_price()))
3859            .collect()
3860    }
3861
3862    #[test]
3863    fn test_order_by_gas_price() {
3864        let mut v = vec![user_txn(42), user_txn(100)];
3865        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3866        assert_eq!(
3867            to_short_strings(v),
3868            vec![
3869                "transaction(100)".to_string(),
3870                "transaction(42)".to_string(),
3871            ]
3872        );
3873
3874        let mut v = vec![
3875            user_txn(1200),
3876            user_txn(12),
3877            user_txn(1000),
3878            user_txn(42),
3879            user_txn(100),
3880            user_txn(1000),
3881        ];
3882        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3883        assert_eq!(
3884            to_short_strings(v),
3885            vec![
3886                "transaction(1200)".to_string(),
3887                "transaction(1000)".to_string(),
3888                "transaction(1000)".to_string(),
3889                "transaction(100)".to_string(),
3890                "transaction(42)".to_string(),
3891                "transaction(12)".to_string(),
3892            ]
3893        );
3894    }
3895
3896    #[tokio::test(flavor = "current_thread")]
3897    async fn test_checkpoint_signature_dedup() {
3898        telemetry_subscribers::init_for_testing();
3899
3900        let network_config =
3901            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3902        let state = TestAuthorityBuilder::new()
3903            .with_network_config(&network_config, 0)
3904            .build()
3905            .await;
3906
3907        let epoch_store = state.epoch_store_for_testing().clone();
3908        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3909
3910        let make_signed = || {
3911            let epoch = epoch_store.epoch();
3912            let contents =
3913                CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3914            let summary = CheckpointSummary::new(
3915                &ProtocolConfig::get_for_max_version_UNSAFE(),
3916                epoch,
3917                42, // sequence number
3918                10, // network_total_transactions
3919                &contents,
3920                None, // previous_digest
3921                GasCostSummary::default(),
3922                None,       // end_of_epoch_data
3923                0,          // timestamp
3924                Vec::new(), // randomness_rounds
3925                Vec::new(), // checkpoint_artifact_digests
3926            );
3927            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name)
3928        };
3929
3930        // Prepare V2 pair: same (authority, seq), different digests => different keys
3931        let v2_s1 = make_signed();
3932        let v2_s1_clone = v2_s1.clone();
3933        let v2_digest_a = v2_s1.data().digest();
3934        let v2_a =
3935            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3936                summary: v2_s1,
3937            });
3938
3939        let v2_s2 = make_signed();
3940        let v2_digest_b = v2_s2.data().digest();
3941        let v2_b =
3942            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3943                summary: v2_s2,
3944            });
3945
3946        assert_ne!(v2_digest_a, v2_digest_b);
3947
3948        // Create an exact duplicate with same digest to exercise valid dedup
3949        assert_eq!(v2_s1_clone.data().digest(), v2_digest_a);
3950        let v2_dup =
3951            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3952                summary: v2_s1_clone,
3953            });
3954
3955        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3956        let block = VerifiedBlock::new_for_test(
3957            TestBlock::new(100, 0)
3958                .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)])
3959                .build(),
3960        );
3961        let commit = CommittedSubDag::new(
3962            block.reference(),
3963            vec![block.clone()],
3964            block.timestamp_ms(),
3965            CommitRef::new(10, CommitDigest::MIN),
3966            true,
3967        );
3968
3969        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3970        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3971        let backpressure = BackpressureManager::new_for_tests();
3972        let consensus_adapter =
3973            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3974        let settlement_scheduler = SettlementScheduler::new(
3975            state.execution_scheduler().as_ref().clone(),
3976            state.get_transaction_cache_reader().clone(),
3977        );
3978        let mut handler = ConsensusHandler::new(
3979            epoch_store.clone(),
3980            Arc::new(CheckpointServiceNoop {}),
3981            settlement_scheduler,
3982            consensus_adapter,
3983            state.get_object_cache_reader().clone(),
3984            Arc::new(ArcSwap::default()),
3985            consensus_committee.clone(),
3986            metrics,
3987            Arc::new(throughput),
3988            backpressure.subscribe(),
3989            state.traffic_controller.clone(),
3990            None,
3991        );
3992
3993        handler.handle_consensus_commit(commit).await;
3994
3995        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3996        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3997
3998        // V2 distinct digests: both must be processed. If these were collapsed to one CheckpointSeq num, only one would process.
3999        let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a));
4000        let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b));
4001        assert!(
4002            epoch_store
4003                .is_consensus_message_processed(&v2_key_a)
4004                .unwrap()
4005        );
4006        assert!(
4007            epoch_store
4008                .is_consensus_message_processed(&v2_key_b)
4009                .unwrap()
4010        );
4011    }
4012
4013    #[tokio::test(flavor = "current_thread")]
4014    async fn test_verify_consensus_transaction_filters_mismatched_authorities() {
4015        telemetry_subscribers::init_for_testing();
4016
4017        let network_config =
4018            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
4019        let state = TestAuthorityBuilder::new()
4020            .with_network_config(&network_config, 0)
4021            .build()
4022            .await;
4023
4024        let epoch_store = state.epoch_store_for_testing().clone();
4025        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
4026
4027        // Create a different authority than our test authority
4028        use fastcrypto::traits::KeyPair;
4029        let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair();
4030        let wrong_authority: AuthorityName = wrong_keypair.public().into();
4031
4032        // Create EndOfPublish transaction with mismatched authority
4033        let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority);
4034
4035        // Create valid EndOfPublish transaction with correct authority
4036        let valid_eop = ConsensusTransaction::new_end_of_publish(state.name);
4037
4038        // Create CheckpointSignature with mismatched authority
4039        let epoch = epoch_store.epoch();
4040        let contents =
4041            CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
4042        let summary = CheckpointSummary::new(
4043            &ProtocolConfig::get_for_max_version_UNSAFE(),
4044            epoch,
4045            42, // sequence number
4046            10, // network_total_transactions
4047            &contents,
4048            None, // previous_digest
4049            GasCostSummary::default(),
4050            None,       // end_of_epoch_data
4051            0,          // timestamp
4052            Vec::new(), // randomness_rounds
4053            Vec::new(), // checkpoint commitments
4054        );
4055
4056        // Create a signed checkpoint with the wrong authority
4057        let mismatched_checkpoint_signed =
4058            SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority);
4059        let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest();
4060        let mismatched_checkpoint =
4061            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4062                summary: mismatched_checkpoint_signed,
4063            });
4064
4065        // Create a valid checkpoint signature with correct authority
4066        let valid_checkpoint_signed =
4067            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name);
4068        let valid_checkpoint_digest = valid_checkpoint_signed.data().digest();
4069        let valid_checkpoint =
4070            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4071                summary: valid_checkpoint_signed,
4072            });
4073
4074        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
4075
4076        // Create a block with both valid and invalid transactions
4077        let block = VerifiedBlock::new_for_test(
4078            TestBlock::new(100, 0)
4079                .set_transactions(vec![
4080                    to_tx(&mismatched_eop),
4081                    to_tx(&valid_eop),
4082                    to_tx(&mismatched_checkpoint),
4083                    to_tx(&valid_checkpoint),
4084                ])
4085                .build(),
4086        );
4087        let commit = CommittedSubDag::new(
4088            block.reference(),
4089            vec![block.clone()],
4090            block.timestamp_ms(),
4091            CommitRef::new(10, CommitDigest::MIN),
4092            true,
4093        );
4094
4095        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
4096        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
4097        let backpressure = BackpressureManager::new_for_tests();
4098        let consensus_adapter =
4099            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
4100        let settlement_scheduler = SettlementScheduler::new(
4101            state.execution_scheduler().as_ref().clone(),
4102            state.get_transaction_cache_reader().clone(),
4103        );
4104        let mut handler = ConsensusHandler::new(
4105            epoch_store.clone(),
4106            Arc::new(CheckpointServiceNoop {}),
4107            settlement_scheduler,
4108            consensus_adapter,
4109            state.get_object_cache_reader().clone(),
4110            Arc::new(ArcSwap::default()),
4111            consensus_committee.clone(),
4112            metrics,
4113            Arc::new(throughput),
4114            backpressure.subscribe(),
4115            state.traffic_controller.clone(),
4116            None,
4117        );
4118
4119        handler.handle_consensus_commit(commit).await;
4120
4121        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
4122        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
4123
4124        // Check that valid transactions were processed
4125        let valid_eop_key = SK::External(CK::EndOfPublish(state.name));
4126        assert!(
4127            epoch_store
4128                .is_consensus_message_processed(&valid_eop_key)
4129                .unwrap(),
4130            "Valid EndOfPublish should have been processed"
4131        );
4132
4133        let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4134            state.name,
4135            42,
4136            valid_checkpoint_digest,
4137        ));
4138        assert!(
4139            epoch_store
4140                .is_consensus_message_processed(&valid_checkpoint_key)
4141                .unwrap(),
4142            "Valid CheckpointSignature should have been processed"
4143        );
4144
4145        // Check that mismatched authority transactions were NOT processed (filtered out by verify_consensus_transaction)
4146        let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority));
4147        assert!(
4148            !epoch_store
4149                .is_consensus_message_processed(&mismatched_eop_key)
4150                .unwrap(),
4151            "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)"
4152        );
4153
4154        let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4155            wrong_authority,
4156            42,
4157            mismatched_checkpoint_digest,
4158        ));
4159        assert!(
4160            !epoch_store
4161                .is_consensus_message_processed(&mismatched_checkpoint_key)
4162                .unwrap(),
4163            "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)"
4164        );
4165    }
4166
4167    fn user_txn(gas_price: u64) -> VerifiedExecutableTransactionWithAliases {
4168        let (committee, keypairs) = Committee::new_simple_test_committee();
4169        let (sender, sender_keypair) = deterministic_random_account_key();
4170        let tx = sui_types::transaction::Transaction::from_data_and_signer(
4171            TransactionData::new_transfer(
4172                SuiAddress::default(),
4173                FullObjectRef::from_fastpath_ref(random_object_ref()),
4174                sender,
4175                random_object_ref(),
4176                1000 * gas_price,
4177                gas_price,
4178            ),
4179            vec![&sender_keypair],
4180        );
4181        let tx = VerifiedExecutableTransaction::new_from_certificate(
4182            VerifiedCertificate::new_unchecked(
4183                CertifiedTransaction::new_from_keypairs_for_testing(
4184                    tx.into_data(),
4185                    &keypairs,
4186                    &committee,
4187                ),
4188            ),
4189        );
4190        VerifiedExecutableTransactionWithAliases::no_aliases(tx)
4191    }
4192
4193    mod checkpoint_queue_tests {
4194        use super::*;
4195        use consensus_core::CommitRef;
4196        use sui_types::digests::Digest;
4197
4198        fn make_chunk(tx_count: usize, height: u64) -> Chunk {
4199            Chunk {
4200                schedulables: (0..tx_count)
4201                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4202                    .collect(),
4203                settlement: None,
4204                height,
4205            }
4206        }
4207
4208        fn make_commit_ref(index: u32) -> CommitRef {
4209            CommitRef {
4210                index,
4211                digest: CommitDigest::MIN,
4212            }
4213        }
4214
4215        fn default_versions() -> HashMap<TransactionKey, AssignedVersions> {
4216            HashMap::new()
4217        }
4218
4219        #[test]
4220        fn test_flush_all_checkpoint_roots() {
4221            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4222            let versions = default_versions();
4223
4224            queue.push_chunk(
4225                make_chunk(5, 1),
4226                &versions,
4227                1000,
4228                make_commit_ref(1),
4229                Digest::default(),
4230            );
4231            queue.push_chunk(
4232                make_chunk(3, 2),
4233                &versions,
4234                1000,
4235                make_commit_ref(1),
4236                Digest::default(),
4237            );
4238
4239            let pending = queue.flush(1000, true);
4240
4241            assert!(pending.is_some());
4242            assert!(queue.pending_roots.is_empty());
4243        }
4244
4245        #[test]
4246        fn test_flush_respects_min_checkpoint_interval() {
4247            let min_interval = 200;
4248            let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, 1000, min_interval);
4249            let versions = default_versions();
4250
4251            queue.push_chunk(
4252                make_chunk(5, 1),
4253                &versions,
4254                1000,
4255                make_commit_ref(1),
4256                Digest::default(),
4257            );
4258
4259            let pending = queue.flush(1000 + min_interval - 1, false);
4260            assert!(pending.is_none());
4261            assert_eq!(queue.pending_roots.len(), 1);
4262
4263            let pending = queue.flush(1000 + min_interval, false);
4264            assert!(pending.is_some());
4265            assert!(queue.pending_roots.is_empty());
4266        }
4267
4268        #[test]
4269        fn test_push_chunk_flushes_when_exceeds_max() {
4270            let max_tx = 10;
4271            let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, max_tx, 0);
4272            let versions = default_versions();
4273
4274            queue.push_chunk(
4275                make_chunk(max_tx / 2 + 1, 1),
4276                &versions,
4277                1000,
4278                make_commit_ref(1),
4279                Digest::default(),
4280            );
4281
4282            let flushed = queue.push_chunk(
4283                make_chunk(max_tx / 2 + 1, 2),
4284                &versions,
4285                1000,
4286                make_commit_ref(2),
4287                Digest::default(),
4288            );
4289
4290            assert_eq!(flushed.len(), 1);
4291            assert_eq!(queue.pending_roots.len(), 1);
4292        }
4293
4294        #[test]
4295        fn test_multiple_chunks_merged_into_one_checkpoint() {
4296            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 200);
4297            let versions = default_versions();
4298
4299            queue.push_chunk(
4300                make_chunk(10, 1),
4301                &versions,
4302                1000,
4303                make_commit_ref(1),
4304                Digest::default(),
4305            );
4306            queue.push_chunk(
4307                make_chunk(10, 2),
4308                &versions,
4309                1000,
4310                make_commit_ref(2),
4311                Digest::default(),
4312            );
4313            queue.push_chunk(
4314                make_chunk(10, 3),
4315                &versions,
4316                1000,
4317                make_commit_ref(3),
4318                Digest::default(),
4319            );
4320
4321            let pending = queue.flush(1000, true).unwrap();
4322
4323            assert_eq!(pending.roots.len(), 3);
4324        }
4325
4326        #[test]
4327        fn test_push_chunk_handles_overflow() {
4328            let max_tx = 10;
4329            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4330            let versions = default_versions();
4331
4332            let flushed1 = queue.push_chunk(
4333                make_chunk(max_tx / 2, 1),
4334                &versions,
4335                1000,
4336                make_commit_ref(1),
4337                Digest::default(),
4338            );
4339            assert!(flushed1.is_empty());
4340
4341            let flushed2 = queue.push_chunk(
4342                make_chunk(max_tx / 2, 2),
4343                &versions,
4344                1000,
4345                make_commit_ref(2),
4346                Digest::default(),
4347            );
4348            assert!(flushed2.is_empty());
4349
4350            let flushed3 = queue.push_chunk(
4351                make_chunk(max_tx / 2, 3),
4352                &versions,
4353                1000,
4354                make_commit_ref(3),
4355                Digest::default(),
4356            );
4357            assert_eq!(flushed3.len(), 1);
4358
4359            let pending = queue.flush(1000, true);
4360
4361            for p in pending.iter().chain(flushed3.iter()) {
4362                let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4363                assert!(tx_count <= max_tx);
4364            }
4365        }
4366
4367        #[test]
4368        fn test_checkpoint_uses_last_chunk_height() {
4369            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4370            let versions = default_versions();
4371
4372            queue.push_chunk(
4373                make_chunk(10, 100),
4374                &versions,
4375                1000,
4376                make_commit_ref(1),
4377                Digest::default(),
4378            );
4379            queue.push_chunk(
4380                make_chunk(10, 200),
4381                &versions,
4382                1000,
4383                make_commit_ref(2),
4384                Digest::default(),
4385            );
4386
4387            let pending = queue.flush(1000, true).unwrap();
4388
4389            assert_eq!(pending.details.checkpoint_height, 200);
4390        }
4391
4392        #[test]
4393        fn test_last_built_timestamp_updated_on_flush() {
4394            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4395            let versions = default_versions();
4396
4397            queue.push_chunk(
4398                make_chunk(10, 1),
4399                &versions,
4400                5000,
4401                make_commit_ref(1),
4402                Digest::default(),
4403            );
4404
4405            assert_eq!(queue.last_built_timestamp, 0);
4406
4407            let _ = queue.flush(5000, true);
4408
4409            assert_eq!(queue.last_built_timestamp, 5000);
4410        }
4411
4412        #[test]
4413        fn test_settlement_info_sent_through_channel() {
4414            let mut queue = CheckpointQueue::new_for_testing(0, 0, 5, 1000, 0);
4415            let versions = default_versions();
4416
4417            let chunk1 = Chunk {
4418                schedulables: vec![
4419                    Schedulable::ConsensusCommitPrologue(0, 1, 0),
4420                    Schedulable::ConsensusCommitPrologue(0, 2, 0),
4421                    Schedulable::ConsensusCommitPrologue(0, 3, 0),
4422                ],
4423                settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4424                height: 1,
4425            };
4426
4427            let chunk2 = Chunk {
4428                schedulables: vec![
4429                    Schedulable::ConsensusCommitPrologue(0, 4, 0),
4430                    Schedulable::ConsensusCommitPrologue(0, 5, 0),
4431                ],
4432                settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4433                height: 2,
4434            };
4435
4436            queue.push_chunk(
4437                chunk1,
4438                &versions,
4439                1000,
4440                make_commit_ref(1),
4441                Digest::default(),
4442            );
4443            queue.push_chunk(
4444                chunk2,
4445                &versions,
4446                1000,
4447                make_commit_ref(1),
4448                Digest::default(),
4449            );
4450        }
4451
4452        #[test]
4453        fn test_settlement_checkpoint_seq_correct_after_flush() {
4454            let max_tx = 10;
4455            let initial_seq = 5;
4456            let (sender, mut receiver) = monitored_mpsc::unbounded_channel("test_settlement_seq");
4457            let mut queue =
4458                CheckpointQueue::new_for_testing_with_sender(0, 0, initial_seq, max_tx, 0, sender);
4459            let versions = default_versions();
4460
4461            // Push a chunk that partially fills the queue (no flush).
4462            let chunk1 = Chunk {
4463                schedulables: (0..max_tx / 2 + 1)
4464                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4465                    .collect(),
4466                settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4467                height: 1,
4468            };
4469            queue.push_chunk(
4470                chunk1,
4471                &versions,
4472                1000,
4473                make_commit_ref(1),
4474                Digest::default(),
4475            );
4476
4477            // Drain the first message from the channel.
4478            let msg1 = receiver.try_recv().unwrap();
4479            let settlement1 = msg1.1.unwrap();
4480            assert_eq!(settlement1.checkpoint_seq, initial_seq);
4481
4482            // Push a second chunk that triggers a flush of chunk1's roots.
4483            let chunk2 = Chunk {
4484                schedulables: (0..max_tx / 2 + 1)
4485                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4486                    .collect(),
4487                settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4488                height: 2,
4489            };
4490            let flushed = queue.push_chunk(
4491                chunk2,
4492                &versions,
4493                1000,
4494                make_commit_ref(2),
4495                Digest::default(),
4496            );
4497            assert_eq!(flushed.len(), 1);
4498            assert_eq!(flushed[0].details.checkpoint_seq, Some(initial_seq));
4499
4500            // The second settlement must have checkpoint_seq = initial_seq + 1,
4501            // because the flush incremented current_checkpoint_seq.
4502            let msg2 = receiver.try_recv().unwrap();
4503            let settlement2 = msg2.1.unwrap();
4504            assert_eq!(settlement2.checkpoint_seq, initial_seq + 1);
4505
4506            // Flush the remaining roots and verify the PendingCheckpointV2's seq
4507            // matches the settlement's seq.
4508            let pending = queue.flush_forced().unwrap();
4509            assert_eq!(
4510                pending.details.checkpoint_seq,
4511                Some(settlement2.checkpoint_seq)
4512            );
4513        }
4514
4515        #[test]
4516        fn test_checkpoint_seq_increments_on_flush() {
4517            let mut queue = CheckpointQueue::new_for_testing(0, 0, 10, 1000, 0);
4518            let versions = default_versions();
4519
4520            queue.push_chunk(
4521                make_chunk(5, 1),
4522                &versions,
4523                1000,
4524                make_commit_ref(1),
4525                Digest::default(),
4526            );
4527
4528            let pending = queue.flush(1000, true).unwrap();
4529
4530            assert_eq!(pending.details.checkpoint_seq, Some(10));
4531            assert_eq!(queue.current_checkpoint_seq, 11);
4532        }
4533
4534        #[test]
4535        fn test_multiple_chunks_with_overflow() {
4536            let max_tx = 10;
4537            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4538            let versions = default_versions();
4539
4540            let flushed1 = queue.push_chunk(
4541                make_chunk(max_tx / 2 + 1, 1),
4542                &versions,
4543                1000,
4544                make_commit_ref(1),
4545                Digest::default(),
4546            );
4547            let flushed2 = queue.push_chunk(
4548                make_chunk(max_tx / 2 + 1, 2),
4549                &versions,
4550                1000,
4551                make_commit_ref(1),
4552                Digest::default(),
4553            );
4554            let flushed3 = queue.push_chunk(
4555                make_chunk(max_tx / 2 + 1, 3),
4556                &versions,
4557                1000,
4558                make_commit_ref(1),
4559                Digest::default(),
4560            );
4561
4562            let all_flushed: Vec<_> = flushed1
4563                .into_iter()
4564                .chain(flushed2)
4565                .chain(flushed3)
4566                .collect();
4567            assert_eq!(all_flushed.len(), 2);
4568            assert_eq!(queue.pending_roots.len(), 1);
4569
4570            for p in &all_flushed {
4571                let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4572                assert!(tx_count <= max_tx);
4573            }
4574        }
4575    }
4576}