sui_core/
consensus_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet, VecDeque},
6    hash::Hash,
7    num::NonZeroUsize,
8    sync::{Arc, Mutex},
9    time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use consensus_config::Committee as ConsensusCommittee;
13use consensus_core::{CommitConsumerMonitor, CommitIndex, CommitRef};
14use consensus_types::block::BlockRef;
15use consensus_types::block::TransactionIndex;
16use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
17use lru::LruCache;
18use mysten_common::{
19    assert_reachable, assert_sometimes, debug_fatal, random_util::randomize_cache_capacity_in_tests,
20};
21use mysten_metrics::{
22    monitored_future,
23    monitored_mpsc::{self, UnboundedReceiver},
24    monitored_scope, spawn_monitored_task,
25};
26use nonempty::NonEmpty;
27use parking_lot::RwLockWriteGuard;
28use serde::{Deserialize, Serialize};
29use sui_config::node::CongestionLogConfig;
30use sui_macros::{fail_point, fail_point_arg, fail_point_if};
31use sui_protocol_config::{Chain, PerObjectCongestionControlMode, ProtocolConfig};
32use sui_types::{
33    SUI_RANDOMNESS_STATE_OBJECT_ID,
34    authenticator_state::ActiveJwk,
35    base_types::{
36        AuthorityName, ConciseableName, ConsensusObjectSequenceKey, ObjectID, ObjectRef,
37        SequenceNumber, TransactionDigest,
38    },
39    crypto::RandomnessRound,
40    digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest, Digest},
41    executable_transaction::{
42        TrustedExecutableTransaction, VerifiedExecutableTransaction,
43        VerifiedExecutableTransactionWithAliases,
44    },
45    messages_checkpoint::{
46        CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointTimestamp,
47    },
48    messages_consensus::{
49        AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments,
50        ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
51        ExecutionTimeObservation,
52    },
53    sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
54    transaction::{
55        InputObjectKind, PlainTransactionWithClaims, SenderSignedData, TransactionDataAPI,
56        TransactionKey, VerifiedTransaction, WithAliases,
57    },
58};
59use tokio::task::JoinSet;
60use tracing::{debug, error, info, instrument, trace, warn};
61
62use crate::{
63    authority::{
64        AuthorityMetrics, AuthorityState, ExecutionEnv,
65        authority_per_epoch_store::{
66            AuthorityPerEpochStore, CancelConsensusCertificateReason, ConsensusStats,
67            ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStatsV2,
68            consensus_quarantine::ConsensusCommitOutput,
69        },
70        backpressure::{BackpressureManager, BackpressureSubscriber},
71        congestion_log::CongestionCommitLogger,
72        consensus_tx_status_cache::ConsensusTxStatus,
73        execution_time_estimator::ExecutionTimeEstimator,
74        shared_object_congestion_tracker::SharedObjectCongestionTracker,
75        shared_object_version_manager::{AssignedTxAndVersions, AssignedVersions, Schedulable},
76        transaction_deferral::{DeferralKey, DeferralReason, transaction_deferral_within_limit},
77    },
78    checkpoints::{
79        CheckpointHeight, CheckpointRoots, CheckpointService, CheckpointServiceNotify,
80        PendingCheckpoint, PendingCheckpointInfo,
81    },
82    consensus_adapter::ConsensusAdapter,
83    consensus_throughput_calculator::ConsensusThroughputCalculator,
84    consensus_types::consensus_output_api::{ConsensusCommitAPI, ParsedTransaction},
85    epoch::{
86        randomness::{DkgStatus, RandomnessManager},
87        reconfiguration::ReconfigState,
88    },
89    execution_cache::ObjectCacheRead,
90    execution_scheduler::{SettlementBatchInfo, SettlementScheduler},
91    gasless_rate_limiter::ConsensusGaslessCounter,
92    post_consensus_tx_reorder::PostConsensusTxReorder,
93    traffic_controller::{TrafficController, policies::TrafficTally},
94};
95
96/// Output from filtering consensus transactions.
97/// Contains the filtered transactions and any owned object locks acquired post-consensus.
98struct FilteredConsensusOutput {
99    transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
100    owned_object_locks: HashMap<ObjectRef, TransactionDigest>,
101    dropped_transaction_keys: Vec<ConsensusTransactionKey>,
102}
103
104pub struct ConsensusHandlerInitializer {
105    state: Arc<AuthorityState>,
106    checkpoint_service: Arc<CheckpointService>,
107    epoch_store: Arc<AuthorityPerEpochStore>,
108    consensus_adapter: Arc<ConsensusAdapter>,
109    throughput_calculator: Arc<ConsensusThroughputCalculator>,
110    backpressure_manager: Arc<BackpressureManager>,
111    congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
112    consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
113}
114
115impl ConsensusHandlerInitializer {
116    pub fn new(
117        state: Arc<AuthorityState>,
118        checkpoint_service: Arc<CheckpointService>,
119        epoch_store: Arc<AuthorityPerEpochStore>,
120        consensus_adapter: Arc<ConsensusAdapter>,
121        throughput_calculator: Arc<ConsensusThroughputCalculator>,
122        backpressure_manager: Arc<BackpressureManager>,
123        congestion_log_config: Option<CongestionLogConfig>,
124    ) -> Self {
125        let congestion_logger =
126            congestion_log_config.and_then(|config| match CongestionCommitLogger::new(&config) {
127                Ok(logger) => Some(Arc::new(Mutex::new(logger))),
128                Err(e) => {
129                    debug_fatal!("Failed to create congestion logger: {e}");
130                    None
131                }
132            });
133        let consensus_gasless_counter = state.consensus_gasless_counter.clone();
134        Self {
135            state,
136            checkpoint_service,
137            epoch_store,
138            consensus_adapter,
139            throughput_calculator,
140            backpressure_manager,
141            congestion_logger,
142            consensus_gasless_counter,
143        }
144    }
145
146    #[cfg(test)]
147    pub(crate) fn new_for_testing(
148        state: Arc<AuthorityState>,
149        checkpoint_service: Arc<CheckpointService>,
150    ) -> Self {
151        use crate::consensus_test_utils::make_consensus_adapter_for_test;
152        use std::collections::HashSet;
153
154        let backpressure_manager = BackpressureManager::new_for_tests();
155        let consensus_adapter =
156            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
157        let consensus_gasless_counter = state.consensus_gasless_counter.clone();
158        Self {
159            state: state.clone(),
160            checkpoint_service,
161            epoch_store: state.epoch_store_for_testing().clone(),
162            consensus_adapter,
163            throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
164                None,
165                state.metrics.clone(),
166            )),
167            backpressure_manager,
168            congestion_logger: None,
169            consensus_gasless_counter,
170        }
171    }
172
173    pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
174        let new_epoch_start_state = self.epoch_store.epoch_start_state();
175        let consensus_committee = new_epoch_start_state.get_consensus_committee();
176
177        let settlement_scheduler = SettlementScheduler::new(
178            self.state.execution_scheduler().as_ref().clone(),
179            self.state.get_transaction_cache_reader().clone(),
180            self.state.metrics.clone(),
181        );
182        ConsensusHandler::new(
183            self.epoch_store.clone(),
184            self.checkpoint_service.clone(),
185            settlement_scheduler,
186            self.consensus_adapter.clone(),
187            self.state.get_object_cache_reader().clone(),
188            consensus_committee,
189            self.state.metrics.clone(),
190            self.throughput_calculator.clone(),
191            self.backpressure_manager.subscribe(),
192            self.state.traffic_controller.clone(),
193            self.congestion_logger.clone(),
194            self.consensus_gasless_counter.clone(),
195        )
196    }
197}
198
199mod additional_consensus_state {
200    use std::marker::PhantomData;
201
202    use consensus_core::CommitRef;
203    use fastcrypto::hash::HashFunction as _;
204    use sui_types::{crypto::DefaultHash, digests::Digest};
205
206    use super::*;
207    /// AdditionalConsensusState tracks any in-memory state that is retained by ConsensusHandler
208    /// between consensus commits. Because of crash recovery, using such data is inherently risky.
209    /// In order to do this safely, we must store data from a fixed number of previous commits.
210    /// Then, at start-up, that same fixed number of already processed commits is replayed to
211    /// reconstruct the state.
212    ///
213    /// To make sure that bugs in this process appear immediately, we record the digest of this
214    /// state in ConsensusCommitPrologue, so that any deviation causes an immediate fork.
215    #[derive(Serialize, Deserialize)]
216    pub(super) struct AdditionalConsensusState {
217        commit_interval_observer: CommitIntervalObserver,
218    }
219
220    impl AdditionalConsensusState {
221        pub fn new(additional_consensus_state_window_size: u32) -> Self {
222            Self {
223                commit_interval_observer: CommitIntervalObserver::new(
224                    additional_consensus_state_window_size,
225                ),
226            }
227        }
228
229        /// Update all internal state based on the new commit
230        pub(crate) fn observe_commit(
231            &mut self,
232            protocol_config: &ProtocolConfig,
233            epoch_start_time: u64,
234            consensus_commit: &impl ConsensusCommitAPI,
235        ) -> ConsensusCommitInfo {
236            self.commit_interval_observer
237                .observe_commit_time(consensus_commit);
238
239            let estimated_commit_period = self
240                .commit_interval_observer
241                .commit_interval_estimate()
242                .unwrap_or(Duration::from_millis(
243                    protocol_config.min_checkpoint_interval_ms(),
244                ));
245
246            info!("estimated commit rate: {:?}", estimated_commit_period);
247
248            self.commit_info_impl(
249                epoch_start_time,
250                consensus_commit,
251                Some(estimated_commit_period),
252            )
253        }
254
255        fn commit_info_impl(
256            &self,
257            epoch_start_time: u64,
258            consensus_commit: &impl ConsensusCommitAPI,
259            estimated_commit_period: Option<Duration>,
260        ) -> ConsensusCommitInfo {
261            let leader_author = consensus_commit.leader_author_index();
262            let timestamp = consensus_commit.commit_timestamp_ms();
263
264            let timestamp = if timestamp < epoch_start_time {
265                error!(
266                    "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}"
267                );
268                epoch_start_time
269            } else {
270                timestamp
271            };
272
273            ConsensusCommitInfo {
274                _phantom: PhantomData,
275                round: consensus_commit.leader_round(),
276                timestamp,
277                leader_author,
278                consensus_commit_ref: consensus_commit.commit_ref(),
279                rejected_transactions_digest: consensus_commit.rejected_transactions_digest(),
280                additional_state_digest: Some(self.digest()),
281                estimated_commit_period,
282                skip_consensus_commit_prologue_in_test: false,
283            }
284        }
285
286        /// Get the digest of the current state.
287        fn digest(&self) -> AdditionalConsensusStateDigest {
288            let mut hash = DefaultHash::new();
289            bcs::serialize_into(&mut hash, self).unwrap();
290            AdditionalConsensusStateDigest::new(hash.finalize().into())
291        }
292    }
293
294    pub struct ConsensusCommitInfo {
295        // prevent public construction
296        _phantom: PhantomData<()>,
297
298        pub round: u64,
299        pub timestamp: u64,
300        pub leader_author: AuthorityIndex,
301        pub consensus_commit_ref: CommitRef,
302        pub rejected_transactions_digest: Digest,
303
304        additional_state_digest: Option<AdditionalConsensusStateDigest>,
305        estimated_commit_period: Option<Duration>,
306
307        pub skip_consensus_commit_prologue_in_test: bool,
308    }
309
310    impl ConsensusCommitInfo {
311        pub fn new_for_test(
312            commit_round: u64,
313            commit_timestamp: u64,
314            estimated_commit_period: Option<Duration>,
315            skip_consensus_commit_prologue_in_test: bool,
316        ) -> Self {
317            Self {
318                _phantom: PhantomData,
319                round: commit_round,
320                timestamp: commit_timestamp,
321                leader_author: 0,
322                consensus_commit_ref: CommitRef::default(),
323                rejected_transactions_digest: Digest::default(),
324                additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO),
325                estimated_commit_period,
326                skip_consensus_commit_prologue_in_test,
327            }
328        }
329
330        pub fn new_for_congestion_test(
331            commit_round: u64,
332            commit_timestamp: u64,
333            estimated_commit_period: Duration,
334        ) -> Self {
335            Self::new_for_test(
336                commit_round,
337                commit_timestamp,
338                Some(estimated_commit_period),
339                true,
340            )
341        }
342
343        pub fn additional_state_digest(&self) -> AdditionalConsensusStateDigest {
344            // this method cannot be called if stateless_commit_info is used
345            self.additional_state_digest
346                .expect("additional_state_digest is not available")
347        }
348
349        pub fn estimated_commit_period(&self) -> Duration {
350            // this method cannot be called if stateless_commit_info is used
351            self.estimated_commit_period
352                .expect("estimated commit period is not available")
353        }
354
355        fn consensus_commit_digest(&self) -> ConsensusCommitDigest {
356            ConsensusCommitDigest::new(self.consensus_commit_ref.digest.into_inner())
357        }
358
359        fn consensus_commit_prologue_transaction(
360            &self,
361            epoch: u64,
362        ) -> VerifiedExecutableTransaction {
363            let transaction = VerifiedTransaction::new_consensus_commit_prologue(
364                epoch,
365                self.round,
366                self.timestamp,
367            );
368            VerifiedExecutableTransaction::new_system(transaction, epoch)
369        }
370
371        fn consensus_commit_prologue_v2_transaction(
372            &self,
373            epoch: u64,
374        ) -> VerifiedExecutableTransaction {
375            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
376                epoch,
377                self.round,
378                self.timestamp,
379                self.consensus_commit_digest(),
380            );
381            VerifiedExecutableTransaction::new_system(transaction, epoch)
382        }
383
384        fn consensus_commit_prologue_v3_transaction(
385            &self,
386            epoch: u64,
387            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
388        ) -> VerifiedExecutableTransaction {
389            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v3(
390                epoch,
391                self.round,
392                self.timestamp,
393                self.consensus_commit_digest(),
394                consensus_determined_version_assignments,
395            );
396            VerifiedExecutableTransaction::new_system(transaction, epoch)
397        }
398
399        fn consensus_commit_prologue_v4_transaction(
400            &self,
401            epoch: u64,
402            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
403            additional_state_digest: AdditionalConsensusStateDigest,
404        ) -> VerifiedExecutableTransaction {
405            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v4(
406                epoch,
407                self.round,
408                self.timestamp,
409                self.consensus_commit_digest(),
410                consensus_determined_version_assignments,
411                additional_state_digest,
412            );
413            VerifiedExecutableTransaction::new_system(transaction, epoch)
414        }
415
416        pub fn create_consensus_commit_prologue_transaction(
417            &self,
418            epoch: u64,
419            protocol_config: &ProtocolConfig,
420            cancelled_txn_version_assignment: Vec<(
421                TransactionDigest,
422                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
423            )>,
424            commit_info: &ConsensusCommitInfo,
425            indirect_state_observer: IndirectStateObserver,
426        ) -> VerifiedExecutableTransaction {
427            let version_assignments = if protocol_config
428                .record_consensus_determined_version_assignments_in_prologue_v2()
429            {
430                Some(
431                    ConsensusDeterminedVersionAssignments::CancelledTransactionsV2(
432                        cancelled_txn_version_assignment,
433                    ),
434                )
435            } else if protocol_config.record_consensus_determined_version_assignments_in_prologue()
436            {
437                Some(
438                    ConsensusDeterminedVersionAssignments::CancelledTransactions(
439                        cancelled_txn_version_assignment
440                            .into_iter()
441                            .map(|(tx_digest, versions)| {
442                                (
443                                    tx_digest,
444                                    versions.into_iter().map(|(id, v)| (id.0, v)).collect(),
445                                )
446                            })
447                            .collect(),
448                    ),
449                )
450            } else {
451                None
452            };
453
454            if protocol_config.record_additional_state_digest_in_prologue() {
455                let additional_state_digest =
456                    if protocol_config.additional_consensus_digest_indirect_state() {
457                        let d1 = commit_info.additional_state_digest();
458                        indirect_state_observer.fold_with(d1)
459                    } else {
460                        commit_info.additional_state_digest()
461                    };
462
463                self.consensus_commit_prologue_v4_transaction(
464                    epoch,
465                    version_assignments.unwrap(),
466                    additional_state_digest,
467                )
468            } else if let Some(version_assignments) = version_assignments {
469                self.consensus_commit_prologue_v3_transaction(epoch, version_assignments)
470            } else if protocol_config.include_consensus_digest_in_prologue() {
471                self.consensus_commit_prologue_v2_transaction(epoch)
472            } else {
473                self.consensus_commit_prologue_transaction(epoch)
474            }
475        }
476    }
477
478    #[derive(Default)]
479    pub struct IndirectStateObserver {
480        hash: DefaultHash,
481    }
482
483    impl IndirectStateObserver {
484        pub fn new() -> Self {
485            Self::default()
486        }
487
488        pub fn observe_indirect_state<T: Serialize>(&mut self, state: &T) {
489            bcs::serialize_into(&mut self.hash, state).unwrap();
490        }
491
492        pub fn fold_with(
493            self,
494            d1: AdditionalConsensusStateDigest,
495        ) -> AdditionalConsensusStateDigest {
496            let hash = self.hash.finalize();
497            let d2 = AdditionalConsensusStateDigest::new(hash.into());
498
499            let mut hasher = DefaultHash::new();
500            bcs::serialize_into(&mut hasher, &d1).unwrap();
501            bcs::serialize_into(&mut hasher, &d2).unwrap();
502            AdditionalConsensusStateDigest::new(hasher.finalize().into())
503        }
504    }
505
506    #[test]
507    fn test_additional_consensus_state() {
508        use crate::consensus_test_utils::TestConsensusCommit;
509
510        fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) {
511            let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
512            state.observe_commit(
513                &protocol_config,
514                100,
515                &TestConsensusCommit::empty(round, timestamp, 0),
516            );
517        }
518
519        let mut s1 = AdditionalConsensusState::new(3);
520        observe(&mut s1, 1, 1000);
521        observe(&mut s1, 2, 2000);
522        observe(&mut s1, 3, 3000);
523        observe(&mut s1, 4, 4000);
524
525        let mut s2 = AdditionalConsensusState::new(3);
526        // Because state uses a ring buffer, we should get the same digest
527        // even though we only added the 3 latest observations.
528        observe(&mut s2, 2, 2000);
529        observe(&mut s2, 3, 3000);
530        observe(&mut s2, 4, 4000);
531
532        assert_eq!(s1.digest(), s2.digest());
533
534        observe(&mut s1, 5, 5000);
535        observe(&mut s2, 5, 5000);
536
537        assert_eq!(s1.digest(), s2.digest());
538    }
539}
540use additional_consensus_state::AdditionalConsensusState;
541pub(crate) use additional_consensus_state::{ConsensusCommitInfo, IndirectStateObserver};
542
543struct QueuedCheckpointRoots {
544    roots: CheckpointRoots,
545    timestamp: CheckpointTimestamp,
546    consensus_commit_ref: CommitRef,
547    rejected_transactions_digest: Digest,
548}
549
550struct Chunk<
551    T: crate::authority::shared_object_version_manager::AsTx = VerifiedExecutableTransaction,
552> {
553    schedulables: Vec<Schedulable<T>>,
554    settlement: Option<Schedulable<T>>,
555    height: CheckpointHeight,
556}
557
558impl<T: crate::authority::shared_object_version_manager::AsTx + Clone> Chunk<T> {
559    fn all_schedulables(&self) -> impl Iterator<Item = &Schedulable<T>> + Clone {
560        self.schedulables.iter().chain(self.settlement.iter())
561    }
562
563    fn all_schedulables_from(chunks: &[Self]) -> impl Iterator<Item = &Schedulable<T>> + Clone {
564        chunks.iter().flat_map(|c| c.all_schedulables())
565    }
566
567    fn to_checkpoint_roots(&self) -> CheckpointRoots {
568        let tx_roots: Vec<_> = self.schedulables.iter().map(|s| s.key()).collect();
569        let settlement_root = self.settlement.as_ref().map(|s| s.key());
570        CheckpointRoots {
571            tx_roots,
572            settlement_root,
573            height: self.height,
574        }
575    }
576}
577
578impl From<Chunk<VerifiedExecutableTransactionWithAliases>> for Chunk {
579    fn from(chunk: Chunk<VerifiedExecutableTransactionWithAliases>) -> Self {
580        Chunk {
581            schedulables: chunk.schedulables.into_iter().map(|s| s.into()).collect(),
582            settlement: chunk.settlement.map(|s| s.into()),
583            height: chunk.height,
584        }
585    }
586}
587
588// Accumulates checkpoint roots from consensus and flushes them into PendingCheckpoints.
589// Roots are buffered in `pending_roots` until a flush is triggered (by max_tx overflow or
590// time interval). A flush always drains all buffered roots into a single PendingCheckpoint,
591// so the queue only ever holds roots for one pending checkpoint at a time.
592//
593// Owns an ExecutionSchedulerSender so push_chunk can send schedulables and settlement info
594// for execution in one shot.
595pub(crate) struct CheckpointQueue {
596    last_built_timestamp: CheckpointTimestamp,
597    pending_roots: VecDeque<QueuedCheckpointRoots>,
598    height: u64,
599    pending_tx_count: usize,
600    current_checkpoint_seq: CheckpointSequenceNumber,
601    max_tx: usize,
602    min_checkpoint_interval_ms: u64,
603    execution_scheduler_sender: ExecutionSchedulerSender,
604}
605
606impl CheckpointQueue {
607    pub(crate) fn new(
608        last_built_timestamp: CheckpointTimestamp,
609        checkpoint_height: u64,
610        next_checkpoint_seq: CheckpointSequenceNumber,
611        max_tx: usize,
612        min_checkpoint_interval_ms: u64,
613        execution_scheduler_sender: ExecutionSchedulerSender,
614    ) -> Self {
615        Self {
616            last_built_timestamp,
617            pending_roots: VecDeque::new(),
618            height: checkpoint_height,
619            pending_tx_count: 0,
620            current_checkpoint_seq: next_checkpoint_seq,
621            max_tx,
622            min_checkpoint_interval_ms,
623            execution_scheduler_sender,
624        }
625    }
626
627    #[cfg(test)]
628    fn new_for_testing(
629        last_built_timestamp: CheckpointTimestamp,
630        checkpoint_height: u64,
631        next_checkpoint_seq: CheckpointSequenceNumber,
632        max_tx: usize,
633        min_checkpoint_interval_ms: u64,
634    ) -> Self {
635        let (sender, _receiver) = monitored_mpsc::unbounded_channel("test_checkpoint_queue_sender");
636        Self {
637            last_built_timestamp,
638            pending_roots: VecDeque::new(),
639            height: checkpoint_height,
640            pending_tx_count: 0,
641            current_checkpoint_seq: next_checkpoint_seq,
642            max_tx,
643            min_checkpoint_interval_ms,
644            execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
645        }
646    }
647
648    #[cfg(test)]
649    fn new_for_testing_with_sender(
650        last_built_timestamp: CheckpointTimestamp,
651        checkpoint_height: u64,
652        next_checkpoint_seq: CheckpointSequenceNumber,
653        max_tx: usize,
654        min_checkpoint_interval_ms: u64,
655        sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
656    ) -> Self {
657        Self {
658            last_built_timestamp,
659            pending_roots: VecDeque::new(),
660            height: checkpoint_height,
661            pending_tx_count: 0,
662            current_checkpoint_seq: next_checkpoint_seq,
663            max_tx,
664            min_checkpoint_interval_ms,
665            execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
666        }
667    }
668
669    pub(crate) fn last_built_timestamp(&self) -> CheckpointTimestamp {
670        self.last_built_timestamp
671    }
672
673    pub(crate) fn is_empty(&self) -> bool {
674        self.pending_roots.is_empty()
675    }
676
677    fn next_height(&mut self) -> u64 {
678        self.height += 1;
679        self.height
680    }
681
682    fn push_chunk(
683        &mut self,
684        chunk: Chunk,
685        assigned_versions: &HashMap<TransactionKey, AssignedVersions>,
686        timestamp: CheckpointTimestamp,
687        consensus_commit_ref: CommitRef,
688        rejected_transactions_digest: Digest,
689    ) -> Vec<PendingCheckpoint> {
690        let max_tx = self.max_tx;
691        let user_tx_count = chunk.schedulables.len();
692
693        let roots = chunk.to_checkpoint_roots();
694
695        let schedulables: Vec<_> = chunk
696            .schedulables
697            .into_iter()
698            .map(|s| {
699                let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
700                (s, versions)
701            })
702            .collect();
703
704        let mut flushed_checkpoints = Vec::new();
705
706        if self.pending_tx_count > 0
707            && self.pending_tx_count + user_tx_count > max_tx
708            && let Some(checkpoint) = self.flush_forced()
709        {
710            flushed_checkpoints.push(checkpoint);
711        }
712
713        let settlement_info = chunk.settlement.as_ref().map(|s| {
714            let settlement_key = s.key();
715            let tx_keys: Vec<_> = schedulables.iter().map(|(s, _)| s.key()).collect();
716            SettlementBatchInfo {
717                settlement_key,
718                tx_keys,
719                checkpoint_height: chunk.height,
720                checkpoint_seq: self.current_checkpoint_seq,
721                assigned_versions: assigned_versions
722                    .get(&settlement_key)
723                    .cloned()
724                    .unwrap_or_default(),
725            }
726        });
727
728        self.execution_scheduler_sender
729            .send(schedulables, settlement_info);
730
731        self.pending_tx_count += user_tx_count;
732        self.pending_roots.push_back(QueuedCheckpointRoots {
733            roots,
734            timestamp,
735            consensus_commit_ref,
736            rejected_transactions_digest,
737        });
738
739        flushed_checkpoints
740    }
741
742    pub(crate) fn flush(
743        &mut self,
744        current_timestamp: CheckpointTimestamp,
745        force: bool,
746    ) -> Option<PendingCheckpoint> {
747        if !force && current_timestamp < self.last_built_timestamp + self.min_checkpoint_interval_ms
748        {
749            return None;
750        }
751        self.flush_forced()
752    }
753
754    fn flush_forced(&mut self) -> Option<PendingCheckpoint> {
755        if self.pending_roots.is_empty() {
756            return None;
757        }
758
759        let to_flush: Vec<_> = self.pending_roots.drain(..).collect();
760        let last_root = to_flush.last().unwrap();
761
762        let checkpoint = PendingCheckpoint {
763            roots: to_flush.iter().map(|q| q.roots.clone()).collect(),
764            details: PendingCheckpointInfo {
765                timestamp_ms: last_root.timestamp,
766                last_of_epoch: false,
767                checkpoint_height: last_root.roots.height,
768                consensus_commit_ref: last_root.consensus_commit_ref,
769                rejected_transactions_digest: last_root.rejected_transactions_digest,
770                checkpoint_seq: self.current_checkpoint_seq,
771            },
772        };
773
774        self.last_built_timestamp = last_root.timestamp;
775        self.pending_tx_count = 0;
776        self.current_checkpoint_seq += 1;
777
778        Some(checkpoint)
779    }
780
781    pub(crate) fn checkpoint_seq(&self) -> CheckpointSequenceNumber {
782        self.current_checkpoint_seq
783            .checked_sub(1)
784            .expect("checkpoint_seq called before any checkpoint was assigned")
785    }
786}
787
788pub struct ConsensusHandler<C> {
789    /// A store created for each epoch. ConsensusHandler is recreated each epoch, with the
790    /// corresponding store. This store is also used to get the current epoch ID.
791    epoch_store: Arc<AuthorityPerEpochStore>,
792    /// Holds the indices, hash and stats after the last consensus commit
793    /// It is used for avoiding replaying already processed transactions,
794    /// checking chain consistency, and accumulating per-epoch consensus output stats.
795    last_consensus_stats: ExecutionIndicesWithStatsV2,
796    checkpoint_service: Arc<C>,
797    /// cache reader is needed when determining the next version to assign for shared objects.
798    cache_reader: Arc<dyn ObjectCacheRead>,
799    /// The consensus committee used to do stake computations for deciding set of low scoring authorities
800    committee: ConsensusCommittee,
801    // TODO: ConsensusHandler doesn't really share metrics with AuthorityState. We could define
802    // a new metrics type here if we want to.
803    metrics: Arc<AuthorityMetrics>,
804    /// Lru cache to quickly discard transactions processed by consensus
805    processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
806    /// Consensus adapter for submitting transactions to consensus
807    consensus_adapter: Arc<ConsensusAdapter>,
808
809    /// Using the throughput calculator to record the current consensus throughput
810    throughput_calculator: Arc<ConsensusThroughputCalculator>,
811
812    additional_consensus_state: AdditionalConsensusState,
813
814    backpressure_subscriber: BackpressureSubscriber,
815
816    traffic_controller: Option<Arc<TrafficController>>,
817
818    congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
819
820    consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
821
822    checkpoint_queue: Mutex<CheckpointQueue>,
823}
824
825const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
826
827impl<C> ConsensusHandler<C> {
828    pub(crate) fn new(
829        epoch_store: Arc<AuthorityPerEpochStore>,
830        checkpoint_service: Arc<C>,
831        settlement_scheduler: SettlementScheduler,
832        consensus_adapter: Arc<ConsensusAdapter>,
833        cache_reader: Arc<dyn ObjectCacheRead>,
834        committee: ConsensusCommittee,
835        metrics: Arc<AuthorityMetrics>,
836        throughput_calculator: Arc<ConsensusThroughputCalculator>,
837        backpressure_subscriber: BackpressureSubscriber,
838        traffic_controller: Option<Arc<TrafficController>>,
839        congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
840        consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
841    ) -> Self {
842        assert!(
843            matches!(
844                epoch_store
845                    .protocol_config()
846                    .per_object_congestion_control_mode(),
847                PerObjectCongestionControlMode::ExecutionTimeEstimate(_)
848            ),
849            "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
850        );
851
852        assert!(
853            epoch_store
854                .protocol_config()
855                .split_checkpoints_in_consensus_handler(),
856            "support for splitting checkpoints outside of consensus handler has been removed"
857        );
858
859        // Recover last_consensus_stats so it is consistent across validators.
860        let mut last_consensus_stats = epoch_store
861            .get_last_consensus_stats()
862            .expect("Should be able to read last consensus index");
863        // stats is empty at the beginning of epoch.
864        if !last_consensus_stats.stats.is_initialized() {
865            last_consensus_stats.stats = ConsensusStats::new(committee.size());
866            last_consensus_stats.checkpoint_seq = epoch_store.previous_epoch_last_checkpoint();
867        }
868        let max_tx = epoch_store
869            .protocol_config()
870            .max_transactions_per_checkpoint() as usize;
871        let min_checkpoint_interval_ms = epoch_store
872            .protocol_config()
873            .min_checkpoint_interval_ms_as_option()
874            .unwrap_or_default();
875        let execution_scheduler_sender =
876            ExecutionSchedulerSender::start(settlement_scheduler, epoch_store.clone());
877        let commit_rate_estimate_window_size = epoch_store
878            .protocol_config()
879            .get_consensus_commit_rate_estimation_window_size();
880        let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
881        let checkpoint_height = last_consensus_stats.height;
882        let next_checkpoint_seq = last_consensus_stats.checkpoint_seq + 1;
883        Self {
884            epoch_store,
885            last_consensus_stats,
886            checkpoint_service,
887            cache_reader,
888            committee,
889            metrics,
890            processed_cache: LruCache::new(
891                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
892            ),
893            consensus_adapter,
894            throughput_calculator,
895            additional_consensus_state: AdditionalConsensusState::new(
896                commit_rate_estimate_window_size,
897            ),
898            backpressure_subscriber,
899            traffic_controller,
900            congestion_logger,
901            consensus_gasless_counter,
902            checkpoint_queue: Mutex::new(CheckpointQueue::new(
903                last_built_timestamp,
904                checkpoint_height,
905                next_checkpoint_seq,
906                max_tx,
907                min_checkpoint_interval_ms,
908                execution_scheduler_sender,
909            )),
910        }
911    }
912
913    /// Returns the last subdag index processed by the handler.
914    pub(crate) fn last_processed_subdag_index(&self) -> u64 {
915        self.last_consensus_stats.index.sub_dag_index
916    }
917
918    pub(crate) fn new_for_testing(
919        epoch_store: Arc<AuthorityPerEpochStore>,
920        checkpoint_service: Arc<C>,
921        execution_scheduler_sender: ExecutionSchedulerSender,
922        consensus_adapter: Arc<ConsensusAdapter>,
923        cache_reader: Arc<dyn ObjectCacheRead>,
924        committee: ConsensusCommittee,
925        metrics: Arc<AuthorityMetrics>,
926        throughput_calculator: Arc<ConsensusThroughputCalculator>,
927        backpressure_subscriber: BackpressureSubscriber,
928        traffic_controller: Option<Arc<TrafficController>>,
929        last_consensus_stats: ExecutionIndicesWithStatsV2,
930    ) -> Self {
931        let commit_rate_estimate_window_size = epoch_store
932            .protocol_config()
933            .get_consensus_commit_rate_estimation_window_size();
934        let max_tx = epoch_store
935            .protocol_config()
936            .max_transactions_per_checkpoint() as usize;
937        let min_checkpoint_interval_ms = epoch_store
938            .protocol_config()
939            .min_checkpoint_interval_ms_as_option()
940            .unwrap_or_default();
941        let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
942        let checkpoint_height = last_consensus_stats.height;
943        Self {
944            epoch_store,
945            last_consensus_stats,
946            checkpoint_service,
947            cache_reader,
948            committee,
949            metrics,
950            processed_cache: LruCache::new(
951                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
952            ),
953            consensus_adapter,
954            throughput_calculator,
955            additional_consensus_state: AdditionalConsensusState::new(
956                commit_rate_estimate_window_size,
957            ),
958            backpressure_subscriber,
959            traffic_controller,
960            congestion_logger: None,
961            consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
962            checkpoint_queue: Mutex::new(CheckpointQueue::new(
963                last_built_timestamp,
964                checkpoint_height,
965                0,
966                max_tx,
967                min_checkpoint_interval_ms,
968                execution_scheduler_sender,
969            )),
970        }
971    }
972}
973
974#[derive(Default)]
975struct CommitHandlerInput {
976    user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
977    capability_notifications: Vec<AuthorityCapabilitiesV2>,
978    execution_time_observations: Vec<ExecutionTimeObservation>,
979    checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
980    randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
981    randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
982    end_of_publish_transactions: Vec<AuthorityName>,
983    new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
984}
985
986struct CommitHandlerState {
987    dkg_failed: bool,
988    randomness_round: Option<RandomnessRound>,
989    output: ConsensusCommitOutput,
990    indirect_state_observer: Option<IndirectStateObserver>,
991    initial_reconfig_state: ReconfigState,
992    // Occurrence counts for user transactions, used for unpaid amplification detection.
993    occurrence_counts: HashMap<TransactionDigest, u32>,
994}
995
996impl CommitHandlerState {
997    fn get_notifications(&self) -> Vec<SequencedConsensusTransactionKey> {
998        self.output
999            .get_consensus_messages_processed()
1000            .cloned()
1001            .collect()
1002    }
1003
1004    fn init_randomness<'a, 'epoch>(
1005        &'a mut self,
1006        epoch_store: &'epoch AuthorityPerEpochStore,
1007        commit_info: &'a ConsensusCommitInfo,
1008    ) -> Option<tokio::sync::MutexGuard<'epoch, RandomnessManager>> {
1009        let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| {
1010            rm.try_lock()
1011                .expect("should only ever be called from the commit handler thread")
1012        });
1013
1014        let mut dkg_failed = false;
1015        let randomness_round = if epoch_store.randomness_state_enabled() {
1016            let randomness_manager = randomness_manager
1017                .as_mut()
1018                .expect("randomness manager should exist if randomness is enabled");
1019            match randomness_manager.dkg_status() {
1020                DkgStatus::Pending => None,
1021                DkgStatus::Failed => {
1022                    dkg_failed = true;
1023                    None
1024                }
1025                DkgStatus::Successful => {
1026                    // Generate randomness for this commit if DKG is successful and we are still
1027                    // accepting certs.
1028                    if self.initial_reconfig_state.should_accept_tx() {
1029                        randomness_manager
1030                            // TODO: make infallible
1031                            .reserve_next_randomness(commit_info.timestamp, &mut self.output)
1032                            .expect("epoch ended")
1033                    } else {
1034                        None
1035                    }
1036                }
1037            }
1038        } else {
1039            None
1040        };
1041
1042        if randomness_round.is_some() {
1043            assert!(!dkg_failed); // invariant check
1044        }
1045
1046        self.randomness_round = randomness_round;
1047        self.dkg_failed = dkg_failed;
1048
1049        randomness_manager
1050    }
1051}
1052
1053impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
1054    /// Called during startup to allow us to observe commits we previously processed, for crash recovery.
1055    /// Any state computed here must be a pure function of the commits observed, it cannot depend on any
1056    /// state recorded in the epoch db.
1057    fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
1058        assert!(
1059            self.epoch_store
1060                .protocol_config()
1061                .record_additional_state_digest_in_prologue()
1062        );
1063        let protocol_config = self.epoch_store.protocol_config();
1064        let epoch_start_time = self
1065            .epoch_store
1066            .epoch_start_config()
1067            .epoch_start_timestamp_ms();
1068
1069        self.additional_consensus_state.observe_commit(
1070            protocol_config,
1071            epoch_start_time,
1072            &consensus_commit,
1073        );
1074    }
1075
1076    #[cfg(test)]
1077    pub(crate) async fn handle_consensus_commit_for_test(
1078        &mut self,
1079        consensus_commit: impl ConsensusCommitAPI,
1080    ) {
1081        let transactions = consensus_commit.transactions();
1082        self.handle_consensus_commit(consensus_commit, transactions)
1083            .await;
1084    }
1085
1086    #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))]
1087    pub(crate) async fn handle_consensus_commit(
1088        &mut self,
1089        consensus_commit: impl ConsensusCommitAPI,
1090        transactions: ParsedConsensusTransactions,
1091    ) {
1092        {
1093            let protocol_config = self.epoch_store.protocol_config();
1094
1095            // Assert all protocol config settings for which we don't support old behavior.
1096            assert!(protocol_config.ignore_execution_time_observations_after_certs_closed());
1097            assert!(protocol_config.record_time_estimate_processed());
1098            assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints());
1099            assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest());
1100            assert!(protocol_config.authority_capabilities_v2());
1101            assert!(protocol_config.cancel_for_failed_dkg_early());
1102        }
1103
1104        // This may block until one of two conditions happens:
1105        // - Number of uncommitted transactions in the writeback cache goes below the
1106        //   backpressure threshold.
1107        // - The highest executed checkpoint catches up to the highest certified checkpoint.
1108        self.backpressure_subscriber.await_no_backpressure().await;
1109
1110        let epoch = self.epoch_store.epoch();
1111
1112        let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit");
1113
1114        let last_committed_round = self.last_consensus_stats.index.last_committed_round;
1115
1116        self.epoch_store
1117            .consensus_tx_status_cache
1118            .update_last_committed_leader_round(last_committed_round as u32);
1119        self.epoch_store
1120            .tx_reject_reason_cache
1121            .set_last_committed_leader_round(last_committed_round as u32);
1122
1123        let commit_info = self.additional_consensus_state.observe_commit(
1124            self.epoch_store.protocol_config(),
1125            self.epoch_store
1126                .epoch_start_config()
1127                .epoch_start_timestamp_ms(),
1128            &consensus_commit,
1129        );
1130        assert!(commit_info.round > last_committed_round);
1131
1132        let (timestamp, leader_author, commit_sub_dag_index) =
1133            self.gather_commit_metadata(&consensus_commit);
1134
1135        info!(
1136            %consensus_commit,
1137            "Received consensus output {}. Rejected transactions {}",
1138            consensus_commit.commit_ref(),
1139            consensus_commit.rejected_transactions_debug_string(),
1140        );
1141
1142        self.last_consensus_stats.index = ExecutionIndices {
1143            last_committed_round: commit_info.round,
1144            sub_dag_index: commit_sub_dag_index,
1145            transaction_index: 0_u64,
1146        };
1147
1148        self.metrics
1149            .consensus_committed_subdags
1150            .with_label_values(&[&leader_author.to_string()])
1151            .inc();
1152
1153        let mut state = CommitHandlerState {
1154            output: ConsensusCommitOutput::new(commit_info.round),
1155            dkg_failed: false,
1156            randomness_round: None,
1157            indirect_state_observer: Some(IndirectStateObserver::new()),
1158            initial_reconfig_state: self
1159                .epoch_store
1160                .get_reconfig_state_read_lock_guard()
1161                .clone(),
1162            occurrence_counts: HashMap::new(),
1163        };
1164
1165        let FilteredConsensusOutput {
1166            transactions,
1167            owned_object_locks,
1168            dropped_transaction_keys,
1169        } = self.filter_consensus_txns(
1170            state.initial_reconfig_state.clone(),
1171            &commit_info,
1172            transactions,
1173        );
1174        // Buffer owned object locks for batch write.
1175        if !owned_object_locks.is_empty() {
1176            state.output.set_owned_object_locks(owned_object_locks);
1177        }
1178
1179        // Still record the dropped transactions as consensus message processed.
1180        for key in dropped_transaction_keys {
1181            state.output.record_consensus_message_processed(
1182                SequencedConsensusTransactionKey::External(key),
1183            );
1184        }
1185        let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions);
1186
1187        let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info);
1188
1189        let CommitHandlerInput {
1190            user_transactions,
1191            capability_notifications,
1192            execution_time_observations,
1193            checkpoint_signature_messages,
1194            randomness_dkg_messages,
1195            randomness_dkg_confirmations,
1196            end_of_publish_transactions,
1197            new_jwks,
1198        } = self.build_commit_handler_input(transactions);
1199
1200        self.process_gasless_transactions(&commit_info, &user_transactions);
1201        self.process_jwks(&mut state, &commit_info, new_jwks);
1202        self.process_capability_notifications(capability_notifications);
1203        self.process_execution_time_observations(&mut state, execution_time_observations);
1204        self.process_checkpoint_signature_messages(checkpoint_signature_messages);
1205
1206        self.process_dkg_updates(
1207            &mut state,
1208            &commit_info,
1209            randomness_manager.as_deref_mut(),
1210            randomness_dkg_messages,
1211            randomness_dkg_confirmations,
1212        )
1213        .await;
1214
1215        let mut execution_time_estimator = self
1216            .epoch_store
1217            .execution_time_estimator
1218            .try_lock()
1219            .expect("should only ever be called from the commit handler thread");
1220
1221        let authenticator_state_update_transaction =
1222            self.create_authenticator_state_update(last_committed_round, &commit_info);
1223
1224        let (
1225            transactions_to_schedule,
1226            randomness_transactions_to_schedule,
1227            cancelled_txns,
1228            randomness_state_update_transaction,
1229        ) = self.collect_transactions_to_schedule(
1230            &mut state,
1231            &mut execution_time_estimator,
1232            &commit_info,
1233            user_transactions,
1234        );
1235
1236        let (should_accept_tx, lock, final_round) =
1237            self.handle_close_epoch(&mut state, &commit_info, end_of_publish_transactions);
1238
1239        let make_checkpoint = should_accept_tx || final_round;
1240        if !make_checkpoint {
1241            // No need for any further processing
1242            return;
1243        }
1244
1245        // If this is the final round, record execution time observations for storage in the
1246        // end-of-epoch tx.
1247        if final_round {
1248            self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1249        }
1250
1251        let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1252            .then_some(Schedulable::ConsensusCommitPrologue(
1253                epoch,
1254                commit_info.round,
1255                commit_info.consensus_commit_ref.index,
1256            ));
1257
1258        let schedulables: Vec<_> = itertools::chain!(
1259            consensus_commit_prologue.into_iter(),
1260            authenticator_state_update_transaction
1261                .into_iter()
1262                .map(Schedulable::Transaction),
1263            transactions_to_schedule
1264                .into_iter()
1265                .map(Schedulable::Transaction),
1266        )
1267        .collect();
1268
1269        let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1270            .into_iter()
1271            .chain(
1272                randomness_transactions_to_schedule
1273                    .into_iter()
1274                    .map(Schedulable::Transaction),
1275            )
1276            .collect();
1277
1278        let num_schedulables = schedulables.len();
1279        let checkpoint_height = self.create_pending_checkpoints(
1280            &mut state,
1281            &commit_info,
1282            schedulables,
1283            randomness_schedulables,
1284            &cancelled_txns,
1285            final_round,
1286        );
1287
1288        let notifications = state.get_notifications();
1289
1290        let mut stats_to_record = self.last_consensus_stats.clone();
1291        stats_to_record.height = checkpoint_height;
1292        {
1293            let queue = self.checkpoint_queue.lock().unwrap();
1294            stats_to_record.last_checkpoint_flush_timestamp = queue.last_built_timestamp();
1295            stats_to_record.checkpoint_seq = queue.checkpoint_seq();
1296        }
1297
1298        state.output.record_consensus_commit_stats(stats_to_record);
1299
1300        self.record_deferral_deletion(&mut state);
1301
1302        self.epoch_store
1303            .consensus_quarantine
1304            .write()
1305            .push_consensus_output(state.output, &self.epoch_store)
1306            .expect("push_consensus_output should not fail");
1307
1308        debug!(
1309            ?commit_info.round,
1310            "Notifying checkpoint service about new pending checkpoint(s)",
1311        );
1312        self.checkpoint_service
1313            .notify_checkpoint()
1314            .expect("failed to notify checkpoint service");
1315
1316        if let Some(randomness_round) = state.randomness_round {
1317            randomness_manager
1318                .as_ref()
1319                .expect("randomness manager should exist if randomness round is provided")
1320                .generate_randomness(epoch, randomness_round);
1321        }
1322
1323        self.epoch_store.process_notifications(notifications.iter());
1324
1325        // pass lock by value to ensure that it is held until this point
1326        self.log_final_round(lock, final_round);
1327
1328        // update the calculated throughput
1329        self.throughput_calculator
1330            .add_transactions(timestamp, num_schedulables as u64);
1331
1332        fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
1333            let key = [commit_sub_dag_index, epoch];
1334            if sui_simulator::random::deterministic_probability_once(&key, 0.01) {
1335                sui_simulator::task::kill_current_node(None);
1336            }
1337        });
1338
1339        fail_point!("crash");
1340
1341        self.send_end_of_publish_if_needed().await;
1342    }
1343
1344    fn handle_close_epoch(
1345        &self,
1346        state: &mut CommitHandlerState,
1347        commit_info: &ConsensusCommitInfo,
1348        end_of_publish_transactions: Vec<AuthorityName>,
1349    ) -> (bool, Option<RwLockWriteGuard<'_, ReconfigState>>, bool) {
1350        let timestamp_based_epoch_close = self
1351            .epoch_store
1352            .protocol_config()
1353            .timestamp_based_epoch_close();
1354        let timestamp_triggered = timestamp_based_epoch_close
1355            && commit_info.timestamp >= self.epoch_store.next_reconfiguration_timestamp_ms();
1356        if timestamp_triggered {
1357            // close_user_certs() is only needed here when timestamp_based_epoch_close is enabled.
1358            let reconfig_guard = self.epoch_store.get_reconfig_state_write_lock_guard();
1359            if reconfig_guard.should_accept_user_certs() {
1360                self.epoch_store.close_user_certs(reconfig_guard);
1361            }
1362        }
1363        let collected_eop_quorum =
1364            self.process_end_of_publish_transactions(state, end_of_publish_transactions);
1365        if timestamp_triggered || collected_eop_quorum {
1366            let (lock, final_round) = self.advance_eop_state_machine(state);
1367            (lock.should_accept_tx(), Some(lock), final_round)
1368        } else {
1369            (true, None, false)
1370        }
1371    }
1372
1373    fn record_end_of_epoch_execution_time_observations(
1374        &self,
1375        estimator: &mut ExecutionTimeEstimator,
1376    ) {
1377        self.epoch_store
1378            .end_of_epoch_execution_time_observations
1379            .set(estimator.take_observations())
1380            .expect("`stored_execution_time_observations` should only be set once at end of epoch");
1381    }
1382
1383    fn record_deferral_deletion(&self, state: &mut CommitHandlerState) {
1384        let mut deferred_transactions = self
1385            .epoch_store
1386            .consensus_output_cache
1387            .deferred_transactions
1388            .lock();
1389        for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() {
1390            deferred_transactions.remove(&deleted_deferred_key);
1391        }
1392    }
1393
1394    fn log_final_round(&self, lock: Option<RwLockWriteGuard<ReconfigState>>, final_round: bool) {
1395        if final_round {
1396            let epoch = self.epoch_store.epoch();
1397            info!(
1398                ?epoch,
1399                lock=?lock.as_ref(),
1400                final_round=?final_round,
1401                "Notified last checkpoint"
1402            );
1403            self.epoch_store.record_end_of_message_quorum_time_metric();
1404        }
1405    }
1406
1407    #[allow(clippy::type_complexity)]
1408    fn collect_transactions_to_schedule(
1409        &self,
1410        state: &mut CommitHandlerState,
1411        execution_time_estimator: &mut ExecutionTimeEstimator,
1412        commit_info: &ConsensusCommitInfo,
1413        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1414    ) -> (
1415        Vec<VerifiedExecutableTransactionWithAliases>,
1416        Vec<VerifiedExecutableTransactionWithAliases>,
1417        BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1418        Option<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1419    ) {
1420        let _scope = monitored_scope("ConsensusCommitHandler::collect_transactions_to_schedule");
1421        let protocol_config = self.epoch_store.protocol_config();
1422        let epoch = self.epoch_store.epoch();
1423
1424        let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) =
1425            self.merge_and_reorder_transactions(state, commit_info, user_transactions);
1426
1427        let mut shared_object_congestion_tracker =
1428            self.init_congestion_tracker(commit_info, false, &ordered_txns);
1429        let mut shared_object_using_randomness_congestion_tracker =
1430            self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns);
1431
1432        let randomness_state_update_transaction = state
1433            .randomness_round
1434            .map(|round| Schedulable::RandomnessStateUpdate(epoch, round));
1435        debug!(
1436            "Randomness state update transaction: {:?}",
1437            randomness_state_update_transaction
1438                .as_ref()
1439                .map(|t| t.key())
1440        );
1441
1442        let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len());
1443        let mut randomness_transactions_to_schedule =
1444            Vec::with_capacity(ordered_randomness_txns.len());
1445        let mut deferred_txns = BTreeMap::new();
1446        let mut cancelled_txns = BTreeMap::new();
1447
1448        for transaction in ordered_txns {
1449            self.handle_deferral_and_cancellation(
1450                state,
1451                &mut cancelled_txns,
1452                &mut deferred_txns,
1453                &mut transactions_to_schedule,
1454                protocol_config,
1455                commit_info,
1456                transaction,
1457                &mut shared_object_congestion_tracker,
1458                &previously_deferred_tx_digests,
1459                execution_time_estimator,
1460            );
1461        }
1462
1463        for transaction in ordered_randomness_txns {
1464            if state.dkg_failed {
1465                debug!(
1466                    "Canceling randomness-using transaction {:?} because DKG failed",
1467                    transaction.tx().digest(),
1468                );
1469                cancelled_txns.insert(
1470                    *transaction.tx().digest(),
1471                    CancelConsensusCertificateReason::DkgFailed,
1472                );
1473                randomness_transactions_to_schedule.push(transaction);
1474                continue;
1475            }
1476            self.handle_deferral_and_cancellation(
1477                state,
1478                &mut cancelled_txns,
1479                &mut deferred_txns,
1480                &mut randomness_transactions_to_schedule,
1481                protocol_config,
1482                commit_info,
1483                transaction,
1484                &mut shared_object_using_randomness_congestion_tracker,
1485                &previously_deferred_tx_digests,
1486                execution_time_estimator,
1487            );
1488        }
1489
1490        let mut total_deferred_txns = 0;
1491        {
1492            let mut deferred_transactions = self
1493                .epoch_store
1494                .consensus_output_cache
1495                .deferred_transactions
1496                .lock();
1497            for (key, txns) in deferred_txns.into_iter() {
1498                total_deferred_txns += txns.len();
1499                deferred_transactions.insert(key, txns.clone());
1500                state.output.defer_transactions(key, txns);
1501            }
1502        }
1503
1504        self.metrics
1505            .consensus_handler_deferred_transactions
1506            .inc_by(total_deferred_txns as u64);
1507        self.metrics
1508            .consensus_handler_cancelled_transactions
1509            .inc_by(cancelled_txns.len() as u64);
1510        self.metrics
1511            .consensus_handler_max_object_costs
1512            .with_label_values(&["regular_commit"])
1513            .set(shared_object_congestion_tracker.max_cost() as i64);
1514        self.metrics
1515            .consensus_handler_max_object_costs
1516            .with_label_values(&["randomness_commit"])
1517            .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
1518
1519        let congestion_commit_data = shared_object_congestion_tracker.finish_commit(commit_info);
1520        let randomness_congestion_commit_data =
1521            shared_object_using_randomness_congestion_tracker.finish_commit(commit_info);
1522
1523        if let Some(logger) = &self.congestion_logger {
1524            let epoch = self.epoch_store.epoch();
1525            let mut logger = logger.lock().unwrap();
1526            logger.write_commit_log(epoch, commit_info, false, &congestion_commit_data);
1527            logger.write_commit_log(epoch, commit_info, true, &randomness_congestion_commit_data);
1528        }
1529
1530        if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get()
1531            && let Err(e) = tx_object_debts.try_send(
1532                congestion_commit_data
1533                    .accumulated_debts
1534                    .iter()
1535                    .chain(randomness_congestion_commit_data.accumulated_debts.iter())
1536                    .map(|(id, _)| *id)
1537                    .collect(),
1538            )
1539        {
1540            info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
1541        }
1542
1543        state
1544            .output
1545            .set_congestion_control_object_debts(congestion_commit_data.accumulated_debts);
1546        state.output.set_congestion_control_randomness_object_debts(
1547            randomness_congestion_commit_data.accumulated_debts,
1548        );
1549
1550        (
1551            transactions_to_schedule,
1552            randomness_transactions_to_schedule,
1553            cancelled_txns,
1554            randomness_state_update_transaction,
1555        )
1556    }
1557
1558    #[allow(clippy::type_complexity)]
1559    fn create_pending_checkpoints(
1560        &self,
1561        state: &mut CommitHandlerState,
1562        commit_info: &ConsensusCommitInfo,
1563        schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1564        randomness_schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1565        cancelled_txns: &BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1566        final_round: bool,
1567    ) -> CheckpointHeight {
1568        let _scope = monitored_scope("ConsensusCommitHandler::create_pending_checkpoints");
1569        let protocol_config = self.epoch_store.protocol_config();
1570        let epoch = self.epoch_store.epoch();
1571        let accumulators_enabled = self.epoch_store.accumulators_enabled();
1572        let max_transactions_per_checkpoint =
1573            protocol_config.max_transactions_per_checkpoint() as usize;
1574
1575        let should_write_random_checkpoint = state.randomness_round.is_some()
1576            || (state.dkg_failed && !randomness_schedulables.is_empty());
1577
1578        let mut checkpoint_queue = self.checkpoint_queue.lock().unwrap();
1579
1580        let build_chunks =
1581            |schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1582             queue: &mut CheckpointQueue|
1583             -> Vec<Chunk<VerifiedExecutableTransactionWithAliases>> {
1584                schedulables
1585                    .chunks(max_transactions_per_checkpoint)
1586                    .map(|chunk| {
1587                        let height = queue.next_height();
1588                        let schedulables: Vec<_> = chunk.to_vec();
1589                        let settlement = if accumulators_enabled {
1590                            Some(Schedulable::AccumulatorSettlement(epoch, height))
1591                        } else {
1592                            None
1593                        };
1594                        Chunk {
1595                            schedulables,
1596                            settlement,
1597                            height,
1598                        }
1599                    })
1600                    .collect()
1601            };
1602
1603        let num_schedulables = schedulables.len();
1604        let chunked_schedulables = build_chunks(schedulables, &mut checkpoint_queue);
1605        if chunked_schedulables.len() > 1 {
1606            info!(
1607                "Splitting transactions into {} checkpoint chunks (num_schedulables={}, max_tx={})",
1608                chunked_schedulables.len(),
1609                num_schedulables,
1610                max_transactions_per_checkpoint
1611            );
1612            assert_reachable!("checkpoint split due to transaction limit");
1613        }
1614        let chunked_randomness_schedulables = if should_write_random_checkpoint {
1615            build_chunks(randomness_schedulables, &mut checkpoint_queue)
1616        } else {
1617            vec![]
1618        };
1619
1620        let schedulables_for_version_assignment =
1621            Chunk::all_schedulables_from(&chunked_schedulables);
1622        let randomness_schedulables_for_version_assignment =
1623            Chunk::all_schedulables_from(&chunked_randomness_schedulables);
1624
1625        let assigned_versions = self
1626            .epoch_store
1627            .process_consensus_transaction_shared_object_versions(
1628                self.cache_reader.as_ref(),
1629                schedulables_for_version_assignment,
1630                randomness_schedulables_for_version_assignment,
1631                cancelled_txns,
1632                &mut state.output,
1633            )
1634            .expect("failed to assign shared object versions");
1635
1636        let consensus_commit_prologue =
1637            self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1638
1639        let mut chunked_schedulables = chunked_schedulables;
1640        let mut assigned_versions = assigned_versions;
1641        if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1642            assert!(matches!(
1643                chunked_schedulables[0].schedulables[0],
1644                Schedulable::ConsensusCommitPrologue(..)
1645            ));
1646            assert!(matches!(
1647                assigned_versions.0[0].0,
1648                TransactionKey::ConsensusCommitPrologue(..)
1649            ));
1650            assigned_versions.0[0].0 =
1651                TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1652            chunked_schedulables[0].schedulables[0] =
1653                Schedulable::Transaction(consensus_commit_prologue);
1654        }
1655
1656        let assigned_versions = assigned_versions.into_map();
1657
1658        self.epoch_store.process_user_signatures(
1659            chunked_schedulables
1660                .iter()
1661                .flat_map(|c| c.all_schedulables())
1662                .chain(
1663                    chunked_randomness_schedulables
1664                        .iter()
1665                        .flat_map(|c| c.all_schedulables()),
1666                ),
1667        );
1668
1669        let commit_height = chunked_randomness_schedulables
1670            .last()
1671            .or(chunked_schedulables.last())
1672            .map(|c| c.height)
1673            .expect("at least one checkpoint root must be created per commit");
1674
1675        let mut pending_checkpoints = Vec::new();
1676        for chunk in chunked_schedulables {
1677            pending_checkpoints.extend(checkpoint_queue.push_chunk(
1678                chunk.into(),
1679                &assigned_versions,
1680                commit_info.timestamp,
1681                commit_info.consensus_commit_ref,
1682                commit_info.rejected_transactions_digest,
1683            ));
1684        }
1685
1686        if protocol_config.merge_randomness_into_checkpoint() {
1687            // We don't want to block checkpoint formation for non-randomness schedulables
1688            // on randomness state update. Therefore, we include randomness chunks in the
1689            // subsequent checkpoint. First we flush the queue, then enqueue randomness
1690            // for merging into the subsequent commit.
1691            pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, final_round));
1692
1693            if should_write_random_checkpoint {
1694                for chunk in chunked_randomness_schedulables {
1695                    pending_checkpoints.extend(checkpoint_queue.push_chunk(
1696                        chunk.into(),
1697                        &assigned_versions,
1698                        commit_info.timestamp,
1699                        commit_info.consensus_commit_ref,
1700                        commit_info.rejected_transactions_digest,
1701                    ));
1702                }
1703                if final_round {
1704                    pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1705                }
1706            }
1707        } else {
1708            let force = final_round || should_write_random_checkpoint;
1709            pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, force));
1710
1711            if should_write_random_checkpoint {
1712                for chunk in chunked_randomness_schedulables {
1713                    pending_checkpoints.extend(checkpoint_queue.push_chunk(
1714                        chunk.into(),
1715                        &assigned_versions,
1716                        commit_info.timestamp,
1717                        commit_info.consensus_commit_ref,
1718                        commit_info.rejected_transactions_digest,
1719                    ));
1720                }
1721                pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1722            }
1723        }
1724
1725        if final_round && let Some(last) = pending_checkpoints.last_mut() {
1726            last.details.last_of_epoch = true;
1727        }
1728
1729        let queue_drained = checkpoint_queue.is_empty();
1730        drop(checkpoint_queue);
1731
1732        for pending_checkpoint in pending_checkpoints {
1733            debug!(
1734                checkpoint_height = pending_checkpoint.details.checkpoint_height,
1735                roots_count = pending_checkpoint.num_roots(),
1736                "Writing pending checkpoint",
1737            );
1738            self.epoch_store
1739                .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1740                .expect("failed to write pending checkpoint");
1741        }
1742
1743        state.output.set_checkpoint_queue_drained(queue_drained);
1744
1745        commit_height
1746    }
1747
1748    // Adds the consensus commit prologue transaction to the beginning of input `transactions` to update
1749    // the system clock used in all transactions in the current consensus commit.
1750    // Returns the root of the consensus commit prologue transaction if it was added to the input.
1751    fn add_consensus_commit_prologue_transaction<'a>(
1752        &'a self,
1753        state: &'a mut CommitHandlerState,
1754        commit_info: &'a ConsensusCommitInfo,
1755        assigned_versions: &AssignedTxAndVersions,
1756    ) -> Option<VerifiedExecutableTransactionWithAliases> {
1757        {
1758            if commit_info.skip_consensus_commit_prologue_in_test {
1759                return None;
1760            }
1761        }
1762
1763        let mut cancelled_txn_version_assignment = Vec::new();
1764
1765        let protocol_config = self.epoch_store.protocol_config();
1766
1767        for (txn_key, assigned_versions) in assigned_versions.0.iter() {
1768            let Some(d) = txn_key.as_digest() else {
1769                continue;
1770            };
1771
1772            if !protocol_config.include_cancelled_randomness_txns_in_prologue()
1773                && assigned_versions
1774                    .shared_object_versions
1775                    .iter()
1776                    .any(|((id, _), _)| *id == SUI_RANDOMNESS_STATE_OBJECT_ID)
1777            {
1778                continue;
1779            }
1780
1781            if assigned_versions
1782                .shared_object_versions
1783                .iter()
1784                .any(|(_, version)| version.is_cancelled())
1785            {
1786                assert_reachable!("cancelled transactions");
1787                cancelled_txn_version_assignment
1788                    .push((*d, assigned_versions.shared_object_versions.clone()));
1789            }
1790        }
1791
1792        fail_point_arg!(
1793            "additional_cancelled_txns_for_tests",
1794            |additional_cancelled_txns: Vec<(
1795                TransactionDigest,
1796                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>
1797            )>| {
1798                cancelled_txn_version_assignment.extend(additional_cancelled_txns);
1799            }
1800        );
1801
1802        let transaction = commit_info.create_consensus_commit_prologue_transaction(
1803            self.epoch_store.epoch(),
1804            self.epoch_store.protocol_config(),
1805            cancelled_txn_version_assignment,
1806            commit_info,
1807            state.indirect_state_observer.take().unwrap(),
1808        );
1809        Some(VerifiedExecutableTransactionWithAliases::no_aliases(
1810            transaction,
1811        ))
1812    }
1813
1814    fn handle_deferral_and_cancellation(
1815        &self,
1816        state: &mut CommitHandlerState,
1817        cancelled_txns: &mut BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1818        deferred_txns: &mut BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>,
1819        scheduled_txns: &mut Vec<VerifiedExecutableTransactionWithAliases>,
1820        protocol_config: &ProtocolConfig,
1821        commit_info: &ConsensusCommitInfo,
1822        transaction: VerifiedExecutableTransactionWithAliases,
1823        shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
1824        previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
1825        execution_time_estimator: &ExecutionTimeEstimator,
1826    ) {
1827        // Check for unpaid amplification before other deferral checks.
1828        // SIP-45: Paid amplification allows (gas_price / RGP + 1) submissions.
1829        // Transactions with more duplicates than paid for are deferred.
1830        if protocol_config.defer_unpaid_amplification() {
1831            let occurrence_count = state
1832                .occurrence_counts
1833                .get(transaction.tx().digest())
1834                .copied()
1835                .unwrap_or(0);
1836
1837            let rgp = self.epoch_store.reference_gas_price();
1838            let gas_price = transaction.tx().transaction_data().gas_price();
1839            let allowed_count = (gas_price / rgp.max(1)) + 1;
1840
1841            if occurrence_count as u64 > allowed_count {
1842                self.metrics
1843                    .consensus_handler_unpaid_amplification_deferrals
1844                    .inc();
1845
1846                let deferred_from_round = previously_deferred_tx_digests
1847                    .get(transaction.tx().digest())
1848                    .map(|k| k.deferred_from_round())
1849                    .unwrap_or(commit_info.round);
1850
1851                let deferral_key = DeferralKey::new_for_consensus_round(
1852                    commit_info.round + 1,
1853                    deferred_from_round,
1854                );
1855
1856                if transaction_deferral_within_limit(
1857                    &deferral_key,
1858                    protocol_config.max_deferral_rounds_for_congestion_control(),
1859                ) {
1860                    assert_reachable!("unpaid amplification deferral");
1861                    debug!(
1862                        "Deferring transaction {:?} due to unpaid amplification (count={}, allowed={})",
1863                        transaction.tx().digest(),
1864                        occurrence_count,
1865                        allowed_count
1866                    );
1867                    deferred_txns
1868                        .entry(deferral_key)
1869                        .or_default()
1870                        .push(transaction);
1871                    return;
1872                }
1873            }
1874        }
1875
1876        let tx_cost = shared_object_congestion_tracker.get_tx_cost(
1877            execution_time_estimator,
1878            transaction.tx(),
1879            state.indirect_state_observer.as_mut().unwrap(),
1880        );
1881
1882        let deferral_info = self.epoch_store.should_defer(
1883            transaction.tx(),
1884            commit_info,
1885            state.dkg_failed,
1886            state.randomness_round.is_some(),
1887            previously_deferred_tx_digests,
1888            shared_object_congestion_tracker,
1889        );
1890
1891        if let Some((deferral_key, deferral_reason)) = deferral_info {
1892            debug!(
1893                "Deferring consensus certificate for transaction {:?} until {:?}",
1894                transaction.tx().digest(),
1895                deferral_key
1896            );
1897
1898            match deferral_reason {
1899                DeferralReason::RandomnessNotReady => {
1900                    deferred_txns
1901                        .entry(deferral_key)
1902                        .or_default()
1903                        .push(transaction);
1904                }
1905                DeferralReason::SharedObjectCongestion(congested_objects) => {
1906                    self.metrics.consensus_handler_congested_transactions.inc();
1907                    if transaction_deferral_within_limit(
1908                        &deferral_key,
1909                        protocol_config.max_deferral_rounds_for_congestion_control(),
1910                    ) {
1911                        deferred_txns
1912                            .entry(deferral_key)
1913                            .or_default()
1914                            .push(transaction);
1915                    } else {
1916                        assert_sometimes!(
1917                            transaction.tx().data().transaction_data().uses_randomness(),
1918                            "cancelled randomness-using transaction"
1919                        );
1920                        assert_sometimes!(
1921                            !transaction.tx().data().transaction_data().uses_randomness(),
1922                            "cancelled non-randomness-using transaction"
1923                        );
1924
1925                        // Cancel the transaction that has been deferred for too long.
1926                        debug!(
1927                            "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}",
1928                            transaction.tx().digest(),
1929                            deferral_key,
1930                            congested_objects
1931                        );
1932                        cancelled_txns.insert(
1933                            *transaction.tx().digest(),
1934                            CancelConsensusCertificateReason::CongestionOnObjects(
1935                                congested_objects,
1936                            ),
1937                        );
1938                        scheduled_txns.push(transaction);
1939                    }
1940                }
1941            }
1942        } else {
1943            // Update object execution cost for all scheduled transactions
1944            shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, transaction.tx());
1945            scheduled_txns.push(transaction);
1946        }
1947    }
1948
1949    fn merge_and_reorder_transactions(
1950        &self,
1951        state: &mut CommitHandlerState,
1952        commit_info: &ConsensusCommitInfo,
1953        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1954    ) -> (
1955        Vec<VerifiedExecutableTransactionWithAliases>,
1956        Vec<VerifiedExecutableTransactionWithAliases>,
1957        HashMap<TransactionDigest, DeferralKey>,
1958    ) {
1959        let protocol_config = self.epoch_store.protocol_config();
1960
1961        let (mut txns, mut randomness_txns, previously_deferred_tx_digests) =
1962            self.load_deferred_transactions(state, commit_info);
1963
1964        txns.reserve(user_transactions.len());
1965        randomness_txns.reserve(user_transactions.len());
1966
1967        // There may be randomness transactions in `txns`, which were deferred due to congestion.
1968        // They must be placed back into `randomness_txns`.
1969        let mut txns: Vec<_> = txns
1970            .into_iter()
1971            .filter_map(|tx| {
1972                if tx.tx().transaction_data().uses_randomness() {
1973                    randomness_txns.push(tx);
1974                    None
1975                } else {
1976                    Some(tx)
1977                }
1978            })
1979            .collect();
1980
1981        for txn in user_transactions {
1982            if txn.tx().transaction_data().uses_randomness() {
1983                randomness_txns.push(txn);
1984            } else {
1985                txns.push(txn);
1986            }
1987        }
1988
1989        PostConsensusTxReorder::reorder(
1990            &mut txns,
1991            protocol_config.consensus_transaction_ordering(),
1992        );
1993        PostConsensusTxReorder::reorder(
1994            &mut randomness_txns,
1995            protocol_config.consensus_transaction_ordering(),
1996        );
1997
1998        (txns, randomness_txns, previously_deferred_tx_digests)
1999    }
2000
2001    fn load_deferred_transactions(
2002        &self,
2003        state: &mut CommitHandlerState,
2004        commit_info: &ConsensusCommitInfo,
2005    ) -> (
2006        Vec<VerifiedExecutableTransactionWithAliases>,
2007        Vec<VerifiedExecutableTransactionWithAliases>,
2008        HashMap<TransactionDigest, DeferralKey>,
2009    ) {
2010        let mut previously_deferred_tx_digests = HashMap::new();
2011
2012        let deferred_txs: Vec<_> = self
2013            .epoch_store
2014            .load_deferred_transactions_for_up_to_consensus_round_v2(
2015                &mut state.output,
2016                commit_info.round,
2017            )
2018            .expect("db error")
2019            .into_iter()
2020            .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2021            .map(|(key, tx)| {
2022                previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2023                tx
2024            })
2025            .collect();
2026        trace!(
2027            "loading deferred transactions: {:?}",
2028            deferred_txs.iter().map(|tx| tx.tx().digest())
2029        );
2030
2031        let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() {
2032            let txns: Vec<_> = self
2033                .epoch_store
2034                .load_deferred_transactions_for_randomness_v2(&mut state.output)
2035                .expect("db error")
2036                .into_iter()
2037                .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2038                .map(|(key, tx)| {
2039                    previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2040                    tx
2041                })
2042                .collect();
2043            trace!(
2044                "loading deferred randomness transactions: {:?}",
2045                txns.iter().map(|tx| tx.tx().digest())
2046            );
2047            txns
2048        } else {
2049            vec![]
2050        };
2051
2052        (
2053            deferred_txs,
2054            deferred_randomness_txs,
2055            previously_deferred_tx_digests,
2056        )
2057    }
2058
2059    fn init_congestion_tracker(
2060        &self,
2061        commit_info: &ConsensusCommitInfo,
2062        for_randomness: bool,
2063        txns: &[VerifiedExecutableTransactionWithAliases],
2064    ) -> SharedObjectCongestionTracker {
2065        #[allow(unused_mut)]
2066        let mut ret = SharedObjectCongestionTracker::from_protocol_config(
2067            self.epoch_store
2068                .consensus_quarantine
2069                .read()
2070                .load_initial_object_debts(
2071                    &self.epoch_store,
2072                    commit_info.round,
2073                    for_randomness,
2074                    txns,
2075                )
2076                .expect("db error"),
2077            self.epoch_store.protocol_config(),
2078            for_randomness,
2079            self.congestion_logger.is_some(),
2080        );
2081
2082        fail_point_arg!(
2083            "initial_congestion_tracker",
2084            |tracker: SharedObjectCongestionTracker| {
2085                info!(
2086                    "Initialize shared_object_congestion_tracker to  {:?}",
2087                    tracker
2088                );
2089                ret = tracker;
2090            }
2091        );
2092
2093        ret
2094    }
2095
2096    fn process_gasless_transactions(
2097        &self,
2098        commit_info: &ConsensusCommitInfo,
2099        user_transactions: &[VerifiedExecutableTransactionWithAliases],
2100    ) {
2101        let gasless_count = user_transactions
2102            .iter()
2103            .filter(|txn| txn.tx().transaction_data().is_gasless_transaction())
2104            .count() as u64;
2105        self.consensus_gasless_counter
2106            .record_commit(commit_info.timestamp, gasless_count);
2107    }
2108
2109    fn process_jwks(
2110        &self,
2111        state: &mut CommitHandlerState,
2112        commit_info: &ConsensusCommitInfo,
2113        new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
2114    ) {
2115        for (authority_name, jwk_id, jwk) in new_jwks {
2116            self.epoch_store.record_jwk_vote(
2117                &mut state.output,
2118                commit_info.round,
2119                authority_name,
2120                &jwk_id,
2121                &jwk,
2122            );
2123        }
2124    }
2125
2126    fn process_capability_notifications(
2127        &self,
2128        capability_notifications: Vec<AuthorityCapabilitiesV2>,
2129    ) {
2130        for capabilities in capability_notifications {
2131            self.epoch_store
2132                .record_capabilities_v2(&capabilities)
2133                .expect("db error");
2134        }
2135    }
2136
2137    fn process_execution_time_observations(
2138        &self,
2139        state: &mut CommitHandlerState,
2140        execution_time_observations: Vec<ExecutionTimeObservation>,
2141    ) {
2142        let _scope = monitored_scope("ConsensusCommitHandler::process_execution_time_observations");
2143        let mut execution_time_estimator = self
2144            .epoch_store
2145            .execution_time_estimator
2146            .try_lock()
2147            .expect("should only ever be called from the commit handler thread");
2148
2149        for ExecutionTimeObservation {
2150            authority,
2151            generation,
2152            estimates,
2153        } in execution_time_observations
2154        {
2155            let authority_index = self
2156                .epoch_store
2157                .committee()
2158                .authority_index(&authority)
2159                .unwrap();
2160            execution_time_estimator.process_observations_from_consensus(
2161                authority_index,
2162                Some(generation),
2163                &estimates,
2164            );
2165            state
2166                .output
2167                .insert_execution_time_observation(authority_index, generation, estimates);
2168        }
2169    }
2170
2171    fn process_checkpoint_signature_messages(
2172        &self,
2173        checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
2174    ) {
2175        for checkpoint_signature_message in checkpoint_signature_messages {
2176            self.checkpoint_service
2177                .notify_checkpoint_signature(&checkpoint_signature_message)
2178                .expect("db error");
2179        }
2180    }
2181
2182    async fn process_dkg_updates(
2183        &self,
2184        state: &mut CommitHandlerState,
2185        commit_info: &ConsensusCommitInfo,
2186        randomness_manager: Option<&mut RandomnessManager>,
2187        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2188        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2189    ) {
2190        if !self.epoch_store.randomness_state_enabled() {
2191            let num_dkg_messages = randomness_dkg_messages.len();
2192            let num_dkg_confirmations = randomness_dkg_confirmations.len();
2193            if num_dkg_messages + num_dkg_confirmations > 0 {
2194                debug_fatal!(
2195                    "received {} RandomnessDkgMessage and {} RandomnessDkgConfirmation messages when randomness is not enabled",
2196                    num_dkg_messages,
2197                    num_dkg_confirmations
2198                );
2199            }
2200            return;
2201        }
2202
2203        let randomness_manager =
2204            randomness_manager.expect("randomness manager should exist if randomness is enabled");
2205
2206        let randomness_dkg_updates =
2207            self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages);
2208
2209        let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations(
2210            state,
2211            randomness_manager,
2212            randomness_dkg_confirmations,
2213        );
2214
2215        // Keep advancing the DKG state machine until it is resolved, regardless of
2216        // whether new messages/confirmations were processed this commit. Preserve
2217        // the mainnet epoch fallback until the protocol flag is active everywhere.
2218        let always_advance_dkg_to_resolution = (self
2219            .epoch_store
2220            .protocol_config()
2221            .always_advance_dkg_to_resolution()
2222            || (self.epoch_store.get_chain() == Chain::Mainnet
2223                && self.epoch_store.epoch() >= 1143))
2224            && randomness_manager.dkg_status() == DkgStatus::Pending;
2225
2226        if randomness_dkg_updates
2227            || randomness_dkg_confirmation_updates
2228            || always_advance_dkg_to_resolution
2229        {
2230            randomness_manager
2231                .advance_dkg(&mut state.output, commit_info.round)
2232                .await
2233                .expect("epoch ended");
2234        }
2235    }
2236
2237    fn process_randomness_dkg_messages(
2238        &self,
2239        randomness_manager: &mut RandomnessManager,
2240        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2241    ) -> bool /* randomness state updated */ {
2242        if randomness_dkg_messages.is_empty() {
2243            return false;
2244        }
2245
2246        let mut randomness_state_updated = false;
2247        for (authority, bytes) in randomness_dkg_messages {
2248            match bcs::from_bytes(&bytes) {
2249                Ok(message) => {
2250                    randomness_manager
2251                        .add_message(&authority, message)
2252                        // TODO: make infallible
2253                        .expect("epoch ended");
2254                    randomness_state_updated = true;
2255                }
2256
2257                Err(e) => {
2258                    warn!(
2259                        "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
2260                        authority.concise(),
2261                    );
2262                }
2263            }
2264        }
2265
2266        randomness_state_updated
2267    }
2268
2269    fn process_randomness_dkg_confirmations(
2270        &self,
2271        state: &mut CommitHandlerState,
2272        randomness_manager: &mut RandomnessManager,
2273        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2274    ) -> bool /* randomness state updated */ {
2275        if randomness_dkg_confirmations.is_empty() {
2276            return false;
2277        }
2278
2279        let mut randomness_state_updated = false;
2280        for (authority, bytes) in randomness_dkg_confirmations {
2281            match bcs::from_bytes(&bytes) {
2282                Ok(message) => {
2283                    randomness_manager
2284                        .add_confirmation(&mut state.output, &authority, message)
2285                        // TODO: make infallible
2286                        .expect("epoch ended");
2287                    randomness_state_updated = true;
2288                }
2289                Err(e) => {
2290                    warn!(
2291                        "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}",
2292                        authority.concise(),
2293                    );
2294                }
2295            }
2296        }
2297
2298        randomness_state_updated
2299    }
2300
2301    /// Returns true if we have collected a quorum of end of publish messages (either in this round or a previous round).
2302    fn process_end_of_publish_transactions(
2303        &self,
2304        state: &mut CommitHandlerState,
2305        end_of_publish_transactions: Vec<AuthorityName>,
2306    ) -> bool {
2307        let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect(
2308            "No contention on end_of_publish as it is only accessed from consensus handler",
2309        );
2310
2311        if eop_aggregator.has_quorum() {
2312            return true;
2313        }
2314
2315        if end_of_publish_transactions.is_empty() {
2316            return false;
2317        }
2318
2319        for authority in end_of_publish_transactions {
2320            info!("Received EndOfPublish from {:?}", authority.concise());
2321
2322            // It is ok to just release lock here as this function is the only place that transition into RejectAllCerts state
2323            // And this function itself is always executed from consensus task
2324            state.output.insert_end_of_publish(authority);
2325            if eop_aggregator
2326                .insert_generic(authority, ())
2327                .is_quorum_reached()
2328            {
2329                debug!(
2330                    "Collected enough end_of_publish messages with last message from validator {:?}",
2331                    authority.concise(),
2332                );
2333                return true;
2334            }
2335        }
2336
2337        false
2338    }
2339
2340    /// After we have collected 2f+1 EndOfPublish messages, we call this function every round until the epoch
2341    /// ends.
2342    fn advance_eop_state_machine(
2343        &self,
2344        state: &mut CommitHandlerState,
2345    ) -> (
2346        RwLockWriteGuard<'_, ReconfigState>,
2347        bool, // true if final round
2348    ) {
2349        let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard();
2350        let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx();
2351
2352        reconfig_state.close_all_certs();
2353
2354        let commit_has_deferred_txns = state.output.has_deferred_transactions();
2355        let previous_commits_have_deferred_txns = !self.epoch_store.deferred_transactions_empty();
2356
2357        if !commit_has_deferred_txns && !previous_commits_have_deferred_txns {
2358            if !start_state_is_reject_all_tx {
2359                info!("Transitioning to RejectAllTx");
2360            }
2361            reconfig_state.close_all_tx();
2362        } else {
2363            debug!(
2364                "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}",
2365                previous_commits_have_deferred_txns, commit_has_deferred_txns,
2366            );
2367        }
2368
2369        state.output.store_reconfig_state(reconfig_state.clone());
2370
2371        if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() {
2372            (reconfig_state, true)
2373        } else {
2374            (reconfig_state, false)
2375        }
2376    }
2377
2378    fn gather_commit_metadata(
2379        &self,
2380        consensus_commit: &impl ConsensusCommitAPI,
2381    ) -> (u64, AuthorityIndex, u64) {
2382        let timestamp = consensus_commit.commit_timestamp_ms();
2383        let leader_author = consensus_commit.leader_author_index();
2384        let commit_sub_dag_index = consensus_commit.commit_sub_dag_index();
2385
2386        let system_time_ms = SystemTime::now()
2387            .duration_since(UNIX_EPOCH)
2388            .unwrap()
2389            .as_millis() as i64;
2390
2391        let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64);
2392        let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0;
2393        self.metrics
2394            .consensus_timestamp_bias
2395            .observe(consensus_timestamp_bias_seconds);
2396
2397        let epoch_start = self
2398            .epoch_store
2399            .epoch_start_config()
2400            .epoch_start_timestamp_ms();
2401        let timestamp = if timestamp < epoch_start {
2402            error!(
2403                "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}"
2404            );
2405            epoch_start
2406        } else {
2407            timestamp
2408        };
2409
2410        (timestamp, leader_author, commit_sub_dag_index)
2411    }
2412
2413    fn create_authenticator_state_update(
2414        &self,
2415        last_committed_round: u64,
2416        commit_info: &ConsensusCommitInfo,
2417    ) -> Option<VerifiedExecutableTransactionWithAliases> {
2418        // Load all jwks that became active in the previous round, and commit them in this round.
2419        // We want to delay one round because none of the transactions in the previous round could
2420        // have been authenticated with the jwks that became active in that round.
2421        //
2422        // Because of this delay, jwks that become active in the last round of the epoch will
2423        // never be committed. That is ok, because in the new epoch, the validators should
2424        // immediately re-submit these jwks, and they can become active then.
2425        let new_jwks = self
2426            .epoch_store
2427            .get_new_jwks(last_committed_round)
2428            .expect("Unrecoverable error in consensus handler");
2429
2430        if !new_jwks.is_empty() {
2431            let authenticator_state_update_transaction = authenticator_state_update_transaction(
2432                &self.epoch_store,
2433                commit_info.round,
2434                new_jwks,
2435            );
2436            debug!(
2437                "adding AuthenticatorStateUpdate({:?}) tx: {:?}",
2438                authenticator_state_update_transaction.digest(),
2439                authenticator_state_update_transaction,
2440            );
2441
2442            Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2443                authenticator_state_update_transaction,
2444            ))
2445        } else {
2446            None
2447        }
2448    }
2449
2450    // Filters out rejected or deprecated transactions.
2451    // Returns FilteredConsensusOutput containing transactions and owned_object_locks.
2452    #[instrument(level = "trace", skip_all)]
2453    fn filter_consensus_txns(
2454        &mut self,
2455        initial_reconfig_state: ReconfigState,
2456        commit_info: &ConsensusCommitInfo,
2457        block_transactions: ParsedConsensusTransactions,
2458    ) -> FilteredConsensusOutput {
2459        let _scope = monitored_scope("ConsensusCommitHandler::filter_consensus_txns");
2460        let mut transactions = Vec::new();
2461        let mut owned_object_locks = HashMap::new();
2462        let mut dropped_transaction_keys = Vec::new();
2463        // Consensus transaction status updates are collected here and flushed in a
2464        // single batched write (one lock acquisition, notifications outside the lock)
2465        // at the end of the commit, rather than one write+notify per transaction.
2466        let mut status_updates: Vec<(ConsensusPosition, ConsensusTxStatus)> = Vec::new();
2467        let epoch = self.epoch_store.epoch();
2468        let mut num_finalized_user_transactions = vec![0; self.committee.size()];
2469        let mut num_rejected_user_transactions = vec![0; self.committee.size()];
2470
2471        // Prefetch the cross-commit owned-object lock state for the whole commit in one
2472        // batched read. These locks are constant for the duration of a commit (new locks
2473        // are only written at commit end), so this replaces the per-transaction
2474        // quarantine+DB lookup that try_acquire_owned_object_locks_post_consensus used to
2475        // do. Over-reading refs of transactions that are later filtered out is harmless.
2476        let existing_locks = {
2477            let mut prefetch_refs: Vec<ObjectRef> = Vec::new();
2478            for (_block, parsed_transactions) in &block_transactions {
2479                for parsed in parsed_transactions {
2480                    if let ConsensusTransactionKind::UserTransactionV2(tx_with_claims) =
2481                        &parsed.transaction.kind
2482                        && let Some(refs) = owned_object_refs_to_lock(tx_with_claims)
2483                    {
2484                        prefetch_refs.extend(refs);
2485                    }
2486                }
2487            }
2488            prefetch_refs.sort();
2489            prefetch_refs.dedup();
2490            // On a read error fall back to an empty map (treat refs as unlocked) — the
2491            // same lenient behavior the per-transaction read had.
2492            self.epoch_store
2493                .get_owned_object_locks_map(&prefetch_refs)
2494                .unwrap_or_default()
2495        };
2496
2497        for (block, parsed_transactions) in block_transactions {
2498            let author = block.author.value();
2499            let author_hostname = self.committee.authority(block.author).hostname.as_str();
2500            // TODO: consider only messages within 1~3 rounds of the leader?
2501            self.last_consensus_stats.stats.inc_num_messages(author);
2502
2503            // Set the "ping" transaction status for this block. This is necessary as there might be some ping requests waiting for the ping transaction to be certified.
2504            status_updates.push((
2505                ConsensusPosition::ping(epoch, block),
2506                ConsensusTxStatus::Finalized,
2507            ));
2508
2509            for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() {
2510                let position = ConsensusPosition {
2511                    epoch,
2512                    block,
2513                    index: tx_index as TransactionIndex,
2514                };
2515
2516                // Transaction has appeared in consensus output, we can increment the submission count
2517                // for this tx for DoS protection.
2518                if let Some(tx) = parsed.transaction.kind.as_user_transaction() {
2519                    let digest = tx.digest();
2520                    if let Some((spam_weight, submitter_client_addrs)) = self
2521                        .epoch_store
2522                        .submitted_transaction_cache
2523                        .increment_submission_count(digest)
2524                    {
2525                        if let Some(ref traffic_controller) = self.traffic_controller {
2526                            debug!(
2527                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses",
2528                                submitter_client_addrs.len()
2529                            );
2530
2531                            // Apply spam weight to all client addresses that submitted this transaction
2532                            for addr in submitter_client_addrs {
2533                                traffic_controller.tally(
2534                                    TrafficTally::new(Some(addr), None, None, spam_weight.clone())
2535                                        .with_method(
2536                                            "consensus_submission_limit_exceeded".to_string(),
2537                                        ),
2538                                );
2539                            }
2540                        } else {
2541                            warn!(
2542                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)",
2543                                submitter_client_addrs.len()
2544                            );
2545                        }
2546                    }
2547                }
2548
2549                // Record metrics for every committed transaction, regardless of whether it was
2550                // accepted or rejected by consensus, so we measure the full committed output.
2551                let kind = classify(&parsed.transaction);
2552                let outcome = if parsed.rejected {
2553                    "rejected"
2554                } else {
2555                    "accepted"
2556                };
2557                self.metrics
2558                    .consensus_handler_processed
2559                    .with_label_values(&[kind, outcome])
2560                    .inc();
2561                self.metrics
2562                    .consensus_handler_transaction_sizes
2563                    .with_label_values(&[kind, outcome])
2564                    .observe(parsed.serialized_len as f64);
2565                // Per-author breakdown is only tracked for user transactions, since that is where
2566                // rejections and author-attributable spam are meaningful. Keeping the block author
2567                // label scoped to user transactions also bounds metric cardinality.
2568                if parsed.transaction.is_user_transaction() {
2569                    self.metrics
2570                        .consensus_handler_processed_user_transactions
2571                        .with_label_values(&[outcome, author_hostname])
2572                        .inc();
2573                }
2574
2575                if parsed.rejected {
2576                    if parsed.transaction.is_user_transaction() {
2577                        status_updates.push((position, ConsensusTxStatus::Rejected));
2578                        num_rejected_user_transactions[author] += 1;
2579                    }
2580                    // Skip processing rejected transactions.
2581                    continue;
2582                }
2583
2584                if parsed.transaction.is_user_transaction() {
2585                    self.last_consensus_stats
2586                        .stats
2587                        .inc_num_user_transactions(author);
2588                }
2589
2590                if !initial_reconfig_state.should_accept_consensus_certs() {
2591                    // (Note: we no longer need to worry about the previously deferred condition, since we are only
2592                    // processing newly-received transactions at this time).
2593                    match &parsed.transaction.kind {
2594                        ConsensusTransactionKind::UserTransactionV2(_)
2595                        // deprecated and ignore later, but added for exhaustive match
2596                        | ConsensusTransactionKind::UserTransaction(_)
2597                        | ConsensusTransactionKind::CertifiedTransaction(_)
2598                        | ConsensusTransactionKind::CapabilityNotification(_)
2599                        | ConsensusTransactionKind::CapabilityNotificationV2(_)
2600                        | ConsensusTransactionKind::EndOfPublish(_)
2601                        // Note: we no longer have to check protocol_config.ignore_execution_time_observations_after_certs_closed()
2602                        | ConsensusTransactionKind::ExecutionTimeObservation(_)
2603                        | ConsensusTransactionKind::NewJWKFetched(_, _, _) => {
2604                            debug!(
2605                                "Ignoring consensus transaction {:?} because of end of epoch",
2606                                parsed.transaction.key()
2607                            );
2608                            continue;
2609                        }
2610
2611                        // These are the message types that are still processed even if !should_accept_consensus_certs()
2612                        ConsensusTransactionKind::CheckpointSignature(_)
2613                        | ConsensusTransactionKind::CheckpointSignatureV2(_)
2614                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2615                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
2616                        | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => ()
2617                    }
2618                }
2619
2620                if !initial_reconfig_state.should_accept_tx() {
2621                    match &parsed.transaction.kind {
2622                        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
2623                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue,
2624                        _ => {}
2625                    }
2626                }
2627
2628                // Handle deprecated messages
2629                match &parsed.transaction.kind {
2630                    ConsensusTransactionKind::CapabilityNotification(_)
2631                    | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2632                    | ConsensusTransactionKind::CheckpointSignature(_) => {
2633                        debug_fatal!(
2634                            "BUG: saw deprecated tx {:?}for commit round {}",
2635                            parsed.transaction.key(),
2636                            commit_info.round
2637                        );
2638                        continue;
2639                    }
2640                    _ => {}
2641                }
2642
2643                if parsed.transaction.is_user_transaction() {
2644                    let author_name = self
2645                        .epoch_store
2646                        .committee()
2647                        .authority_by_index(author as u32)
2648                        .unwrap();
2649                    if self
2650                        .epoch_store
2651                        .has_received_end_of_publish_from(author_name)
2652                    {
2653                        // In some edge cases, consensus might resend previously seen certificate after EndOfPublish
2654                        // An honest validator should not send a new transaction after EndOfPublish. Whether the
2655                        // transaction is duplicate or not, we filter it out here.
2656                        warn!(
2657                            "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus",
2658                            author_name.concise(),
2659                            parsed.transaction.key(),
2660                        );
2661                        continue;
2662                    }
2663                }
2664
2665                // Perform post-consensus owned object conflict detection. If lock acquisition
2666                // fails, the transaction has invalid/conflicting owned inputs and should be dropped.
2667                // This must happen AFTER all filtering checks above to avoid acquiring locks
2668                // for transactions that will be dropped (e.g., during epoch change).
2669                // Only applies to UserTransactionV2 - other transaction types don't need lock acquisition.
2670                if let ConsensusTransactionKind::UserTransactionV2(tx_with_claims) =
2671                    &parsed.transaction.kind
2672                {
2673                    let tx = tx_with_claims.tx();
2674                    let Some(owned_object_refs) = owned_object_refs_to_lock(tx_with_claims) else {
2675                        debug_fatal!("Invalid input objects for transaction {}", tx.digest());
2676                        continue;
2677                    };
2678
2679                    match self
2680                        .epoch_store
2681                        .try_acquire_owned_object_locks_post_consensus(
2682                            &owned_object_refs,
2683                            *tx.digest(),
2684                            &owned_object_locks,
2685                            &existing_locks,
2686                        ) {
2687                        Ok(new_locks) => {
2688                            owned_object_locks.extend(new_locks.into_iter());
2689                            // Lock acquisition succeeded - now set Finalized status
2690                            status_updates.push((position, ConsensusTxStatus::Finalized));
2691                            num_finalized_user_transactions[author] += 1;
2692                        }
2693                        Err(e) => {
2694                            debug!("Dropping transaction {}: {}", tx.digest(), e);
2695                            self.metrics.consensus_handler_dropped_transactions.inc();
2696                            status_updates.push((position, ConsensusTxStatus::Dropped));
2697                            self.epoch_store.set_rejection_vote_reason(position, &e);
2698                            dropped_transaction_keys.push(parsed.transaction.key());
2699                            continue;
2700                        }
2701                    }
2702                }
2703
2704                let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);
2705                transactions.push((transaction, author as u32));
2706            }
2707        }
2708
2709        // Flush all collected status updates in one batched write. None of the logic
2710        // above reads transaction status back, so deferring visibility to here (still
2711        // before dedup/processing) is equivalent to setting each status inline.
2712        self.epoch_store.set_consensus_tx_statuses(status_updates);
2713
2714        for (i, authority) in self.committee.authorities() {
2715            let hostname = &authority.hostname;
2716            self.metrics
2717                .consensus_committed_messages
2718                .with_label_values(&[hostname])
2719                .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
2720            self.metrics
2721                .consensus_committed_user_transactions
2722                .with_label_values(&[hostname])
2723                .set(
2724                    self.last_consensus_stats
2725                        .stats
2726                        .get_num_user_transactions(i.value()) as i64,
2727                );
2728            self.metrics
2729                .consensus_finalized_user_transactions
2730                .with_label_values(&[hostname])
2731                .add(num_finalized_user_transactions[i.value()] as i64);
2732            self.metrics
2733                .consensus_rejected_user_transactions
2734                .with_label_values(&[hostname])
2735                .add(num_rejected_user_transactions[i.value()] as i64);
2736        }
2737
2738        FilteredConsensusOutput {
2739            transactions,
2740            owned_object_locks,
2741            dropped_transaction_keys,
2742        }
2743    }
2744
2745    fn deduplicate_consensus_txns(
2746        &mut self,
2747        state: &mut CommitHandlerState,
2748        commit_info: &ConsensusCommitInfo,
2749        transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
2750    ) -> Vec<VerifiedSequencedConsensusTransaction> {
2751        let _scope = monitored_scope("ConsensusCommitHandler::deduplicate_consensus_txns");
2752        let mut all_transactions = Vec::new();
2753
2754        // Track occurrence counts for each transaction key within this commit.
2755        // Also serves as the deduplication set (count > 1 means duplicate within commit).
2756        let mut occurrence_counts: HashMap<SequencedConsensusTransactionKey, u32> = HashMap::new();
2757        // Keys being seen for the first time (not duplicates from previous commits).
2758        let mut first_commit_keys: HashSet<SequencedConsensusTransactionKey> = HashSet::new();
2759
2760        for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
2761            // In process_consensus_transactions_and_commit_boundary(), we will add a system consensus commit
2762            // prologue transaction, which will be the first transaction in this consensus commit batch.
2763            // Therefore, the transaction sequence number starts from 1 here.
2764            let current_tx_index = ExecutionIndices {
2765                last_committed_round: commit_info.round,
2766                sub_dag_index: commit_info.consensus_commit_ref.index.into(),
2767                transaction_index: (seq + 1) as u64,
2768            };
2769
2770            self.last_consensus_stats.index = current_tx_index;
2771
2772            let certificate_author = *self
2773                .epoch_store
2774                .committee()
2775                .authority_by_index(cert_origin)
2776                .unwrap();
2777
2778            let sequenced_transaction = SequencedConsensusTransaction {
2779                certificate_author_index: cert_origin,
2780                certificate_author,
2781                consensus_index: current_tx_index,
2782                transaction,
2783            };
2784
2785            let Some(verified_transaction) = self
2786                .epoch_store
2787                .verify_consensus_transaction(sequenced_transaction)
2788            else {
2789                continue;
2790            };
2791
2792            let key = verified_transaction.0.key();
2793
2794            if let Some(tx_digest) = key.user_transaction_digest() {
2795                self.epoch_store
2796                    .cache_recently_finalized_transaction(tx_digest);
2797            }
2798
2799            // Increment count and check if this is a duplicate within this commit.
2800            // This replaces the separate processed_set HashSet.
2801            let count = occurrence_counts.entry(key.clone()).or_insert(0);
2802            *count += 1;
2803            let in_commit = *count > 1;
2804
2805            let in_cache = self.processed_cache.put(key.clone(), ()).is_some();
2806            if in_commit || in_cache {
2807                self.metrics.skipped_consensus_txns_cache_hit.inc();
2808                continue;
2809            }
2810            if self
2811                .epoch_store
2812                .is_consensus_message_processed(&key)
2813                .expect("db error")
2814            {
2815                self.metrics.skipped_consensus_txns.inc();
2816                continue;
2817            }
2818
2819            first_commit_keys.insert(key.clone());
2820
2821            state.output.record_consensus_message_processed(key);
2822
2823            all_transactions.push(verified_transaction);
2824        }
2825
2826        for key in first_commit_keys {
2827            if let Some(&count) = occurrence_counts.get(&key)
2828                && count > 1
2829            {
2830                self.metrics
2831                    .consensus_handler_duplicate_tx_count
2832                    .observe(count as f64);
2833            }
2834        }
2835
2836        // Copy user transaction occurrence counts to state for unpaid amplification detection.
2837        assert!(
2838            state.occurrence_counts.is_empty(),
2839            "occurrence_counts should be empty before populating"
2840        );
2841        state.occurrence_counts.reserve(occurrence_counts.len());
2842        state.occurrence_counts.extend(
2843            occurrence_counts
2844                .into_iter()
2845                .filter_map(|(key, count)| key.user_transaction_digest().map(|d| (d, count))),
2846        );
2847
2848        all_transactions
2849    }
2850
2851    fn build_commit_handler_input(
2852        &self,
2853        transactions: Vec<VerifiedSequencedConsensusTransaction>,
2854    ) -> CommitHandlerInput {
2855        let _scope = monitored_scope("ConsensusCommitHandler::build_commit_handler_input");
2856        let epoch = self.epoch_store.epoch();
2857        let mut commit_handler_input = CommitHandlerInput::default();
2858
2859        for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() {
2860            match transaction.transaction {
2861                SequencedConsensusTransactionKind::External(consensus_transaction) => {
2862                    match consensus_transaction.kind {
2863                        // === User transactions ===
2864                        ConsensusTransactionKind::UserTransactionV2(tx) => {
2865                            // Extract the aliases claim (required) from the claims
2866                            let used_alias_versions = if self
2867                                .epoch_store
2868                                .protocol_config()
2869                                .fix_checkpoint_signature_mapping()
2870                            {
2871                                tx.aliases()
2872                            } else {
2873                                // Convert V1 to V2 format using dummy signature indices
2874                                // which will be ignored with `fix_checkpoint_signature_mapping`
2875                                // disabled.
2876                                tx.aliases_v1().map(|a| {
2877                                    NonEmpty::from_vec(
2878                                        a.into_iter()
2879                                            .enumerate()
2880                                            .map(|(idx, (_, seq))| (idx as u8, seq))
2881                                            .collect(),
2882                                    )
2883                                    .unwrap()
2884                                })
2885                            };
2886                            let inner_tx = tx.into_tx();
2887                            // Safe because transactions are certified by consensus.
2888                            let tx = VerifiedTransaction::new_unchecked(inner_tx);
2889                            // TODO(fastpath): accept position in consensus, after plumbing consensus round, authority index, and transaction index here.
2890                            let transaction =
2891                                VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
2892                            if let Some(used_alias_versions) = used_alias_versions {
2893                                commit_handler_input
2894                                    .user_transactions
2895                                    .push(WithAliases::new(transaction, used_alias_versions));
2896                            } else {
2897                                commit_handler_input.user_transactions.push(
2898                                    VerifiedExecutableTransactionWithAliases::no_aliases(
2899                                        transaction,
2900                                    ),
2901                                );
2902                            }
2903                        }
2904
2905                        // === State machines ===
2906                        ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => {
2907                            commit_handler_input
2908                                .end_of_publish_transactions
2909                                .push(authority_public_key_bytes);
2910                        }
2911                        ConsensusTransactionKind::NewJWKFetched(
2912                            authority_public_key_bytes,
2913                            jwk_id,
2914                            jwk,
2915                        ) => {
2916                            commit_handler_input.new_jwks.push((
2917                                authority_public_key_bytes,
2918                                jwk_id,
2919                                jwk,
2920                            ));
2921                        }
2922                        ConsensusTransactionKind::RandomnessDkgMessage(
2923                            authority_public_key_bytes,
2924                            items,
2925                        ) => {
2926                            commit_handler_input
2927                                .randomness_dkg_messages
2928                                .push((authority_public_key_bytes, items));
2929                        }
2930                        ConsensusTransactionKind::RandomnessDkgConfirmation(
2931                            authority_public_key_bytes,
2932                            items,
2933                        ) => {
2934                            commit_handler_input
2935                                .randomness_dkg_confirmations
2936                                .push((authority_public_key_bytes, items));
2937                        }
2938                        ConsensusTransactionKind::CapabilityNotificationV2(
2939                            authority_capabilities_v2,
2940                        ) => {
2941                            commit_handler_input
2942                                .capability_notifications
2943                                .push(authority_capabilities_v2);
2944                        }
2945                        ConsensusTransactionKind::ExecutionTimeObservation(
2946                            execution_time_observation,
2947                        ) => {
2948                            commit_handler_input
2949                                .execution_time_observations
2950                                .push(execution_time_observation);
2951                        }
2952                        ConsensusTransactionKind::CheckpointSignatureV2(
2953                            checkpoint_signature_message,
2954                        ) => {
2955                            commit_handler_input
2956                                .checkpoint_signature_messages
2957                                .push(*checkpoint_signature_message);
2958                        }
2959
2960                        // Deprecated messages, filtered earlier by filter_consensus_txns()
2961                        // or rejected by SuiTxValidator. Kept for exhaustiveness.
2962                        ConsensusTransactionKind::CheckpointSignature(_)
2963                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2964                        | ConsensusTransactionKind::CapabilityNotification(_)
2965                        | ConsensusTransactionKind::CertifiedTransaction(_)
2966                        | ConsensusTransactionKind::UserTransaction(_) => {
2967                            unreachable!("filtered earlier")
2968                        }
2969                    }
2970                }
2971                // TODO: I think we can delete this, it was only used to inject randomness state update into the tx stream.
2972                SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(),
2973            }
2974        }
2975
2976        commit_handler_input
2977    }
2978
2979    async fn send_end_of_publish_if_needed(&self) {
2980        if self
2981            .epoch_store
2982            .protocol_config()
2983            .timestamp_based_epoch_close()
2984        {
2985            return;
2986        }
2987        if !self.epoch_store.should_send_end_of_publish() {
2988            return;
2989        }
2990
2991        let end_of_publish = ConsensusTransaction::new_end_of_publish(self.epoch_store.name);
2992        if let Err(err) =
2993            self.consensus_adapter
2994                .submit(end_of_publish, None, &self.epoch_store, None, None)
2995        {
2996            warn!(
2997                "Error when sending EndOfPublish message from ConsensusHandler: {:?}",
2998                err
2999            );
3000        } else {
3001            info!(epoch=?self.epoch_store.epoch(), "Sending EndOfPublish message to consensus");
3002        }
3003    }
3004}
3005
3006/// Sends transactions to the execution scheduler in a separate task,
3007/// to avoid blocking consensus handler.
3008pub(crate) type SchedulerMessage = (
3009    Vec<(Schedulable, AssignedVersions)>,
3010    Option<SettlementBatchInfo>,
3011);
3012
3013#[derive(Clone)]
3014pub(crate) struct ExecutionSchedulerSender {
3015    sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3016}
3017
3018impl ExecutionSchedulerSender {
3019    fn start(
3020        settlement_scheduler: SettlementScheduler,
3021        epoch_store: Arc<AuthorityPerEpochStore>,
3022    ) -> Self {
3023        let (sender, recv) = monitored_mpsc::unbounded_channel("execution_scheduler_sender");
3024        spawn_monitored_task!(Self::run(recv, settlement_scheduler, epoch_store));
3025        Self { sender }
3026    }
3027
3028    pub(crate) fn new_for_testing(
3029        sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3030    ) -> Self {
3031        Self { sender }
3032    }
3033
3034    fn send(
3035        &self,
3036        transactions: Vec<(Schedulable, AssignedVersions)>,
3037        settlement: Option<SettlementBatchInfo>,
3038    ) {
3039        let _ = self.sender.send((transactions, settlement));
3040    }
3041
3042    async fn run(
3043        mut recv: monitored_mpsc::UnboundedReceiver<SchedulerMessage>,
3044        settlement_scheduler: SettlementScheduler,
3045        epoch_store: Arc<AuthorityPerEpochStore>,
3046    ) {
3047        while let Some((transactions, settlement)) = recv.recv().await {
3048            let _guard = monitored_scope("ConsensusHandler::enqueue");
3049            let txns = transactions
3050                .into_iter()
3051                .map(|(txn, versions)| (txn, ExecutionEnv::new().with_assigned_versions(versions)))
3052                .collect();
3053            if let Some(settlement) = settlement {
3054                settlement_scheduler.enqueue_v2(txns, settlement, &epoch_store);
3055            } else {
3056                settlement_scheduler.enqueue(txns, &epoch_store);
3057            }
3058        }
3059    }
3060}
3061
3062/// Capacity of the channel from the deserialize worker to the commit handler. Small/bounded: lets
3063/// the worker prepare ~1 commit ahead (pipelining) while applying backpressure when the handler is
3064/// behind, instead of buffering parsed commits unboundedly.
3065const CONSENSUS_HANDLER_DESERIALIZE_CHANNEL_CAPACITY: usize = 2;
3066
3067/// Transactions BCS-deserialized out of a consensus commit, grouped by block. Produced by the
3068/// deserialize worker and consumed by the commit handler, so parsing stays off the handler's
3069/// critical path.
3070type ParsedConsensusTransactions = Vec<(BlockRef, Vec<ParsedTransaction>)>;
3071
3072/// Manages the lifetime of tasks handling the commits and transactions output by consensus.
3073pub(crate) struct MysticetiConsensusHandler {
3074    tasks: JoinSet<()>,
3075}
3076
3077impl MysticetiConsensusHandler {
3078    pub(crate) fn new(
3079        last_processed_commit_at_startup: CommitIndex,
3080        mut consensus_handler: ConsensusHandler<CheckpointService>,
3081        mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
3082        commit_consumer_monitor: Arc<CommitConsumerMonitor>,
3083    ) -> Self {
3084        debug!(
3085            last_processed_commit_at_startup,
3086            "Starting consensus replay"
3087        );
3088        let mut tasks = JoinSet::new();
3089
3090        // Stage 1 — deserialize worker: BCS-parses each commit's transactions off the handler's
3091        // critical path, so parsing overlaps the handler processing the previous commit. The
3092        // channel is bounded (small) so the worker prepares ~1 commit ahead but applies
3093        // backpressure (rather than buffering parsed commits unboundedly) when the handler is the
3094        // bottleneck. Single-threaded; ordering is preserved (one worker, FIFO channel, one handler).
3095        let (parsed_sender, mut parsed_receiver) = monitored_mpsc::channel(
3096            "consensus_deserialized_commits",
3097            CONSENSUS_HANDLER_DESERIALIZE_CHANNEL_CAPACITY,
3098        );
3099        tasks.spawn(monitored_future!(async move {
3100            while let Some(consensus_commit) = commit_receiver.recv().await {
3101                let transactions: ParsedConsensusTransactions = {
3102                    let _scope = monitored_scope("ConsensusCommitHandler::deserialize_worker");
3103                    consensus_commit.transactions()
3104                };
3105                // The send is intentionally outside the scope above: on the bounded channel it
3106                // blocks when the handler is the bottleneck, and that idle-wait would otherwise
3107                // inflate the deserialize_worker scope (making it read as CPU work, not waiting).
3108                if parsed_sender
3109                    .send((consensus_commit, transactions))
3110                    .await
3111                    .is_err()
3112                {
3113                    break;
3114                }
3115            }
3116        }));
3117
3118        // Stage 2 — commit handler: processes pre-parsed commits in order.
3119        tasks.spawn(monitored_future!(async move {
3120            // TODO: pause when execution is overloaded, so consensus can detect the backpressure.
3121            while let Some((consensus_commit, transactions)) = parsed_receiver.recv().await {
3122                let commit_index = consensus_commit.commit_ref.index;
3123                if commit_index <= last_processed_commit_at_startup {
3124                    consensus_handler.handle_prior_consensus_commit(consensus_commit);
3125                } else {
3126                    consensus_handler
3127                        .handle_consensus_commit(consensus_commit, transactions)
3128                        .await;
3129                }
3130                commit_consumer_monitor.set_highest_handled_commit(commit_index);
3131            }
3132        }));
3133        Self { tasks }
3134    }
3135
3136    pub(crate) async fn abort(&mut self) {
3137        self.tasks.shutdown().await;
3138    }
3139}
3140
3141fn authenticator_state_update_transaction(
3142    epoch_store: &AuthorityPerEpochStore,
3143    round: u64,
3144    mut new_active_jwks: Vec<ActiveJwk>,
3145) -> VerifiedExecutableTransaction {
3146    let epoch = epoch_store.epoch();
3147    new_active_jwks.sort();
3148
3149    info!("creating authenticator state update transaction");
3150    assert!(epoch_store.authenticator_state_enabled());
3151    let transaction = VerifiedTransaction::new_authenticator_state_update(
3152        epoch,
3153        round,
3154        new_active_jwks,
3155        epoch_store
3156            .epoch_start_config()
3157            .authenticator_obj_initial_shared_version()
3158            .expect("authenticator state obj must exist"),
3159    );
3160    VerifiedExecutableTransaction::new_system(transaction, epoch)
3161}
3162
3163/// The owned (non-immutable `ImmOrOwnedMoveObject`) object refs that a
3164/// `UserTransactionV2` must lock post-consensus. Immutable objects are excluded as
3165/// they can be used concurrently. Returns `None` if the transaction's input objects
3166/// are invalid. Used both to prefetch existing locks for the whole commit and, per
3167/// transaction, to perform conflict detection — keeping the two in sync.
3168fn owned_object_refs_to_lock(
3169    tx_with_claims: &PlainTransactionWithClaims,
3170) -> Option<Vec<ObjectRef>> {
3171    let immutable_object_ids: HashSet<ObjectID> =
3172        tx_with_claims.get_immutable_objects().into_iter().collect();
3173    let input_objects = tx_with_claims
3174        .tx()
3175        .transaction_data()
3176        .input_objects()
3177        .ok()?;
3178    Some(
3179        input_objects
3180            .iter()
3181            .filter_map(|obj| match obj {
3182                InputObjectKind::ImmOrOwnedMoveObject(obj_ref)
3183                    if !immutable_object_ids.contains(&obj_ref.0) =>
3184                {
3185                    Some(*obj_ref)
3186                }
3187                _ => None,
3188            })
3189            .collect(),
3190    )
3191}
3192
3193pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
3194    match &transaction.kind {
3195        // Deprecated and rejected by SuiTxValidator; never classified in practice.
3196        ConsensusTransactionKind::CertifiedTransaction(_) => "_deprecated_certificate",
3197        ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
3198        ConsensusTransactionKind::CheckpointSignatureV2(_) => "checkpoint_signature",
3199        ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
3200        ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
3201        ConsensusTransactionKind::CapabilityNotificationV2(_) => "capability_notification_v2",
3202        ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
3203        ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
3204        ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
3205        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
3206        ConsensusTransactionKind::UserTransaction(_) => "_deprecated_user_transaction",
3207        ConsensusTransactionKind::UserTransactionV2(tx) => {
3208            if tx.tx().is_consensus_tx() {
3209                "shared_user_transaction_v2"
3210            } else {
3211                "owned_user_transaction_v2"
3212            }
3213        }
3214        ConsensusTransactionKind::ExecutionTimeObservation(_) => "execution_time_observation",
3215    }
3216}
3217
3218#[derive(Debug, Clone, Serialize, Deserialize)]
3219pub struct SequencedConsensusTransaction {
3220    pub certificate_author_index: AuthorityIndex,
3221    pub certificate_author: AuthorityName,
3222    pub consensus_index: ExecutionIndices,
3223    pub transaction: SequencedConsensusTransactionKind,
3224}
3225
3226#[derive(Debug, Clone)]
3227#[allow(clippy::large_enum_variant)]
3228pub enum SequencedConsensusTransactionKind {
3229    External(ConsensusTransaction),
3230    System(VerifiedExecutableTransaction),
3231}
3232
3233impl Serialize for SequencedConsensusTransactionKind {
3234    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
3235        let serializable = SerializableSequencedConsensusTransactionKind::from(self);
3236        serializable.serialize(serializer)
3237    }
3238}
3239
3240impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
3241    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
3242        let serializable =
3243            SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
3244        Ok(serializable.into())
3245    }
3246}
3247
3248// We can't serialize SequencedConsensusTransactionKind directly because it contains a
3249// VerifiedExecutableTransaction, which is not serializable (by design). This wrapper allows us to
3250// convert to a serializable format easily.
3251#[derive(Debug, Clone, Serialize, Deserialize)]
3252#[allow(clippy::large_enum_variant)]
3253enum SerializableSequencedConsensusTransactionKind {
3254    External(ConsensusTransaction),
3255    System(TrustedExecutableTransaction),
3256}
3257
3258impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
3259    fn from(kind: &SequencedConsensusTransactionKind) -> Self {
3260        match kind {
3261            SequencedConsensusTransactionKind::External(ext) => {
3262                SerializableSequencedConsensusTransactionKind::External(ext.clone())
3263            }
3264            SequencedConsensusTransactionKind::System(txn) => {
3265                SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
3266            }
3267        }
3268    }
3269}
3270
3271impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
3272    fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
3273        match kind {
3274            SerializableSequencedConsensusTransactionKind::External(ext) => {
3275                SequencedConsensusTransactionKind::External(ext)
3276            }
3277            SerializableSequencedConsensusTransactionKind::System(txn) => {
3278                SequencedConsensusTransactionKind::System(txn.into())
3279            }
3280        }
3281    }
3282}
3283
3284#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
3285pub enum SequencedConsensusTransactionKey {
3286    External(ConsensusTransactionKey),
3287    System(TransactionDigest),
3288}
3289
3290impl SequencedConsensusTransactionKey {
3291    pub fn user_transaction_digest(&self) -> Option<TransactionDigest> {
3292        match self {
3293            SequencedConsensusTransactionKey::External(key) => match key {
3294                ConsensusTransactionKey::Certificate(digest) => Some(*digest),
3295                _ => None,
3296            },
3297            SequencedConsensusTransactionKey::System(_) => None,
3298        }
3299    }
3300}
3301
3302impl SequencedConsensusTransactionKind {
3303    pub fn key(&self) -> SequencedConsensusTransactionKey {
3304        match self {
3305            SequencedConsensusTransactionKind::External(ext) => {
3306                SequencedConsensusTransactionKey::External(ext.key())
3307            }
3308            SequencedConsensusTransactionKind::System(txn) => {
3309                SequencedConsensusTransactionKey::System(*txn.digest())
3310            }
3311        }
3312    }
3313
3314    pub fn get_tracking_id(&self) -> u64 {
3315        match self {
3316            SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
3317            SequencedConsensusTransactionKind::System(_txn) => 0,
3318        }
3319    }
3320
3321    pub fn is_executable_transaction(&self) -> bool {
3322        match self {
3323            SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(),
3324            SequencedConsensusTransactionKind::System(_) => true,
3325        }
3326    }
3327
3328    pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
3329        match self {
3330            SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
3331                ConsensusTransactionKind::UserTransactionV2(txn) => Some(*txn.tx().digest()),
3332                _ => None,
3333            },
3334            SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
3335        }
3336    }
3337
3338    pub fn is_end_of_publish(&self) -> bool {
3339        match self {
3340            SequencedConsensusTransactionKind::External(ext) => {
3341                matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
3342            }
3343            SequencedConsensusTransactionKind::System(_) => false,
3344        }
3345    }
3346}
3347
3348impl SequencedConsensusTransaction {
3349    pub fn sender_authority(&self) -> AuthorityName {
3350        self.certificate_author
3351    }
3352
3353    pub fn key(&self) -> SequencedConsensusTransactionKey {
3354        self.transaction.key()
3355    }
3356
3357    pub fn is_end_of_publish(&self) -> bool {
3358        if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
3359            matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
3360        } else {
3361            false
3362        }
3363    }
3364
3365    pub fn try_take_execution_time_observation(&mut self) -> Option<ExecutionTimeObservation> {
3366        if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
3367            kind: ConsensusTransactionKind::ExecutionTimeObservation(observation),
3368            ..
3369        }) = &mut self.transaction
3370        {
3371            Some(std::mem::take(observation))
3372        } else {
3373            None
3374        }
3375    }
3376
3377    pub fn is_system(&self) -> bool {
3378        matches!(
3379            self.transaction,
3380            SequencedConsensusTransactionKind::System(_)
3381        )
3382    }
3383
3384    pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
3385        if !randomness_state_enabled {
3386            // If randomness is disabled, these should be processed same as a tx without randomness,
3387            // which will eventually fail when the randomness state object is not found.
3388            return false;
3389        }
3390        match &self.transaction {
3391            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3392                kind: ConsensusTransactionKind::UserTransactionV2(txn),
3393                ..
3394            }) => txn.tx().transaction_data().uses_randomness(),
3395            _ => false,
3396        }
3397    }
3398
3399    pub fn as_consensus_txn(&self) -> Option<&SenderSignedData> {
3400        match &self.transaction {
3401            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3402                kind: ConsensusTransactionKind::UserTransactionV2(txn),
3403                ..
3404            }) if txn.tx().is_consensus_tx() => Some(txn.tx().data()),
3405            SequencedConsensusTransactionKind::System(txn) if txn.is_consensus_tx() => {
3406                Some(txn.data())
3407            }
3408            _ => None,
3409        }
3410    }
3411}
3412
3413#[derive(Debug, Clone, Serialize, Deserialize)]
3414pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
3415
3416#[cfg(test)]
3417impl VerifiedSequencedConsensusTransaction {
3418    pub fn new_test(transaction: ConsensusTransaction) -> Self {
3419        Self(SequencedConsensusTransaction::new_test(transaction))
3420    }
3421}
3422
3423impl SequencedConsensusTransaction {
3424    pub fn new_test(transaction: ConsensusTransaction) -> Self {
3425        Self {
3426            certificate_author_index: 0,
3427            certificate_author: AuthorityName::ZERO,
3428            consensus_index: Default::default(),
3429            transaction: SequencedConsensusTransactionKind::External(transaction),
3430        }
3431    }
3432}
3433
3434#[derive(Serialize, Deserialize)]
3435pub(crate) struct CommitIntervalObserver {
3436    ring_buffer: VecDeque<u64>,
3437}
3438
3439impl CommitIntervalObserver {
3440    pub fn new(window_size: u32) -> Self {
3441        Self {
3442            ring_buffer: VecDeque::with_capacity(window_size as usize),
3443        }
3444    }
3445
3446    pub fn observe_commit_time(&mut self, consensus_commit: &impl ConsensusCommitAPI) {
3447        let commit_time = consensus_commit.commit_timestamp_ms();
3448        if self.ring_buffer.len() == self.ring_buffer.capacity() {
3449            self.ring_buffer.pop_front();
3450        }
3451        self.ring_buffer.push_back(commit_time);
3452    }
3453
3454    pub fn commit_interval_estimate(&self) -> Option<Duration> {
3455        if self.ring_buffer.len() <= 1 {
3456            None
3457        } else {
3458            let first = self.ring_buffer.front().unwrap();
3459            let last = self.ring_buffer.back().unwrap();
3460            let duration = last.saturating_sub(*first);
3461            let num_commits = self.ring_buffer.len() as u64;
3462            Some(Duration::from_millis(duration.div_ceil(num_commits)))
3463        }
3464    }
3465}
3466
3467#[cfg(test)]
3468mod tests {
3469    use std::collections::HashSet;
3470
3471    use consensus_core::{
3472        BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock,
3473    };
3474    use futures::pin_mut;
3475    use prometheus::Registry;
3476    use sui_protocol_config::{ConsensusTransactionOrdering, ProtocolConfig};
3477    use sui_types::{
3478        base_types::ExecutionDigests,
3479        base_types::{AuthorityName, FullObjectRef, ObjectID, SuiAddress, random_object_ref},
3480        committee::Committee,
3481        crypto::deterministic_random_account_key,
3482        gas::GasCostSummary,
3483        message_envelope::Message,
3484        messages_checkpoint::{
3485            CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
3486            SignedCheckpointSummary,
3487        },
3488        messages_consensus::ConsensusTransaction,
3489        object::Object,
3490        transaction::{
3491            CertifiedTransaction, TransactionData, TransactionDataAPI, VerifiedCertificate,
3492        },
3493    };
3494
3495    use super::*;
3496    use crate::{
3497        authority::{
3498            authority_per_epoch_store::ConsensusStatsAPI,
3499            consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
3500            test_authority_builder::TestAuthorityBuilder,
3501        },
3502        checkpoints::CheckpointServiceNoop,
3503        consensus_adapter::consensus_tests::test_user_transaction,
3504        consensus_test_utils::{
3505            TestConsensusCommit, make_consensus_adapter_for_test,
3506            setup_consensus_handler_for_testing,
3507        },
3508        post_consensus_tx_reorder::PostConsensusTxReorder,
3509    };
3510
3511    #[tokio::test(flavor = "current_thread", start_paused = true)]
3512    async fn test_consensus_commit_handler() {
3513        telemetry_subscribers::init_for_testing();
3514
3515        // GIVEN
3516        // 1 account keypair
3517        let (sender, keypair) = deterministic_random_account_key();
3518        // 12 gas objects.
3519        let gas_objects: Vec<Object> = (0..12)
3520            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3521            .collect();
3522        // 4 owned objects.
3523        let owned_objects: Vec<Object> = (0..4)
3524            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3525            .collect();
3526        // 6 shared objects.
3527        let shared_objects: Vec<Object> = (0..6)
3528            .map(|_| Object::shared_for_testing())
3529            .collect::<Vec<_>>();
3530        let mut all_objects = gas_objects.clone();
3531        all_objects.extend(owned_objects.clone());
3532        all_objects.extend(shared_objects.clone());
3533
3534        let network_config =
3535            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3536                .with_objects(all_objects.clone())
3537                .build();
3538
3539        let state = TestAuthorityBuilder::new()
3540            .with_network_config(&network_config, 0)
3541            .build()
3542            .await;
3543
3544        let epoch_store = state.epoch_store_for_testing().clone();
3545        let new_epoch_start_state = epoch_store.epoch_start_state();
3546        let consensus_committee = new_epoch_start_state.get_consensus_committee();
3547
3548        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3549
3550        let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
3551
3552        let backpressure_manager = BackpressureManager::new_for_tests();
3553        let consensus_adapter =
3554            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3555        let settlement_scheduler = SettlementScheduler::new(
3556            state.execution_scheduler().as_ref().clone(),
3557            state.get_transaction_cache_reader().clone(),
3558            state.metrics.clone(),
3559        );
3560        let mut consensus_handler = ConsensusHandler::new(
3561            epoch_store,
3562            Arc::new(CheckpointServiceNoop {}),
3563            settlement_scheduler,
3564            consensus_adapter,
3565            state.get_object_cache_reader().clone(),
3566            consensus_committee.clone(),
3567            metrics,
3568            Arc::new(throughput_calculator),
3569            backpressure_manager.subscribe(),
3570            state.traffic_controller.clone(),
3571            None,
3572            state.consensus_gasless_counter.clone(),
3573        );
3574
3575        // AND create test user transactions alternating between owned and shared input.
3576        let mut user_transactions = vec![];
3577        for (i, gas_object) in gas_objects[0..8].iter().enumerate() {
3578            let input_object = if i % 2 == 0 {
3579                owned_objects.get(i / 2).unwrap().clone()
3580            } else {
3581                shared_objects.get(i / 2).unwrap().clone()
3582            };
3583            let transaction = test_user_transaction(
3584                &state,
3585                sender,
3586                &keypair,
3587                gas_object.clone(),
3588                vec![input_object],
3589            )
3590            .await;
3591            user_transactions.push(transaction);
3592        }
3593
3594        // AND create 4 more user transactions with remaining gas objects and 2 shared objects.
3595        // Having more txns on the same shared object may get deferred.
3596        for (i, gas_object) in gas_objects[8..12].iter().enumerate() {
3597            let shared_object = if i < 2 {
3598                shared_objects[4].clone()
3599            } else {
3600                shared_objects[5].clone()
3601            };
3602            let transaction = test_user_transaction(
3603                &state,
3604                sender,
3605                &keypair,
3606                gas_object.clone(),
3607                vec![shared_object],
3608            )
3609            .await;
3610            user_transactions.push(transaction);
3611        }
3612
3613        // AND create block for each user transaction
3614        let mut blocks = Vec::new();
3615        for (i, consensus_transaction) in user_transactions
3616            .iter()
3617            .cloned()
3618            .map(|t| ConsensusTransaction::new_user_transaction_v2_message(&state.name, t.into()))
3619            .enumerate()
3620        {
3621            let transaction_bytes = bcs::to_bytes(&consensus_transaction).unwrap();
3622            let block = VerifiedBlock::new_for_test(
3623                TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
3624                    .set_transactions(vec![Transaction::new(transaction_bytes)])
3625                    .build(),
3626            );
3627
3628            blocks.push(block);
3629        }
3630
3631        // AND create the consensus commit
3632        let leader_block = blocks[0].clone();
3633        let committed_sub_dag = CommittedSubDag::new(
3634            leader_block.reference(),
3635            blocks.clone(),
3636            leader_block.timestamp_ms(),
3637            CommitRef::new(10, CommitDigest::MIN),
3638        );
3639
3640        // Test that the consensus handler respects backpressure.
3641        backpressure_manager.set_backpressure(true);
3642        // Default watermarks are 0,0 which will suppress the backpressure.
3643        backpressure_manager.update_highest_certified_checkpoint(1);
3644
3645        // AND process the consensus commit once
3646        {
3647            let waiter =
3648                consensus_handler.handle_consensus_commit_for_test(committed_sub_dag.clone());
3649            pin_mut!(waiter);
3650
3651            // waiter should not complete within 5 seconds
3652            tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
3653                .await
3654                .unwrap_err();
3655
3656            // lift backpressure
3657            backpressure_manager.set_backpressure(false);
3658
3659            // waiter completes now.
3660            tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
3661                .await
3662                .unwrap();
3663        }
3664
3665        // THEN check the consensus stats
3666        let num_blocks = blocks.len();
3667        let num_transactions = user_transactions.len();
3668        let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
3669        assert_eq!(
3670            last_consensus_stats_1.index.transaction_index,
3671            num_transactions as u64
3672        );
3673        assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
3674        assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
3675        assert_eq!(
3676            last_consensus_stats_1.stats.get_num_messages(0),
3677            num_blocks as u64
3678        );
3679        assert_eq!(
3680            last_consensus_stats_1.stats.get_num_user_transactions(0),
3681            num_transactions as u64
3682        );
3683
3684        // THEN check for execution status of user transactions.
3685        for (i, t) in user_transactions.iter().enumerate() {
3686            let digest = t.tx().digest();
3687            if tokio::time::timeout(
3688                std::time::Duration::from_secs(10),
3689                state.notify_read_effects_for_testing("", *digest),
3690            )
3691            .await
3692            .is_ok()
3693            {
3694                // Effects exist as expected.
3695            } else {
3696                panic!("User transaction {} {} did not execute", i, digest);
3697            }
3698        }
3699
3700        // THEN check for no inflight or suspended transactions.
3701        state.execution_scheduler().check_empty_for_testing().await;
3702    }
3703
3704    #[tokio::test(flavor = "current_thread")]
3705    async fn test_dropped_owned_object_lock_conflict_is_marked_processed() {
3706        telemetry_subscribers::init_for_testing();
3707
3708        let (sender, keypair) = deterministic_random_account_key();
3709        let gas_objects: Vec<Object> = (0..2)
3710            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3711            .collect();
3712        let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), sender);
3713        let mut all_objects = gas_objects.clone();
3714        all_objects.push(owned_object.clone());
3715
3716        let state = TestAuthorityBuilder::new()
3717            .with_starting_objects(&all_objects)
3718            .skip_rpc_index_init()
3719            .skip_genesis_owner_index()
3720            .build()
3721            .await;
3722        let epoch_store = state.epoch_store_for_testing();
3723        let owned_object_ref = state
3724            .get_object(&owned_object.id())
3725            .unwrap()
3726            .compute_object_reference();
3727
3728        let winner = test_user_transaction(
3729            &state,
3730            sender,
3731            &keypair,
3732            gas_objects[0].clone(),
3733            vec![owned_object.clone()],
3734        )
3735        .await;
3736        let loser = test_user_transaction(
3737            &state,
3738            sender,
3739            &keypair,
3740            gas_objects[1].clone(),
3741            vec![owned_object.clone()],
3742        )
3743        .await;
3744
3745        let winner_digest = *winner.tx().digest();
3746        let loser_digest = *loser.tx().digest();
3747        assert_ne!(winner_digest, loser_digest);
3748
3749        let winner_consensus_tx =
3750            ConsensusTransaction::new_user_transaction_v2_message(&state.name, winner.into());
3751        let loser_consensus_tx =
3752            ConsensusTransaction::new_user_transaction_v2_message(&state.name, loser.into());
3753        let winner_key = SequencedConsensusTransactionKey::External(winner_consensus_tx.key());
3754        let loser_key = SequencedConsensusTransactionKey::External(loser_consensus_tx.key());
3755
3756        let round = 100;
3757        let commit = TestConsensusCommit::new(
3758            vec![winner_consensus_tx, loser_consensus_tx],
3759            round as u64,
3760            1_000,
3761            10,
3762        );
3763        let mut setup = setup_consensus_handler_for_testing(&state).await;
3764        setup
3765            .consensus_handler
3766            .handle_consensus_commit_for_test(commit)
3767            .await;
3768        assert_eq!(
3769            setup
3770                .consensus_handler
3771                .metrics
3772                .consensus_handler_dropped_transactions
3773                .get(),
3774            1
3775        );
3776
3777        let block = BlockRef {
3778            author: consensus_config::AuthorityIndex::ZERO,
3779            round,
3780            digest: Default::default(),
3781        };
3782        assert!(matches!(
3783            epoch_store
3784                .consensus_tx_status_cache
3785                .notify_read_transaction_status(ConsensusPosition {
3786                    epoch: epoch_store.epoch(),
3787                    block,
3788                    index: 0,
3789                })
3790                .await,
3791            NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
3792        ));
3793        assert!(matches!(
3794            epoch_store
3795                .consensus_tx_status_cache
3796                .notify_read_transaction_status(ConsensusPosition {
3797                    epoch: epoch_store.epoch(),
3798                    block,
3799                    index: 1,
3800                })
3801                .await,
3802            NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Dropped)
3803        ));
3804
3805        let locks = epoch_store
3806            .get_owned_object_locks_map(&[owned_object_ref])
3807            .unwrap();
3808        assert_eq!(locks.get(&owned_object_ref), Some(&winner_digest));
3809        assert!(
3810            epoch_store
3811                .is_consensus_message_processed(&winner_key)
3812                .unwrap()
3813        );
3814        assert!(
3815            epoch_store
3816                .is_consensus_message_processed(&loser_key)
3817                .unwrap()
3818        );
3819    }
3820
3821    fn to_short_strings(txs: Vec<VerifiedExecutableTransactionWithAliases>) -> Vec<String> {
3822        txs.into_iter()
3823            .map(|tx| format!("transaction({})", tx.tx().transaction_data().gas_price()))
3824            .collect()
3825    }
3826
3827    #[test]
3828    fn test_order_by_gas_price() {
3829        let mut v = vec![user_txn(42), user_txn(100)];
3830        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3831        assert_eq!(
3832            to_short_strings(v),
3833            vec![
3834                "transaction(100)".to_string(),
3835                "transaction(42)".to_string(),
3836            ]
3837        );
3838
3839        let mut v = vec![
3840            user_txn(1200),
3841            user_txn(12),
3842            user_txn(1000),
3843            user_txn(42),
3844            user_txn(100),
3845            user_txn(1000),
3846        ];
3847        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3848        assert_eq!(
3849            to_short_strings(v),
3850            vec![
3851                "transaction(1200)".to_string(),
3852                "transaction(1000)".to_string(),
3853                "transaction(1000)".to_string(),
3854                "transaction(100)".to_string(),
3855                "transaction(42)".to_string(),
3856                "transaction(12)".to_string(),
3857            ]
3858        );
3859    }
3860
3861    #[tokio::test(flavor = "current_thread")]
3862    async fn test_checkpoint_signature_dedup() {
3863        telemetry_subscribers::init_for_testing();
3864
3865        let network_config =
3866            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3867        let state = TestAuthorityBuilder::new()
3868            .with_network_config(&network_config, 0)
3869            .build()
3870            .await;
3871
3872        let epoch_store = state.epoch_store_for_testing().clone();
3873        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3874
3875        let make_signed = || {
3876            let epoch = epoch_store.epoch();
3877            let contents =
3878                CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3879            let summary = CheckpointSummary::new(
3880                &ProtocolConfig::get_for_max_version_UNSAFE(),
3881                epoch,
3882                42, // sequence number
3883                10, // network_total_transactions
3884                &contents,
3885                None, // previous_digest
3886                GasCostSummary::default(),
3887                None,       // end_of_epoch_data
3888                0,          // timestamp
3889                Vec::new(), // randomness_rounds
3890                Vec::new(), // checkpoint_artifact_digests
3891            );
3892            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name)
3893        };
3894
3895        // Prepare V2 pair: same (authority, seq), different digests => different keys
3896        let v2_s1 = make_signed();
3897        let v2_s1_clone = v2_s1.clone();
3898        let v2_digest_a = v2_s1.data().digest();
3899        let v2_a =
3900            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3901                summary: v2_s1,
3902            });
3903
3904        let v2_s2 = make_signed();
3905        let v2_digest_b = v2_s2.data().digest();
3906        let v2_b =
3907            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3908                summary: v2_s2,
3909            });
3910
3911        assert_ne!(v2_digest_a, v2_digest_b);
3912
3913        // Create an exact duplicate with same digest to exercise valid dedup
3914        assert_eq!(v2_s1_clone.data().digest(), v2_digest_a);
3915        let v2_dup =
3916            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3917                summary: v2_s1_clone,
3918            });
3919
3920        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3921        let block = VerifiedBlock::new_for_test(
3922            TestBlock::new(100, 0)
3923                .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)])
3924                .build(),
3925        );
3926        let commit = CommittedSubDag::new(
3927            block.reference(),
3928            vec![block.clone()],
3929            block.timestamp_ms(),
3930            CommitRef::new(10, CommitDigest::MIN),
3931        );
3932
3933        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3934        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3935        let backpressure = BackpressureManager::new_for_tests();
3936        let consensus_adapter =
3937            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3938        let settlement_scheduler = SettlementScheduler::new(
3939            state.execution_scheduler().as_ref().clone(),
3940            state.get_transaction_cache_reader().clone(),
3941            state.metrics.clone(),
3942        );
3943        let mut handler = ConsensusHandler::new(
3944            epoch_store.clone(),
3945            Arc::new(CheckpointServiceNoop {}),
3946            settlement_scheduler,
3947            consensus_adapter,
3948            state.get_object_cache_reader().clone(),
3949            consensus_committee.clone(),
3950            metrics,
3951            Arc::new(throughput),
3952            backpressure.subscribe(),
3953            state.traffic_controller.clone(),
3954            None,
3955            state.consensus_gasless_counter.clone(),
3956        );
3957
3958        handler.handle_consensus_commit_for_test(commit).await;
3959
3960        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3961        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3962
3963        // V2 distinct digests: both must be processed. If these were collapsed to one CheckpointSeq num, only one would process.
3964        let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a));
3965        let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b));
3966        assert!(
3967            epoch_store
3968                .is_consensus_message_processed(&v2_key_a)
3969                .unwrap()
3970        );
3971        assert!(
3972            epoch_store
3973                .is_consensus_message_processed(&v2_key_b)
3974                .unwrap()
3975        );
3976    }
3977
3978    #[tokio::test(flavor = "current_thread")]
3979    async fn test_verify_consensus_transaction_filters_mismatched_authorities() {
3980        telemetry_subscribers::init_for_testing();
3981
3982        let network_config =
3983            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3984        let state = TestAuthorityBuilder::new()
3985            .with_network_config(&network_config, 0)
3986            .build()
3987            .await;
3988
3989        let epoch_store = state.epoch_store_for_testing().clone();
3990        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3991
3992        // Create a different authority than our test authority
3993        use fastcrypto::traits::KeyPair;
3994        let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair();
3995        let wrong_authority: AuthorityName = wrong_keypair.public().into();
3996
3997        // Create EndOfPublish transaction with mismatched authority
3998        let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority);
3999
4000        // Create valid EndOfPublish transaction with correct authority
4001        let valid_eop = ConsensusTransaction::new_end_of_publish(state.name);
4002
4003        // Create CheckpointSignature with mismatched authority
4004        let epoch = epoch_store.epoch();
4005        let contents =
4006            CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
4007        let summary = CheckpointSummary::new(
4008            &ProtocolConfig::get_for_max_version_UNSAFE(),
4009            epoch,
4010            42, // sequence number
4011            10, // network_total_transactions
4012            &contents,
4013            None, // previous_digest
4014            GasCostSummary::default(),
4015            None,       // end_of_epoch_data
4016            0,          // timestamp
4017            Vec::new(), // randomness_rounds
4018            Vec::new(), // checkpoint commitments
4019        );
4020
4021        // Create a signed checkpoint with the wrong authority
4022        let mismatched_checkpoint_signed =
4023            SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority);
4024        let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest();
4025        let mismatched_checkpoint =
4026            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4027                summary: mismatched_checkpoint_signed,
4028            });
4029
4030        // Create a valid checkpoint signature with correct authority
4031        let valid_checkpoint_signed =
4032            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name);
4033        let valid_checkpoint_digest = valid_checkpoint_signed.data().digest();
4034        let valid_checkpoint =
4035            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4036                summary: valid_checkpoint_signed,
4037            });
4038
4039        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
4040
4041        // Create a block with both valid and invalid transactions
4042        let block = VerifiedBlock::new_for_test(
4043            TestBlock::new(100, 0)
4044                .set_transactions(vec![
4045                    to_tx(&mismatched_eop),
4046                    to_tx(&valid_eop),
4047                    to_tx(&mismatched_checkpoint),
4048                    to_tx(&valid_checkpoint),
4049                ])
4050                .build(),
4051        );
4052        let commit = CommittedSubDag::new(
4053            block.reference(),
4054            vec![block.clone()],
4055            block.timestamp_ms(),
4056            CommitRef::new(10, CommitDigest::MIN),
4057        );
4058
4059        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
4060        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
4061        let backpressure = BackpressureManager::new_for_tests();
4062        let consensus_adapter =
4063            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
4064        let settlement_scheduler = SettlementScheduler::new(
4065            state.execution_scheduler().as_ref().clone(),
4066            state.get_transaction_cache_reader().clone(),
4067            state.metrics.clone(),
4068        );
4069        let mut handler = ConsensusHandler::new(
4070            epoch_store.clone(),
4071            Arc::new(CheckpointServiceNoop {}),
4072            settlement_scheduler,
4073            consensus_adapter,
4074            state.get_object_cache_reader().clone(),
4075            consensus_committee.clone(),
4076            metrics,
4077            Arc::new(throughput),
4078            backpressure.subscribe(),
4079            state.traffic_controller.clone(),
4080            None,
4081            state.consensus_gasless_counter.clone(),
4082        );
4083
4084        handler.handle_consensus_commit_for_test(commit).await;
4085
4086        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
4087        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
4088
4089        // Check that valid transactions were processed
4090        let valid_eop_key = SK::External(CK::EndOfPublish(state.name));
4091        assert!(
4092            epoch_store
4093                .is_consensus_message_processed(&valid_eop_key)
4094                .unwrap(),
4095            "Valid EndOfPublish should have been processed"
4096        );
4097
4098        let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4099            state.name,
4100            42,
4101            valid_checkpoint_digest,
4102        ));
4103        assert!(
4104            epoch_store
4105                .is_consensus_message_processed(&valid_checkpoint_key)
4106                .unwrap(),
4107            "Valid CheckpointSignature should have been processed"
4108        );
4109
4110        // Check that mismatched authority transactions were NOT processed (filtered out by verify_consensus_transaction)
4111        let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority));
4112        assert!(
4113            !epoch_store
4114                .is_consensus_message_processed(&mismatched_eop_key)
4115                .unwrap(),
4116            "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)"
4117        );
4118
4119        let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4120            wrong_authority,
4121            42,
4122            mismatched_checkpoint_digest,
4123        ));
4124        assert!(
4125            !epoch_store
4126                .is_consensus_message_processed(&mismatched_checkpoint_key)
4127                .unwrap(),
4128            "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)"
4129        );
4130    }
4131
4132    fn user_txn(gas_price: u64) -> VerifiedExecutableTransactionWithAliases {
4133        let (committee, keypairs) = Committee::new_simple_test_committee();
4134        let (sender, sender_keypair) = deterministic_random_account_key();
4135        let tx = sui_types::transaction::Transaction::from_data_and_signer(
4136            TransactionData::new_transfer(
4137                SuiAddress::default(),
4138                FullObjectRef::from_fastpath_ref(random_object_ref()),
4139                sender,
4140                random_object_ref(),
4141                1000 * gas_price,
4142                gas_price,
4143            ),
4144            vec![&sender_keypair],
4145        );
4146        let tx = VerifiedExecutableTransaction::new_from_certificate(
4147            VerifiedCertificate::new_unchecked(
4148                CertifiedTransaction::new_from_keypairs_for_testing(
4149                    tx.into_data(),
4150                    &keypairs,
4151                    &committee,
4152                ),
4153            ),
4154        );
4155        VerifiedExecutableTransactionWithAliases::no_aliases(tx)
4156    }
4157
4158    mod checkpoint_queue_tests {
4159        use super::*;
4160        use consensus_core::CommitRef;
4161        use sui_types::digests::Digest;
4162
4163        fn make_chunk(tx_count: usize, height: u64) -> Chunk {
4164            Chunk {
4165                schedulables: (0..tx_count)
4166                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4167                    .collect(),
4168                settlement: None,
4169                height,
4170            }
4171        }
4172
4173        fn make_commit_ref(index: u32) -> CommitRef {
4174            CommitRef {
4175                index,
4176                digest: CommitDigest::MIN,
4177            }
4178        }
4179
4180        fn default_versions() -> HashMap<TransactionKey, AssignedVersions> {
4181            HashMap::new()
4182        }
4183
4184        #[test]
4185        fn test_flush_all_checkpoint_roots() {
4186            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4187            let versions = default_versions();
4188
4189            queue.push_chunk(
4190                make_chunk(5, 1),
4191                &versions,
4192                1000,
4193                make_commit_ref(1),
4194                Digest::default(),
4195            );
4196            queue.push_chunk(
4197                make_chunk(3, 2),
4198                &versions,
4199                1000,
4200                make_commit_ref(1),
4201                Digest::default(),
4202            );
4203
4204            let pending = queue.flush(1000, true);
4205
4206            assert!(pending.is_some());
4207            assert!(queue.pending_roots.is_empty());
4208        }
4209
4210        #[test]
4211        fn test_flush_respects_min_checkpoint_interval() {
4212            let min_interval = 200;
4213            let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, 1000, min_interval);
4214            let versions = default_versions();
4215
4216            queue.push_chunk(
4217                make_chunk(5, 1),
4218                &versions,
4219                1000,
4220                make_commit_ref(1),
4221                Digest::default(),
4222            );
4223
4224            let pending = queue.flush(1000 + min_interval - 1, false);
4225            assert!(pending.is_none());
4226            assert_eq!(queue.pending_roots.len(), 1);
4227
4228            let pending = queue.flush(1000 + min_interval, false);
4229            assert!(pending.is_some());
4230            assert!(queue.pending_roots.is_empty());
4231        }
4232
4233        #[test]
4234        fn test_push_chunk_flushes_when_exceeds_max() {
4235            let max_tx = 10;
4236            let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, max_tx, 0);
4237            let versions = default_versions();
4238
4239            queue.push_chunk(
4240                make_chunk(max_tx / 2 + 1, 1),
4241                &versions,
4242                1000,
4243                make_commit_ref(1),
4244                Digest::default(),
4245            );
4246
4247            let flushed = queue.push_chunk(
4248                make_chunk(max_tx / 2 + 1, 2),
4249                &versions,
4250                1000,
4251                make_commit_ref(2),
4252                Digest::default(),
4253            );
4254
4255            assert_eq!(flushed.len(), 1);
4256            assert_eq!(queue.pending_roots.len(), 1);
4257        }
4258
4259        #[test]
4260        fn test_multiple_chunks_merged_into_one_checkpoint() {
4261            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 200);
4262            let versions = default_versions();
4263
4264            queue.push_chunk(
4265                make_chunk(10, 1),
4266                &versions,
4267                1000,
4268                make_commit_ref(1),
4269                Digest::default(),
4270            );
4271            queue.push_chunk(
4272                make_chunk(10, 2),
4273                &versions,
4274                1000,
4275                make_commit_ref(2),
4276                Digest::default(),
4277            );
4278            queue.push_chunk(
4279                make_chunk(10, 3),
4280                &versions,
4281                1000,
4282                make_commit_ref(3),
4283                Digest::default(),
4284            );
4285
4286            let pending = queue.flush(1000, true).unwrap();
4287
4288            assert_eq!(pending.roots.len(), 3);
4289        }
4290
4291        #[test]
4292        fn test_push_chunk_handles_overflow() {
4293            let max_tx = 10;
4294            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4295            let versions = default_versions();
4296
4297            let flushed1 = queue.push_chunk(
4298                make_chunk(max_tx / 2, 1),
4299                &versions,
4300                1000,
4301                make_commit_ref(1),
4302                Digest::default(),
4303            );
4304            assert!(flushed1.is_empty());
4305
4306            let flushed2 = queue.push_chunk(
4307                make_chunk(max_tx / 2, 2),
4308                &versions,
4309                1000,
4310                make_commit_ref(2),
4311                Digest::default(),
4312            );
4313            assert!(flushed2.is_empty());
4314
4315            let flushed3 = queue.push_chunk(
4316                make_chunk(max_tx / 2, 3),
4317                &versions,
4318                1000,
4319                make_commit_ref(3),
4320                Digest::default(),
4321            );
4322            assert_eq!(flushed3.len(), 1);
4323
4324            let pending = queue.flush(1000, true);
4325
4326            for p in pending.iter().chain(flushed3.iter()) {
4327                let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4328                assert!(tx_count <= max_tx);
4329            }
4330        }
4331
4332        #[test]
4333        fn test_checkpoint_uses_last_chunk_height() {
4334            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4335            let versions = default_versions();
4336
4337            queue.push_chunk(
4338                make_chunk(10, 100),
4339                &versions,
4340                1000,
4341                make_commit_ref(1),
4342                Digest::default(),
4343            );
4344            queue.push_chunk(
4345                make_chunk(10, 200),
4346                &versions,
4347                1000,
4348                make_commit_ref(2),
4349                Digest::default(),
4350            );
4351
4352            let pending = queue.flush(1000, true).unwrap();
4353
4354            assert_eq!(pending.details.checkpoint_height, 200);
4355        }
4356
4357        #[test]
4358        fn test_last_built_timestamp_updated_on_flush() {
4359            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4360            let versions = default_versions();
4361
4362            queue.push_chunk(
4363                make_chunk(10, 1),
4364                &versions,
4365                5000,
4366                make_commit_ref(1),
4367                Digest::default(),
4368            );
4369
4370            assert_eq!(queue.last_built_timestamp, 0);
4371
4372            let _ = queue.flush(5000, true);
4373
4374            assert_eq!(queue.last_built_timestamp, 5000);
4375        }
4376
4377        #[test]
4378        fn test_settlement_info_sent_through_channel() {
4379            let mut queue = CheckpointQueue::new_for_testing(0, 0, 5, 1000, 0);
4380            let versions = default_versions();
4381
4382            let chunk1 = Chunk {
4383                schedulables: vec![
4384                    Schedulable::ConsensusCommitPrologue(0, 1, 0),
4385                    Schedulable::ConsensusCommitPrologue(0, 2, 0),
4386                    Schedulable::ConsensusCommitPrologue(0, 3, 0),
4387                ],
4388                settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4389                height: 1,
4390            };
4391
4392            let chunk2 = Chunk {
4393                schedulables: vec![
4394                    Schedulable::ConsensusCommitPrologue(0, 4, 0),
4395                    Schedulable::ConsensusCommitPrologue(0, 5, 0),
4396                ],
4397                settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4398                height: 2,
4399            };
4400
4401            queue.push_chunk(
4402                chunk1,
4403                &versions,
4404                1000,
4405                make_commit_ref(1),
4406                Digest::default(),
4407            );
4408            queue.push_chunk(
4409                chunk2,
4410                &versions,
4411                1000,
4412                make_commit_ref(1),
4413                Digest::default(),
4414            );
4415        }
4416
4417        #[test]
4418        fn test_settlement_checkpoint_seq_correct_after_flush() {
4419            let max_tx = 10;
4420            let initial_seq = 5;
4421            let (sender, mut receiver) = monitored_mpsc::unbounded_channel("test_settlement_seq");
4422            let mut queue =
4423                CheckpointQueue::new_for_testing_with_sender(0, 0, initial_seq, max_tx, 0, sender);
4424            let versions = default_versions();
4425
4426            // Push a chunk that partially fills the queue (no flush).
4427            let chunk1 = Chunk {
4428                schedulables: (0..max_tx / 2 + 1)
4429                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4430                    .collect(),
4431                settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4432                height: 1,
4433            };
4434            queue.push_chunk(
4435                chunk1,
4436                &versions,
4437                1000,
4438                make_commit_ref(1),
4439                Digest::default(),
4440            );
4441
4442            // Drain the first message from the channel.
4443            let msg1 = receiver.try_recv().unwrap();
4444            let settlement1 = msg1.1.unwrap();
4445            assert_eq!(settlement1.checkpoint_seq, initial_seq);
4446
4447            // Push a second chunk that triggers a flush of chunk1's roots.
4448            let chunk2 = Chunk {
4449                schedulables: (0..max_tx / 2 + 1)
4450                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4451                    .collect(),
4452                settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4453                height: 2,
4454            };
4455            let flushed = queue.push_chunk(
4456                chunk2,
4457                &versions,
4458                1000,
4459                make_commit_ref(2),
4460                Digest::default(),
4461            );
4462            assert_eq!(flushed.len(), 1);
4463            assert_eq!(flushed[0].details.checkpoint_seq, initial_seq);
4464
4465            // The second settlement must have checkpoint_seq = initial_seq + 1,
4466            // because the flush incremented current_checkpoint_seq.
4467            let msg2 = receiver.try_recv().unwrap();
4468            let settlement2 = msg2.1.unwrap();
4469            assert_eq!(settlement2.checkpoint_seq, initial_seq + 1);
4470
4471            // Flush the remaining roots and verify the PendingCheckpoint's seq
4472            // matches the settlement's seq.
4473            let pending = queue.flush_forced().unwrap();
4474            assert_eq!(pending.details.checkpoint_seq, settlement2.checkpoint_seq);
4475        }
4476
4477        #[test]
4478        fn test_checkpoint_seq_increments_on_flush() {
4479            let mut queue = CheckpointQueue::new_for_testing(0, 0, 10, 1000, 0);
4480            let versions = default_versions();
4481
4482            queue.push_chunk(
4483                make_chunk(5, 1),
4484                &versions,
4485                1000,
4486                make_commit_ref(1),
4487                Digest::default(),
4488            );
4489
4490            let pending = queue.flush(1000, true).unwrap();
4491
4492            assert_eq!(pending.details.checkpoint_seq, 10);
4493            assert_eq!(queue.current_checkpoint_seq, 11);
4494        }
4495
4496        #[test]
4497        fn test_multiple_chunks_with_overflow() {
4498            let max_tx = 10;
4499            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4500            let versions = default_versions();
4501
4502            let flushed1 = queue.push_chunk(
4503                make_chunk(max_tx / 2 + 1, 1),
4504                &versions,
4505                1000,
4506                make_commit_ref(1),
4507                Digest::default(),
4508            );
4509            let flushed2 = queue.push_chunk(
4510                make_chunk(max_tx / 2 + 1, 2),
4511                &versions,
4512                1000,
4513                make_commit_ref(1),
4514                Digest::default(),
4515            );
4516            let flushed3 = queue.push_chunk(
4517                make_chunk(max_tx / 2 + 1, 3),
4518                &versions,
4519                1000,
4520                make_commit_ref(1),
4521                Digest::default(),
4522            );
4523
4524            let all_flushed: Vec<_> = flushed1
4525                .into_iter()
4526                .chain(flushed2)
4527                .chain(flushed3)
4528                .collect();
4529            assert_eq!(all_flushed.len(), 2);
4530            assert_eq!(queue.pending_roots.len(), 1);
4531
4532            for p in &all_flushed {
4533                let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4534                assert!(tx_count <= max_tx);
4535            }
4536        }
4537    }
4538}