sui_core/
consensus_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashMap, HashSet, VecDeque},
6    hash::Hash,
7    num::NonZeroUsize,
8    sync::{Arc, Mutex},
9    time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use consensus_config::Committee as ConsensusCommittee;
13use consensus_core::{CommitConsumerMonitor, CommitIndex, CommitRef};
14use consensus_types::block::TransactionIndex;
15use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
16use lru::LruCache;
17use mysten_common::{
18    assert_reachable, assert_sometimes, debug_fatal, random_util::randomize_cache_capacity_in_tests,
19};
20use mysten_metrics::{
21    monitored_future,
22    monitored_mpsc::{self, UnboundedReceiver},
23    monitored_scope, spawn_monitored_task,
24};
25use nonempty::NonEmpty;
26use parking_lot::RwLockWriteGuard;
27use serde::{Deserialize, Serialize};
28use sui_config::node::CongestionLogConfig;
29use sui_macros::{fail_point, fail_point_arg, fail_point_if};
30use sui_protocol_config::{Chain, PerObjectCongestionControlMode, ProtocolConfig};
31use sui_types::{
32    SUI_RANDOMNESS_STATE_OBJECT_ID,
33    authenticator_state::ActiveJwk,
34    base_types::{
35        AuthorityName, ConciseableName, ConsensusObjectSequenceKey, ObjectID, ObjectRef,
36        SequenceNumber, TransactionDigest,
37    },
38    crypto::RandomnessRound,
39    digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest, Digest},
40    executable_transaction::{
41        TrustedExecutableTransaction, VerifiedExecutableTransaction,
42        VerifiedExecutableTransactionWithAliases,
43    },
44    messages_checkpoint::{
45        CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointTimestamp,
46    },
47    messages_consensus::{
48        AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments,
49        ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
50        ExecutionTimeObservation,
51    },
52    node_role::NodeRole,
53    sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
54    transaction::{
55        InputObjectKind, SenderSignedData, TransactionDataAPI, TransactionKey, VerifiedTransaction,
56        WithAliases,
57    },
58};
59use tokio::task::JoinSet;
60use tracing::{debug, error, info, instrument, trace, warn};
61
62use crate::{
63    authority::{
64        AuthorityMetrics, AuthorityState, ExecutionEnv,
65        authority_per_epoch_store::{
66            AuthorityPerEpochStore, CancelConsensusCertificateReason, ConsensusStats,
67            ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStatsV2,
68            consensus_quarantine::ConsensusCommitOutput,
69        },
70        backpressure::{BackpressureManager, BackpressureSubscriber},
71        congestion_log::CongestionCommitLogger,
72        consensus_tx_status_cache::ConsensusTxStatus,
73        epoch_start_configuration::EpochStartConfigTrait,
74        execution_time_estimator::ExecutionTimeEstimator,
75        shared_object_congestion_tracker::SharedObjectCongestionTracker,
76        shared_object_version_manager::{AssignedTxAndVersions, AssignedVersions, Schedulable},
77        transaction_deferral::{DeferralKey, DeferralReason, transaction_deferral_within_limit},
78    },
79    checkpoints::{
80        CheckpointHeight, CheckpointRoots, CheckpointService, CheckpointServiceNotify,
81        PendingCheckpoint, PendingCheckpointInfo, PendingCheckpointV2,
82    },
83    consensus_adapter::ConsensusAdapter,
84    consensus_throughput_calculator::ConsensusThroughputCalculator,
85    consensus_types::consensus_output_api::ConsensusCommitAPI,
86    epoch::{
87        randomness::{DkgStatus, RandomnessManager},
88        reconfiguration::ReconfigState,
89    },
90    execution_cache::ObjectCacheRead,
91    execution_scheduler::{SettlementBatchInfo, SettlementScheduler},
92    gasless_rate_limiter::ConsensusGaslessCounter,
93    post_consensus_tx_reorder::PostConsensusTxReorder,
94    traffic_controller::{TrafficController, policies::TrafficTally},
95};
96
97/// Output from filtering consensus transactions.
98/// Contains the filtered transactions and any owned object locks acquired post-consensus.
99struct FilteredConsensusOutput {
100    transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
101    owned_object_locks: HashMap<ObjectRef, TransactionDigest>,
102}
103
104pub struct ConsensusHandlerInitializer {
105    state: Arc<AuthorityState>,
106    checkpoint_service: Arc<CheckpointService>,
107    epoch_store: Arc<AuthorityPerEpochStore>,
108    consensus_adapter: Arc<ConsensusAdapter>,
109    throughput_calculator: Arc<ConsensusThroughputCalculator>,
110    backpressure_manager: Arc<BackpressureManager>,
111    congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
112    consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
113}
114
115impl ConsensusHandlerInitializer {
116    pub fn new(
117        state: Arc<AuthorityState>,
118        checkpoint_service: Arc<CheckpointService>,
119        epoch_store: Arc<AuthorityPerEpochStore>,
120        consensus_adapter: Arc<ConsensusAdapter>,
121        throughput_calculator: Arc<ConsensusThroughputCalculator>,
122        backpressure_manager: Arc<BackpressureManager>,
123        congestion_log_config: Option<CongestionLogConfig>,
124    ) -> Self {
125        let congestion_logger =
126            congestion_log_config.and_then(|config| match CongestionCommitLogger::new(&config) {
127                Ok(logger) => Some(Arc::new(Mutex::new(logger))),
128                Err(e) => {
129                    debug_fatal!("Failed to create congestion logger: {e}");
130                    None
131                }
132            });
133        let consensus_gasless_counter = state.consensus_gasless_counter.clone();
134        Self {
135            state,
136            checkpoint_service,
137            epoch_store,
138            consensus_adapter,
139            throughput_calculator,
140            backpressure_manager,
141            congestion_logger,
142            consensus_gasless_counter,
143        }
144    }
145
146    #[cfg(test)]
147    pub(crate) fn new_for_testing(
148        state: Arc<AuthorityState>,
149        checkpoint_service: Arc<CheckpointService>,
150    ) -> Self {
151        use crate::consensus_test_utils::make_consensus_adapter_for_test;
152        use std::collections::HashSet;
153
154        let backpressure_manager = BackpressureManager::new_for_tests();
155        let consensus_adapter =
156            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
157        let consensus_gasless_counter = state.consensus_gasless_counter.clone();
158        Self {
159            state: state.clone(),
160            checkpoint_service,
161            epoch_store: state.epoch_store_for_testing().clone(),
162            consensus_adapter,
163            throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
164                None,
165                state.metrics.clone(),
166            )),
167            backpressure_manager,
168            congestion_logger: None,
169            consensus_gasless_counter,
170        }
171    }
172
173    pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
174        let new_epoch_start_state = self.epoch_store.epoch_start_state();
175        let consensus_committee = new_epoch_start_state.get_consensus_committee();
176
177        let settlement_scheduler = SettlementScheduler::new(
178            self.state.execution_scheduler().as_ref().clone(),
179            self.state.get_transaction_cache_reader().clone(),
180            self.state.metrics.clone(),
181        );
182        ConsensusHandler::new(
183            self.epoch_store.clone(),
184            self.checkpoint_service.clone(),
185            settlement_scheduler,
186            self.consensus_adapter.clone(),
187            self.state.get_object_cache_reader().clone(),
188            consensus_committee,
189            self.state.metrics.clone(),
190            self.throughput_calculator.clone(),
191            self.backpressure_manager.subscribe(),
192            self.state.traffic_controller.clone(),
193            self.congestion_logger.clone(),
194            self.consensus_gasless_counter.clone(),
195        )
196    }
197}
198
199mod additional_consensus_state {
200    use std::marker::PhantomData;
201
202    use consensus_core::CommitRef;
203    use fastcrypto::hash::HashFunction as _;
204    use sui_types::{crypto::DefaultHash, digests::Digest};
205
206    use super::*;
207    /// AdditionalConsensusState tracks any in-memory state that is retained by ConsensusHandler
208    /// between consensus commits. Because of crash recovery, using such data is inherently risky.
209    /// In order to do this safely, we must store data from a fixed number of previous commits.
210    /// Then, at start-up, that same fixed number of already processed commits is replayed to
211    /// reconstruct the state.
212    ///
213    /// To make sure that bugs in this process appear immediately, we record the digest of this
214    /// state in ConsensusCommitPrologue, so that any deviation causes an immediate fork.
215    #[derive(Serialize, Deserialize)]
216    pub(super) struct AdditionalConsensusState {
217        commit_interval_observer: CommitIntervalObserver,
218    }
219
220    impl AdditionalConsensusState {
221        pub fn new(additional_consensus_state_window_size: u32) -> Self {
222            Self {
223                commit_interval_observer: CommitIntervalObserver::new(
224                    additional_consensus_state_window_size,
225                ),
226            }
227        }
228
229        /// Update all internal state based on the new commit
230        pub(crate) fn observe_commit(
231            &mut self,
232            protocol_config: &ProtocolConfig,
233            epoch_start_time: u64,
234            consensus_commit: &impl ConsensusCommitAPI,
235        ) -> ConsensusCommitInfo {
236            self.commit_interval_observer
237                .observe_commit_time(consensus_commit);
238
239            let estimated_commit_period = self
240                .commit_interval_observer
241                .commit_interval_estimate()
242                .unwrap_or(Duration::from_millis(
243                    protocol_config.min_checkpoint_interval_ms(),
244                ));
245
246            info!("estimated commit rate: {:?}", estimated_commit_period);
247
248            self.commit_info_impl(
249                epoch_start_time,
250                consensus_commit,
251                Some(estimated_commit_period),
252            )
253        }
254
255        fn commit_info_impl(
256            &self,
257            epoch_start_time: u64,
258            consensus_commit: &impl ConsensusCommitAPI,
259            estimated_commit_period: Option<Duration>,
260        ) -> ConsensusCommitInfo {
261            let leader_author = consensus_commit.leader_author_index();
262            let timestamp = consensus_commit.commit_timestamp_ms();
263
264            let timestamp = if timestamp < epoch_start_time {
265                error!(
266                    "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}"
267                );
268                epoch_start_time
269            } else {
270                timestamp
271            };
272
273            ConsensusCommitInfo {
274                _phantom: PhantomData,
275                round: consensus_commit.leader_round(),
276                timestamp,
277                leader_author,
278                consensus_commit_ref: consensus_commit.commit_ref(),
279                rejected_transactions_digest: consensus_commit.rejected_transactions_digest(),
280                additional_state_digest: Some(self.digest()),
281                estimated_commit_period,
282                skip_consensus_commit_prologue_in_test: false,
283            }
284        }
285
286        /// Get the digest of the current state.
287        fn digest(&self) -> AdditionalConsensusStateDigest {
288            let mut hash = DefaultHash::new();
289            bcs::serialize_into(&mut hash, self).unwrap();
290            AdditionalConsensusStateDigest::new(hash.finalize().into())
291        }
292    }
293
294    pub struct ConsensusCommitInfo {
295        // prevent public construction
296        _phantom: PhantomData<()>,
297
298        pub round: u64,
299        pub timestamp: u64,
300        pub leader_author: AuthorityIndex,
301        pub consensus_commit_ref: CommitRef,
302        pub rejected_transactions_digest: Digest,
303
304        additional_state_digest: Option<AdditionalConsensusStateDigest>,
305        estimated_commit_period: Option<Duration>,
306
307        pub skip_consensus_commit_prologue_in_test: bool,
308    }
309
310    impl ConsensusCommitInfo {
311        pub fn new_for_test(
312            commit_round: u64,
313            commit_timestamp: u64,
314            estimated_commit_period: Option<Duration>,
315            skip_consensus_commit_prologue_in_test: bool,
316        ) -> Self {
317            Self {
318                _phantom: PhantomData,
319                round: commit_round,
320                timestamp: commit_timestamp,
321                leader_author: 0,
322                consensus_commit_ref: CommitRef::default(),
323                rejected_transactions_digest: Digest::default(),
324                additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO),
325                estimated_commit_period,
326                skip_consensus_commit_prologue_in_test,
327            }
328        }
329
330        pub fn new_for_congestion_test(
331            commit_round: u64,
332            commit_timestamp: u64,
333            estimated_commit_period: Duration,
334        ) -> Self {
335            Self::new_for_test(
336                commit_round,
337                commit_timestamp,
338                Some(estimated_commit_period),
339                true,
340            )
341        }
342
343        pub fn additional_state_digest(&self) -> AdditionalConsensusStateDigest {
344            // this method cannot be called if stateless_commit_info is used
345            self.additional_state_digest
346                .expect("additional_state_digest is not available")
347        }
348
349        pub fn estimated_commit_period(&self) -> Duration {
350            // this method cannot be called if stateless_commit_info is used
351            self.estimated_commit_period
352                .expect("estimated commit period is not available")
353        }
354
355        fn consensus_commit_digest(&self) -> ConsensusCommitDigest {
356            ConsensusCommitDigest::new(self.consensus_commit_ref.digest.into_inner())
357        }
358
359        fn consensus_commit_prologue_transaction(
360            &self,
361            epoch: u64,
362        ) -> VerifiedExecutableTransaction {
363            let transaction = VerifiedTransaction::new_consensus_commit_prologue(
364                epoch,
365                self.round,
366                self.timestamp,
367            );
368            VerifiedExecutableTransaction::new_system(transaction, epoch)
369        }
370
371        fn consensus_commit_prologue_v2_transaction(
372            &self,
373            epoch: u64,
374        ) -> VerifiedExecutableTransaction {
375            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
376                epoch,
377                self.round,
378                self.timestamp,
379                self.consensus_commit_digest(),
380            );
381            VerifiedExecutableTransaction::new_system(transaction, epoch)
382        }
383
384        fn consensus_commit_prologue_v3_transaction(
385            &self,
386            epoch: u64,
387            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
388        ) -> VerifiedExecutableTransaction {
389            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v3(
390                epoch,
391                self.round,
392                self.timestamp,
393                self.consensus_commit_digest(),
394                consensus_determined_version_assignments,
395            );
396            VerifiedExecutableTransaction::new_system(transaction, epoch)
397        }
398
399        fn consensus_commit_prologue_v4_transaction(
400            &self,
401            epoch: u64,
402            consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
403            additional_state_digest: AdditionalConsensusStateDigest,
404        ) -> VerifiedExecutableTransaction {
405            let transaction = VerifiedTransaction::new_consensus_commit_prologue_v4(
406                epoch,
407                self.round,
408                self.timestamp,
409                self.consensus_commit_digest(),
410                consensus_determined_version_assignments,
411                additional_state_digest,
412            );
413            VerifiedExecutableTransaction::new_system(transaction, epoch)
414        }
415
416        pub fn create_consensus_commit_prologue_transaction(
417            &self,
418            epoch: u64,
419            protocol_config: &ProtocolConfig,
420            cancelled_txn_version_assignment: Vec<(
421                TransactionDigest,
422                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
423            )>,
424            commit_info: &ConsensusCommitInfo,
425            indirect_state_observer: IndirectStateObserver,
426        ) -> VerifiedExecutableTransaction {
427            let version_assignments = if protocol_config
428                .record_consensus_determined_version_assignments_in_prologue_v2()
429            {
430                Some(
431                    ConsensusDeterminedVersionAssignments::CancelledTransactionsV2(
432                        cancelled_txn_version_assignment,
433                    ),
434                )
435            } else if protocol_config.record_consensus_determined_version_assignments_in_prologue()
436            {
437                Some(
438                    ConsensusDeterminedVersionAssignments::CancelledTransactions(
439                        cancelled_txn_version_assignment
440                            .into_iter()
441                            .map(|(tx_digest, versions)| {
442                                (
443                                    tx_digest,
444                                    versions.into_iter().map(|(id, v)| (id.0, v)).collect(),
445                                )
446                            })
447                            .collect(),
448                    ),
449                )
450            } else {
451                None
452            };
453
454            if protocol_config.record_additional_state_digest_in_prologue() {
455                let additional_state_digest =
456                    if protocol_config.additional_consensus_digest_indirect_state() {
457                        let d1 = commit_info.additional_state_digest();
458                        indirect_state_observer.fold_with(d1)
459                    } else {
460                        commit_info.additional_state_digest()
461                    };
462
463                self.consensus_commit_prologue_v4_transaction(
464                    epoch,
465                    version_assignments.unwrap(),
466                    additional_state_digest,
467                )
468            } else if let Some(version_assignments) = version_assignments {
469                self.consensus_commit_prologue_v3_transaction(epoch, version_assignments)
470            } else if protocol_config.include_consensus_digest_in_prologue() {
471                self.consensus_commit_prologue_v2_transaction(epoch)
472            } else {
473                self.consensus_commit_prologue_transaction(epoch)
474            }
475        }
476    }
477
478    #[derive(Default)]
479    pub struct IndirectStateObserver {
480        hash: DefaultHash,
481    }
482
483    impl IndirectStateObserver {
484        pub fn new() -> Self {
485            Self::default()
486        }
487
488        pub fn observe_indirect_state<T: Serialize>(&mut self, state: &T) {
489            bcs::serialize_into(&mut self.hash, state).unwrap();
490        }
491
492        pub fn fold_with(
493            self,
494            d1: AdditionalConsensusStateDigest,
495        ) -> AdditionalConsensusStateDigest {
496            let hash = self.hash.finalize();
497            let d2 = AdditionalConsensusStateDigest::new(hash.into());
498
499            let mut hasher = DefaultHash::new();
500            bcs::serialize_into(&mut hasher, &d1).unwrap();
501            bcs::serialize_into(&mut hasher, &d2).unwrap();
502            AdditionalConsensusStateDigest::new(hasher.finalize().into())
503        }
504    }
505
506    #[test]
507    fn test_additional_consensus_state() {
508        use crate::consensus_test_utils::TestConsensusCommit;
509
510        fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) {
511            let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
512            state.observe_commit(
513                &protocol_config,
514                100,
515                &TestConsensusCommit::empty(round, timestamp, 0),
516            );
517        }
518
519        let mut s1 = AdditionalConsensusState::new(3);
520        observe(&mut s1, 1, 1000);
521        observe(&mut s1, 2, 2000);
522        observe(&mut s1, 3, 3000);
523        observe(&mut s1, 4, 4000);
524
525        let mut s2 = AdditionalConsensusState::new(3);
526        // Because state uses a ring buffer, we should get the same digest
527        // even though we only added the 3 latest observations.
528        observe(&mut s2, 2, 2000);
529        observe(&mut s2, 3, 3000);
530        observe(&mut s2, 4, 4000);
531
532        assert_eq!(s1.digest(), s2.digest());
533
534        observe(&mut s1, 5, 5000);
535        observe(&mut s2, 5, 5000);
536
537        assert_eq!(s1.digest(), s2.digest());
538    }
539}
540use additional_consensus_state::AdditionalConsensusState;
541pub(crate) use additional_consensus_state::{ConsensusCommitInfo, IndirectStateObserver};
542
543struct QueuedCheckpointRoots {
544    roots: CheckpointRoots,
545    timestamp: CheckpointTimestamp,
546    consensus_commit_ref: CommitRef,
547    rejected_transactions_digest: Digest,
548}
549
550struct Chunk<
551    T: crate::authority::shared_object_version_manager::AsTx = VerifiedExecutableTransaction,
552> {
553    schedulables: Vec<Schedulable<T>>,
554    settlement: Option<Schedulable<T>>,
555    height: CheckpointHeight,
556}
557
558impl<T: crate::authority::shared_object_version_manager::AsTx + Clone> Chunk<T> {
559    fn all_schedulables(&self) -> impl Iterator<Item = &Schedulable<T>> + Clone {
560        self.schedulables.iter().chain(self.settlement.iter())
561    }
562
563    fn all_schedulables_from(chunks: &[Self]) -> impl Iterator<Item = &Schedulable<T>> + Clone {
564        chunks.iter().flat_map(|c| c.all_schedulables())
565    }
566
567    fn to_checkpoint_roots(&self) -> CheckpointRoots {
568        let tx_roots: Vec<_> = self.schedulables.iter().map(|s| s.key()).collect();
569        let settlement_root = self.settlement.as_ref().map(|s| s.key());
570        CheckpointRoots {
571            tx_roots,
572            settlement_root,
573            height: self.height,
574        }
575    }
576}
577
578impl From<Chunk<VerifiedExecutableTransactionWithAliases>> for Chunk {
579    fn from(chunk: Chunk<VerifiedExecutableTransactionWithAliases>) -> Self {
580        Chunk {
581            schedulables: chunk.schedulables.into_iter().map(|s| s.into()).collect(),
582            settlement: chunk.settlement.map(|s| s.into()),
583            height: chunk.height,
584        }
585    }
586}
587
588// Accumulates checkpoint roots from consensus and flushes them into PendingCheckpointV2s.
589// Roots are buffered in `pending_roots` until a flush is triggered (by max_tx overflow or
590// time interval). A flush always drains all buffered roots into a single PendingCheckpointV2,
591// so the queue only ever holds roots for one pending checkpoint at a time.
592//
593// Owns an ExecutionSchedulerSender so push_chunk can send schedulables and settlement info
594// for execution in one shot.
595pub(crate) struct CheckpointQueue {
596    last_built_timestamp: CheckpointTimestamp,
597    pending_roots: VecDeque<QueuedCheckpointRoots>,
598    height: u64,
599    pending_tx_count: usize,
600    current_checkpoint_seq: CheckpointSequenceNumber,
601    max_tx: usize,
602    min_checkpoint_interval_ms: u64,
603    execution_scheduler_sender: ExecutionSchedulerSender,
604}
605
606impl CheckpointQueue {
607    pub(crate) fn new(
608        last_built_timestamp: CheckpointTimestamp,
609        checkpoint_height: u64,
610        next_checkpoint_seq: CheckpointSequenceNumber,
611        max_tx: usize,
612        min_checkpoint_interval_ms: u64,
613        execution_scheduler_sender: ExecutionSchedulerSender,
614    ) -> Self {
615        Self {
616            last_built_timestamp,
617            pending_roots: VecDeque::new(),
618            height: checkpoint_height,
619            pending_tx_count: 0,
620            current_checkpoint_seq: next_checkpoint_seq,
621            max_tx,
622            min_checkpoint_interval_ms,
623            execution_scheduler_sender,
624        }
625    }
626
627    #[cfg(test)]
628    fn new_for_testing(
629        last_built_timestamp: CheckpointTimestamp,
630        checkpoint_height: u64,
631        next_checkpoint_seq: CheckpointSequenceNumber,
632        max_tx: usize,
633        min_checkpoint_interval_ms: u64,
634    ) -> Self {
635        let (sender, _receiver) = monitored_mpsc::unbounded_channel("test_checkpoint_queue_sender");
636        Self {
637            last_built_timestamp,
638            pending_roots: VecDeque::new(),
639            height: checkpoint_height,
640            pending_tx_count: 0,
641            current_checkpoint_seq: next_checkpoint_seq,
642            max_tx,
643            min_checkpoint_interval_ms,
644            execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
645        }
646    }
647
648    #[cfg(test)]
649    fn new_for_testing_with_sender(
650        last_built_timestamp: CheckpointTimestamp,
651        checkpoint_height: u64,
652        next_checkpoint_seq: CheckpointSequenceNumber,
653        max_tx: usize,
654        min_checkpoint_interval_ms: u64,
655        sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
656    ) -> Self {
657        Self {
658            last_built_timestamp,
659            pending_roots: VecDeque::new(),
660            height: checkpoint_height,
661            pending_tx_count: 0,
662            current_checkpoint_seq: next_checkpoint_seq,
663            max_tx,
664            min_checkpoint_interval_ms,
665            execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
666        }
667    }
668
669    pub(crate) fn last_built_timestamp(&self) -> CheckpointTimestamp {
670        self.last_built_timestamp
671    }
672
673    pub(crate) fn is_empty(&self) -> bool {
674        self.pending_roots.is_empty()
675    }
676
677    fn next_height(&mut self) -> u64 {
678        self.height += 1;
679        self.height
680    }
681
682    fn push_chunk(
683        &mut self,
684        chunk: Chunk,
685        assigned_versions: &HashMap<TransactionKey, AssignedVersions>,
686        timestamp: CheckpointTimestamp,
687        consensus_commit_ref: CommitRef,
688        rejected_transactions_digest: Digest,
689    ) -> Vec<PendingCheckpointV2> {
690        let max_tx = self.max_tx;
691        let user_tx_count = chunk.schedulables.len();
692
693        let roots = chunk.to_checkpoint_roots();
694
695        let schedulables: Vec<_> = chunk
696            .schedulables
697            .into_iter()
698            .map(|s| {
699                let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
700                (s, versions)
701            })
702            .collect();
703
704        let mut flushed_checkpoints = Vec::new();
705
706        if self.pending_tx_count > 0
707            && self.pending_tx_count + user_tx_count > max_tx
708            && let Some(checkpoint) = self.flush_forced()
709        {
710            flushed_checkpoints.push(checkpoint);
711        }
712
713        let settlement_info = chunk.settlement.as_ref().map(|s| {
714            let settlement_key = s.key();
715            let tx_keys: Vec<_> = schedulables.iter().map(|(s, _)| s.key()).collect();
716            SettlementBatchInfo {
717                settlement_key,
718                tx_keys,
719                checkpoint_height: chunk.height,
720                checkpoint_seq: self.current_checkpoint_seq,
721                assigned_versions: assigned_versions
722                    .get(&settlement_key)
723                    .cloned()
724                    .unwrap_or_default(),
725            }
726        });
727
728        self.execution_scheduler_sender
729            .send(schedulables, settlement_info);
730
731        self.pending_tx_count += user_tx_count;
732        self.pending_roots.push_back(QueuedCheckpointRoots {
733            roots,
734            timestamp,
735            consensus_commit_ref,
736            rejected_transactions_digest,
737        });
738
739        flushed_checkpoints
740    }
741
742    pub(crate) fn flush(
743        &mut self,
744        current_timestamp: CheckpointTimestamp,
745        force: bool,
746    ) -> Option<PendingCheckpointV2> {
747        if !force && current_timestamp < self.last_built_timestamp + self.min_checkpoint_interval_ms
748        {
749            return None;
750        }
751        self.flush_forced()
752    }
753
754    fn flush_forced(&mut self) -> Option<PendingCheckpointV2> {
755        if self.pending_roots.is_empty() {
756            return None;
757        }
758
759        let to_flush: Vec<_> = self.pending_roots.drain(..).collect();
760        let last_root = to_flush.last().unwrap();
761
762        let checkpoint = PendingCheckpointV2 {
763            roots: to_flush.iter().map(|q| q.roots.clone()).collect(),
764            details: PendingCheckpointInfo {
765                timestamp_ms: last_root.timestamp,
766                last_of_epoch: false,
767                checkpoint_height: last_root.roots.height,
768                consensus_commit_ref: last_root.consensus_commit_ref,
769                rejected_transactions_digest: last_root.rejected_transactions_digest,
770                checkpoint_seq: Some(self.current_checkpoint_seq),
771            },
772        };
773
774        self.last_built_timestamp = last_root.timestamp;
775        self.pending_tx_count = 0;
776        self.current_checkpoint_seq += 1;
777
778        Some(checkpoint)
779    }
780
781    pub(crate) fn checkpoint_seq(&self) -> CheckpointSequenceNumber {
782        self.current_checkpoint_seq
783            .checked_sub(1)
784            .expect("checkpoint_seq called before any checkpoint was assigned")
785    }
786}
787
788pub struct ConsensusHandler<C> {
789    /// A store created for each epoch. ConsensusHandler is recreated each epoch, with the
790    /// corresponding store. This store is also used to get the current epoch ID.
791    epoch_store: Arc<AuthorityPerEpochStore>,
792    /// Holds the indices, hash and stats after the last consensus commit
793    /// It is used for avoiding replaying already processed transactions,
794    /// checking chain consistency, and accumulating per-epoch consensus output stats.
795    last_consensus_stats: ExecutionIndicesWithStatsV2,
796    checkpoint_service: Arc<C>,
797    /// cache reader is needed when determining the next version to assign for shared objects.
798    cache_reader: Arc<dyn ObjectCacheRead>,
799    /// The consensus committee used to do stake computations for deciding set of low scoring authorities
800    committee: ConsensusCommittee,
801    // TODO: ConsensusHandler doesn't really share metrics with AuthorityState. We could define
802    // a new metrics type here if we want to.
803    metrics: Arc<AuthorityMetrics>,
804    /// Lru cache to quickly discard transactions processed by consensus
805    processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
806    /// Enqueues transactions to the execution scheduler via a separate task.
807    execution_scheduler_sender: ExecutionSchedulerSender,
808    /// Consensus adapter for submitting transactions to consensus
809    consensus_adapter: Arc<ConsensusAdapter>,
810
811    /// Using the throughput calculator to record the current consensus throughput
812    throughput_calculator: Arc<ConsensusThroughputCalculator>,
813
814    additional_consensus_state: AdditionalConsensusState,
815
816    backpressure_subscriber: BackpressureSubscriber,
817
818    traffic_controller: Option<Arc<TrafficController>>,
819
820    congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
821
822    consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
823
824    checkpoint_queue: Mutex<CheckpointQueue>,
825}
826
827const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
828
829impl<C> ConsensusHandler<C> {
830    pub(crate) fn new(
831        epoch_store: Arc<AuthorityPerEpochStore>,
832        checkpoint_service: Arc<C>,
833        settlement_scheduler: SettlementScheduler,
834        consensus_adapter: Arc<ConsensusAdapter>,
835        cache_reader: Arc<dyn ObjectCacheRead>,
836        committee: ConsensusCommittee,
837        metrics: Arc<AuthorityMetrics>,
838        throughput_calculator: Arc<ConsensusThroughputCalculator>,
839        backpressure_subscriber: BackpressureSubscriber,
840        traffic_controller: Option<Arc<TrafficController>>,
841        congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
842        consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
843    ) -> Self {
844        assert!(
845            matches!(
846                epoch_store
847                    .protocol_config()
848                    .per_object_congestion_control_mode(),
849                PerObjectCongestionControlMode::ExecutionTimeEstimate(_)
850            ),
851            "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
852        );
853
854        // Recover last_consensus_stats so it is consistent across validators.
855        let mut last_consensus_stats = epoch_store
856            .get_last_consensus_stats()
857            .expect("Should be able to read last consensus index");
858        // stats is empty at the beginning of epoch.
859        if !last_consensus_stats.stats.is_initialized() {
860            last_consensus_stats.stats = ConsensusStats::new(committee.size());
861            if epoch_store
862                .protocol_config()
863                .split_checkpoints_in_consensus_handler()
864            {
865                last_consensus_stats.checkpoint_seq = epoch_store.previous_epoch_last_checkpoint();
866            }
867        }
868        let max_tx = epoch_store
869            .protocol_config()
870            .max_transactions_per_checkpoint() as usize;
871        let min_checkpoint_interval_ms = epoch_store
872            .protocol_config()
873            .min_checkpoint_interval_ms_as_option()
874            .unwrap_or_default();
875        let execution_scheduler_sender =
876            ExecutionSchedulerSender::start(settlement_scheduler, epoch_store.clone());
877        let commit_rate_estimate_window_size = epoch_store
878            .protocol_config()
879            .get_consensus_commit_rate_estimation_window_size();
880        let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
881        let checkpoint_height = last_consensus_stats.height;
882        let next_checkpoint_seq = if epoch_store
883            .protocol_config()
884            .split_checkpoints_in_consensus_handler()
885        {
886            last_consensus_stats.checkpoint_seq + 1
887        } else {
888            0
889        };
890        Self {
891            epoch_store,
892            last_consensus_stats,
893            checkpoint_service,
894            cache_reader,
895            committee,
896            metrics,
897            processed_cache: LruCache::new(
898                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
899            ),
900            execution_scheduler_sender: execution_scheduler_sender.clone(),
901            consensus_adapter,
902            throughput_calculator,
903            additional_consensus_state: AdditionalConsensusState::new(
904                commit_rate_estimate_window_size,
905            ),
906            backpressure_subscriber,
907            traffic_controller,
908            congestion_logger,
909            consensus_gasless_counter,
910            checkpoint_queue: Mutex::new(CheckpointQueue::new(
911                last_built_timestamp,
912                checkpoint_height,
913                next_checkpoint_seq,
914                max_tx,
915                min_checkpoint_interval_ms,
916                execution_scheduler_sender,
917            )),
918        }
919    }
920
921    /// Returns the last subdag index processed by the handler.
922    pub(crate) fn last_processed_subdag_index(&self) -> u64 {
923        self.last_consensus_stats.index.sub_dag_index
924    }
925
926    pub(crate) fn new_for_testing(
927        epoch_store: Arc<AuthorityPerEpochStore>,
928        checkpoint_service: Arc<C>,
929        execution_scheduler_sender: ExecutionSchedulerSender,
930        consensus_adapter: Arc<ConsensusAdapter>,
931        cache_reader: Arc<dyn ObjectCacheRead>,
932        committee: ConsensusCommittee,
933        metrics: Arc<AuthorityMetrics>,
934        throughput_calculator: Arc<ConsensusThroughputCalculator>,
935        backpressure_subscriber: BackpressureSubscriber,
936        traffic_controller: Option<Arc<TrafficController>>,
937        last_consensus_stats: ExecutionIndicesWithStatsV2,
938    ) -> Self {
939        let commit_rate_estimate_window_size = epoch_store
940            .protocol_config()
941            .get_consensus_commit_rate_estimation_window_size();
942        let max_tx = epoch_store
943            .protocol_config()
944            .max_transactions_per_checkpoint() as usize;
945        let min_checkpoint_interval_ms = epoch_store
946            .protocol_config()
947            .min_checkpoint_interval_ms_as_option()
948            .unwrap_or_default();
949        let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
950        let checkpoint_height = last_consensus_stats.height;
951        Self {
952            epoch_store,
953            last_consensus_stats,
954            checkpoint_service,
955            cache_reader,
956            committee,
957            metrics,
958            processed_cache: LruCache::new(
959                NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
960            ),
961            execution_scheduler_sender: execution_scheduler_sender.clone(),
962            consensus_adapter,
963            throughput_calculator,
964            additional_consensus_state: AdditionalConsensusState::new(
965                commit_rate_estimate_window_size,
966            ),
967            backpressure_subscriber,
968            traffic_controller,
969            congestion_logger: None,
970            consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
971            checkpoint_queue: Mutex::new(CheckpointQueue::new(
972                last_built_timestamp,
973                checkpoint_height,
974                0,
975                max_tx,
976                min_checkpoint_interval_ms,
977                execution_scheduler_sender,
978            )),
979        }
980    }
981}
982
983#[derive(Default)]
984struct CommitHandlerInput {
985    user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
986    capability_notifications: Vec<AuthorityCapabilitiesV2>,
987    execution_time_observations: Vec<ExecutionTimeObservation>,
988    checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
989    randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
990    randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
991    end_of_publish_transactions: Vec<AuthorityName>,
992    new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
993}
994
995struct CommitHandlerState {
996    dkg_failed: bool,
997    randomness_round: Option<RandomnessRound>,
998    output: ConsensusCommitOutput,
999    indirect_state_observer: Option<IndirectStateObserver>,
1000    initial_reconfig_state: ReconfigState,
1001    // Occurrence counts for user transactions, used for unpaid amplification detection.
1002    occurrence_counts: HashMap<TransactionDigest, u32>,
1003}
1004
1005impl CommitHandlerState {
1006    fn get_notifications(&self) -> Vec<SequencedConsensusTransactionKey> {
1007        self.output
1008            .get_consensus_messages_processed()
1009            .cloned()
1010            .collect()
1011    }
1012
1013    fn init_randomness<'a, 'epoch>(
1014        &'a mut self,
1015        epoch_store: &'epoch AuthorityPerEpochStore,
1016        commit_info: &'a ConsensusCommitInfo,
1017    ) -> Option<tokio::sync::MutexGuard<'epoch, RandomnessManager>> {
1018        let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| {
1019            rm.try_lock()
1020                .expect("should only ever be called from the commit handler thread")
1021        });
1022
1023        let mut dkg_failed = false;
1024        let randomness_round = if epoch_store.randomness_state_enabled() {
1025            let randomness_manager = randomness_manager
1026                .as_mut()
1027                .expect("randomness manager should exist if randomness is enabled");
1028            match randomness_manager.dkg_status() {
1029                DkgStatus::Pending => None,
1030                DkgStatus::Failed => {
1031                    dkg_failed = true;
1032                    None
1033                }
1034                DkgStatus::Successful => {
1035                    // Generate randomness for this commit if DKG is successful and we are still
1036                    // accepting certs.
1037                    if self.initial_reconfig_state.should_accept_tx() {
1038                        randomness_manager
1039                            // TODO: make infallible
1040                            .reserve_next_randomness(commit_info.timestamp, &mut self.output)
1041                            .expect("epoch ended")
1042                    } else {
1043                        None
1044                    }
1045                }
1046            }
1047        } else {
1048            None
1049        };
1050
1051        if randomness_round.is_some() {
1052            assert!(!dkg_failed); // invariant check
1053        }
1054
1055        self.randomness_round = randomness_round;
1056        self.dkg_failed = dkg_failed;
1057
1058        randomness_manager
1059    }
1060}
1061
1062impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
1063    /// Called during startup to allow us to observe commits we previously processed, for crash recovery.
1064    /// Any state computed here must be a pure function of the commits observed, it cannot depend on any
1065    /// state recorded in the epoch db.
1066    fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
1067        assert!(
1068            self.epoch_store
1069                .protocol_config()
1070                .record_additional_state_digest_in_prologue()
1071        );
1072        let protocol_config = self.epoch_store.protocol_config();
1073        let epoch_start_time = self
1074            .epoch_store
1075            .epoch_start_config()
1076            .epoch_start_timestamp_ms();
1077
1078        self.additional_consensus_state.observe_commit(
1079            protocol_config,
1080            epoch_start_time,
1081            &consensus_commit,
1082        );
1083    }
1084
1085    #[cfg(test)]
1086    pub(crate) async fn handle_consensus_commit_for_test(
1087        &mut self,
1088        consensus_commit: impl ConsensusCommitAPI,
1089    ) {
1090        self.handle_consensus_commit(consensus_commit).await;
1091    }
1092
1093    #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))]
1094    pub(crate) async fn handle_consensus_commit(
1095        &mut self,
1096        consensus_commit: impl ConsensusCommitAPI,
1097    ) {
1098        {
1099            let protocol_config = self.epoch_store.protocol_config();
1100
1101            // Assert all protocol config settings for which we don't support old behavior.
1102            assert!(protocol_config.ignore_execution_time_observations_after_certs_closed());
1103            assert!(protocol_config.record_time_estimate_processed());
1104            assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints());
1105            assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest());
1106            assert!(protocol_config.authority_capabilities_v2());
1107            assert!(protocol_config.cancel_for_failed_dkg_early());
1108        }
1109
1110        // This may block until one of two conditions happens:
1111        // - Number of uncommitted transactions in the writeback cache goes below the
1112        //   backpressure threshold.
1113        // - The highest executed checkpoint catches up to the highest certified checkpoint.
1114        self.backpressure_subscriber.await_no_backpressure().await;
1115
1116        let epoch = self.epoch_store.epoch();
1117
1118        let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit");
1119
1120        let last_committed_round = self.last_consensus_stats.index.last_committed_round;
1121
1122        self.epoch_store
1123            .consensus_tx_status_cache
1124            .update_last_committed_leader_round(last_committed_round as u32);
1125        self.epoch_store
1126            .tx_reject_reason_cache
1127            .set_last_committed_leader_round(last_committed_round as u32);
1128
1129        let commit_info = self.additional_consensus_state.observe_commit(
1130            self.epoch_store.protocol_config(),
1131            self.epoch_store
1132                .epoch_start_config()
1133                .epoch_start_timestamp_ms(),
1134            &consensus_commit,
1135        );
1136        assert!(commit_info.round > last_committed_round);
1137
1138        let (timestamp, leader_author, commit_sub_dag_index) =
1139            self.gather_commit_metadata(&consensus_commit);
1140
1141        info!(
1142            %consensus_commit,
1143            "Received consensus output {}. Rejected transactions {}",
1144            consensus_commit.commit_ref(),
1145            consensus_commit.rejected_transactions_debug_string(),
1146        );
1147
1148        self.last_consensus_stats.index = ExecutionIndices {
1149            last_committed_round: commit_info.round,
1150            sub_dag_index: commit_sub_dag_index,
1151            transaction_index: 0_u64,
1152        };
1153
1154        self.metrics
1155            .consensus_committed_subdags
1156            .with_label_values(&[&leader_author.to_string()])
1157            .inc();
1158
1159        let mut state = CommitHandlerState {
1160            output: ConsensusCommitOutput::new(commit_info.round),
1161            dkg_failed: false,
1162            randomness_round: None,
1163            indirect_state_observer: Some(IndirectStateObserver::new()),
1164            initial_reconfig_state: self
1165                .epoch_store
1166                .get_reconfig_state_read_lock_guard()
1167                .clone(),
1168            occurrence_counts: HashMap::new(),
1169        };
1170
1171        let FilteredConsensusOutput {
1172            transactions,
1173            owned_object_locks,
1174        } = self.filter_consensus_txns(
1175            state.initial_reconfig_state.clone(),
1176            &commit_info,
1177            &consensus_commit,
1178        );
1179        // Buffer owned object locks for batch write.
1180        if !owned_object_locks.is_empty() {
1181            state.output.set_owned_object_locks(owned_object_locks);
1182        }
1183        let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions);
1184
1185        let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info);
1186
1187        let CommitHandlerInput {
1188            user_transactions,
1189            capability_notifications,
1190            execution_time_observations,
1191            checkpoint_signature_messages,
1192            randomness_dkg_messages,
1193            randomness_dkg_confirmations,
1194            end_of_publish_transactions,
1195            new_jwks,
1196        } = self.build_commit_handler_input(transactions);
1197
1198        self.process_gasless_transactions(&commit_info, &user_transactions);
1199        self.process_jwks(&mut state, &commit_info, new_jwks);
1200        self.process_capability_notifications(capability_notifications);
1201        self.process_execution_time_observations(&mut state, execution_time_observations);
1202        self.process_checkpoint_signature_messages(checkpoint_signature_messages);
1203
1204        self.process_dkg_updates(
1205            &mut state,
1206            &commit_info,
1207            randomness_manager.as_deref_mut(),
1208            randomness_dkg_messages,
1209            randomness_dkg_confirmations,
1210        )
1211        .await;
1212
1213        let mut execution_time_estimator = self
1214            .epoch_store
1215            .execution_time_estimator
1216            .try_lock()
1217            .expect("should only ever be called from the commit handler thread");
1218
1219        let authenticator_state_update_transaction =
1220            self.create_authenticator_state_update(last_committed_round, &commit_info);
1221
1222        let split_checkpoints = self
1223            .epoch_store
1224            .protocol_config()
1225            .split_checkpoints_in_consensus_handler();
1226
1227        let (lock, final_round, num_schedulables, checkpoint_height) = if !split_checkpoints {
1228            let (schedulables, randomness_schedulables, assigned_versions) = self
1229                .process_transactions(
1230                    &mut state,
1231                    &mut execution_time_estimator,
1232                    &commit_info,
1233                    authenticator_state_update_transaction,
1234                    user_transactions,
1235                );
1236
1237            let (should_accept_tx, lock, final_round) =
1238                self.handle_close_epoch(&mut state, &commit_info, end_of_publish_transactions);
1239
1240            let make_checkpoint = should_accept_tx || final_round;
1241            if !make_checkpoint {
1242                // No need for any further processing
1243                return;
1244            }
1245
1246            // If this is the final round, record execution time observations for storage in the
1247            // end-of-epoch tx.
1248            if final_round {
1249                self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1250            }
1251
1252            let checkpoint_height = self
1253                .epoch_store
1254                .calculate_pending_checkpoint_height(commit_info.round);
1255
1256            self.create_pending_checkpoints(
1257                &mut state,
1258                &commit_info,
1259                &schedulables,
1260                &randomness_schedulables,
1261                final_round,
1262            );
1263
1264            let num_schedulables = schedulables.len();
1265            let mut all_schedulables = schedulables;
1266            all_schedulables.extend(randomness_schedulables);
1267            let assigned_versions = assigned_versions.into_map();
1268            let paired: Vec<_> = all_schedulables
1269                .into_iter()
1270                .map(|s| {
1271                    let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
1272                    (s, versions)
1273                })
1274                .collect();
1275            self.execution_scheduler_sender.send(paired, None);
1276
1277            (lock, final_round, num_schedulables, checkpoint_height)
1278        } else {
1279            let (
1280                transactions_to_schedule,
1281                randomness_transactions_to_schedule,
1282                cancelled_txns,
1283                randomness_state_update_transaction,
1284            ) = self.collect_transactions_to_schedule(
1285                &mut state,
1286                &mut execution_time_estimator,
1287                &commit_info,
1288                user_transactions,
1289            );
1290
1291            let (should_accept_tx, lock, final_round) =
1292                self.handle_close_epoch(&mut state, &commit_info, end_of_publish_transactions);
1293
1294            let make_checkpoint = should_accept_tx || final_round;
1295            if !make_checkpoint {
1296                // No need for any further processing
1297                return;
1298            }
1299
1300            // If this is the final round, record execution time observations for storage in the
1301            // end-of-epoch tx.
1302            if final_round {
1303                self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1304            }
1305
1306            let epoch = self.epoch_store.epoch();
1307            let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1308                .then_some(Schedulable::ConsensusCommitPrologue(
1309                    epoch,
1310                    commit_info.round,
1311                    commit_info.consensus_commit_ref.index,
1312                ));
1313
1314            let schedulables: Vec<_> = itertools::chain!(
1315                consensus_commit_prologue.into_iter(),
1316                authenticator_state_update_transaction
1317                    .into_iter()
1318                    .map(Schedulable::Transaction),
1319                transactions_to_schedule
1320                    .into_iter()
1321                    .map(Schedulable::Transaction),
1322            )
1323            .collect();
1324
1325            let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1326                .into_iter()
1327                .chain(
1328                    randomness_transactions_to_schedule
1329                        .into_iter()
1330                        .map(Schedulable::Transaction),
1331                )
1332                .collect();
1333
1334            let num_schedulables = schedulables.len();
1335            let checkpoint_height = self.create_pending_checkpoints_v2(
1336                &mut state,
1337                &commit_info,
1338                schedulables,
1339                randomness_schedulables,
1340                &cancelled_txns,
1341                final_round,
1342            );
1343
1344            (lock, final_round, num_schedulables, checkpoint_height)
1345        };
1346
1347        let notifications = state.get_notifications();
1348
1349        let mut stats_to_record = self.last_consensus_stats.clone();
1350        stats_to_record.height = checkpoint_height;
1351        if split_checkpoints {
1352            let queue = self.checkpoint_queue.lock().unwrap();
1353            stats_to_record.last_checkpoint_flush_timestamp = queue.last_built_timestamp();
1354            stats_to_record.checkpoint_seq = queue.checkpoint_seq();
1355        }
1356
1357        state.output.record_consensus_commit_stats(stats_to_record);
1358
1359        self.record_deferral_deletion(&mut state);
1360
1361        self.epoch_store
1362            .consensus_quarantine
1363            .write()
1364            .push_consensus_output(state.output, &self.epoch_store)
1365            .expect("push_consensus_output should not fail");
1366
1367        debug!(
1368            ?commit_info.round,
1369            "Notifying checkpoint service about new pending checkpoint(s)",
1370        );
1371        self.checkpoint_service
1372            .notify_checkpoint()
1373            .expect("failed to notify checkpoint service");
1374
1375        if let Some(randomness_round) = state.randomness_round {
1376            randomness_manager
1377                .as_ref()
1378                .expect("randomness manager should exist if randomness round is provided")
1379                .generate_randomness(epoch, randomness_round);
1380        }
1381
1382        self.epoch_store.process_notifications(notifications.iter());
1383
1384        // pass lock by value to ensure that it is held until this point
1385        self.log_final_round(lock, final_round);
1386
1387        // update the calculated throughput
1388        self.throughput_calculator
1389            .add_transactions(timestamp, num_schedulables as u64);
1390
1391        fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
1392            let key = [commit_sub_dag_index, epoch];
1393            if sui_simulator::random::deterministic_probability_once(&key, 0.01) {
1394                sui_simulator::task::kill_current_node(None);
1395            }
1396        });
1397
1398        fail_point!("crash");
1399
1400        self.send_end_of_publish_if_needed().await;
1401    }
1402
1403    fn handle_close_epoch(
1404        &self,
1405        state: &mut CommitHandlerState,
1406        commit_info: &ConsensusCommitInfo,
1407        end_of_publish_transactions: Vec<AuthorityName>,
1408    ) -> (bool, Option<RwLockWriteGuard<'_, ReconfigState>>, bool) {
1409        let timestamp_based_epoch_close = self
1410            .epoch_store
1411            .protocol_config()
1412            .timestamp_based_epoch_close();
1413        let timestamp_triggered = timestamp_based_epoch_close
1414            && commit_info.timestamp >= self.epoch_store.next_reconfiguration_timestamp_ms();
1415        if timestamp_triggered {
1416            // close_user_certs() is only needed here when timestamp_based_epoch_close is enabled.
1417            let reconfig_guard = self.epoch_store.get_reconfig_state_write_lock_guard();
1418            if reconfig_guard.should_accept_user_certs() {
1419                self.epoch_store.close_user_certs(reconfig_guard);
1420            }
1421        }
1422        let collected_eop_quorum =
1423            self.process_end_of_publish_transactions(state, end_of_publish_transactions);
1424        if timestamp_triggered || collected_eop_quorum {
1425            let (lock, final_round) = self.advance_eop_state_machine(state);
1426            (lock.should_accept_tx(), Some(lock), final_round)
1427        } else {
1428            (true, None, false)
1429        }
1430    }
1431
1432    fn record_end_of_epoch_execution_time_observations(
1433        &self,
1434        estimator: &mut ExecutionTimeEstimator,
1435    ) {
1436        self.epoch_store
1437            .end_of_epoch_execution_time_observations
1438            .set(estimator.take_observations())
1439            .expect("`stored_execution_time_observations` should only be set once at end of epoch");
1440    }
1441
1442    fn record_deferral_deletion(&self, state: &mut CommitHandlerState) {
1443        let mut deferred_transactions = self
1444            .epoch_store
1445            .consensus_output_cache
1446            .deferred_transactions
1447            .lock();
1448        for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() {
1449            deferred_transactions.remove(&deleted_deferred_key);
1450        }
1451    }
1452
1453    fn log_final_round(&self, lock: Option<RwLockWriteGuard<ReconfigState>>, final_round: bool) {
1454        if final_round {
1455            let epoch = self.epoch_store.epoch();
1456            info!(
1457                ?epoch,
1458                lock=?lock.as_ref(),
1459                final_round=?final_round,
1460                "Notified last checkpoint"
1461            );
1462            self.epoch_store.record_end_of_message_quorum_time_metric();
1463        }
1464    }
1465
1466    fn create_pending_checkpoints(
1467        &self,
1468        state: &mut CommitHandlerState,
1469        commit_info: &ConsensusCommitInfo,
1470        schedulables: &[Schedulable],
1471        randomness_schedulables: &[Schedulable],
1472        final_round: bool,
1473    ) {
1474        assert!(
1475            !self
1476                .epoch_store
1477                .protocol_config()
1478                .split_checkpoints_in_consensus_handler()
1479        );
1480
1481        let checkpoint_height = self
1482            .epoch_store
1483            .calculate_pending_checkpoint_height(commit_info.round);
1484
1485        // Determine whether to write pending checkpoint for user tx with randomness.
1486        // - If randomness is not generated for this commit, we will skip the
1487        //   checkpoint with the associated height. Therefore checkpoint heights may
1488        //   not be contiguous.
1489        // - Exception: if DKG fails, we always need to write out a PendingCheckpoint
1490        //   for randomness tx that are canceled.
1491        let should_write_random_checkpoint = state.randomness_round.is_some()
1492            || (state.dkg_failed && !randomness_schedulables.is_empty());
1493
1494        let pending_checkpoint = PendingCheckpoint {
1495            roots: schedulables.iter().map(|s| s.key()).collect(),
1496            details: PendingCheckpointInfo {
1497                timestamp_ms: commit_info.timestamp,
1498                last_of_epoch: final_round && !should_write_random_checkpoint,
1499                checkpoint_height,
1500                consensus_commit_ref: commit_info.consensus_commit_ref,
1501                rejected_transactions_digest: commit_info.rejected_transactions_digest,
1502                checkpoint_seq: None,
1503            },
1504        };
1505        self.epoch_store
1506            .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1507            .expect("failed to write pending checkpoint");
1508
1509        info!(
1510            "Written pending checkpoint: {:?}",
1511            pending_checkpoint.details,
1512        );
1513
1514        if should_write_random_checkpoint {
1515            let pending_checkpoint = PendingCheckpoint {
1516                roots: randomness_schedulables.iter().map(|s| s.key()).collect(),
1517                details: PendingCheckpointInfo {
1518                    timestamp_ms: commit_info.timestamp,
1519                    last_of_epoch: final_round,
1520                    checkpoint_height: checkpoint_height + 1,
1521                    consensus_commit_ref: commit_info.consensus_commit_ref,
1522                    rejected_transactions_digest: commit_info.rejected_transactions_digest,
1523                    checkpoint_seq: None,
1524                },
1525            };
1526            self.epoch_store
1527                .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1528                .expect("failed to write pending checkpoint");
1529        }
1530    }
1531
1532    #[allow(clippy::type_complexity)]
1533    fn collect_transactions_to_schedule(
1534        &self,
1535        state: &mut CommitHandlerState,
1536        execution_time_estimator: &mut ExecutionTimeEstimator,
1537        commit_info: &ConsensusCommitInfo,
1538        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1539    ) -> (
1540        Vec<VerifiedExecutableTransactionWithAliases>,
1541        Vec<VerifiedExecutableTransactionWithAliases>,
1542        BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1543        Option<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1544    ) {
1545        let protocol_config = self.epoch_store.protocol_config();
1546        let epoch = self.epoch_store.epoch();
1547
1548        let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) =
1549            self.merge_and_reorder_transactions(state, commit_info, user_transactions);
1550
1551        let mut shared_object_congestion_tracker =
1552            self.init_congestion_tracker(commit_info, false, &ordered_txns);
1553        let mut shared_object_using_randomness_congestion_tracker =
1554            self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns);
1555
1556        let randomness_state_update_transaction = state
1557            .randomness_round
1558            .map(|round| Schedulable::RandomnessStateUpdate(epoch, round));
1559        debug!(
1560            "Randomness state update transaction: {:?}",
1561            randomness_state_update_transaction
1562                .as_ref()
1563                .map(|t| t.key())
1564        );
1565
1566        let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len());
1567        let mut randomness_transactions_to_schedule =
1568            Vec::with_capacity(ordered_randomness_txns.len());
1569        let mut deferred_txns = BTreeMap::new();
1570        let mut cancelled_txns = BTreeMap::new();
1571
1572        for transaction in ordered_txns {
1573            self.handle_deferral_and_cancellation(
1574                state,
1575                &mut cancelled_txns,
1576                &mut deferred_txns,
1577                &mut transactions_to_schedule,
1578                protocol_config,
1579                commit_info,
1580                transaction,
1581                &mut shared_object_congestion_tracker,
1582                &previously_deferred_tx_digests,
1583                execution_time_estimator,
1584            );
1585        }
1586
1587        for transaction in ordered_randomness_txns {
1588            if state.dkg_failed {
1589                debug!(
1590                    "Canceling randomness-using transaction {:?} because DKG failed",
1591                    transaction.tx().digest(),
1592                );
1593                cancelled_txns.insert(
1594                    *transaction.tx().digest(),
1595                    CancelConsensusCertificateReason::DkgFailed,
1596                );
1597                randomness_transactions_to_schedule.push(transaction);
1598                continue;
1599            }
1600            self.handle_deferral_and_cancellation(
1601                state,
1602                &mut cancelled_txns,
1603                &mut deferred_txns,
1604                &mut randomness_transactions_to_schedule,
1605                protocol_config,
1606                commit_info,
1607                transaction,
1608                &mut shared_object_using_randomness_congestion_tracker,
1609                &previously_deferred_tx_digests,
1610                execution_time_estimator,
1611            );
1612        }
1613
1614        let mut total_deferred_txns = 0;
1615        {
1616            let mut deferred_transactions = self
1617                .epoch_store
1618                .consensus_output_cache
1619                .deferred_transactions
1620                .lock();
1621            for (key, txns) in deferred_txns.into_iter() {
1622                total_deferred_txns += txns.len();
1623                deferred_transactions.insert(key, txns.clone());
1624                state.output.defer_transactions(key, txns);
1625            }
1626        }
1627
1628        self.metrics
1629            .consensus_handler_deferred_transactions
1630            .inc_by(total_deferred_txns as u64);
1631        self.metrics
1632            .consensus_handler_cancelled_transactions
1633            .inc_by(cancelled_txns.len() as u64);
1634        self.metrics
1635            .consensus_handler_max_object_costs
1636            .with_label_values(&["regular_commit"])
1637            .set(shared_object_congestion_tracker.max_cost() as i64);
1638        self.metrics
1639            .consensus_handler_max_object_costs
1640            .with_label_values(&["randomness_commit"])
1641            .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
1642
1643        let congestion_commit_data = shared_object_congestion_tracker.finish_commit(commit_info);
1644        let randomness_congestion_commit_data =
1645            shared_object_using_randomness_congestion_tracker.finish_commit(commit_info);
1646
1647        if let Some(logger) = &self.congestion_logger {
1648            let epoch = self.epoch_store.epoch();
1649            let mut logger = logger.lock().unwrap();
1650            logger.write_commit_log(epoch, commit_info, false, &congestion_commit_data);
1651            logger.write_commit_log(epoch, commit_info, true, &randomness_congestion_commit_data);
1652        }
1653
1654        if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get()
1655            && let Err(e) = tx_object_debts.try_send(
1656                congestion_commit_data
1657                    .accumulated_debts
1658                    .iter()
1659                    .chain(randomness_congestion_commit_data.accumulated_debts.iter())
1660                    .map(|(id, _)| *id)
1661                    .collect(),
1662            )
1663        {
1664            info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
1665        }
1666
1667        state
1668            .output
1669            .set_congestion_control_object_debts(congestion_commit_data.accumulated_debts);
1670        state.output.set_congestion_control_randomness_object_debts(
1671            randomness_congestion_commit_data.accumulated_debts,
1672        );
1673
1674        (
1675            transactions_to_schedule,
1676            randomness_transactions_to_schedule,
1677            cancelled_txns,
1678            randomness_state_update_transaction,
1679        )
1680    }
1681
1682    fn process_transactions(
1683        &self,
1684        state: &mut CommitHandlerState,
1685        execution_time_estimator: &mut ExecutionTimeEstimator,
1686        commit_info: &ConsensusCommitInfo,
1687        authenticator_state_update_transaction: Option<VerifiedExecutableTransactionWithAliases>,
1688        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1689    ) -> (Vec<Schedulable>, Vec<Schedulable>, AssignedTxAndVersions) {
1690        let protocol_config = self.epoch_store.protocol_config();
1691        assert!(!protocol_config.split_checkpoints_in_consensus_handler());
1692        let epoch = self.epoch_store.epoch();
1693
1694        let (
1695            transactions_to_schedule,
1696            randomness_transactions_to_schedule,
1697            cancelled_txns,
1698            randomness_state_update_transaction,
1699        ) = self.collect_transactions_to_schedule(
1700            state,
1701            execution_time_estimator,
1702            commit_info,
1703            user_transactions,
1704        );
1705
1706        let mut settlement = None;
1707        let mut randomness_settlement = None;
1708        if self.epoch_store.accumulators_enabled() {
1709            let checkpoint_height = self
1710                .epoch_store
1711                .calculate_pending_checkpoint_height(commit_info.round);
1712
1713            settlement = Some(Schedulable::AccumulatorSettlement(epoch, checkpoint_height));
1714
1715            if state.randomness_round.is_some() || !randomness_transactions_to_schedule.is_empty() {
1716                randomness_settlement = Some(Schedulable::AccumulatorSettlement(
1717                    epoch,
1718                    checkpoint_height + 1,
1719                ));
1720            }
1721        }
1722
1723        let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1724            .then_some(Schedulable::ConsensusCommitPrologue(
1725                epoch,
1726                commit_info.round,
1727                commit_info.consensus_commit_ref.index,
1728            ));
1729
1730        let schedulables: Vec<_> = itertools::chain!(
1731            consensus_commit_prologue.into_iter(),
1732            authenticator_state_update_transaction
1733                .into_iter()
1734                .map(Schedulable::Transaction),
1735            transactions_to_schedule
1736                .into_iter()
1737                .map(Schedulable::Transaction),
1738            settlement,
1739        )
1740        .collect();
1741
1742        let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1743            .into_iter()
1744            .chain(
1745                randomness_transactions_to_schedule
1746                    .into_iter()
1747                    .map(Schedulable::Transaction),
1748            )
1749            .chain(randomness_settlement)
1750            .collect();
1751
1752        let assigned_versions = self
1753            .epoch_store
1754            .process_consensus_transaction_shared_object_versions(
1755                self.cache_reader.as_ref(),
1756                schedulables.iter(),
1757                randomness_schedulables.iter(),
1758                &cancelled_txns,
1759                &mut state.output,
1760            )
1761            .expect("failed to assign shared object versions");
1762
1763        let consensus_commit_prologue =
1764            self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1765
1766        let mut schedulables = schedulables;
1767        let mut assigned_versions = assigned_versions;
1768        if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1769            assert!(matches!(
1770                schedulables[0],
1771                Schedulable::ConsensusCommitPrologue(..)
1772            ));
1773            assert!(matches!(
1774                assigned_versions.0[0].0,
1775                TransactionKey::ConsensusCommitPrologue(..)
1776            ));
1777            assigned_versions.0[0].0 =
1778                TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1779            schedulables[0] = Schedulable::Transaction(consensus_commit_prologue);
1780        }
1781
1782        self.epoch_store
1783            .process_user_signatures(schedulables.iter().chain(randomness_schedulables.iter()));
1784
1785        // After this point we can throw away alias version information.
1786        let schedulables: Vec<Schedulable> = schedulables.into_iter().map(|s| s.into()).collect();
1787        let randomness_schedulables: Vec<Schedulable> = randomness_schedulables
1788            .into_iter()
1789            .map(|s| s.into())
1790            .collect();
1791
1792        (schedulables, randomness_schedulables, assigned_versions)
1793    }
1794
1795    #[allow(clippy::type_complexity)]
1796    fn create_pending_checkpoints_v2(
1797        &self,
1798        state: &mut CommitHandlerState,
1799        commit_info: &ConsensusCommitInfo,
1800        schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1801        randomness_schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1802        cancelled_txns: &BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1803        final_round: bool,
1804    ) -> CheckpointHeight {
1805        let protocol_config = self.epoch_store.protocol_config();
1806        assert!(protocol_config.split_checkpoints_in_consensus_handler());
1807
1808        let epoch = self.epoch_store.epoch();
1809        let accumulators_enabled = self.epoch_store.accumulators_enabled();
1810        let max_transactions_per_checkpoint =
1811            protocol_config.max_transactions_per_checkpoint() as usize;
1812
1813        let should_write_random_checkpoint = state.randomness_round.is_some()
1814            || (state.dkg_failed && !randomness_schedulables.is_empty());
1815
1816        let mut checkpoint_queue = self.checkpoint_queue.lock().unwrap();
1817
1818        let build_chunks =
1819            |schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1820             queue: &mut CheckpointQueue|
1821             -> Vec<Chunk<VerifiedExecutableTransactionWithAliases>> {
1822                schedulables
1823                    .chunks(max_transactions_per_checkpoint)
1824                    .map(|chunk| {
1825                        let height = queue.next_height();
1826                        let schedulables: Vec<_> = chunk.to_vec();
1827                        let settlement = if accumulators_enabled {
1828                            Some(Schedulable::AccumulatorSettlement(epoch, height))
1829                        } else {
1830                            None
1831                        };
1832                        Chunk {
1833                            schedulables,
1834                            settlement,
1835                            height,
1836                        }
1837                    })
1838                    .collect()
1839            };
1840
1841        let num_schedulables = schedulables.len();
1842        let chunked_schedulables = build_chunks(schedulables, &mut checkpoint_queue);
1843        if chunked_schedulables.len() > 1 {
1844            info!(
1845                "Splitting transactions into {} checkpoint chunks (num_schedulables={}, max_tx={})",
1846                chunked_schedulables.len(),
1847                num_schedulables,
1848                max_transactions_per_checkpoint
1849            );
1850            assert_reachable!("checkpoint split due to transaction limit");
1851        }
1852        let chunked_randomness_schedulables = if should_write_random_checkpoint {
1853            build_chunks(randomness_schedulables, &mut checkpoint_queue)
1854        } else {
1855            vec![]
1856        };
1857
1858        let schedulables_for_version_assignment =
1859            Chunk::all_schedulables_from(&chunked_schedulables);
1860        let randomness_schedulables_for_version_assignment =
1861            Chunk::all_schedulables_from(&chunked_randomness_schedulables);
1862
1863        let assigned_versions = self
1864            .epoch_store
1865            .process_consensus_transaction_shared_object_versions(
1866                self.cache_reader.as_ref(),
1867                schedulables_for_version_assignment,
1868                randomness_schedulables_for_version_assignment,
1869                cancelled_txns,
1870                &mut state.output,
1871            )
1872            .expect("failed to assign shared object versions");
1873
1874        let consensus_commit_prologue =
1875            self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1876
1877        let mut chunked_schedulables = chunked_schedulables;
1878        let mut assigned_versions = assigned_versions;
1879        if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1880            assert!(matches!(
1881                chunked_schedulables[0].schedulables[0],
1882                Schedulable::ConsensusCommitPrologue(..)
1883            ));
1884            assert!(matches!(
1885                assigned_versions.0[0].0,
1886                TransactionKey::ConsensusCommitPrologue(..)
1887            ));
1888            assigned_versions.0[0].0 =
1889                TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1890            chunked_schedulables[0].schedulables[0] =
1891                Schedulable::Transaction(consensus_commit_prologue);
1892        }
1893
1894        let assigned_versions = assigned_versions.into_map();
1895
1896        self.epoch_store.process_user_signatures(
1897            chunked_schedulables
1898                .iter()
1899                .flat_map(|c| c.all_schedulables())
1900                .chain(
1901                    chunked_randomness_schedulables
1902                        .iter()
1903                        .flat_map(|c| c.all_schedulables()),
1904                ),
1905        );
1906
1907        let commit_height = chunked_randomness_schedulables
1908            .last()
1909            .or(chunked_schedulables.last())
1910            .map(|c| c.height)
1911            .expect("at least one checkpoint root must be created per commit");
1912
1913        let mut pending_checkpoints = Vec::new();
1914        for chunk in chunked_schedulables {
1915            pending_checkpoints.extend(checkpoint_queue.push_chunk(
1916                chunk.into(),
1917                &assigned_versions,
1918                commit_info.timestamp,
1919                commit_info.consensus_commit_ref,
1920                commit_info.rejected_transactions_digest,
1921            ));
1922        }
1923
1924        if protocol_config.merge_randomness_into_checkpoint() {
1925            // We don't want to block checkpoint formation for non-randomness schedulables
1926            // on randomness state update. Therefore, we include randomness chunks in the
1927            // subsequent checkpoint. First we flush the queue, then enqueue randomness
1928            // for merging into the subsequent commit.
1929            pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, final_round));
1930
1931            if should_write_random_checkpoint {
1932                for chunk in chunked_randomness_schedulables {
1933                    pending_checkpoints.extend(checkpoint_queue.push_chunk(
1934                        chunk.into(),
1935                        &assigned_versions,
1936                        commit_info.timestamp,
1937                        commit_info.consensus_commit_ref,
1938                        commit_info.rejected_transactions_digest,
1939                    ));
1940                }
1941                if final_round {
1942                    pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1943                }
1944            }
1945        } else {
1946            let force = final_round || should_write_random_checkpoint;
1947            pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, force));
1948
1949            if should_write_random_checkpoint {
1950                for chunk in chunked_randomness_schedulables {
1951                    pending_checkpoints.extend(checkpoint_queue.push_chunk(
1952                        chunk.into(),
1953                        &assigned_versions,
1954                        commit_info.timestamp,
1955                        commit_info.consensus_commit_ref,
1956                        commit_info.rejected_transactions_digest,
1957                    ));
1958                }
1959                pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1960            }
1961        }
1962
1963        if final_round && let Some(last) = pending_checkpoints.last_mut() {
1964            last.details.last_of_epoch = true;
1965        }
1966
1967        let queue_drained = checkpoint_queue.is_empty();
1968        drop(checkpoint_queue);
1969
1970        for pending_checkpoint in pending_checkpoints {
1971            debug!(
1972                checkpoint_height = pending_checkpoint.details.checkpoint_height,
1973                roots_count = pending_checkpoint.num_roots(),
1974                "Writing pending checkpoint",
1975            );
1976            self.epoch_store
1977                .write_pending_checkpoint_v2(&mut state.output, &pending_checkpoint)
1978                .expect("failed to write pending checkpoint");
1979        }
1980
1981        state.output.set_checkpoint_queue_drained(queue_drained);
1982
1983        commit_height
1984    }
1985
1986    // Adds the consensus commit prologue transaction to the beginning of input `transactions` to update
1987    // the system clock used in all transactions in the current consensus commit.
1988    // Returns the root of the consensus commit prologue transaction if it was added to the input.
1989    fn add_consensus_commit_prologue_transaction<'a>(
1990        &'a self,
1991        state: &'a mut CommitHandlerState,
1992        commit_info: &'a ConsensusCommitInfo,
1993        assigned_versions: &AssignedTxAndVersions,
1994    ) -> Option<VerifiedExecutableTransactionWithAliases> {
1995        {
1996            if commit_info.skip_consensus_commit_prologue_in_test {
1997                return None;
1998            }
1999        }
2000
2001        let mut cancelled_txn_version_assignment = Vec::new();
2002
2003        let protocol_config = self.epoch_store.protocol_config();
2004
2005        for (txn_key, assigned_versions) in assigned_versions.0.iter() {
2006            let Some(d) = txn_key.as_digest() else {
2007                continue;
2008            };
2009
2010            if !protocol_config.include_cancelled_randomness_txns_in_prologue()
2011                && assigned_versions
2012                    .shared_object_versions
2013                    .iter()
2014                    .any(|((id, _), _)| *id == SUI_RANDOMNESS_STATE_OBJECT_ID)
2015            {
2016                continue;
2017            }
2018
2019            if assigned_versions
2020                .shared_object_versions
2021                .iter()
2022                .any(|(_, version)| version.is_cancelled())
2023            {
2024                assert_reachable!("cancelled transactions");
2025                cancelled_txn_version_assignment
2026                    .push((*d, assigned_versions.shared_object_versions.clone()));
2027            }
2028        }
2029
2030        fail_point_arg!(
2031            "additional_cancelled_txns_for_tests",
2032            |additional_cancelled_txns: Vec<(
2033                TransactionDigest,
2034                Vec<(ConsensusObjectSequenceKey, SequenceNumber)>
2035            )>| {
2036                cancelled_txn_version_assignment.extend(additional_cancelled_txns);
2037            }
2038        );
2039
2040        let transaction = commit_info.create_consensus_commit_prologue_transaction(
2041            self.epoch_store.epoch(),
2042            self.epoch_store.protocol_config(),
2043            cancelled_txn_version_assignment,
2044            commit_info,
2045            state.indirect_state_observer.take().unwrap(),
2046        );
2047        Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2048            transaction,
2049        ))
2050    }
2051
2052    fn handle_deferral_and_cancellation(
2053        &self,
2054        state: &mut CommitHandlerState,
2055        cancelled_txns: &mut BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
2056        deferred_txns: &mut BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>,
2057        scheduled_txns: &mut Vec<VerifiedExecutableTransactionWithAliases>,
2058        protocol_config: &ProtocolConfig,
2059        commit_info: &ConsensusCommitInfo,
2060        transaction: VerifiedExecutableTransactionWithAliases,
2061        shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
2062        previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
2063        execution_time_estimator: &ExecutionTimeEstimator,
2064    ) {
2065        // Check for unpaid amplification before other deferral checks.
2066        // SIP-45: Paid amplification allows (gas_price / RGP + 1) submissions.
2067        // Transactions with more duplicates than paid for are deferred.
2068        if protocol_config.defer_unpaid_amplification() {
2069            let occurrence_count = state
2070                .occurrence_counts
2071                .get(transaction.tx().digest())
2072                .copied()
2073                .unwrap_or(0);
2074
2075            let rgp = self.epoch_store.reference_gas_price();
2076            let gas_price = transaction.tx().transaction_data().gas_price();
2077            let allowed_count = (gas_price / rgp.max(1)) + 1;
2078
2079            if occurrence_count as u64 > allowed_count {
2080                self.metrics
2081                    .consensus_handler_unpaid_amplification_deferrals
2082                    .inc();
2083
2084                let deferred_from_round = previously_deferred_tx_digests
2085                    .get(transaction.tx().digest())
2086                    .map(|k| k.deferred_from_round())
2087                    .unwrap_or(commit_info.round);
2088
2089                let deferral_key = DeferralKey::new_for_consensus_round(
2090                    commit_info.round + 1,
2091                    deferred_from_round,
2092                );
2093
2094                if transaction_deferral_within_limit(
2095                    &deferral_key,
2096                    protocol_config.max_deferral_rounds_for_congestion_control(),
2097                ) {
2098                    assert_reachable!("unpaid amplification deferral");
2099                    debug!(
2100                        "Deferring transaction {:?} due to unpaid amplification (count={}, allowed={})",
2101                        transaction.tx().digest(),
2102                        occurrence_count,
2103                        allowed_count
2104                    );
2105                    deferred_txns
2106                        .entry(deferral_key)
2107                        .or_default()
2108                        .push(transaction);
2109                    return;
2110                }
2111            }
2112        }
2113
2114        let tx_cost = shared_object_congestion_tracker.get_tx_cost(
2115            execution_time_estimator,
2116            transaction.tx(),
2117            state.indirect_state_observer.as_mut().unwrap(),
2118        );
2119
2120        let deferral_info = self.epoch_store.should_defer(
2121            transaction.tx(),
2122            commit_info,
2123            state.dkg_failed,
2124            state.randomness_round.is_some(),
2125            previously_deferred_tx_digests,
2126            shared_object_congestion_tracker,
2127        );
2128
2129        if let Some((deferral_key, deferral_reason)) = deferral_info {
2130            debug!(
2131                "Deferring consensus certificate for transaction {:?} until {:?}",
2132                transaction.tx().digest(),
2133                deferral_key
2134            );
2135
2136            match deferral_reason {
2137                DeferralReason::RandomnessNotReady => {
2138                    deferred_txns
2139                        .entry(deferral_key)
2140                        .or_default()
2141                        .push(transaction);
2142                }
2143                DeferralReason::SharedObjectCongestion(congested_objects) => {
2144                    self.metrics.consensus_handler_congested_transactions.inc();
2145                    if transaction_deferral_within_limit(
2146                        &deferral_key,
2147                        protocol_config.max_deferral_rounds_for_congestion_control(),
2148                    ) {
2149                        deferred_txns
2150                            .entry(deferral_key)
2151                            .or_default()
2152                            .push(transaction);
2153                    } else {
2154                        assert_sometimes!(
2155                            transaction.tx().data().transaction_data().uses_randomness(),
2156                            "cancelled randomness-using transaction"
2157                        );
2158                        assert_sometimes!(
2159                            !transaction.tx().data().transaction_data().uses_randomness(),
2160                            "cancelled non-randomness-using transaction"
2161                        );
2162
2163                        // Cancel the transaction that has been deferred for too long.
2164                        debug!(
2165                            "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}",
2166                            transaction.tx().digest(),
2167                            deferral_key,
2168                            congested_objects
2169                        );
2170                        cancelled_txns.insert(
2171                            *transaction.tx().digest(),
2172                            CancelConsensusCertificateReason::CongestionOnObjects(
2173                                congested_objects,
2174                            ),
2175                        );
2176                        scheduled_txns.push(transaction);
2177                    }
2178                }
2179            }
2180        } else {
2181            // Update object execution cost for all scheduled transactions
2182            shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, transaction.tx());
2183            scheduled_txns.push(transaction);
2184        }
2185    }
2186
2187    fn merge_and_reorder_transactions(
2188        &self,
2189        state: &mut CommitHandlerState,
2190        commit_info: &ConsensusCommitInfo,
2191        user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
2192    ) -> (
2193        Vec<VerifiedExecutableTransactionWithAliases>,
2194        Vec<VerifiedExecutableTransactionWithAliases>,
2195        HashMap<TransactionDigest, DeferralKey>,
2196    ) {
2197        let protocol_config = self.epoch_store.protocol_config();
2198
2199        let (mut txns, mut randomness_txns, previously_deferred_tx_digests) =
2200            self.load_deferred_transactions(state, commit_info);
2201
2202        txns.reserve(user_transactions.len());
2203        randomness_txns.reserve(user_transactions.len());
2204
2205        // There may be randomness transactions in `txns`, which were deferred due to congestion.
2206        // They must be placed back into `randomness_txns`.
2207        let mut txns: Vec<_> = txns
2208            .into_iter()
2209            .filter_map(|tx| {
2210                if tx.tx().transaction_data().uses_randomness() {
2211                    randomness_txns.push(tx);
2212                    None
2213                } else {
2214                    Some(tx)
2215                }
2216            })
2217            .collect();
2218
2219        for txn in user_transactions {
2220            if txn.tx().transaction_data().uses_randomness() {
2221                randomness_txns.push(txn);
2222            } else {
2223                txns.push(txn);
2224            }
2225        }
2226
2227        PostConsensusTxReorder::reorder(
2228            &mut txns,
2229            protocol_config.consensus_transaction_ordering(),
2230        );
2231        PostConsensusTxReorder::reorder(
2232            &mut randomness_txns,
2233            protocol_config.consensus_transaction_ordering(),
2234        );
2235
2236        (txns, randomness_txns, previously_deferred_tx_digests)
2237    }
2238
2239    fn load_deferred_transactions(
2240        &self,
2241        state: &mut CommitHandlerState,
2242        commit_info: &ConsensusCommitInfo,
2243    ) -> (
2244        Vec<VerifiedExecutableTransactionWithAliases>,
2245        Vec<VerifiedExecutableTransactionWithAliases>,
2246        HashMap<TransactionDigest, DeferralKey>,
2247    ) {
2248        let mut previously_deferred_tx_digests = HashMap::new();
2249
2250        let deferred_txs: Vec<_> = self
2251            .epoch_store
2252            .load_deferred_transactions_for_up_to_consensus_round_v2(
2253                &mut state.output,
2254                commit_info.round,
2255            )
2256            .expect("db error")
2257            .into_iter()
2258            .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2259            .map(|(key, tx)| {
2260                previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2261                tx
2262            })
2263            .collect();
2264        trace!(
2265            "loading deferred transactions: {:?}",
2266            deferred_txs.iter().map(|tx| tx.tx().digest())
2267        );
2268
2269        let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() {
2270            let txns: Vec<_> = self
2271                .epoch_store
2272                .load_deferred_transactions_for_randomness_v2(&mut state.output)
2273                .expect("db error")
2274                .into_iter()
2275                .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2276                .map(|(key, tx)| {
2277                    previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2278                    tx
2279                })
2280                .collect();
2281            trace!(
2282                "loading deferred randomness transactions: {:?}",
2283                txns.iter().map(|tx| tx.tx().digest())
2284            );
2285            txns
2286        } else {
2287            vec![]
2288        };
2289
2290        (
2291            deferred_txs,
2292            deferred_randomness_txs,
2293            previously_deferred_tx_digests,
2294        )
2295    }
2296
2297    fn init_congestion_tracker(
2298        &self,
2299        commit_info: &ConsensusCommitInfo,
2300        for_randomness: bool,
2301        txns: &[VerifiedExecutableTransactionWithAliases],
2302    ) -> SharedObjectCongestionTracker {
2303        #[allow(unused_mut)]
2304        let mut ret = SharedObjectCongestionTracker::from_protocol_config(
2305            self.epoch_store
2306                .consensus_quarantine
2307                .read()
2308                .load_initial_object_debts(
2309                    &self.epoch_store,
2310                    commit_info.round,
2311                    for_randomness,
2312                    txns,
2313                )
2314                .expect("db error"),
2315            self.epoch_store.protocol_config(),
2316            for_randomness,
2317            self.congestion_logger.is_some(),
2318        );
2319
2320        fail_point_arg!(
2321            "initial_congestion_tracker",
2322            |tracker: SharedObjectCongestionTracker| {
2323                info!(
2324                    "Initialize shared_object_congestion_tracker to  {:?}",
2325                    tracker
2326                );
2327                ret = tracker;
2328            }
2329        );
2330
2331        ret
2332    }
2333
2334    fn process_gasless_transactions(
2335        &self,
2336        commit_info: &ConsensusCommitInfo,
2337        user_transactions: &[VerifiedExecutableTransactionWithAliases],
2338    ) {
2339        let gasless_count = user_transactions
2340            .iter()
2341            .filter(|txn| txn.tx().transaction_data().is_gasless_transaction())
2342            .count() as u64;
2343        self.consensus_gasless_counter
2344            .record_commit(commit_info.timestamp, gasless_count);
2345    }
2346
2347    fn process_jwks(
2348        &self,
2349        state: &mut CommitHandlerState,
2350        commit_info: &ConsensusCommitInfo,
2351        new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
2352    ) {
2353        for (authority_name, jwk_id, jwk) in new_jwks {
2354            self.epoch_store.record_jwk_vote(
2355                &mut state.output,
2356                commit_info.round,
2357                authority_name,
2358                &jwk_id,
2359                &jwk,
2360            );
2361        }
2362    }
2363
2364    fn process_capability_notifications(
2365        &self,
2366        capability_notifications: Vec<AuthorityCapabilitiesV2>,
2367    ) {
2368        for capabilities in capability_notifications {
2369            self.epoch_store
2370                .record_capabilities_v2(&capabilities)
2371                .expect("db error");
2372        }
2373    }
2374
2375    fn process_execution_time_observations(
2376        &self,
2377        state: &mut CommitHandlerState,
2378        execution_time_observations: Vec<ExecutionTimeObservation>,
2379    ) {
2380        let mut execution_time_estimator = self
2381            .epoch_store
2382            .execution_time_estimator
2383            .try_lock()
2384            .expect("should only ever be called from the commit handler thread");
2385
2386        for ExecutionTimeObservation {
2387            authority,
2388            generation,
2389            estimates,
2390        } in execution_time_observations
2391        {
2392            let authority_index = self
2393                .epoch_store
2394                .committee()
2395                .authority_index(&authority)
2396                .unwrap();
2397            execution_time_estimator.process_observations_from_consensus(
2398                authority_index,
2399                Some(generation),
2400                &estimates,
2401            );
2402            state
2403                .output
2404                .insert_execution_time_observation(authority_index, generation, estimates);
2405        }
2406    }
2407
2408    fn process_checkpoint_signature_messages(
2409        &self,
2410        checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
2411    ) {
2412        for checkpoint_signature_message in checkpoint_signature_messages {
2413            self.checkpoint_service
2414                .notify_checkpoint_signature(&checkpoint_signature_message)
2415                .expect("db error");
2416        }
2417    }
2418
2419    async fn process_dkg_updates(
2420        &self,
2421        state: &mut CommitHandlerState,
2422        commit_info: &ConsensusCommitInfo,
2423        randomness_manager: Option<&mut RandomnessManager>,
2424        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2425        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2426    ) {
2427        if !self.epoch_store.randomness_state_enabled() {
2428            let num_dkg_messages = randomness_dkg_messages.len();
2429            let num_dkg_confirmations = randomness_dkg_confirmations.len();
2430            if num_dkg_messages + num_dkg_confirmations > 0 {
2431                debug_fatal!(
2432                    "received {} RandomnessDkgMessage and {} RandomnessDkgConfirmation messages when randomness is not enabled",
2433                    num_dkg_messages,
2434                    num_dkg_confirmations
2435                );
2436            }
2437            return;
2438        }
2439
2440        let randomness_manager =
2441            randomness_manager.expect("randomness manager should exist if randomness is enabled");
2442
2443        let randomness_dkg_updates =
2444            self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages);
2445
2446        let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations(
2447            state,
2448            randomness_manager,
2449            randomness_dkg_confirmations,
2450        );
2451
2452        // Keep advancing the DKG state machine until it is resolved, regardless of
2453        // whether new messages/confirmations were processed this commit. Preserve
2454        // the mainnet epoch fallback until the protocol flag is active everywhere.
2455        let always_advance_dkg_to_resolution = (self
2456            .epoch_store
2457            .protocol_config()
2458            .always_advance_dkg_to_resolution()
2459            || (self.epoch_store.get_chain() == Chain::Mainnet
2460                && self.epoch_store.epoch() >= 1143))
2461            && randomness_manager.dkg_status() == DkgStatus::Pending;
2462
2463        if randomness_dkg_updates
2464            || randomness_dkg_confirmation_updates
2465            || always_advance_dkg_to_resolution
2466        {
2467            randomness_manager
2468                .advance_dkg(&mut state.output, commit_info.round)
2469                .await
2470                .expect("epoch ended");
2471        }
2472    }
2473
2474    fn process_randomness_dkg_messages(
2475        &self,
2476        randomness_manager: &mut RandomnessManager,
2477        randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2478    ) -> bool /* randomness state updated */ {
2479        if randomness_dkg_messages.is_empty() {
2480            return false;
2481        }
2482
2483        let mut randomness_state_updated = false;
2484        for (authority, bytes) in randomness_dkg_messages {
2485            match bcs::from_bytes(&bytes) {
2486                Ok(message) => {
2487                    randomness_manager
2488                        .add_message(&authority, message)
2489                        // TODO: make infallible
2490                        .expect("epoch ended");
2491                    randomness_state_updated = true;
2492                }
2493
2494                Err(e) => {
2495                    warn!(
2496                        "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
2497                        authority.concise(),
2498                    );
2499                }
2500            }
2501        }
2502
2503        randomness_state_updated
2504    }
2505
2506    fn process_randomness_dkg_confirmations(
2507        &self,
2508        state: &mut CommitHandlerState,
2509        randomness_manager: &mut RandomnessManager,
2510        randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2511    ) -> bool /* randomness state updated */ {
2512        if randomness_dkg_confirmations.is_empty() {
2513            return false;
2514        }
2515
2516        let mut randomness_state_updated = false;
2517        for (authority, bytes) in randomness_dkg_confirmations {
2518            match bcs::from_bytes(&bytes) {
2519                Ok(message) => {
2520                    randomness_manager
2521                        .add_confirmation(&mut state.output, &authority, message)
2522                        // TODO: make infallible
2523                        .expect("epoch ended");
2524                    randomness_state_updated = true;
2525                }
2526                Err(e) => {
2527                    warn!(
2528                        "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}",
2529                        authority.concise(),
2530                    );
2531                }
2532            }
2533        }
2534
2535        randomness_state_updated
2536    }
2537
2538    /// Returns true if we have collected a quorum of end of publish messages (either in this round or a previous round).
2539    fn process_end_of_publish_transactions(
2540        &self,
2541        state: &mut CommitHandlerState,
2542        end_of_publish_transactions: Vec<AuthorityName>,
2543    ) -> bool {
2544        let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect(
2545            "No contention on end_of_publish as it is only accessed from consensus handler",
2546        );
2547
2548        if eop_aggregator.has_quorum() {
2549            return true;
2550        }
2551
2552        if end_of_publish_transactions.is_empty() {
2553            return false;
2554        }
2555
2556        for authority in end_of_publish_transactions {
2557            info!("Received EndOfPublish from {:?}", authority.concise());
2558
2559            // It is ok to just release lock here as this function is the only place that transition into RejectAllCerts state
2560            // And this function itself is always executed from consensus task
2561            state.output.insert_end_of_publish(authority);
2562            if eop_aggregator
2563                .insert_generic(authority, ())
2564                .is_quorum_reached()
2565            {
2566                debug!(
2567                    "Collected enough end_of_publish messages with last message from validator {:?}",
2568                    authority.concise(),
2569                );
2570                return true;
2571            }
2572        }
2573
2574        false
2575    }
2576
2577    /// After we have collected 2f+1 EndOfPublish messages, we call this function every round until the epoch
2578    /// ends.
2579    fn advance_eop_state_machine(
2580        &self,
2581        state: &mut CommitHandlerState,
2582    ) -> (
2583        RwLockWriteGuard<'_, ReconfigState>,
2584        bool, // true if final round
2585    ) {
2586        let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard();
2587        let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx();
2588
2589        reconfig_state.close_all_certs();
2590
2591        let commit_has_deferred_txns = state.output.has_deferred_transactions();
2592        let previous_commits_have_deferred_txns = !self.epoch_store.deferred_transactions_empty();
2593
2594        if !commit_has_deferred_txns && !previous_commits_have_deferred_txns {
2595            if !start_state_is_reject_all_tx {
2596                info!("Transitioning to RejectAllTx");
2597            }
2598            reconfig_state.close_all_tx();
2599        } else {
2600            debug!(
2601                "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}",
2602                previous_commits_have_deferred_txns, commit_has_deferred_txns,
2603            );
2604        }
2605
2606        state.output.store_reconfig_state(reconfig_state.clone());
2607
2608        if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() {
2609            (reconfig_state, true)
2610        } else {
2611            (reconfig_state, false)
2612        }
2613    }
2614
2615    fn gather_commit_metadata(
2616        &self,
2617        consensus_commit: &impl ConsensusCommitAPI,
2618    ) -> (u64, AuthorityIndex, u64) {
2619        let timestamp = consensus_commit.commit_timestamp_ms();
2620        let leader_author = consensus_commit.leader_author_index();
2621        let commit_sub_dag_index = consensus_commit.commit_sub_dag_index();
2622
2623        let system_time_ms = SystemTime::now()
2624            .duration_since(UNIX_EPOCH)
2625            .unwrap()
2626            .as_millis() as i64;
2627
2628        let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64);
2629        let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0;
2630        self.metrics
2631            .consensus_timestamp_bias
2632            .observe(consensus_timestamp_bias_seconds);
2633
2634        let epoch_start = self
2635            .epoch_store
2636            .epoch_start_config()
2637            .epoch_start_timestamp_ms();
2638        let timestamp = if timestamp < epoch_start {
2639            error!(
2640                "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}"
2641            );
2642            epoch_start
2643        } else {
2644            timestamp
2645        };
2646
2647        (timestamp, leader_author, commit_sub_dag_index)
2648    }
2649
2650    fn create_authenticator_state_update(
2651        &self,
2652        last_committed_round: u64,
2653        commit_info: &ConsensusCommitInfo,
2654    ) -> Option<VerifiedExecutableTransactionWithAliases> {
2655        // Load all jwks that became active in the previous round, and commit them in this round.
2656        // We want to delay one round because none of the transactions in the previous round could
2657        // have been authenticated with the jwks that became active in that round.
2658        //
2659        // Because of this delay, jwks that become active in the last round of the epoch will
2660        // never be committed. That is ok, because in the new epoch, the validators should
2661        // immediately re-submit these jwks, and they can become active then.
2662        let new_jwks = self
2663            .epoch_store
2664            .get_new_jwks(last_committed_round)
2665            .expect("Unrecoverable error in consensus handler");
2666
2667        if !new_jwks.is_empty() {
2668            let authenticator_state_update_transaction = authenticator_state_update_transaction(
2669                &self.epoch_store,
2670                commit_info.round,
2671                new_jwks,
2672            );
2673            debug!(
2674                "adding AuthenticatorStateUpdate({:?}) tx: {:?}",
2675                authenticator_state_update_transaction.digest(),
2676                authenticator_state_update_transaction,
2677            );
2678
2679            Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2680                authenticator_state_update_transaction,
2681            ))
2682        } else {
2683            None
2684        }
2685    }
2686
2687    // Filters out rejected or deprecated transactions.
2688    // Returns FilteredConsensusOutput containing transactions and owned_object_locks.
2689    #[instrument(level = "trace", skip_all)]
2690    fn filter_consensus_txns(
2691        &mut self,
2692        initial_reconfig_state: ReconfigState,
2693        commit_info: &ConsensusCommitInfo,
2694        consensus_commit: &impl ConsensusCommitAPI,
2695    ) -> FilteredConsensusOutput {
2696        let mut transactions = Vec::new();
2697        let mut owned_object_locks = HashMap::new();
2698        let epoch = self.epoch_store.epoch();
2699        let mut num_finalized_user_transactions = vec![0; self.committee.size()];
2700        let mut num_rejected_user_transactions = vec![0; self.committee.size()];
2701        for (block, parsed_transactions) in consensus_commit.transactions() {
2702            let author = block.author.value();
2703            // TODO: consider only messages within 1~3 rounds of the leader?
2704            self.last_consensus_stats.stats.inc_num_messages(author);
2705
2706            // Set the "ping" transaction status for this block. This is necessary as there might be some ping requests waiting for the ping transaction to be certified.
2707            self.epoch_store.set_consensus_tx_status(
2708                ConsensusPosition::ping(epoch, block),
2709                ConsensusTxStatus::Finalized,
2710            );
2711
2712            for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() {
2713                let position = ConsensusPosition {
2714                    epoch,
2715                    block,
2716                    index: tx_index as TransactionIndex,
2717                };
2718
2719                // Transaction has appeared in consensus output, we can increment the submission count
2720                // for this tx for DoS protection.
2721                if let Some(tx) = parsed.transaction.kind.as_user_transaction() {
2722                    let digest = tx.digest();
2723                    if let Some((spam_weight, submitter_client_addrs)) = self
2724                        .epoch_store
2725                        .submitted_transaction_cache
2726                        .increment_submission_count(digest)
2727                    {
2728                        if let Some(ref traffic_controller) = self.traffic_controller {
2729                            debug!(
2730                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses",
2731                                submitter_client_addrs.len()
2732                            );
2733
2734                            // Apply spam weight to all client addresses that submitted this transaction
2735                            for addr in submitter_client_addrs {
2736                                traffic_controller.tally(TrafficTally::new(
2737                                    Some(addr),
2738                                    None,
2739                                    None,
2740                                    spam_weight.clone(),
2741                                ));
2742                            }
2743                        } else {
2744                            warn!(
2745                                "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)",
2746                                submitter_client_addrs.len()
2747                            );
2748                        }
2749                    }
2750                }
2751
2752                if parsed.rejected {
2753                    // TODO(fastpath): Add metrics for rejected transactions.
2754                    if parsed.transaction.is_user_transaction() {
2755                        self.epoch_store
2756                            .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
2757                        num_rejected_user_transactions[author] += 1;
2758                    }
2759                    // Skip processing rejected transactions.
2760                    continue;
2761                }
2762
2763                let kind = classify(&parsed.transaction);
2764                self.metrics
2765                    .consensus_handler_processed
2766                    .with_label_values(&[kind])
2767                    .inc();
2768                self.metrics
2769                    .consensus_handler_transaction_sizes
2770                    .with_label_values(&[kind])
2771                    .observe(parsed.serialized_len as f64);
2772                if parsed.transaction.is_user_transaction() {
2773                    self.last_consensus_stats
2774                        .stats
2775                        .inc_num_user_transactions(author);
2776                }
2777
2778                if !initial_reconfig_state.should_accept_consensus_certs() {
2779                    // (Note: we no longer need to worry about the previously deferred condition, since we are only
2780                    // processing newly-received transactions at this time).
2781                    match &parsed.transaction.kind {
2782                        ConsensusTransactionKind::UserTransactionV2(_)
2783                        // deprecated and ignore later, but added for exhaustive match
2784                        | ConsensusTransactionKind::UserTransaction(_)
2785                        | ConsensusTransactionKind::CertifiedTransaction(_)
2786                        | ConsensusTransactionKind::CapabilityNotification(_)
2787                        | ConsensusTransactionKind::CapabilityNotificationV2(_)
2788                        | ConsensusTransactionKind::EndOfPublish(_)
2789                        // Note: we no longer have to check protocol_config.ignore_execution_time_observations_after_certs_closed()
2790                        | ConsensusTransactionKind::ExecutionTimeObservation(_)
2791                        | ConsensusTransactionKind::NewJWKFetched(_, _, _) => {
2792                            debug!(
2793                                "Ignoring consensus transaction {:?} because of end of epoch",
2794                                parsed.transaction.key()
2795                            );
2796                            continue;
2797                        }
2798
2799                        // These are the message types that are still processed even if !should_accept_consensus_certs()
2800                        ConsensusTransactionKind::CheckpointSignature(_)
2801                        | ConsensusTransactionKind::CheckpointSignatureV2(_)
2802                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2803                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
2804                        | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => ()
2805                    }
2806                }
2807
2808                if !initial_reconfig_state.should_accept_tx() {
2809                    match &parsed.transaction.kind {
2810                        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
2811                        | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue,
2812                        _ => {}
2813                    }
2814                }
2815
2816                // Handle deprecated messages
2817                match &parsed.transaction.kind {
2818                    ConsensusTransactionKind::CapabilityNotification(_)
2819                    | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2820                    | ConsensusTransactionKind::CheckpointSignature(_) => {
2821                        debug_fatal!(
2822                            "BUG: saw deprecated tx {:?}for commit round {}",
2823                            parsed.transaction.key(),
2824                            commit_info.round
2825                        );
2826                        continue;
2827                    }
2828                    _ => {}
2829                }
2830
2831                if parsed.transaction.is_user_transaction() {
2832                    let author_name = self
2833                        .epoch_store
2834                        .committee()
2835                        .authority_by_index(author as u32)
2836                        .unwrap();
2837                    if self
2838                        .epoch_store
2839                        .has_received_end_of_publish_from(author_name)
2840                    {
2841                        // In some edge cases, consensus might resend previously seen certificate after EndOfPublish
2842                        // An honest validator should not send a new transaction after EndOfPublish. Whether the
2843                        // transaction is duplicate or not, we filter it out here.
2844                        warn!(
2845                            "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus",
2846                            author_name.concise(),
2847                            parsed.transaction.key(),
2848                        );
2849                        continue;
2850                    }
2851                }
2852
2853                // Perform post-consensus owned object conflict detection. If lock acquisition
2854                // fails, the transaction has invalid/conflicting owned inputs and should be dropped.
2855                // This must happen AFTER all filtering checks above to avoid acquiring locks
2856                // for transactions that will be dropped (e.g., during epoch change).
2857                // Only applies to UserTransactionV2 - other transaction types don't need lock acquisition.
2858                if let ConsensusTransactionKind::UserTransactionV2(tx_with_claims) =
2859                    &parsed.transaction.kind
2860                {
2861                    let immutable_object_ids: HashSet<ObjectID> =
2862                        tx_with_claims.get_immutable_objects().into_iter().collect();
2863                    let tx = tx_with_claims.tx();
2864
2865                    let Ok(input_objects) = tx.transaction_data().input_objects() else {
2866                        debug_fatal!("Invalid input objects for transaction {}", tx.digest());
2867                        continue;
2868                    };
2869
2870                    // Filter ImmOrOwnedMoveObject inputs, excluding those claimed to be immutable.
2871                    // Immutable objects don't need lock acquisition as they can be used concurrently.
2872                    let owned_object_refs: Vec<_> = input_objects
2873                        .iter()
2874                        .filter_map(|obj| match obj {
2875                            InputObjectKind::ImmOrOwnedMoveObject(obj_ref)
2876                                if !immutable_object_ids.contains(&obj_ref.0) =>
2877                            {
2878                                Some(*obj_ref)
2879                            }
2880                            _ => None,
2881                        })
2882                        .collect();
2883
2884                    match self
2885                        .epoch_store
2886                        .try_acquire_owned_object_locks_post_consensus(
2887                            &owned_object_refs,
2888                            *tx.digest(),
2889                            &owned_object_locks,
2890                        ) {
2891                        Ok(new_locks) => {
2892                            owned_object_locks.extend(new_locks.into_iter());
2893                            // Lock acquisition succeeded - now set Finalized status
2894                            self.epoch_store
2895                                .set_consensus_tx_status(position, ConsensusTxStatus::Finalized);
2896                            num_finalized_user_transactions[author] += 1;
2897                        }
2898                        Err(e) => {
2899                            debug!("Dropping transaction {}: {}", tx.digest(), e);
2900                            self.epoch_store
2901                                .set_consensus_tx_status(position, ConsensusTxStatus::Dropped);
2902                            self.epoch_store.set_rejection_vote_reason(position, &e);
2903                            continue;
2904                        }
2905                    }
2906                }
2907
2908                let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);
2909                transactions.push((transaction, author as u32));
2910            }
2911        }
2912
2913        for (i, authority) in self.committee.authorities() {
2914            let hostname = &authority.hostname;
2915            self.metrics
2916                .consensus_committed_messages
2917                .with_label_values(&[hostname])
2918                .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
2919            self.metrics
2920                .consensus_committed_user_transactions
2921                .with_label_values(&[hostname])
2922                .set(
2923                    self.last_consensus_stats
2924                        .stats
2925                        .get_num_user_transactions(i.value()) as i64,
2926                );
2927            self.metrics
2928                .consensus_finalized_user_transactions
2929                .with_label_values(&[hostname])
2930                .add(num_finalized_user_transactions[i.value()] as i64);
2931            self.metrics
2932                .consensus_rejected_user_transactions
2933                .with_label_values(&[hostname])
2934                .add(num_rejected_user_transactions[i.value()] as i64);
2935        }
2936
2937        FilteredConsensusOutput {
2938            transactions,
2939            owned_object_locks,
2940        }
2941    }
2942
2943    fn deduplicate_consensus_txns(
2944        &mut self,
2945        state: &mut CommitHandlerState,
2946        commit_info: &ConsensusCommitInfo,
2947        transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
2948    ) -> Vec<VerifiedSequencedConsensusTransaction> {
2949        let mut all_transactions = Vec::new();
2950
2951        // Track occurrence counts for each transaction key within this commit.
2952        // Also serves as the deduplication set (count > 1 means duplicate within commit).
2953        let mut occurrence_counts: HashMap<SequencedConsensusTransactionKey, u32> = HashMap::new();
2954        // Keys being seen for the first time (not duplicates from previous commits).
2955        let mut first_commit_keys: HashSet<SequencedConsensusTransactionKey> = HashSet::new();
2956
2957        for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
2958            // In process_consensus_transactions_and_commit_boundary(), we will add a system consensus commit
2959            // prologue transaction, which will be the first transaction in this consensus commit batch.
2960            // Therefore, the transaction sequence number starts from 1 here.
2961            let current_tx_index = ExecutionIndices {
2962                last_committed_round: commit_info.round,
2963                sub_dag_index: commit_info.consensus_commit_ref.index.into(),
2964                transaction_index: (seq + 1) as u64,
2965            };
2966
2967            self.last_consensus_stats.index = current_tx_index;
2968
2969            let certificate_author = *self
2970                .epoch_store
2971                .committee()
2972                .authority_by_index(cert_origin)
2973                .unwrap();
2974
2975            let sequenced_transaction = SequencedConsensusTransaction {
2976                certificate_author_index: cert_origin,
2977                certificate_author,
2978                consensus_index: current_tx_index,
2979                transaction,
2980            };
2981
2982            let Some(verified_transaction) = self
2983                .epoch_store
2984                .verify_consensus_transaction(sequenced_transaction)
2985            else {
2986                continue;
2987            };
2988
2989            let key = verified_transaction.0.key();
2990
2991            if let Some(tx_digest) = key.user_transaction_digest() {
2992                self.epoch_store
2993                    .cache_recently_finalized_transaction(tx_digest);
2994            }
2995
2996            // Increment count and check if this is a duplicate within this commit.
2997            // This replaces the separate processed_set HashSet.
2998            let count = occurrence_counts.entry(key.clone()).or_insert(0);
2999            *count += 1;
3000            let in_commit = *count > 1;
3001
3002            let in_cache = self.processed_cache.put(key.clone(), ()).is_some();
3003            if in_commit || in_cache {
3004                self.metrics.skipped_consensus_txns_cache_hit.inc();
3005                continue;
3006            }
3007            if self
3008                .epoch_store
3009                .is_consensus_message_processed(&key)
3010                .expect("db error")
3011            {
3012                self.metrics.skipped_consensus_txns.inc();
3013                continue;
3014            }
3015
3016            first_commit_keys.insert(key.clone());
3017
3018            state.output.record_consensus_message_processed(key);
3019
3020            all_transactions.push(verified_transaction);
3021        }
3022
3023        for key in first_commit_keys {
3024            if let Some(&count) = occurrence_counts.get(&key)
3025                && count > 1
3026            {
3027                self.metrics
3028                    .consensus_handler_duplicate_tx_count
3029                    .observe(count as f64);
3030            }
3031        }
3032
3033        // Copy user transaction occurrence counts to state for unpaid amplification detection.
3034        assert!(
3035            state.occurrence_counts.is_empty(),
3036            "occurrence_counts should be empty before populating"
3037        );
3038        state.occurrence_counts.reserve(occurrence_counts.len());
3039        state.occurrence_counts.extend(
3040            occurrence_counts
3041                .into_iter()
3042                .filter_map(|(key, count)| key.user_transaction_digest().map(|d| (d, count))),
3043        );
3044
3045        all_transactions
3046    }
3047
3048    fn build_commit_handler_input(
3049        &self,
3050        transactions: Vec<VerifiedSequencedConsensusTransaction>,
3051    ) -> CommitHandlerInput {
3052        let epoch = self.epoch_store.epoch();
3053        let mut commit_handler_input = CommitHandlerInput::default();
3054
3055        for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() {
3056            match transaction.transaction {
3057                SequencedConsensusTransactionKind::External(consensus_transaction) => {
3058                    match consensus_transaction.kind {
3059                        // === User transactions ===
3060                        ConsensusTransactionKind::UserTransactionV2(tx) => {
3061                            // Extract the aliases claim (required) from the claims
3062                            let used_alias_versions = if self
3063                                .epoch_store
3064                                .protocol_config()
3065                                .fix_checkpoint_signature_mapping()
3066                            {
3067                                tx.aliases()
3068                            } else {
3069                                // Convert V1 to V2 format using dummy signature indices
3070                                // which will be ignored with `fix_checkpoint_signature_mapping`
3071                                // disabled.
3072                                tx.aliases_v1().map(|a| {
3073                                    NonEmpty::from_vec(
3074                                        a.into_iter()
3075                                            .enumerate()
3076                                            .map(|(idx, (_, seq))| (idx as u8, seq))
3077                                            .collect(),
3078                                    )
3079                                    .unwrap()
3080                                })
3081                            };
3082                            let inner_tx = tx.into_tx();
3083                            // Safe because transactions are certified by consensus.
3084                            let tx = VerifiedTransaction::new_unchecked(inner_tx);
3085                            // TODO(fastpath): accept position in consensus, after plumbing consensus round, authority index, and transaction index here.
3086                            let transaction =
3087                                VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
3088                            if let Some(used_alias_versions) = used_alias_versions {
3089                                commit_handler_input
3090                                    .user_transactions
3091                                    .push(WithAliases::new(transaction, used_alias_versions));
3092                            } else {
3093                                commit_handler_input.user_transactions.push(
3094                                    VerifiedExecutableTransactionWithAliases::no_aliases(
3095                                        transaction,
3096                                    ),
3097                                );
3098                            }
3099                        }
3100
3101                        // === State machines ===
3102                        ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => {
3103                            commit_handler_input
3104                                .end_of_publish_transactions
3105                                .push(authority_public_key_bytes);
3106                        }
3107                        ConsensusTransactionKind::NewJWKFetched(
3108                            authority_public_key_bytes,
3109                            jwk_id,
3110                            jwk,
3111                        ) => {
3112                            commit_handler_input.new_jwks.push((
3113                                authority_public_key_bytes,
3114                                jwk_id,
3115                                jwk,
3116                            ));
3117                        }
3118                        ConsensusTransactionKind::RandomnessDkgMessage(
3119                            authority_public_key_bytes,
3120                            items,
3121                        ) => {
3122                            commit_handler_input
3123                                .randomness_dkg_messages
3124                                .push((authority_public_key_bytes, items));
3125                        }
3126                        ConsensusTransactionKind::RandomnessDkgConfirmation(
3127                            authority_public_key_bytes,
3128                            items,
3129                        ) => {
3130                            commit_handler_input
3131                                .randomness_dkg_confirmations
3132                                .push((authority_public_key_bytes, items));
3133                        }
3134                        ConsensusTransactionKind::CapabilityNotificationV2(
3135                            authority_capabilities_v2,
3136                        ) => {
3137                            commit_handler_input
3138                                .capability_notifications
3139                                .push(authority_capabilities_v2);
3140                        }
3141                        ConsensusTransactionKind::ExecutionTimeObservation(
3142                            execution_time_observation,
3143                        ) => {
3144                            commit_handler_input
3145                                .execution_time_observations
3146                                .push(execution_time_observation);
3147                        }
3148                        ConsensusTransactionKind::CheckpointSignatureV2(
3149                            checkpoint_signature_message,
3150                        ) => {
3151                            commit_handler_input
3152                                .checkpoint_signature_messages
3153                                .push(*checkpoint_signature_message);
3154                        }
3155
3156                        // Deprecated messages, filtered earlier by filter_consensus_txns()
3157                        // or rejected by SuiTxValidator. Kept for exhaustiveness.
3158                        ConsensusTransactionKind::CheckpointSignature(_)
3159                        | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
3160                        | ConsensusTransactionKind::CapabilityNotification(_)
3161                        | ConsensusTransactionKind::CertifiedTransaction(_)
3162                        | ConsensusTransactionKind::UserTransaction(_) => {
3163                            unreachable!("filtered earlier")
3164                        }
3165                    }
3166                }
3167                // TODO: I think we can delete this, it was only used to inject randomness state update into the tx stream.
3168                SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(),
3169            }
3170        }
3171
3172        commit_handler_input
3173    }
3174
3175    async fn send_end_of_publish_if_needed(&self) {
3176        if self
3177            .epoch_store
3178            .protocol_config()
3179            .timestamp_based_epoch_close()
3180        {
3181            return;
3182        }
3183        if !self.epoch_store.should_send_end_of_publish() {
3184            return;
3185        }
3186
3187        let end_of_publish = ConsensusTransaction::new_end_of_publish(self.epoch_store.name);
3188        if let Err(err) =
3189            self.consensus_adapter
3190                .submit(end_of_publish, None, &self.epoch_store, None, None)
3191        {
3192            warn!(
3193                "Error when sending EndOfPublish message from ConsensusHandler: {:?}",
3194                err
3195            );
3196        } else {
3197            info!(epoch=?self.epoch_store.epoch(), "Sending EndOfPublish message to consensus");
3198        }
3199    }
3200}
3201
3202/// Sends transactions to the execution scheduler in a separate task,
3203/// to avoid blocking consensus handler.
3204pub(crate) type SchedulerMessage = (
3205    Vec<(Schedulable, AssignedVersions)>,
3206    Option<SettlementBatchInfo>,
3207);
3208
3209#[derive(Clone)]
3210pub(crate) struct ExecutionSchedulerSender {
3211    sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3212}
3213
3214impl ExecutionSchedulerSender {
3215    fn start(
3216        settlement_scheduler: SettlementScheduler,
3217        epoch_store: Arc<AuthorityPerEpochStore>,
3218    ) -> Self {
3219        let (sender, recv) = monitored_mpsc::unbounded_channel("execution_scheduler_sender");
3220        spawn_monitored_task!(Self::run(recv, settlement_scheduler, epoch_store));
3221        Self { sender }
3222    }
3223
3224    pub(crate) fn new_for_testing(
3225        sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3226    ) -> Self {
3227        Self { sender }
3228    }
3229
3230    fn send(
3231        &self,
3232        transactions: Vec<(Schedulable, AssignedVersions)>,
3233        settlement: Option<SettlementBatchInfo>,
3234    ) {
3235        let _ = self.sender.send((transactions, settlement));
3236    }
3237
3238    async fn run(
3239        mut recv: monitored_mpsc::UnboundedReceiver<SchedulerMessage>,
3240        settlement_scheduler: SettlementScheduler,
3241        epoch_store: Arc<AuthorityPerEpochStore>,
3242    ) {
3243        while let Some((transactions, settlement)) = recv.recv().await {
3244            let _guard = monitored_scope("ConsensusHandler::enqueue");
3245            let txns = transactions
3246                .into_iter()
3247                .map(|(txn, versions)| (txn, ExecutionEnv::new().with_assigned_versions(versions)))
3248                .collect();
3249            if let Some(settlement) = settlement {
3250                settlement_scheduler.enqueue_v2(txns, settlement, &epoch_store);
3251            } else {
3252                settlement_scheduler.enqueue(txns, &epoch_store);
3253            }
3254        }
3255    }
3256}
3257
3258/// Manages the lifetime of tasks handling the commits and transactions output by consensus.
3259pub(crate) struct MysticetiConsensusHandler {
3260    tasks: JoinSet<()>,
3261}
3262
3263impl MysticetiConsensusHandler {
3264    pub(crate) fn new(
3265        last_processed_commit_at_startup: CommitIndex,
3266        mut consensus_handler: ConsensusHandler<CheckpointService>,
3267        mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
3268        commit_consumer_monitor: Arc<CommitConsumerMonitor>,
3269        node_role: NodeRole,
3270    ) -> Self {
3271        debug!(
3272            last_processed_commit_at_startup,
3273            "Starting consensus replay"
3274        );
3275        let mut tasks = JoinSet::new();
3276        tasks.spawn(monitored_future!(async move {
3277            // TODO: pause when execution is overloaded, so consensus can detect the backpressure.
3278            while let Some(consensus_commit) = commit_receiver.recv().await {
3279                let commit_index = consensus_commit.commit_ref.index;
3280                if !node_role.process_consensus_commits() {
3281                    debug!(
3282                        commit_index,
3283                        "Observer skipping consensus commit processing"
3284                    );
3285                } else if commit_index <= last_processed_commit_at_startup {
3286                    consensus_handler.handle_prior_consensus_commit(consensus_commit);
3287                } else {
3288                    consensus_handler
3289                        .handle_consensus_commit(consensus_commit)
3290                        .await;
3291                }
3292                commit_consumer_monitor.set_highest_handled_commit(commit_index);
3293            }
3294        }));
3295        Self { tasks }
3296    }
3297
3298    pub(crate) async fn abort(&mut self) {
3299        self.tasks.shutdown().await;
3300    }
3301}
3302
3303fn authenticator_state_update_transaction(
3304    epoch_store: &AuthorityPerEpochStore,
3305    round: u64,
3306    mut new_active_jwks: Vec<ActiveJwk>,
3307) -> VerifiedExecutableTransaction {
3308    let epoch = epoch_store.epoch();
3309    new_active_jwks.sort();
3310
3311    info!("creating authenticator state update transaction");
3312    assert!(epoch_store.authenticator_state_enabled());
3313    let transaction = VerifiedTransaction::new_authenticator_state_update(
3314        epoch,
3315        round,
3316        new_active_jwks,
3317        epoch_store
3318            .epoch_start_config()
3319            .authenticator_obj_initial_shared_version()
3320            .expect("authenticator state obj must exist"),
3321    );
3322    VerifiedExecutableTransaction::new_system(transaction, epoch)
3323}
3324
3325pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
3326    match &transaction.kind {
3327        // Deprecated and rejected by SuiTxValidator; never classified in practice.
3328        ConsensusTransactionKind::CertifiedTransaction(_) => "_deprecated_certificate",
3329        ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
3330        ConsensusTransactionKind::CheckpointSignatureV2(_) => "checkpoint_signature",
3331        ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
3332        ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
3333        ConsensusTransactionKind::CapabilityNotificationV2(_) => "capability_notification_v2",
3334        ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
3335        ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
3336        ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
3337        ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
3338        ConsensusTransactionKind::UserTransaction(_) => "_deprecated_user_transaction",
3339        ConsensusTransactionKind::UserTransactionV2(tx) => {
3340            if tx.tx().is_consensus_tx() {
3341                "shared_user_transaction_v2"
3342            } else {
3343                "owned_user_transaction_v2"
3344            }
3345        }
3346        ConsensusTransactionKind::ExecutionTimeObservation(_) => "execution_time_observation",
3347    }
3348}
3349
3350#[derive(Debug, Clone, Serialize, Deserialize)]
3351pub struct SequencedConsensusTransaction {
3352    pub certificate_author_index: AuthorityIndex,
3353    pub certificate_author: AuthorityName,
3354    pub consensus_index: ExecutionIndices,
3355    pub transaction: SequencedConsensusTransactionKind,
3356}
3357
3358#[derive(Debug, Clone)]
3359#[allow(clippy::large_enum_variant)]
3360pub enum SequencedConsensusTransactionKind {
3361    External(ConsensusTransaction),
3362    System(VerifiedExecutableTransaction),
3363}
3364
3365impl Serialize for SequencedConsensusTransactionKind {
3366    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
3367        let serializable = SerializableSequencedConsensusTransactionKind::from(self);
3368        serializable.serialize(serializer)
3369    }
3370}
3371
3372impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
3373    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
3374        let serializable =
3375            SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
3376        Ok(serializable.into())
3377    }
3378}
3379
3380// We can't serialize SequencedConsensusTransactionKind directly because it contains a
3381// VerifiedExecutableTransaction, which is not serializable (by design). This wrapper allows us to
3382// convert to a serializable format easily.
3383#[derive(Debug, Clone, Serialize, Deserialize)]
3384#[allow(clippy::large_enum_variant)]
3385enum SerializableSequencedConsensusTransactionKind {
3386    External(ConsensusTransaction),
3387    System(TrustedExecutableTransaction),
3388}
3389
3390impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
3391    fn from(kind: &SequencedConsensusTransactionKind) -> Self {
3392        match kind {
3393            SequencedConsensusTransactionKind::External(ext) => {
3394                SerializableSequencedConsensusTransactionKind::External(ext.clone())
3395            }
3396            SequencedConsensusTransactionKind::System(txn) => {
3397                SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
3398            }
3399        }
3400    }
3401}
3402
3403impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
3404    fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
3405        match kind {
3406            SerializableSequencedConsensusTransactionKind::External(ext) => {
3407                SequencedConsensusTransactionKind::External(ext)
3408            }
3409            SerializableSequencedConsensusTransactionKind::System(txn) => {
3410                SequencedConsensusTransactionKind::System(txn.into())
3411            }
3412        }
3413    }
3414}
3415
3416#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
3417pub enum SequencedConsensusTransactionKey {
3418    External(ConsensusTransactionKey),
3419    System(TransactionDigest),
3420}
3421
3422impl SequencedConsensusTransactionKey {
3423    pub fn user_transaction_digest(&self) -> Option<TransactionDigest> {
3424        match self {
3425            SequencedConsensusTransactionKey::External(key) => match key {
3426                ConsensusTransactionKey::Certificate(digest) => Some(*digest),
3427                _ => None,
3428            },
3429            SequencedConsensusTransactionKey::System(_) => None,
3430        }
3431    }
3432}
3433
3434impl SequencedConsensusTransactionKind {
3435    pub fn key(&self) -> SequencedConsensusTransactionKey {
3436        match self {
3437            SequencedConsensusTransactionKind::External(ext) => {
3438                SequencedConsensusTransactionKey::External(ext.key())
3439            }
3440            SequencedConsensusTransactionKind::System(txn) => {
3441                SequencedConsensusTransactionKey::System(*txn.digest())
3442            }
3443        }
3444    }
3445
3446    pub fn get_tracking_id(&self) -> u64 {
3447        match self {
3448            SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
3449            SequencedConsensusTransactionKind::System(_txn) => 0,
3450        }
3451    }
3452
3453    pub fn is_executable_transaction(&self) -> bool {
3454        match self {
3455            SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(),
3456            SequencedConsensusTransactionKind::System(_) => true,
3457        }
3458    }
3459
3460    pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
3461        match self {
3462            SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
3463                ConsensusTransactionKind::UserTransactionV2(txn) => Some(*txn.tx().digest()),
3464                _ => None,
3465            },
3466            SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
3467        }
3468    }
3469
3470    pub fn is_end_of_publish(&self) -> bool {
3471        match self {
3472            SequencedConsensusTransactionKind::External(ext) => {
3473                matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
3474            }
3475            SequencedConsensusTransactionKind::System(_) => false,
3476        }
3477    }
3478}
3479
3480impl SequencedConsensusTransaction {
3481    pub fn sender_authority(&self) -> AuthorityName {
3482        self.certificate_author
3483    }
3484
3485    pub fn key(&self) -> SequencedConsensusTransactionKey {
3486        self.transaction.key()
3487    }
3488
3489    pub fn is_end_of_publish(&self) -> bool {
3490        if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
3491            matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
3492        } else {
3493            false
3494        }
3495    }
3496
3497    pub fn try_take_execution_time_observation(&mut self) -> Option<ExecutionTimeObservation> {
3498        if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
3499            kind: ConsensusTransactionKind::ExecutionTimeObservation(observation),
3500            ..
3501        }) = &mut self.transaction
3502        {
3503            Some(std::mem::take(observation))
3504        } else {
3505            None
3506        }
3507    }
3508
3509    pub fn is_system(&self) -> bool {
3510        matches!(
3511            self.transaction,
3512            SequencedConsensusTransactionKind::System(_)
3513        )
3514    }
3515
3516    pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
3517        if !randomness_state_enabled {
3518            // If randomness is disabled, these should be processed same as a tx without randomness,
3519            // which will eventually fail when the randomness state object is not found.
3520            return false;
3521        }
3522        match &self.transaction {
3523            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3524                kind: ConsensusTransactionKind::UserTransactionV2(txn),
3525                ..
3526            }) => txn.tx().transaction_data().uses_randomness(),
3527            _ => false,
3528        }
3529    }
3530
3531    pub fn as_consensus_txn(&self) -> Option<&SenderSignedData> {
3532        match &self.transaction {
3533            SequencedConsensusTransactionKind::External(ConsensusTransaction {
3534                kind: ConsensusTransactionKind::UserTransactionV2(txn),
3535                ..
3536            }) if txn.tx().is_consensus_tx() => Some(txn.tx().data()),
3537            SequencedConsensusTransactionKind::System(txn) if txn.is_consensus_tx() => {
3538                Some(txn.data())
3539            }
3540            _ => None,
3541        }
3542    }
3543}
3544
3545#[derive(Debug, Clone, Serialize, Deserialize)]
3546pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
3547
3548#[cfg(test)]
3549impl VerifiedSequencedConsensusTransaction {
3550    pub fn new_test(transaction: ConsensusTransaction) -> Self {
3551        Self(SequencedConsensusTransaction::new_test(transaction))
3552    }
3553}
3554
3555impl SequencedConsensusTransaction {
3556    pub fn new_test(transaction: ConsensusTransaction) -> Self {
3557        Self {
3558            certificate_author_index: 0,
3559            certificate_author: AuthorityName::ZERO,
3560            consensus_index: Default::default(),
3561            transaction: SequencedConsensusTransactionKind::External(transaction),
3562        }
3563    }
3564}
3565
3566#[derive(Serialize, Deserialize)]
3567pub(crate) struct CommitIntervalObserver {
3568    ring_buffer: VecDeque<u64>,
3569}
3570
3571impl CommitIntervalObserver {
3572    pub fn new(window_size: u32) -> Self {
3573        Self {
3574            ring_buffer: VecDeque::with_capacity(window_size as usize),
3575        }
3576    }
3577
3578    pub fn observe_commit_time(&mut self, consensus_commit: &impl ConsensusCommitAPI) {
3579        let commit_time = consensus_commit.commit_timestamp_ms();
3580        if self.ring_buffer.len() == self.ring_buffer.capacity() {
3581            self.ring_buffer.pop_front();
3582        }
3583        self.ring_buffer.push_back(commit_time);
3584    }
3585
3586    pub fn commit_interval_estimate(&self) -> Option<Duration> {
3587        if self.ring_buffer.len() <= 1 {
3588            None
3589        } else {
3590            let first = self.ring_buffer.front().unwrap();
3591            let last = self.ring_buffer.back().unwrap();
3592            let duration = last.saturating_sub(*first);
3593            let num_commits = self.ring_buffer.len() as u64;
3594            Some(Duration::from_millis(duration.div_ceil(num_commits)))
3595        }
3596    }
3597}
3598
3599#[cfg(test)]
3600mod tests {
3601    use std::collections::HashSet;
3602
3603    use consensus_core::{
3604        BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock,
3605    };
3606    use futures::pin_mut;
3607    use prometheus::Registry;
3608    use sui_protocol_config::{ConsensusTransactionOrdering, ProtocolConfig};
3609    use sui_types::{
3610        base_types::ExecutionDigests,
3611        base_types::{AuthorityName, FullObjectRef, ObjectID, SuiAddress, random_object_ref},
3612        committee::Committee,
3613        crypto::deterministic_random_account_key,
3614        gas::GasCostSummary,
3615        message_envelope::Message,
3616        messages_checkpoint::{
3617            CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
3618            SignedCheckpointSummary,
3619        },
3620        messages_consensus::ConsensusTransaction,
3621        object::Object,
3622        transaction::{
3623            CertifiedTransaction, TransactionData, TransactionDataAPI, VerifiedCertificate,
3624        },
3625    };
3626
3627    use super::*;
3628    use crate::{
3629        authority::{
3630            authority_per_epoch_store::ConsensusStatsAPI,
3631            test_authority_builder::TestAuthorityBuilder,
3632        },
3633        checkpoints::CheckpointServiceNoop,
3634        consensus_adapter::consensus_tests::test_user_transaction,
3635        consensus_test_utils::make_consensus_adapter_for_test,
3636        post_consensus_tx_reorder::PostConsensusTxReorder,
3637    };
3638
3639    #[tokio::test(flavor = "current_thread", start_paused = true)]
3640    async fn test_consensus_commit_handler() {
3641        telemetry_subscribers::init_for_testing();
3642
3643        // GIVEN
3644        // 1 account keypair
3645        let (sender, keypair) = deterministic_random_account_key();
3646        // 12 gas objects.
3647        let gas_objects: Vec<Object> = (0..12)
3648            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3649            .collect();
3650        // 4 owned objects.
3651        let owned_objects: Vec<Object> = (0..4)
3652            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3653            .collect();
3654        // 6 shared objects.
3655        let shared_objects: Vec<Object> = (0..6)
3656            .map(|_| Object::shared_for_testing())
3657            .collect::<Vec<_>>();
3658        let mut all_objects = gas_objects.clone();
3659        all_objects.extend(owned_objects.clone());
3660        all_objects.extend(shared_objects.clone());
3661
3662        let network_config =
3663            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3664                .with_objects(all_objects.clone())
3665                .build();
3666
3667        let state = TestAuthorityBuilder::new()
3668            .with_network_config(&network_config, 0)
3669            .build()
3670            .await;
3671
3672        let epoch_store = state.epoch_store_for_testing().clone();
3673        let new_epoch_start_state = epoch_store.epoch_start_state();
3674        let consensus_committee = new_epoch_start_state.get_consensus_committee();
3675
3676        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3677
3678        let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
3679
3680        let backpressure_manager = BackpressureManager::new_for_tests();
3681        let consensus_adapter =
3682            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3683        let settlement_scheduler = SettlementScheduler::new(
3684            state.execution_scheduler().as_ref().clone(),
3685            state.get_transaction_cache_reader().clone(),
3686            state.metrics.clone(),
3687        );
3688        let mut consensus_handler = ConsensusHandler::new(
3689            epoch_store,
3690            Arc::new(CheckpointServiceNoop {}),
3691            settlement_scheduler,
3692            consensus_adapter,
3693            state.get_object_cache_reader().clone(),
3694            consensus_committee.clone(),
3695            metrics,
3696            Arc::new(throughput_calculator),
3697            backpressure_manager.subscribe(),
3698            state.traffic_controller.clone(),
3699            None,
3700            state.consensus_gasless_counter.clone(),
3701        );
3702
3703        // AND create test user transactions alternating between owned and shared input.
3704        let mut user_transactions = vec![];
3705        for (i, gas_object) in gas_objects[0..8].iter().enumerate() {
3706            let input_object = if i % 2 == 0 {
3707                owned_objects.get(i / 2).unwrap().clone()
3708            } else {
3709                shared_objects.get(i / 2).unwrap().clone()
3710            };
3711            let transaction = test_user_transaction(
3712                &state,
3713                sender,
3714                &keypair,
3715                gas_object.clone(),
3716                vec![input_object],
3717            )
3718            .await;
3719            user_transactions.push(transaction);
3720        }
3721
3722        // AND create 4 more user transactions with remaining gas objects and 2 shared objects.
3723        // Having more txns on the same shared object may get deferred.
3724        for (i, gas_object) in gas_objects[8..12].iter().enumerate() {
3725            let shared_object = if i < 2 {
3726                shared_objects[4].clone()
3727            } else {
3728                shared_objects[5].clone()
3729            };
3730            let transaction = test_user_transaction(
3731                &state,
3732                sender,
3733                &keypair,
3734                gas_object.clone(),
3735                vec![shared_object],
3736            )
3737            .await;
3738            user_transactions.push(transaction);
3739        }
3740
3741        // AND create block for each user transaction
3742        let mut blocks = Vec::new();
3743        for (i, consensus_transaction) in user_transactions
3744            .iter()
3745            .cloned()
3746            .map(|t| ConsensusTransaction::new_user_transaction_v2_message(&state.name, t.into()))
3747            .enumerate()
3748        {
3749            let transaction_bytes = bcs::to_bytes(&consensus_transaction).unwrap();
3750            let block = VerifiedBlock::new_for_test(
3751                TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
3752                    .set_transactions(vec![Transaction::new(transaction_bytes)])
3753                    .build(),
3754            );
3755
3756            blocks.push(block);
3757        }
3758
3759        // AND create the consensus commit
3760        let leader_block = blocks[0].clone();
3761        let committed_sub_dag = CommittedSubDag::new(
3762            leader_block.reference(),
3763            blocks.clone(),
3764            leader_block.timestamp_ms(),
3765            CommitRef::new(10, CommitDigest::MIN),
3766        );
3767
3768        // Test that the consensus handler respects backpressure.
3769        backpressure_manager.set_backpressure(true);
3770        // Default watermarks are 0,0 which will suppress the backpressure.
3771        backpressure_manager.update_highest_certified_checkpoint(1);
3772
3773        // AND process the consensus commit once
3774        {
3775            let waiter = consensus_handler.handle_consensus_commit(committed_sub_dag.clone());
3776            pin_mut!(waiter);
3777
3778            // waiter should not complete within 5 seconds
3779            tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
3780                .await
3781                .unwrap_err();
3782
3783            // lift backpressure
3784            backpressure_manager.set_backpressure(false);
3785
3786            // waiter completes now.
3787            tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
3788                .await
3789                .unwrap();
3790        }
3791
3792        // THEN check the consensus stats
3793        let num_blocks = blocks.len();
3794        let num_transactions = user_transactions.len();
3795        let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
3796        assert_eq!(
3797            last_consensus_stats_1.index.transaction_index,
3798            num_transactions as u64
3799        );
3800        assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
3801        assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
3802        assert_eq!(
3803            last_consensus_stats_1.stats.get_num_messages(0),
3804            num_blocks as u64
3805        );
3806        assert_eq!(
3807            last_consensus_stats_1.stats.get_num_user_transactions(0),
3808            num_transactions as u64
3809        );
3810
3811        // THEN check for execution status of user transactions.
3812        for (i, t) in user_transactions.iter().enumerate() {
3813            let digest = t.tx().digest();
3814            if tokio::time::timeout(
3815                std::time::Duration::from_secs(10),
3816                state.notify_read_effects_for_testing("", *digest),
3817            )
3818            .await
3819            .is_ok()
3820            {
3821                // Effects exist as expected.
3822            } else {
3823                panic!("User transaction {} {} did not execute", i, digest);
3824            }
3825        }
3826
3827        // THEN check for no inflight or suspended transactions.
3828        state.execution_scheduler().check_empty_for_testing().await;
3829    }
3830
3831    fn to_short_strings(txs: Vec<VerifiedExecutableTransactionWithAliases>) -> Vec<String> {
3832        txs.into_iter()
3833            .map(|tx| format!("transaction({})", tx.tx().transaction_data().gas_price()))
3834            .collect()
3835    }
3836
3837    #[test]
3838    fn test_order_by_gas_price() {
3839        let mut v = vec![user_txn(42), user_txn(100)];
3840        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3841        assert_eq!(
3842            to_short_strings(v),
3843            vec![
3844                "transaction(100)".to_string(),
3845                "transaction(42)".to_string(),
3846            ]
3847        );
3848
3849        let mut v = vec![
3850            user_txn(1200),
3851            user_txn(12),
3852            user_txn(1000),
3853            user_txn(42),
3854            user_txn(100),
3855            user_txn(1000),
3856        ];
3857        PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3858        assert_eq!(
3859            to_short_strings(v),
3860            vec![
3861                "transaction(1200)".to_string(),
3862                "transaction(1000)".to_string(),
3863                "transaction(1000)".to_string(),
3864                "transaction(100)".to_string(),
3865                "transaction(42)".to_string(),
3866                "transaction(12)".to_string(),
3867            ]
3868        );
3869    }
3870
3871    #[tokio::test(flavor = "current_thread")]
3872    async fn test_checkpoint_signature_dedup() {
3873        telemetry_subscribers::init_for_testing();
3874
3875        let network_config =
3876            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3877        let state = TestAuthorityBuilder::new()
3878            .with_network_config(&network_config, 0)
3879            .build()
3880            .await;
3881
3882        let epoch_store = state.epoch_store_for_testing().clone();
3883        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3884
3885        let make_signed = || {
3886            let epoch = epoch_store.epoch();
3887            let contents =
3888                CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3889            let summary = CheckpointSummary::new(
3890                &ProtocolConfig::get_for_max_version_UNSAFE(),
3891                epoch,
3892                42, // sequence number
3893                10, // network_total_transactions
3894                &contents,
3895                None, // previous_digest
3896                GasCostSummary::default(),
3897                None,       // end_of_epoch_data
3898                0,          // timestamp
3899                Vec::new(), // randomness_rounds
3900                Vec::new(), // checkpoint_artifact_digests
3901            );
3902            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name)
3903        };
3904
3905        // Prepare V2 pair: same (authority, seq), different digests => different keys
3906        let v2_s1 = make_signed();
3907        let v2_s1_clone = v2_s1.clone();
3908        let v2_digest_a = v2_s1.data().digest();
3909        let v2_a =
3910            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3911                summary: v2_s1,
3912            });
3913
3914        let v2_s2 = make_signed();
3915        let v2_digest_b = v2_s2.data().digest();
3916        let v2_b =
3917            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3918                summary: v2_s2,
3919            });
3920
3921        assert_ne!(v2_digest_a, v2_digest_b);
3922
3923        // Create an exact duplicate with same digest to exercise valid dedup
3924        assert_eq!(v2_s1_clone.data().digest(), v2_digest_a);
3925        let v2_dup =
3926            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3927                summary: v2_s1_clone,
3928            });
3929
3930        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3931        let block = VerifiedBlock::new_for_test(
3932            TestBlock::new(100, 0)
3933                .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)])
3934                .build(),
3935        );
3936        let commit = CommittedSubDag::new(
3937            block.reference(),
3938            vec![block.clone()],
3939            block.timestamp_ms(),
3940            CommitRef::new(10, CommitDigest::MIN),
3941        );
3942
3943        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3944        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3945        let backpressure = BackpressureManager::new_for_tests();
3946        let consensus_adapter =
3947            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3948        let settlement_scheduler = SettlementScheduler::new(
3949            state.execution_scheduler().as_ref().clone(),
3950            state.get_transaction_cache_reader().clone(),
3951            state.metrics.clone(),
3952        );
3953        let mut handler = ConsensusHandler::new(
3954            epoch_store.clone(),
3955            Arc::new(CheckpointServiceNoop {}),
3956            settlement_scheduler,
3957            consensus_adapter,
3958            state.get_object_cache_reader().clone(),
3959            consensus_committee.clone(),
3960            metrics,
3961            Arc::new(throughput),
3962            backpressure.subscribe(),
3963            state.traffic_controller.clone(),
3964            None,
3965            state.consensus_gasless_counter.clone(),
3966        );
3967
3968        handler.handle_consensus_commit(commit).await;
3969
3970        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3971        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3972
3973        // V2 distinct digests: both must be processed. If these were collapsed to one CheckpointSeq num, only one would process.
3974        let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a));
3975        let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b));
3976        assert!(
3977            epoch_store
3978                .is_consensus_message_processed(&v2_key_a)
3979                .unwrap()
3980        );
3981        assert!(
3982            epoch_store
3983                .is_consensus_message_processed(&v2_key_b)
3984                .unwrap()
3985        );
3986    }
3987
3988    #[tokio::test(flavor = "current_thread")]
3989    async fn test_verify_consensus_transaction_filters_mismatched_authorities() {
3990        telemetry_subscribers::init_for_testing();
3991
3992        let network_config =
3993            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3994        let state = TestAuthorityBuilder::new()
3995            .with_network_config(&network_config, 0)
3996            .build()
3997            .await;
3998
3999        let epoch_store = state.epoch_store_for_testing().clone();
4000        let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
4001
4002        // Create a different authority than our test authority
4003        use fastcrypto::traits::KeyPair;
4004        let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair();
4005        let wrong_authority: AuthorityName = wrong_keypair.public().into();
4006
4007        // Create EndOfPublish transaction with mismatched authority
4008        let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority);
4009
4010        // Create valid EndOfPublish transaction with correct authority
4011        let valid_eop = ConsensusTransaction::new_end_of_publish(state.name);
4012
4013        // Create CheckpointSignature with mismatched authority
4014        let epoch = epoch_store.epoch();
4015        let contents =
4016            CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
4017        let summary = CheckpointSummary::new(
4018            &ProtocolConfig::get_for_max_version_UNSAFE(),
4019            epoch,
4020            42, // sequence number
4021            10, // network_total_transactions
4022            &contents,
4023            None, // previous_digest
4024            GasCostSummary::default(),
4025            None,       // end_of_epoch_data
4026            0,          // timestamp
4027            Vec::new(), // randomness_rounds
4028            Vec::new(), // checkpoint commitments
4029        );
4030
4031        // Create a signed checkpoint with the wrong authority
4032        let mismatched_checkpoint_signed =
4033            SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority);
4034        let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest();
4035        let mismatched_checkpoint =
4036            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4037                summary: mismatched_checkpoint_signed,
4038            });
4039
4040        // Create a valid checkpoint signature with correct authority
4041        let valid_checkpoint_signed =
4042            SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name);
4043        let valid_checkpoint_digest = valid_checkpoint_signed.data().digest();
4044        let valid_checkpoint =
4045            ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4046                summary: valid_checkpoint_signed,
4047            });
4048
4049        let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
4050
4051        // Create a block with both valid and invalid transactions
4052        let block = VerifiedBlock::new_for_test(
4053            TestBlock::new(100, 0)
4054                .set_transactions(vec![
4055                    to_tx(&mismatched_eop),
4056                    to_tx(&valid_eop),
4057                    to_tx(&mismatched_checkpoint),
4058                    to_tx(&valid_checkpoint),
4059                ])
4060                .build(),
4061        );
4062        let commit = CommittedSubDag::new(
4063            block.reference(),
4064            vec![block.clone()],
4065            block.timestamp_ms(),
4066            CommitRef::new(10, CommitDigest::MIN),
4067        );
4068
4069        let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
4070        let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
4071        let backpressure = BackpressureManager::new_for_tests();
4072        let consensus_adapter =
4073            make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
4074        let settlement_scheduler = SettlementScheduler::new(
4075            state.execution_scheduler().as_ref().clone(),
4076            state.get_transaction_cache_reader().clone(),
4077            state.metrics.clone(),
4078        );
4079        let mut handler = ConsensusHandler::new(
4080            epoch_store.clone(),
4081            Arc::new(CheckpointServiceNoop {}),
4082            settlement_scheduler,
4083            consensus_adapter,
4084            state.get_object_cache_reader().clone(),
4085            consensus_committee.clone(),
4086            metrics,
4087            Arc::new(throughput),
4088            backpressure.subscribe(),
4089            state.traffic_controller.clone(),
4090            None,
4091            state.consensus_gasless_counter.clone(),
4092        );
4093
4094        handler.handle_consensus_commit(commit).await;
4095
4096        use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
4097        use sui_types::messages_consensus::ConsensusTransactionKey as CK;
4098
4099        // Check that valid transactions were processed
4100        let valid_eop_key = SK::External(CK::EndOfPublish(state.name));
4101        assert!(
4102            epoch_store
4103                .is_consensus_message_processed(&valid_eop_key)
4104                .unwrap(),
4105            "Valid EndOfPublish should have been processed"
4106        );
4107
4108        let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4109            state.name,
4110            42,
4111            valid_checkpoint_digest,
4112        ));
4113        assert!(
4114            epoch_store
4115                .is_consensus_message_processed(&valid_checkpoint_key)
4116                .unwrap(),
4117            "Valid CheckpointSignature should have been processed"
4118        );
4119
4120        // Check that mismatched authority transactions were NOT processed (filtered out by verify_consensus_transaction)
4121        let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority));
4122        assert!(
4123            !epoch_store
4124                .is_consensus_message_processed(&mismatched_eop_key)
4125                .unwrap(),
4126            "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)"
4127        );
4128
4129        let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4130            wrong_authority,
4131            42,
4132            mismatched_checkpoint_digest,
4133        ));
4134        assert!(
4135            !epoch_store
4136                .is_consensus_message_processed(&mismatched_checkpoint_key)
4137                .unwrap(),
4138            "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)"
4139        );
4140    }
4141
4142    fn user_txn(gas_price: u64) -> VerifiedExecutableTransactionWithAliases {
4143        let (committee, keypairs) = Committee::new_simple_test_committee();
4144        let (sender, sender_keypair) = deterministic_random_account_key();
4145        let tx = sui_types::transaction::Transaction::from_data_and_signer(
4146            TransactionData::new_transfer(
4147                SuiAddress::default(),
4148                FullObjectRef::from_fastpath_ref(random_object_ref()),
4149                sender,
4150                random_object_ref(),
4151                1000 * gas_price,
4152                gas_price,
4153            ),
4154            vec![&sender_keypair],
4155        );
4156        let tx = VerifiedExecutableTransaction::new_from_certificate(
4157            VerifiedCertificate::new_unchecked(
4158                CertifiedTransaction::new_from_keypairs_for_testing(
4159                    tx.into_data(),
4160                    &keypairs,
4161                    &committee,
4162                ),
4163            ),
4164        );
4165        VerifiedExecutableTransactionWithAliases::no_aliases(tx)
4166    }
4167
4168    mod checkpoint_queue_tests {
4169        use super::*;
4170        use consensus_core::CommitRef;
4171        use sui_types::digests::Digest;
4172
4173        fn make_chunk(tx_count: usize, height: u64) -> Chunk {
4174            Chunk {
4175                schedulables: (0..tx_count)
4176                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4177                    .collect(),
4178                settlement: None,
4179                height,
4180            }
4181        }
4182
4183        fn make_commit_ref(index: u32) -> CommitRef {
4184            CommitRef {
4185                index,
4186                digest: CommitDigest::MIN,
4187            }
4188        }
4189
4190        fn default_versions() -> HashMap<TransactionKey, AssignedVersions> {
4191            HashMap::new()
4192        }
4193
4194        #[test]
4195        fn test_flush_all_checkpoint_roots() {
4196            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4197            let versions = default_versions();
4198
4199            queue.push_chunk(
4200                make_chunk(5, 1),
4201                &versions,
4202                1000,
4203                make_commit_ref(1),
4204                Digest::default(),
4205            );
4206            queue.push_chunk(
4207                make_chunk(3, 2),
4208                &versions,
4209                1000,
4210                make_commit_ref(1),
4211                Digest::default(),
4212            );
4213
4214            let pending = queue.flush(1000, true);
4215
4216            assert!(pending.is_some());
4217            assert!(queue.pending_roots.is_empty());
4218        }
4219
4220        #[test]
4221        fn test_flush_respects_min_checkpoint_interval() {
4222            let min_interval = 200;
4223            let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, 1000, min_interval);
4224            let versions = default_versions();
4225
4226            queue.push_chunk(
4227                make_chunk(5, 1),
4228                &versions,
4229                1000,
4230                make_commit_ref(1),
4231                Digest::default(),
4232            );
4233
4234            let pending = queue.flush(1000 + min_interval - 1, false);
4235            assert!(pending.is_none());
4236            assert_eq!(queue.pending_roots.len(), 1);
4237
4238            let pending = queue.flush(1000 + min_interval, false);
4239            assert!(pending.is_some());
4240            assert!(queue.pending_roots.is_empty());
4241        }
4242
4243        #[test]
4244        fn test_push_chunk_flushes_when_exceeds_max() {
4245            let max_tx = 10;
4246            let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, max_tx, 0);
4247            let versions = default_versions();
4248
4249            queue.push_chunk(
4250                make_chunk(max_tx / 2 + 1, 1),
4251                &versions,
4252                1000,
4253                make_commit_ref(1),
4254                Digest::default(),
4255            );
4256
4257            let flushed = queue.push_chunk(
4258                make_chunk(max_tx / 2 + 1, 2),
4259                &versions,
4260                1000,
4261                make_commit_ref(2),
4262                Digest::default(),
4263            );
4264
4265            assert_eq!(flushed.len(), 1);
4266            assert_eq!(queue.pending_roots.len(), 1);
4267        }
4268
4269        #[test]
4270        fn test_multiple_chunks_merged_into_one_checkpoint() {
4271            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 200);
4272            let versions = default_versions();
4273
4274            queue.push_chunk(
4275                make_chunk(10, 1),
4276                &versions,
4277                1000,
4278                make_commit_ref(1),
4279                Digest::default(),
4280            );
4281            queue.push_chunk(
4282                make_chunk(10, 2),
4283                &versions,
4284                1000,
4285                make_commit_ref(2),
4286                Digest::default(),
4287            );
4288            queue.push_chunk(
4289                make_chunk(10, 3),
4290                &versions,
4291                1000,
4292                make_commit_ref(3),
4293                Digest::default(),
4294            );
4295
4296            let pending = queue.flush(1000, true).unwrap();
4297
4298            assert_eq!(pending.roots.len(), 3);
4299        }
4300
4301        #[test]
4302        fn test_push_chunk_handles_overflow() {
4303            let max_tx = 10;
4304            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4305            let versions = default_versions();
4306
4307            let flushed1 = queue.push_chunk(
4308                make_chunk(max_tx / 2, 1),
4309                &versions,
4310                1000,
4311                make_commit_ref(1),
4312                Digest::default(),
4313            );
4314            assert!(flushed1.is_empty());
4315
4316            let flushed2 = queue.push_chunk(
4317                make_chunk(max_tx / 2, 2),
4318                &versions,
4319                1000,
4320                make_commit_ref(2),
4321                Digest::default(),
4322            );
4323            assert!(flushed2.is_empty());
4324
4325            let flushed3 = queue.push_chunk(
4326                make_chunk(max_tx / 2, 3),
4327                &versions,
4328                1000,
4329                make_commit_ref(3),
4330                Digest::default(),
4331            );
4332            assert_eq!(flushed3.len(), 1);
4333
4334            let pending = queue.flush(1000, true);
4335
4336            for p in pending.iter().chain(flushed3.iter()) {
4337                let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4338                assert!(tx_count <= max_tx);
4339            }
4340        }
4341
4342        #[test]
4343        fn test_checkpoint_uses_last_chunk_height() {
4344            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4345            let versions = default_versions();
4346
4347            queue.push_chunk(
4348                make_chunk(10, 100),
4349                &versions,
4350                1000,
4351                make_commit_ref(1),
4352                Digest::default(),
4353            );
4354            queue.push_chunk(
4355                make_chunk(10, 200),
4356                &versions,
4357                1000,
4358                make_commit_ref(2),
4359                Digest::default(),
4360            );
4361
4362            let pending = queue.flush(1000, true).unwrap();
4363
4364            assert_eq!(pending.details.checkpoint_height, 200);
4365        }
4366
4367        #[test]
4368        fn test_last_built_timestamp_updated_on_flush() {
4369            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4370            let versions = default_versions();
4371
4372            queue.push_chunk(
4373                make_chunk(10, 1),
4374                &versions,
4375                5000,
4376                make_commit_ref(1),
4377                Digest::default(),
4378            );
4379
4380            assert_eq!(queue.last_built_timestamp, 0);
4381
4382            let _ = queue.flush(5000, true);
4383
4384            assert_eq!(queue.last_built_timestamp, 5000);
4385        }
4386
4387        #[test]
4388        fn test_settlement_info_sent_through_channel() {
4389            let mut queue = CheckpointQueue::new_for_testing(0, 0, 5, 1000, 0);
4390            let versions = default_versions();
4391
4392            let chunk1 = Chunk {
4393                schedulables: vec![
4394                    Schedulable::ConsensusCommitPrologue(0, 1, 0),
4395                    Schedulable::ConsensusCommitPrologue(0, 2, 0),
4396                    Schedulable::ConsensusCommitPrologue(0, 3, 0),
4397                ],
4398                settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4399                height: 1,
4400            };
4401
4402            let chunk2 = Chunk {
4403                schedulables: vec![
4404                    Schedulable::ConsensusCommitPrologue(0, 4, 0),
4405                    Schedulable::ConsensusCommitPrologue(0, 5, 0),
4406                ],
4407                settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4408                height: 2,
4409            };
4410
4411            queue.push_chunk(
4412                chunk1,
4413                &versions,
4414                1000,
4415                make_commit_ref(1),
4416                Digest::default(),
4417            );
4418            queue.push_chunk(
4419                chunk2,
4420                &versions,
4421                1000,
4422                make_commit_ref(1),
4423                Digest::default(),
4424            );
4425        }
4426
4427        #[test]
4428        fn test_settlement_checkpoint_seq_correct_after_flush() {
4429            let max_tx = 10;
4430            let initial_seq = 5;
4431            let (sender, mut receiver) = monitored_mpsc::unbounded_channel("test_settlement_seq");
4432            let mut queue =
4433                CheckpointQueue::new_for_testing_with_sender(0, 0, initial_seq, max_tx, 0, sender);
4434            let versions = default_versions();
4435
4436            // Push a chunk that partially fills the queue (no flush).
4437            let chunk1 = Chunk {
4438                schedulables: (0..max_tx / 2 + 1)
4439                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4440                    .collect(),
4441                settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4442                height: 1,
4443            };
4444            queue.push_chunk(
4445                chunk1,
4446                &versions,
4447                1000,
4448                make_commit_ref(1),
4449                Digest::default(),
4450            );
4451
4452            // Drain the first message from the channel.
4453            let msg1 = receiver.try_recv().unwrap();
4454            let settlement1 = msg1.1.unwrap();
4455            assert_eq!(settlement1.checkpoint_seq, initial_seq);
4456
4457            // Push a second chunk that triggers a flush of chunk1's roots.
4458            let chunk2 = Chunk {
4459                schedulables: (0..max_tx / 2 + 1)
4460                    .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4461                    .collect(),
4462                settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4463                height: 2,
4464            };
4465            let flushed = queue.push_chunk(
4466                chunk2,
4467                &versions,
4468                1000,
4469                make_commit_ref(2),
4470                Digest::default(),
4471            );
4472            assert_eq!(flushed.len(), 1);
4473            assert_eq!(flushed[0].details.checkpoint_seq, Some(initial_seq));
4474
4475            // The second settlement must have checkpoint_seq = initial_seq + 1,
4476            // because the flush incremented current_checkpoint_seq.
4477            let msg2 = receiver.try_recv().unwrap();
4478            let settlement2 = msg2.1.unwrap();
4479            assert_eq!(settlement2.checkpoint_seq, initial_seq + 1);
4480
4481            // Flush the remaining roots and verify the PendingCheckpointV2's seq
4482            // matches the settlement's seq.
4483            let pending = queue.flush_forced().unwrap();
4484            assert_eq!(
4485                pending.details.checkpoint_seq,
4486                Some(settlement2.checkpoint_seq)
4487            );
4488        }
4489
4490        #[test]
4491        fn test_checkpoint_seq_increments_on_flush() {
4492            let mut queue = CheckpointQueue::new_for_testing(0, 0, 10, 1000, 0);
4493            let versions = default_versions();
4494
4495            queue.push_chunk(
4496                make_chunk(5, 1),
4497                &versions,
4498                1000,
4499                make_commit_ref(1),
4500                Digest::default(),
4501            );
4502
4503            let pending = queue.flush(1000, true).unwrap();
4504
4505            assert_eq!(pending.details.checkpoint_seq, Some(10));
4506            assert_eq!(queue.current_checkpoint_seq, 11);
4507        }
4508
4509        #[test]
4510        fn test_multiple_chunks_with_overflow() {
4511            let max_tx = 10;
4512            let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4513            let versions = default_versions();
4514
4515            let flushed1 = queue.push_chunk(
4516                make_chunk(max_tx / 2 + 1, 1),
4517                &versions,
4518                1000,
4519                make_commit_ref(1),
4520                Digest::default(),
4521            );
4522            let flushed2 = queue.push_chunk(
4523                make_chunk(max_tx / 2 + 1, 2),
4524                &versions,
4525                1000,
4526                make_commit_ref(1),
4527                Digest::default(),
4528            );
4529            let flushed3 = queue.push_chunk(
4530                make_chunk(max_tx / 2 + 1, 3),
4531                &versions,
4532                1000,
4533                make_commit_ref(1),
4534                Digest::default(),
4535            );
4536
4537            let all_flushed: Vec<_> = flushed1
4538                .into_iter()
4539                .chain(flushed2)
4540                .chain(flushed3)
4541                .collect();
4542            assert_eq!(all_flushed.len(), 2);
4543            assert_eq!(queue.pending_roots.len(), 1);
4544
4545            for p in &all_flushed {
4546                let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4547                assert!(tx_count <= max_tx);
4548            }
4549        }
4550    }
4551}