1use std::{
5 collections::{BTreeMap, HashMap, HashSet, VecDeque},
6 hash::Hash,
7 num::NonZeroUsize,
8 sync::Arc,
9 time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use arc_swap::ArcSwap;
13use consensus_config::Committee as ConsensusCommittee;
14use consensus_core::{CertifiedBlocksOutput, CommitConsumerMonitor, CommitIndex};
15use consensus_types::block::TransactionIndex;
16use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
17use itertools::Itertools as _;
18use lru::LruCache;
19use mysten_common::{
20 assert_reachable, assert_sometimes, debug_fatal, random_util::randomize_cache_capacity_in_tests,
21};
22use mysten_metrics::{
23 monitored_future,
24 monitored_mpsc::{self, UnboundedReceiver},
25 monitored_scope, spawn_monitored_task,
26};
27use parking_lot::RwLockWriteGuard;
28use serde::{Deserialize, Serialize};
29use sui_macros::{fail_point, fail_point_arg, fail_point_if};
30use sui_protocol_config::ProtocolConfig;
31use sui_types::{
32 SUI_RANDOMNESS_STATE_OBJECT_ID,
33 authenticator_state::ActiveJwk,
34 base_types::{
35 AuthorityName, ConciseableName, ConsensusObjectSequenceKey, SequenceNumber,
36 TransactionDigest,
37 },
38 crypto::RandomnessRound,
39 digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest},
40 executable_transaction::{
41 TrustedExecutableTransaction, VerifiedExecutableTransaction,
42 VerifiedExecutableTransactionWithAliases,
43 },
44 messages_checkpoint::CheckpointSignatureMessage,
45 messages_consensus::{
46 AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments,
47 ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
48 ExecutionTimeObservation,
49 },
50 sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
51 transaction::{
52 SenderSignedData, TransactionKey, VerifiedCertificate, VerifiedTransaction, WithAliases,
53 },
54};
55use tokio::task::JoinSet;
56use tracing::{debug, error, info, instrument, trace, warn};
57
58use crate::{
59 authority::{
60 AuthorityMetrics, AuthorityState, ExecutionEnv,
61 authority_per_epoch_store::{
62 AuthorityPerEpochStore, CancelConsensusCertificateReason, ConsensusStats,
63 ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStats,
64 consensus_quarantine::ConsensusCommitOutput,
65 },
66 backpressure::{BackpressureManager, BackpressureSubscriber},
67 consensus_tx_status_cache::ConsensusTxStatus,
68 epoch_start_configuration::EpochStartConfigTrait,
69 execution_time_estimator::ExecutionTimeEstimator,
70 shared_object_congestion_tracker::SharedObjectCongestionTracker,
71 shared_object_version_manager::{AssignedTxAndVersions, Schedulable},
72 transaction_deferral::{DeferralKey, DeferralReason, transaction_deferral_within_limit},
73 },
74 checkpoints::{
75 CheckpointService, CheckpointServiceNotify, PendingCheckpoint, PendingCheckpointInfo,
76 },
77 consensus_adapter::ConsensusAdapter,
78 consensus_throughput_calculator::ConsensusThroughputCalculator,
79 consensus_types::consensus_output_api::{ConsensusCommitAPI, parse_block_transactions},
80 epoch::{
81 randomness::{DkgStatus, RandomnessManager},
82 reconfiguration::ReconfigState,
83 },
84 execution_cache::ObjectCacheRead,
85 execution_scheduler::{ExecutionScheduler, SchedulingSource},
86 post_consensus_tx_reorder::PostConsensusTxReorder,
87 scoring_decision::update_low_scoring_authorities,
88 traffic_controller::{TrafficController, policies::TrafficTally},
89};
90
91pub struct ConsensusHandlerInitializer {
92 state: Arc<AuthorityState>,
93 checkpoint_service: Arc<CheckpointService>,
94 epoch_store: Arc<AuthorityPerEpochStore>,
95 consensus_adapter: Arc<ConsensusAdapter>,
96 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
97 throughput_calculator: Arc<ConsensusThroughputCalculator>,
98 backpressure_manager: Arc<BackpressureManager>,
99}
100
101impl ConsensusHandlerInitializer {
102 pub fn new(
103 state: Arc<AuthorityState>,
104 checkpoint_service: Arc<CheckpointService>,
105 epoch_store: Arc<AuthorityPerEpochStore>,
106 consensus_adapter: Arc<ConsensusAdapter>,
107 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
108 throughput_calculator: Arc<ConsensusThroughputCalculator>,
109 backpressure_manager: Arc<BackpressureManager>,
110 ) -> Self {
111 Self {
112 state,
113 checkpoint_service,
114 epoch_store,
115 consensus_adapter,
116 low_scoring_authorities,
117 throughput_calculator,
118 backpressure_manager,
119 }
120 }
121
122 #[cfg(test)]
123 pub(crate) fn new_for_testing(
124 state: Arc<AuthorityState>,
125 checkpoint_service: Arc<CheckpointService>,
126 ) -> Self {
127 use crate::consensus_test_utils::make_consensus_adapter_for_test;
128 use std::collections::HashSet;
129
130 let backpressure_manager = BackpressureManager::new_for_tests();
131 let consensus_adapter =
132 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
133 Self {
134 state: state.clone(),
135 checkpoint_service,
136 epoch_store: state.epoch_store_for_testing().clone(),
137 consensus_adapter,
138 low_scoring_authorities: Arc::new(Default::default()),
139 throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
140 None,
141 state.metrics.clone(),
142 )),
143 backpressure_manager,
144 }
145 }
146
147 pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
148 let new_epoch_start_state = self.epoch_store.epoch_start_state();
149 let consensus_committee = new_epoch_start_state.get_consensus_committee();
150
151 ConsensusHandler::new(
152 self.epoch_store.clone(),
153 self.checkpoint_service.clone(),
154 self.state.execution_scheduler().clone(),
155 self.consensus_adapter.clone(),
156 self.state.get_object_cache_reader().clone(),
157 self.low_scoring_authorities.clone(),
158 consensus_committee,
159 self.state.metrics.clone(),
160 self.throughput_calculator.clone(),
161 self.backpressure_manager.subscribe(),
162 self.state.traffic_controller.clone(),
163 )
164 }
165
166 pub(crate) fn metrics(&self) -> &Arc<AuthorityMetrics> {
167 &self.state.metrics
168 }
169
170 pub(crate) fn backpressure_subscriber(&self) -> BackpressureSubscriber {
171 self.backpressure_manager.subscribe()
172 }
173}
174
175mod additional_consensus_state {
176 use std::marker::PhantomData;
177
178 use consensus_core::CommitRef;
179 use fastcrypto::hash::HashFunction as _;
180 use sui_types::{crypto::DefaultHash, digests::Digest};
181
182 use super::*;
183 #[derive(Serialize, Deserialize)]
192 pub(super) struct AdditionalConsensusState {
193 commit_interval_observer: CommitIntervalObserver,
194 }
195
196 impl AdditionalConsensusState {
197 pub fn new(additional_consensus_state_window_size: u32) -> Self {
198 Self {
199 commit_interval_observer: CommitIntervalObserver::new(
200 additional_consensus_state_window_size,
201 ),
202 }
203 }
204
205 pub(crate) fn observe_commit(
207 &mut self,
208 protocol_config: &ProtocolConfig,
209 epoch_start_time: u64,
210 consensus_commit: &impl ConsensusCommitAPI,
211 ) -> ConsensusCommitInfo {
212 self.commit_interval_observer
213 .observe_commit_time(consensus_commit);
214
215 let estimated_commit_period = self
216 .commit_interval_observer
217 .commit_interval_estimate()
218 .unwrap_or(Duration::from_millis(
219 protocol_config.min_checkpoint_interval_ms(),
220 ));
221
222 info!("estimated commit rate: {:?}", estimated_commit_period);
223
224 self.commit_info_impl(
225 epoch_start_time,
226 consensus_commit,
227 Some(estimated_commit_period),
228 )
229 }
230
231 fn commit_info_impl(
232 &self,
233 epoch_start_time: u64,
234 consensus_commit: &impl ConsensusCommitAPI,
235 estimated_commit_period: Option<Duration>,
236 ) -> ConsensusCommitInfo {
237 let leader_author = consensus_commit.leader_author_index();
238 let timestamp = consensus_commit.commit_timestamp_ms();
239
240 let timestamp = if timestamp < epoch_start_time {
241 error!(
242 "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}"
243 );
244 epoch_start_time
245 } else {
246 timestamp
247 };
248
249 ConsensusCommitInfo {
250 _phantom: PhantomData,
251 round: consensus_commit.leader_round(),
252 timestamp,
253 leader_author,
254 consensus_commit_ref: consensus_commit.commit_ref(),
255 rejected_transactions_digest: consensus_commit.rejected_transactions_digest(),
256 additional_state_digest: Some(self.digest()),
257 estimated_commit_period,
258 skip_consensus_commit_prologue_in_test: false,
259 }
260 }
261
262 fn digest(&self) -> AdditionalConsensusStateDigest {
264 let mut hash = DefaultHash::new();
265 bcs::serialize_into(&mut hash, self).unwrap();
266 AdditionalConsensusStateDigest::new(hash.finalize().into())
267 }
268 }
269
270 pub struct ConsensusCommitInfo {
271 _phantom: PhantomData<()>,
273
274 pub round: u64,
275 pub timestamp: u64,
276 pub leader_author: AuthorityIndex,
277 pub consensus_commit_ref: CommitRef,
278 pub rejected_transactions_digest: Digest,
279
280 additional_state_digest: Option<AdditionalConsensusStateDigest>,
281 estimated_commit_period: Option<Duration>,
282
283 pub skip_consensus_commit_prologue_in_test: bool,
284 }
285
286 impl ConsensusCommitInfo {
287 pub fn new_for_test(
288 commit_round: u64,
289 commit_timestamp: u64,
290 estimated_commit_period: Option<Duration>,
291 skip_consensus_commit_prologue_in_test: bool,
292 ) -> Self {
293 Self {
294 _phantom: PhantomData,
295 round: commit_round,
296 timestamp: commit_timestamp,
297 leader_author: 0,
298 consensus_commit_ref: CommitRef::default(),
299 rejected_transactions_digest: Digest::default(),
300 additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO),
301 estimated_commit_period,
302 skip_consensus_commit_prologue_in_test,
303 }
304 }
305
306 pub fn new_for_congestion_test(
307 commit_round: u64,
308 commit_timestamp: u64,
309 estimated_commit_period: Duration,
310 ) -> Self {
311 Self::new_for_test(
312 commit_round,
313 commit_timestamp,
314 Some(estimated_commit_period),
315 true,
316 )
317 }
318
319 pub fn additional_state_digest(&self) -> AdditionalConsensusStateDigest {
320 self.additional_state_digest
322 .expect("additional_state_digest is not available")
323 }
324
325 pub fn estimated_commit_period(&self) -> Duration {
326 self.estimated_commit_period
328 .expect("estimated commit period is not available")
329 }
330
331 fn consensus_commit_digest(&self) -> ConsensusCommitDigest {
332 ConsensusCommitDigest::new(self.consensus_commit_ref.digest.into_inner())
333 }
334
335 fn consensus_commit_prologue_transaction(
336 &self,
337 epoch: u64,
338 ) -> VerifiedExecutableTransaction {
339 let transaction = VerifiedTransaction::new_consensus_commit_prologue(
340 epoch,
341 self.round,
342 self.timestamp,
343 );
344 VerifiedExecutableTransaction::new_system(transaction, epoch)
345 }
346
347 fn consensus_commit_prologue_v2_transaction(
348 &self,
349 epoch: u64,
350 ) -> VerifiedExecutableTransaction {
351 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
352 epoch,
353 self.round,
354 self.timestamp,
355 self.consensus_commit_digest(),
356 );
357 VerifiedExecutableTransaction::new_system(transaction, epoch)
358 }
359
360 fn consensus_commit_prologue_v3_transaction(
361 &self,
362 epoch: u64,
363 consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
364 ) -> VerifiedExecutableTransaction {
365 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v3(
366 epoch,
367 self.round,
368 self.timestamp,
369 self.consensus_commit_digest(),
370 consensus_determined_version_assignments,
371 );
372 VerifiedExecutableTransaction::new_system(transaction, epoch)
373 }
374
375 fn consensus_commit_prologue_v4_transaction(
376 &self,
377 epoch: u64,
378 consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
379 additional_state_digest: AdditionalConsensusStateDigest,
380 ) -> VerifiedExecutableTransaction {
381 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v4(
382 epoch,
383 self.round,
384 self.timestamp,
385 self.consensus_commit_digest(),
386 consensus_determined_version_assignments,
387 additional_state_digest,
388 );
389 VerifiedExecutableTransaction::new_system(transaction, epoch)
390 }
391
392 pub fn create_consensus_commit_prologue_transaction(
393 &self,
394 epoch: u64,
395 protocol_config: &ProtocolConfig,
396 cancelled_txn_version_assignment: Vec<(
397 TransactionDigest,
398 Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
399 )>,
400 commit_info: &ConsensusCommitInfo,
401 indirect_state_observer: IndirectStateObserver,
402 ) -> VerifiedExecutableTransaction {
403 let version_assignments = if protocol_config
404 .record_consensus_determined_version_assignments_in_prologue_v2()
405 {
406 Some(
407 ConsensusDeterminedVersionAssignments::CancelledTransactionsV2(
408 cancelled_txn_version_assignment,
409 ),
410 )
411 } else if protocol_config.record_consensus_determined_version_assignments_in_prologue()
412 {
413 Some(
414 ConsensusDeterminedVersionAssignments::CancelledTransactions(
415 cancelled_txn_version_assignment
416 .into_iter()
417 .map(|(tx_digest, versions)| {
418 (
419 tx_digest,
420 versions.into_iter().map(|(id, v)| (id.0, v)).collect(),
421 )
422 })
423 .collect(),
424 ),
425 )
426 } else {
427 None
428 };
429
430 if protocol_config.record_additional_state_digest_in_prologue() {
431 let additional_state_digest =
432 if protocol_config.additional_consensus_digest_indirect_state() {
433 let d1 = commit_info.additional_state_digest();
434 indirect_state_observer.fold_with(d1)
435 } else {
436 commit_info.additional_state_digest()
437 };
438
439 self.consensus_commit_prologue_v4_transaction(
440 epoch,
441 version_assignments.unwrap(),
442 additional_state_digest,
443 )
444 } else if let Some(version_assignments) = version_assignments {
445 self.consensus_commit_prologue_v3_transaction(epoch, version_assignments)
446 } else if protocol_config.include_consensus_digest_in_prologue() {
447 self.consensus_commit_prologue_v2_transaction(epoch)
448 } else {
449 self.consensus_commit_prologue_transaction(epoch)
450 }
451 }
452 }
453
454 #[derive(Default)]
455 pub struct IndirectStateObserver {
456 hash: DefaultHash,
457 }
458
459 impl IndirectStateObserver {
460 pub fn new() -> Self {
461 Self::default()
462 }
463
464 pub fn observe_indirect_state<T: Serialize>(&mut self, state: &T) {
465 bcs::serialize_into(&mut self.hash, state).unwrap();
466 }
467
468 pub fn fold_with(
469 self,
470 d1: AdditionalConsensusStateDigest,
471 ) -> AdditionalConsensusStateDigest {
472 let hash = self.hash.finalize();
473 let d2 = AdditionalConsensusStateDigest::new(hash.into());
474
475 let mut hasher = DefaultHash::new();
476 bcs::serialize_into(&mut hasher, &d1).unwrap();
477 bcs::serialize_into(&mut hasher, &d2).unwrap();
478 AdditionalConsensusStateDigest::new(hasher.finalize().into())
479 }
480 }
481
482 #[test]
483 fn test_additional_consensus_state() {
484 use crate::consensus_test_utils::TestConsensusCommit;
485
486 fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) {
487 let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
488 state.observe_commit(
489 &protocol_config,
490 100,
491 &TestConsensusCommit::empty(round, timestamp, 0),
492 );
493 }
494
495 let mut s1 = AdditionalConsensusState::new(3);
496 observe(&mut s1, 1, 1000);
497 observe(&mut s1, 2, 2000);
498 observe(&mut s1, 3, 3000);
499 observe(&mut s1, 4, 4000);
500
501 let mut s2 = AdditionalConsensusState::new(3);
502 observe(&mut s2, 2, 2000);
505 observe(&mut s2, 3, 3000);
506 observe(&mut s2, 4, 4000);
507
508 assert_eq!(s1.digest(), s2.digest());
509
510 observe(&mut s1, 5, 5000);
511 observe(&mut s2, 5, 5000);
512
513 assert_eq!(s1.digest(), s2.digest());
514 }
515}
516use additional_consensus_state::AdditionalConsensusState;
517pub(crate) use additional_consensus_state::{ConsensusCommitInfo, IndirectStateObserver};
518
519pub struct ConsensusHandler<C> {
520 epoch_store: Arc<AuthorityPerEpochStore>,
523 last_consensus_stats: ExecutionIndicesWithStats,
527 checkpoint_service: Arc<C>,
528 cache_reader: Arc<dyn ObjectCacheRead>,
530 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
532 committee: ConsensusCommittee,
534 metrics: Arc<AuthorityMetrics>,
537 processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
539 execution_scheduler_sender: ExecutionSchedulerSender,
541 consensus_adapter: Arc<ConsensusAdapter>,
543
544 throughput_calculator: Arc<ConsensusThroughputCalculator>,
546
547 additional_consensus_state: AdditionalConsensusState,
548
549 backpressure_subscriber: BackpressureSubscriber,
550
551 traffic_controller: Option<Arc<TrafficController>>,
552}
553
554const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
555
556impl<C> ConsensusHandler<C> {
557 pub(crate) fn new(
558 epoch_store: Arc<AuthorityPerEpochStore>,
559 checkpoint_service: Arc<C>,
560 execution_scheduler: Arc<ExecutionScheduler>,
561 consensus_adapter: Arc<ConsensusAdapter>,
562 cache_reader: Arc<dyn ObjectCacheRead>,
563 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
564 committee: ConsensusCommittee,
565 metrics: Arc<AuthorityMetrics>,
566 throughput_calculator: Arc<ConsensusThroughputCalculator>,
567 backpressure_subscriber: BackpressureSubscriber,
568 traffic_controller: Option<Arc<TrafficController>>,
569 ) -> Self {
570 let mut last_consensus_stats = epoch_store
572 .get_last_consensus_stats()
573 .expect("Should be able to read last consensus index");
574 if !last_consensus_stats.stats.is_initialized() {
576 last_consensus_stats.stats = ConsensusStats::new(committee.size());
577 }
578 let execution_scheduler_sender =
579 ExecutionSchedulerSender::start(execution_scheduler, epoch_store.clone());
580 let commit_rate_estimate_window_size = epoch_store
581 .protocol_config()
582 .get_consensus_commit_rate_estimation_window_size();
583 Self {
584 epoch_store,
585 last_consensus_stats,
586 checkpoint_service,
587 cache_reader,
588 low_scoring_authorities,
589 committee,
590 metrics,
591 processed_cache: LruCache::new(
592 NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
593 ),
594 execution_scheduler_sender,
595 consensus_adapter,
596 throughput_calculator,
597 additional_consensus_state: AdditionalConsensusState::new(
598 commit_rate_estimate_window_size,
599 ),
600 backpressure_subscriber,
601 traffic_controller,
602 }
603 }
604
605 pub(crate) fn last_processed_subdag_index(&self) -> u64 {
607 self.last_consensus_stats.index.sub_dag_index
608 }
609
610 pub(crate) fn execution_scheduler_sender(&self) -> &ExecutionSchedulerSender {
611 &self.execution_scheduler_sender
612 }
613
614 pub(crate) fn new_for_testing(
615 epoch_store: Arc<AuthorityPerEpochStore>,
616 checkpoint_service: Arc<C>,
617 execution_scheduler_sender: ExecutionSchedulerSender,
618 consensus_adapter: Arc<ConsensusAdapter>,
619 cache_reader: Arc<dyn ObjectCacheRead>,
620 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
621 committee: ConsensusCommittee,
622 metrics: Arc<AuthorityMetrics>,
623 throughput_calculator: Arc<ConsensusThroughputCalculator>,
624 backpressure_subscriber: BackpressureSubscriber,
625 traffic_controller: Option<Arc<TrafficController>>,
626 last_consensus_stats: ExecutionIndicesWithStats,
627 ) -> Self {
628 let commit_rate_estimate_window_size = epoch_store
629 .protocol_config()
630 .get_consensus_commit_rate_estimation_window_size();
631 Self {
632 epoch_store,
633 last_consensus_stats,
634 checkpoint_service,
635 cache_reader,
636 low_scoring_authorities,
637 committee,
638 metrics,
639 processed_cache: LruCache::new(
640 NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
641 ),
642 execution_scheduler_sender,
643 consensus_adapter,
644 throughput_calculator,
645 additional_consensus_state: AdditionalConsensusState::new(
646 commit_rate_estimate_window_size,
647 ),
648 backpressure_subscriber,
649 traffic_controller,
650 }
651 }
652}
653
654#[derive(Default)]
655struct CommitHandlerInput {
656 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
657 capability_notifications: Vec<AuthorityCapabilitiesV2>,
658 execution_time_observations: Vec<ExecutionTimeObservation>,
659 checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
660 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
661 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
662 end_of_publish_transactions: Vec<AuthorityName>,
663 new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
664}
665
666struct CommitHandlerState {
667 dkg_failed: bool,
668 randomness_round: Option<RandomnessRound>,
669 output: ConsensusCommitOutput,
670 indirect_state_observer: Option<IndirectStateObserver>,
671 initial_reconfig_state: ReconfigState,
672}
673
674impl CommitHandlerState {
675 fn get_notifications(&self) -> Vec<SequencedConsensusTransactionKey> {
676 self.output
677 .get_consensus_messages_processed()
678 .cloned()
679 .collect()
680 }
681
682 fn init_randomness<'a, 'epoch>(
683 &'a mut self,
684 epoch_store: &'epoch AuthorityPerEpochStore,
685 commit_info: &'a ConsensusCommitInfo,
686 ) -> Option<tokio::sync::MutexGuard<'epoch, RandomnessManager>> {
687 let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| {
688 rm.try_lock()
689 .expect("should only ever be called from the commit handler thread")
690 });
691
692 let mut dkg_failed = false;
693 let randomness_round = if epoch_store.randomness_state_enabled() {
694 let randomness_manager = randomness_manager
695 .as_mut()
696 .expect("randomness manager should exist if randomness is enabled");
697 match randomness_manager.dkg_status() {
698 DkgStatus::Pending => None,
699 DkgStatus::Failed => {
700 dkg_failed = true;
701 None
702 }
703 DkgStatus::Successful => {
704 if self.initial_reconfig_state.should_accept_tx() {
707 randomness_manager
708 .reserve_next_randomness(commit_info.timestamp, &mut self.output)
710 .expect("epoch ended")
711 } else {
712 None
713 }
714 }
715 }
716 } else {
717 None
718 };
719
720 if randomness_round.is_some() {
721 assert!(!dkg_failed); }
723
724 self.randomness_round = randomness_round;
725 self.dkg_failed = dkg_failed;
726
727 randomness_manager
728 }
729}
730
731impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
732 fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
736 assert!(
737 self.epoch_store
738 .protocol_config()
739 .record_additional_state_digest_in_prologue()
740 );
741 let protocol_config = self.epoch_store.protocol_config();
742 let epoch_start_time = self
743 .epoch_store
744 .epoch_start_config()
745 .epoch_start_timestamp_ms();
746
747 self.additional_consensus_state.observe_commit(
748 protocol_config,
749 epoch_start_time,
750 &consensus_commit,
751 );
752 }
753
754 #[cfg(test)]
755 pub(crate) async fn handle_consensus_commit_for_test(
756 &mut self,
757 consensus_commit: impl ConsensusCommitAPI,
758 ) {
759 self.handle_consensus_commit(consensus_commit).await;
760 }
761
762 #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))]
763 pub(crate) async fn handle_consensus_commit(
764 &mut self,
765 consensus_commit: impl ConsensusCommitAPI,
766 ) {
767 let protocol_config = self.epoch_store.protocol_config();
768
769 assert!(protocol_config.ignore_execution_time_observations_after_certs_closed());
771 assert!(protocol_config.record_time_estimate_processed());
772 assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints());
773 assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest());
774 assert!(protocol_config.authority_capabilities_v2());
775 assert!(protocol_config.cancel_for_failed_dkg_early());
776
777 self.backpressure_subscriber.await_no_backpressure().await;
782
783 let epoch = self.epoch_store.epoch();
784
785 let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit");
786
787 let last_committed_round = self.last_consensus_stats.index.last_committed_round;
788
789 if let Some(consensus_tx_status_cache) = self.epoch_store.consensus_tx_status_cache.as_ref()
790 {
791 consensus_tx_status_cache
792 .update_last_committed_leader_round(last_committed_round as u32)
793 .await;
794 }
795 if let Some(tx_reject_reason_cache) = self.epoch_store.tx_reject_reason_cache.as_ref() {
796 tx_reject_reason_cache.set_last_committed_leader_round(last_committed_round as u32);
797 }
798
799 let commit_info = self.additional_consensus_state.observe_commit(
800 protocol_config,
801 self.epoch_store
802 .epoch_start_config()
803 .epoch_start_timestamp_ms(),
804 &consensus_commit,
805 );
806 assert!(commit_info.round > last_committed_round);
807
808 let (timestamp, leader_author, commit_sub_dag_index) =
809 self.gather_commit_metadata(&consensus_commit);
810
811 info!(
812 %consensus_commit,
813 "Received consensus output. Rejected transactions: {}",
814 consensus_commit.rejected_transactions_debug_string(),
815 );
816
817 self.last_consensus_stats.index = ExecutionIndices {
818 last_committed_round: commit_info.round,
819 sub_dag_index: commit_sub_dag_index,
820 transaction_index: 0_u64,
821 };
822
823 update_low_scoring_authorities(
824 self.low_scoring_authorities.clone(),
825 self.epoch_store.committee(),
826 &self.committee,
827 consensus_commit.reputation_score_sorted_desc(),
828 &self.metrics,
829 protocol_config.consensus_bad_nodes_stake_threshold(),
830 );
831
832 self.metrics
833 .consensus_committed_subdags
834 .with_label_values(&[&leader_author.to_string()])
835 .inc();
836
837 let mut state = CommitHandlerState {
838 output: ConsensusCommitOutput::new(commit_info.round),
839 dkg_failed: false,
840 randomness_round: None,
841 indirect_state_observer: Some(IndirectStateObserver::new()),
842 initial_reconfig_state: self
843 .epoch_store
844 .get_reconfig_state_read_lock_guard()
845 .clone(),
846 };
847
848 let transactions = self.filter_consensus_txns(
849 state.initial_reconfig_state.clone(),
850 &commit_info,
851 &consensus_commit,
852 );
853 let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions);
854
855 let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info);
856
857 let CommitHandlerInput {
858 user_transactions,
859 capability_notifications,
860 execution_time_observations,
861 checkpoint_signature_messages,
862 randomness_dkg_messages,
863 randomness_dkg_confirmations,
864 end_of_publish_transactions,
865 new_jwks,
866 } = self.build_commit_handler_input(transactions);
867
868 self.process_jwks(&mut state, &commit_info, new_jwks);
869 self.process_capability_notifications(capability_notifications);
870 self.process_execution_time_observations(&mut state, execution_time_observations);
871 self.process_checkpoint_signature_messages(checkpoint_signature_messages);
872
873 self.process_dkg_updates(
874 &mut state,
875 &commit_info,
876 randomness_manager.as_deref_mut(),
877 randomness_dkg_messages,
878 randomness_dkg_confirmations,
879 )
880 .await;
881
882 let mut execution_time_estimator = self
883 .epoch_store
884 .execution_time_estimator
885 .try_lock()
886 .expect("should only ever be called from the commit handler thread");
887
888 let authenticator_state_update_transaction =
889 self.create_authenticator_state_update(last_committed_round, &commit_info);
890
891 let (schedulables, randomness_schedulables, assigned_versions) = self.process_transactions(
892 &mut state,
893 &mut execution_time_estimator,
894 &commit_info,
895 authenticator_state_update_transaction,
896 user_transactions,
897 );
898
899 let (should_accept_tx, lock, final_round) =
900 self.handle_eop(&mut state, end_of_publish_transactions);
901
902 let make_checkpoint = should_accept_tx || final_round;
903 if !make_checkpoint {
904 return;
906 }
907
908 if final_round {
911 self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
912 }
913
914 self.create_pending_checkpoints(
915 &mut state,
916 &commit_info,
917 &schedulables,
918 &randomness_schedulables,
919 final_round,
920 );
921
922 let notifications = state.get_notifications();
923
924 state
925 .output
926 .record_consensus_commit_stats(self.last_consensus_stats.clone());
927
928 self.record_deferral_deletion(&mut state);
929
930 self.epoch_store
931 .consensus_quarantine
932 .write()
933 .push_consensus_output(state.output, &self.epoch_store)
934 .expect("push_consensus_output should not fail");
935
936 debug!(
939 ?commit_info.round,
940 "Notifying checkpoint service about new pending checkpoint(s)",
941 );
942 self.checkpoint_service
943 .notify_checkpoint()
944 .expect("failed to notify checkpoint service");
945
946 if let Some(randomness_round) = state.randomness_round {
947 randomness_manager
948 .as_ref()
949 .expect("randomness manager should exist if randomness round is provided")
950 .generate_randomness(epoch, randomness_round);
951 }
952
953 self.epoch_store.process_notifications(notifications.iter());
954
955 self.log_final_round(lock, final_round);
957
958 self.throughput_calculator
960 .add_transactions(timestamp, schedulables.len() as u64);
961
962 fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
963 let key = [commit_sub_dag_index, epoch];
964 if sui_simulator::random::deterministic_probability_once(&key, 0.01) {
965 sui_simulator::task::kill_current_node(None);
966 }
967 });
968
969 fail_point!("crash"); let mut schedulables = schedulables;
972 schedulables.extend(randomness_schedulables);
973 self.execution_scheduler_sender.send(
974 schedulables,
975 assigned_versions,
976 SchedulingSource::NonFastPath,
977 );
978
979 self.send_end_of_publish_if_needed().await;
980 }
981
982 fn handle_eop(
983 &self,
984 state: &mut CommitHandlerState,
985 end_of_publish_transactions: Vec<AuthorityName>,
986 ) -> (bool, Option<RwLockWriteGuard<'_, ReconfigState>>, bool) {
987 let collected_eop =
988 self.process_end_of_publish_transactions(state, end_of_publish_transactions);
989 if collected_eop {
990 let (lock, final_round) = self.advance_eop_state_machine(state);
991 (lock.should_accept_tx(), Some(lock), final_round)
992 } else {
993 (true, None, false)
994 }
995 }
996
997 fn record_end_of_epoch_execution_time_observations(
998 &self,
999 estimator: &mut ExecutionTimeEstimator,
1000 ) {
1001 self.epoch_store
1002 .end_of_epoch_execution_time_observations
1003 .set(estimator.take_observations())
1004 .expect("`stored_execution_time_observations` should only be set once at end of epoch");
1005 }
1006
1007 fn record_deferral_deletion(&self, state: &mut CommitHandlerState) {
1008 let mut deferred_transactions = self
1009 .epoch_store
1010 .consensus_output_cache
1011 .deferred_transactions_v2
1012 .lock();
1013 for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() {
1014 deferred_transactions.remove(&deleted_deferred_key);
1015 }
1016 }
1017
1018 fn log_final_round(&self, lock: Option<RwLockWriteGuard<ReconfigState>>, final_round: bool) {
1019 if final_round {
1020 let epoch = self.epoch_store.epoch();
1021 info!(
1022 ?epoch,
1023 lock=?lock.as_ref(),
1024 final_round=?final_round,
1025 "Notified last checkpoint"
1026 );
1027 self.epoch_store.record_end_of_message_quorum_time_metric();
1028 }
1029 }
1030
1031 fn create_pending_checkpoints(
1032 &self,
1033 state: &mut CommitHandlerState,
1034 commit_info: &ConsensusCommitInfo,
1035 schedulables: &[Schedulable],
1036 randomness_schedulables: &[Schedulable],
1037 final_round: bool,
1038 ) {
1039 let checkpoint_height = self
1040 .epoch_store
1041 .calculate_pending_checkpoint_height(commit_info.round);
1042
1043 let should_write_random_checkpoint = state.randomness_round.is_some()
1050 || (state.dkg_failed && !randomness_schedulables.is_empty());
1051
1052 let pending_checkpoint = PendingCheckpoint {
1053 roots: schedulables.iter().map(|s| s.key()).collect(),
1054 details: PendingCheckpointInfo {
1055 timestamp_ms: commit_info.timestamp,
1056 last_of_epoch: final_round && !should_write_random_checkpoint,
1057 checkpoint_height,
1058 consensus_commit_ref: commit_info.consensus_commit_ref,
1059 rejected_transactions_digest: commit_info.rejected_transactions_digest,
1060 },
1061 };
1062 self.epoch_store
1063 .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1064 .expect("failed to write pending checkpoint");
1065
1066 info!(
1067 "Written pending checkpoint: {:?}",
1068 pending_checkpoint.details,
1069 );
1070
1071 if should_write_random_checkpoint {
1072 let pending_checkpoint = PendingCheckpoint {
1073 roots: randomness_schedulables.iter().map(|s| s.key()).collect(),
1074 details: PendingCheckpointInfo {
1075 timestamp_ms: commit_info.timestamp,
1076 last_of_epoch: final_round,
1077 checkpoint_height: checkpoint_height + 1,
1078 consensus_commit_ref: commit_info.consensus_commit_ref,
1079 rejected_transactions_digest: commit_info.rejected_transactions_digest,
1080 },
1081 };
1082 self.epoch_store
1083 .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1084 .expect("failed to write pending checkpoint");
1085 }
1086 }
1087
1088 fn process_transactions(
1089 &self,
1090 state: &mut CommitHandlerState,
1091 execution_time_estimator: &mut ExecutionTimeEstimator,
1092 commit_info: &ConsensusCommitInfo,
1093 authenticator_state_update_transaction: Option<VerifiedExecutableTransactionWithAliases>,
1094 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1095 ) -> (Vec<Schedulable>, Vec<Schedulable>, AssignedTxAndVersions) {
1096 let protocol_config = self.epoch_store.protocol_config();
1097 let epoch = self.epoch_store.epoch();
1098
1099 let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) =
1102 self.merge_and_reorder_transactions(state, commit_info, user_transactions);
1103
1104 let mut shared_object_congestion_tracker =
1105 self.init_congestion_tracker(commit_info, false, &ordered_txns);
1106 let mut shared_object_using_randomness_congestion_tracker =
1107 self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns);
1108
1109 let randomness_state_update_transaction = state
1110 .randomness_round
1111 .map(|round| Schedulable::RandomnessStateUpdate(epoch, round));
1112 debug!(
1113 "Randomness state update transaction: {:?}",
1114 randomness_state_update_transaction
1115 .as_ref()
1116 .map(|t| t.key())
1117 );
1118
1119 let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len());
1120 let mut randomness_transactions_to_schedule =
1121 Vec::with_capacity(ordered_randomness_txns.len());
1122 let mut deferred_txns = BTreeMap::new();
1123 let mut cancelled_txns = BTreeMap::new();
1124
1125 for transaction in ordered_txns {
1126 self.handle_deferral_and_cancellation(
1127 state,
1128 &mut cancelled_txns,
1129 &mut deferred_txns,
1130 &mut transactions_to_schedule,
1131 protocol_config,
1132 commit_info,
1133 transaction,
1134 &mut shared_object_congestion_tracker,
1135 &previously_deferred_tx_digests,
1136 execution_time_estimator,
1137 );
1138 }
1139
1140 for transaction in ordered_randomness_txns {
1141 if state.dkg_failed {
1142 debug!(
1143 "Canceling randomness-using transaction {:?} because DKG failed",
1144 transaction.tx().digest(),
1145 );
1146 cancelled_txns.insert(
1147 *transaction.tx().digest(),
1148 CancelConsensusCertificateReason::DkgFailed,
1149 );
1150 randomness_transactions_to_schedule.push(transaction);
1151 continue;
1152 }
1153 self.handle_deferral_and_cancellation(
1154 state,
1155 &mut cancelled_txns,
1156 &mut deferred_txns,
1157 &mut randomness_transactions_to_schedule,
1158 protocol_config,
1159 commit_info,
1160 transaction,
1161 &mut shared_object_using_randomness_congestion_tracker,
1162 &previously_deferred_tx_digests,
1163 execution_time_estimator,
1164 );
1165 }
1166
1167 let mut total_deferred_txns = 0;
1168 {
1169 let mut deferred_transactions = self
1170 .epoch_store
1171 .consensus_output_cache
1172 .deferred_transactions_v2
1173 .lock();
1174 for (key, txns) in deferred_txns.into_iter() {
1175 total_deferred_txns += txns.len();
1176 deferred_transactions.insert(key, txns.clone());
1177 state.output.defer_transactions(key, txns);
1178 }
1179 }
1180
1181 self.metrics
1182 .consensus_handler_deferred_transactions
1183 .inc_by(total_deferred_txns as u64);
1184 self.metrics
1185 .consensus_handler_cancelled_transactions
1186 .inc_by(cancelled_txns.len() as u64);
1187 self.metrics
1188 .consensus_handler_max_object_costs
1189 .with_label_values(&["regular_commit"])
1190 .set(shared_object_congestion_tracker.max_cost() as i64);
1191 self.metrics
1192 .consensus_handler_max_object_costs
1193 .with_label_values(&["randomness_commit"])
1194 .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
1195
1196 let object_debts = shared_object_congestion_tracker.accumulated_debts(commit_info);
1197 let randomness_object_debts =
1198 shared_object_using_randomness_congestion_tracker.accumulated_debts(commit_info);
1199 if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get()
1200 && let Err(e) = tx_object_debts.try_send(
1201 object_debts
1202 .iter()
1203 .chain(randomness_object_debts.iter())
1204 .map(|(id, _)| *id)
1205 .collect(),
1206 )
1207 {
1208 info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
1209 }
1210
1211 state
1212 .output
1213 .set_congestion_control_object_debts(object_debts);
1214 state
1215 .output
1216 .set_congestion_control_randomness_object_debts(randomness_object_debts);
1217
1218 let mut settlement = None;
1219 let mut randomness_settlement = None;
1220 if self.epoch_store.accumulators_enabled() {
1221 let checkpoint_height = self
1222 .epoch_store
1223 .calculate_pending_checkpoint_height(commit_info.round);
1224
1225 settlement = Some(Schedulable::AccumulatorSettlement(epoch, checkpoint_height));
1226
1227 if state.randomness_round.is_some() || !randomness_transactions_to_schedule.is_empty() {
1228 randomness_settlement = Some(Schedulable::AccumulatorSettlement(
1229 epoch,
1230 checkpoint_height + 1,
1231 ));
1232 }
1233 }
1234
1235 let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1236 .then_some(Schedulable::ConsensusCommitPrologue(
1237 epoch,
1238 commit_info.round,
1239 commit_info.consensus_commit_ref.index,
1240 ));
1241
1242 let schedulables: Vec<_> = itertools::chain!(
1243 consensus_commit_prologue.into_iter(),
1244 authenticator_state_update_transaction
1245 .into_iter()
1246 .map(Schedulable::Transaction),
1247 transactions_to_schedule
1248 .into_iter()
1249 .map(Schedulable::Transaction),
1250 settlement,
1251 )
1252 .collect();
1253
1254 let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1255 .into_iter()
1256 .chain(
1257 randomness_transactions_to_schedule
1258 .into_iter()
1259 .map(Schedulable::Transaction),
1260 )
1261 .chain(randomness_settlement)
1262 .collect();
1263
1264 let assigned_versions = self
1265 .epoch_store
1266 .process_consensus_transaction_shared_object_versions(
1267 self.cache_reader.as_ref(),
1268 schedulables.iter(),
1269 randomness_schedulables.iter(),
1270 &cancelled_txns,
1271 &mut state.output,
1272 )
1273 .expect("failed to assign shared object versions");
1274
1275 let consensus_commit_prologue =
1276 self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1277
1278 let mut schedulables = schedulables;
1279 let mut assigned_versions = assigned_versions;
1280 if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1281 assert!(matches!(
1282 schedulables[0],
1283 Schedulable::ConsensusCommitPrologue(..)
1284 ));
1285 assert!(matches!(
1286 assigned_versions.0[0].0,
1287 TransactionKey::ConsensusCommitPrologue(..)
1288 ));
1289 assigned_versions.0[0].0 =
1290 TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1291 schedulables[0] = Schedulable::Transaction(consensus_commit_prologue);
1292 }
1293
1294 self.epoch_store
1295 .process_user_signatures(schedulables.iter().chain(randomness_schedulables.iter()));
1296
1297 let schedulables: Vec<Schedulable> = schedulables.into_iter().map(|s| s.into()).collect();
1299 let randomness_schedulables: Vec<Schedulable> = randomness_schedulables
1300 .into_iter()
1301 .map(|s| s.into())
1302 .collect();
1303
1304 (schedulables, randomness_schedulables, assigned_versions)
1305 }
1306
1307 fn add_consensus_commit_prologue_transaction<'a>(
1311 &'a self,
1312 state: &'a mut CommitHandlerState,
1313 commit_info: &'a ConsensusCommitInfo,
1314 assigned_versions: &AssignedTxAndVersions,
1315 ) -> Option<VerifiedExecutableTransactionWithAliases> {
1316 {
1317 if commit_info.skip_consensus_commit_prologue_in_test {
1318 return None;
1319 }
1320 }
1321
1322 let mut cancelled_txn_version_assignment = Vec::new();
1323
1324 let protocol_config = self.epoch_store.protocol_config();
1325
1326 for (txn_key, assigned_versions) in assigned_versions.0.iter() {
1327 let Some(d) = txn_key.as_digest() else {
1328 continue;
1329 };
1330
1331 if !protocol_config.include_cancelled_randomness_txns_in_prologue()
1332 && assigned_versions
1333 .shared_object_versions
1334 .iter()
1335 .any(|((id, _), _)| *id == SUI_RANDOMNESS_STATE_OBJECT_ID)
1336 {
1337 continue;
1338 }
1339
1340 if assigned_versions
1341 .shared_object_versions
1342 .iter()
1343 .any(|(_, version)| version.is_cancelled())
1344 {
1345 assert_reachable!("cancelled transactions");
1346 cancelled_txn_version_assignment
1347 .push((*d, assigned_versions.shared_object_versions.clone()));
1348 }
1349 }
1350
1351 fail_point_arg!(
1352 "additional_cancelled_txns_for_tests",
1353 |additional_cancelled_txns: Vec<(
1354 TransactionDigest,
1355 Vec<(ConsensusObjectSequenceKey, SequenceNumber)>
1356 )>| {
1357 cancelled_txn_version_assignment.extend(additional_cancelled_txns);
1358 }
1359 );
1360
1361 let transaction = commit_info.create_consensus_commit_prologue_transaction(
1362 self.epoch_store.epoch(),
1363 self.epoch_store.protocol_config(),
1364 cancelled_txn_version_assignment,
1365 commit_info,
1366 state.indirect_state_observer.take().unwrap(),
1367 );
1368 Some(VerifiedExecutableTransactionWithAliases::no_aliases(
1369 transaction,
1370 ))
1371 }
1372
1373 fn handle_deferral_and_cancellation(
1374 &self,
1375 state: &mut CommitHandlerState,
1376 cancelled_txns: &mut BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1377 deferred_txns: &mut BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>,
1378 scheduled_txns: &mut Vec<VerifiedExecutableTransactionWithAliases>,
1379 protocol_config: &ProtocolConfig,
1380 commit_info: &ConsensusCommitInfo,
1381 transaction: VerifiedExecutableTransactionWithAliases,
1382 shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
1383 previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
1384 execution_time_estimator: &ExecutionTimeEstimator,
1385 ) {
1386 let tx_cost = shared_object_congestion_tracker.get_tx_cost(
1387 execution_time_estimator,
1388 transaction.tx(),
1389 state.indirect_state_observer.as_mut().unwrap(),
1390 );
1391
1392 let deferral_info = self.epoch_store.should_defer(
1393 transaction.tx(),
1394 commit_info,
1395 state.dkg_failed,
1396 state.randomness_round.is_some(),
1397 previously_deferred_tx_digests,
1398 shared_object_congestion_tracker,
1399 );
1400
1401 if let Some((deferral_key, deferral_reason)) = deferral_info {
1402 debug!(
1403 "Deferring consensus certificate for transaction {:?} until {:?}",
1404 transaction.tx().digest(),
1405 deferral_key
1406 );
1407
1408 match deferral_reason {
1409 DeferralReason::RandomnessNotReady => {
1410 deferred_txns
1411 .entry(deferral_key)
1412 .or_default()
1413 .push(transaction);
1414 }
1415 DeferralReason::SharedObjectCongestion(congested_objects) => {
1416 self.metrics.consensus_handler_congested_transactions.inc();
1417 if transaction_deferral_within_limit(
1418 &deferral_key,
1419 protocol_config.max_deferral_rounds_for_congestion_control(),
1420 ) {
1421 deferred_txns
1422 .entry(deferral_key)
1423 .or_default()
1424 .push(transaction);
1425 } else {
1426 assert_sometimes!(
1427 transaction.tx().data().transaction_data().uses_randomness(),
1428 "cancelled randomness-using transaction"
1429 );
1430 assert_sometimes!(
1431 !transaction.tx().data().transaction_data().uses_randomness(),
1432 "cancelled non-randomness-using transaction"
1433 );
1434
1435 debug!(
1437 "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}",
1438 transaction.tx().digest(),
1439 deferral_key,
1440 congested_objects
1441 );
1442 cancelled_txns.insert(
1443 *transaction.tx().digest(),
1444 CancelConsensusCertificateReason::CongestionOnObjects(
1445 congested_objects,
1446 ),
1447 );
1448 scheduled_txns.push(transaction);
1449 }
1450 }
1451 }
1452 } else {
1453 shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, transaction.tx());
1455 scheduled_txns.push(transaction);
1456 }
1457 }
1458
1459 fn merge_and_reorder_transactions(
1460 &self,
1461 state: &mut CommitHandlerState,
1462 commit_info: &ConsensusCommitInfo,
1463 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1464 ) -> (
1465 Vec<VerifiedExecutableTransactionWithAliases>,
1466 Vec<VerifiedExecutableTransactionWithAliases>,
1467 HashMap<TransactionDigest, DeferralKey>,
1468 ) {
1469 let protocol_config = self.epoch_store.protocol_config();
1470
1471 let (mut txns, mut randomness_txns, previously_deferred_tx_digests) =
1472 self.load_deferred_transactions(state, commit_info);
1473
1474 txns.reserve(user_transactions.len());
1475 randomness_txns.reserve(user_transactions.len());
1476
1477 let mut txns: Vec<_> = txns
1480 .into_iter()
1481 .filter_map(|tx| {
1482 if tx.tx().transaction_data().uses_randomness() {
1483 randomness_txns.push(tx);
1484 None
1485 } else {
1486 Some(tx)
1487 }
1488 })
1489 .collect();
1490
1491 for txn in user_transactions {
1492 if txn.tx().transaction_data().uses_randomness() {
1493 randomness_txns.push(txn);
1494 } else {
1495 txns.push(txn);
1496 }
1497 }
1498
1499 PostConsensusTxReorder::reorder(
1500 &mut txns,
1501 protocol_config.consensus_transaction_ordering(),
1502 );
1503 PostConsensusTxReorder::reorder(
1504 &mut randomness_txns,
1505 protocol_config.consensus_transaction_ordering(),
1506 );
1507
1508 (txns, randomness_txns, previously_deferred_tx_digests)
1509 }
1510
1511 fn load_deferred_transactions(
1512 &self,
1513 state: &mut CommitHandlerState,
1514 commit_info: &ConsensusCommitInfo,
1515 ) -> (
1516 Vec<VerifiedExecutableTransactionWithAliases>,
1517 Vec<VerifiedExecutableTransactionWithAliases>,
1518 HashMap<TransactionDigest, DeferralKey>,
1519 ) {
1520 let mut previously_deferred_tx_digests = HashMap::new();
1521
1522 let deferred_txs: Vec<_> = self
1523 .epoch_store
1524 .load_deferred_transactions_for_up_to_consensus_round_v2(
1525 &mut state.output,
1526 commit_info.round,
1527 )
1528 .expect("db error")
1529 .into_iter()
1530 .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
1531 .map(|(key, tx)| {
1532 previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
1533 tx
1534 })
1535 .collect();
1536 trace!(
1537 "loading deferred transactions: {:?}",
1538 deferred_txs.iter().map(|tx| tx.tx().digest())
1539 );
1540
1541 let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() {
1542 let txns: Vec<_> = self
1543 .epoch_store
1544 .load_deferred_transactions_for_randomness_v2(&mut state.output)
1545 .expect("db error")
1546 .into_iter()
1547 .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
1548 .map(|(key, tx)| {
1549 previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
1550 tx
1551 })
1552 .collect();
1553 trace!(
1554 "loading deferred randomness transactions: {:?}",
1555 txns.iter().map(|tx| tx.tx().digest())
1556 );
1557 txns
1558 } else {
1559 vec![]
1560 };
1561
1562 (
1563 deferred_txs,
1564 deferred_randomness_txs,
1565 previously_deferred_tx_digests,
1566 )
1567 }
1568
1569 fn init_congestion_tracker(
1570 &self,
1571 commit_info: &ConsensusCommitInfo,
1572 for_randomness: bool,
1573 txns: &[VerifiedExecutableTransactionWithAliases],
1574 ) -> SharedObjectCongestionTracker {
1575 #[allow(unused_mut)]
1576 let mut ret = SharedObjectCongestionTracker::from_protocol_config(
1577 self.epoch_store
1578 .consensus_quarantine
1579 .read()
1580 .load_initial_object_debts(
1581 &self.epoch_store,
1582 commit_info.round,
1583 for_randomness,
1584 txns,
1585 )
1586 .expect("db error"),
1587 self.epoch_store.protocol_config(),
1588 for_randomness,
1589 );
1590
1591 fail_point_arg!(
1592 "initial_congestion_tracker",
1593 |tracker: SharedObjectCongestionTracker| {
1594 info!(
1595 "Initialize shared_object_congestion_tracker to {:?}",
1596 tracker
1597 );
1598 ret = tracker;
1599 }
1600 );
1601
1602 ret
1603 }
1604
1605 fn process_jwks(
1606 &self,
1607 state: &mut CommitHandlerState,
1608 commit_info: &ConsensusCommitInfo,
1609 new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
1610 ) {
1611 for (authority_name, jwk_id, jwk) in new_jwks {
1612 self.epoch_store.record_jwk_vote(
1613 &mut state.output,
1614 commit_info.round,
1615 authority_name,
1616 &jwk_id,
1617 &jwk,
1618 );
1619 }
1620 }
1621
1622 fn process_capability_notifications(
1623 &self,
1624 capability_notifications: Vec<AuthorityCapabilitiesV2>,
1625 ) {
1626 for capabilities in capability_notifications {
1627 self.epoch_store
1628 .record_capabilities_v2(&capabilities)
1629 .expect("db error");
1630 }
1631 }
1632
1633 fn process_execution_time_observations(
1634 &self,
1635 state: &mut CommitHandlerState,
1636 execution_time_observations: Vec<ExecutionTimeObservation>,
1637 ) {
1638 let mut execution_time_estimator = self
1639 .epoch_store
1640 .execution_time_estimator
1641 .try_lock()
1642 .expect("should only ever be called from the commit handler thread");
1643
1644 for ExecutionTimeObservation {
1645 authority,
1646 generation,
1647 estimates,
1648 } in execution_time_observations
1649 {
1650 let authority_index = self
1651 .epoch_store
1652 .committee()
1653 .authority_index(&authority)
1654 .unwrap();
1655 execution_time_estimator.process_observations_from_consensus(
1656 authority_index,
1657 Some(generation),
1658 &estimates,
1659 );
1660 state
1661 .output
1662 .insert_execution_time_observation(authority_index, generation, estimates);
1663 }
1664 }
1665
1666 fn process_checkpoint_signature_messages(
1667 &self,
1668 checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
1669 ) {
1670 for checkpoint_signature_message in checkpoint_signature_messages {
1671 self.checkpoint_service
1672 .notify_checkpoint_signature(&self.epoch_store, &checkpoint_signature_message)
1673 .expect("db error");
1674 }
1675 }
1676
1677 async fn process_dkg_updates(
1678 &self,
1679 state: &mut CommitHandlerState,
1680 commit_info: &ConsensusCommitInfo,
1681 randomness_manager: Option<&mut RandomnessManager>,
1682 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
1683 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
1684 ) {
1685 if !self.epoch_store.randomness_state_enabled() {
1686 let num_dkg_messages = randomness_dkg_messages.len();
1687 let num_dkg_confirmations = randomness_dkg_confirmations.len();
1688 if num_dkg_messages + num_dkg_confirmations > 0 {
1689 debug_fatal!(
1690 "received {} RandomnessDkgMessage and {} RandomnessDkgConfirmation messages when randomness is not enabled",
1691 num_dkg_messages,
1692 num_dkg_confirmations
1693 );
1694 }
1695 return;
1696 }
1697
1698 let randomness_manager =
1699 randomness_manager.expect("randomness manager should exist if randomness is enabled");
1700
1701 let randomness_dkg_updates =
1702 self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages);
1703
1704 let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations(
1705 state,
1706 randomness_manager,
1707 randomness_dkg_confirmations,
1708 );
1709
1710 if randomness_dkg_updates || randomness_dkg_confirmation_updates {
1711 randomness_manager
1712 .advance_dkg(&mut state.output, commit_info.round)
1713 .await
1714 .expect("epoch ended");
1715 }
1716 }
1717
1718 fn process_randomness_dkg_messages(
1719 &self,
1720 randomness_manager: &mut RandomnessManager,
1721 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
1722 ) -> bool {
1723 if randomness_dkg_messages.is_empty() {
1724 return false;
1725 }
1726
1727 let mut randomness_state_updated = false;
1728 for (authority, bytes) in randomness_dkg_messages {
1729 match bcs::from_bytes(&bytes) {
1730 Ok(message) => {
1731 randomness_manager
1732 .add_message(&authority, message)
1733 .expect("epoch ended");
1735 randomness_state_updated = true;
1736 }
1737
1738 Err(e) => {
1739 warn!(
1740 "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
1741 authority.concise(),
1742 );
1743 }
1744 }
1745 }
1746
1747 randomness_state_updated
1748 }
1749
1750 fn process_randomness_dkg_confirmations(
1751 &self,
1752 state: &mut CommitHandlerState,
1753 randomness_manager: &mut RandomnessManager,
1754 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
1755 ) -> bool {
1756 if randomness_dkg_confirmations.is_empty() {
1757 return false;
1758 }
1759
1760 let mut randomness_state_updated = false;
1761 for (authority, bytes) in randomness_dkg_confirmations {
1762 match bcs::from_bytes(&bytes) {
1763 Ok(message) => {
1764 randomness_manager
1765 .add_confirmation(&mut state.output, &authority, message)
1766 .expect("epoch ended");
1768 randomness_state_updated = true;
1769 }
1770 Err(e) => {
1771 warn!(
1772 "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}",
1773 authority.concise(),
1774 );
1775 }
1776 }
1777 }
1778
1779 randomness_state_updated
1780 }
1781
1782 fn process_end_of_publish_transactions(
1784 &self,
1785 state: &mut CommitHandlerState,
1786 end_of_publish_transactions: Vec<AuthorityName>,
1787 ) -> bool {
1788 let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect(
1789 "No contention on end_of_publish as it is only accessed from consensus handler",
1790 );
1791
1792 if eop_aggregator.has_quorum() {
1793 return true;
1794 }
1795
1796 if end_of_publish_transactions.is_empty() {
1797 return false;
1798 }
1799
1800 for authority in end_of_publish_transactions {
1801 info!("Received EndOfPublish from {:?}", authority.concise());
1802
1803 state.output.insert_end_of_publish(authority);
1806 if eop_aggregator
1807 .insert_generic(authority, ())
1808 .is_quorum_reached()
1809 {
1810 debug!(
1811 "Collected enough end_of_publish messages with last message from validator {:?}",
1812 authority.concise(),
1813 );
1814 return true;
1815 }
1816 }
1817
1818 false
1819 }
1820
1821 fn advance_eop_state_machine(
1824 &self,
1825 state: &mut CommitHandlerState,
1826 ) -> (
1827 RwLockWriteGuard<'_, ReconfigState>,
1828 bool, ) {
1830 let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard();
1831 let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx();
1832
1833 reconfig_state.close_all_certs();
1834
1835 let commit_has_deferred_txns = state.output.has_deferred_transactions();
1836 let previous_commits_have_deferred_txns =
1837 !self.epoch_store.deferred_transactions_empty_v2();
1838
1839 if !commit_has_deferred_txns && !previous_commits_have_deferred_txns {
1840 if !start_state_is_reject_all_tx {
1841 info!("Transitioning to RejectAllTx");
1842 }
1843 reconfig_state.close_all_tx();
1844 } else {
1845 debug!(
1846 "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}",
1847 previous_commits_have_deferred_txns, commit_has_deferred_txns,
1848 );
1849 }
1850
1851 state.output.store_reconfig_state(reconfig_state.clone());
1852
1853 if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() {
1854 (reconfig_state, true)
1855 } else {
1856 (reconfig_state, false)
1857 }
1858 }
1859
1860 fn gather_commit_metadata(
1861 &self,
1862 consensus_commit: &impl ConsensusCommitAPI,
1863 ) -> (u64, AuthorityIndex, u64) {
1864 let timestamp = consensus_commit.commit_timestamp_ms();
1865 let leader_author = consensus_commit.leader_author_index();
1866 let commit_sub_dag_index = consensus_commit.commit_sub_dag_index();
1867
1868 let system_time_ms = SystemTime::now()
1869 .duration_since(UNIX_EPOCH)
1870 .unwrap()
1871 .as_millis() as i64;
1872
1873 let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64);
1874 let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0;
1875 self.metrics
1876 .consensus_timestamp_bias
1877 .observe(consensus_timestamp_bias_seconds);
1878
1879 let epoch_start = self
1880 .epoch_store
1881 .epoch_start_config()
1882 .epoch_start_timestamp_ms();
1883 let timestamp = if timestamp < epoch_start {
1884 error!(
1885 "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}"
1886 );
1887 epoch_start
1888 } else {
1889 timestamp
1890 };
1891
1892 (timestamp, leader_author, commit_sub_dag_index)
1893 }
1894
1895 fn create_authenticator_state_update(
1896 &self,
1897 last_committed_round: u64,
1898 commit_info: &ConsensusCommitInfo,
1899 ) -> Option<VerifiedExecutableTransactionWithAliases> {
1900 let new_jwks = self
1908 .epoch_store
1909 .get_new_jwks(last_committed_round)
1910 .expect("Unrecoverable error in consensus handler");
1911
1912 if !new_jwks.is_empty() {
1913 let authenticator_state_update_transaction = authenticator_state_update_transaction(
1914 &self.epoch_store,
1915 commit_info.round,
1916 new_jwks,
1917 );
1918 debug!(
1919 "adding AuthenticatorStateUpdate({:?}) tx: {:?}",
1920 authenticator_state_update_transaction.digest(),
1921 authenticator_state_update_transaction,
1922 );
1923
1924 Some(VerifiedExecutableTransactionWithAliases::no_aliases(
1925 authenticator_state_update_transaction,
1926 ))
1927 } else {
1928 None
1929 }
1930 }
1931
1932 #[instrument(level = "trace", skip_all)]
1934 fn filter_consensus_txns(
1935 &mut self,
1936 initial_reconfig_state: ReconfigState,
1937 commit_info: &ConsensusCommitInfo,
1938 consensus_commit: &impl ConsensusCommitAPI,
1939 ) -> Vec<(SequencedConsensusTransactionKind, u32)> {
1940 let mut transactions = Vec::new();
1941 let epoch = self.epoch_store.epoch();
1942 let mut num_finalized_user_transactions = vec![0; self.committee.size()];
1943 let mut num_rejected_user_transactions = vec![0; self.committee.size()];
1944 for (block, parsed_transactions) in consensus_commit.transactions() {
1945 let author = block.author.value();
1946 self.last_consensus_stats.stats.inc_num_messages(author);
1948
1949 self.epoch_store.set_consensus_tx_status(
1951 ConsensusPosition::ping(epoch, block),
1952 ConsensusTxStatus::Finalized,
1953 );
1954
1955 for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() {
1956 let position = ConsensusPosition {
1957 epoch,
1958 block,
1959 index: tx_index as TransactionIndex,
1960 };
1961
1962 if self.epoch_store.protocol_config().mysticeti_fastpath()
1965 && let Some(tx) = parsed.transaction.kind.as_user_transaction()
1966 {
1967 let digest = tx.digest();
1968 if let Some((spam_weight, submitter_client_addrs)) = self
1969 .epoch_store
1970 .submitted_transaction_cache
1971 .increment_submission_count(digest)
1972 {
1973 if let Some(ref traffic_controller) = self.traffic_controller {
1974 debug!(
1975 "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses",
1976 submitter_client_addrs.len()
1977 );
1978
1979 for addr in submitter_client_addrs {
1981 traffic_controller.tally(TrafficTally::new(
1982 Some(addr),
1983 None,
1984 None,
1985 spam_weight.clone(),
1986 ));
1987 }
1988 } else {
1989 warn!(
1990 "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)",
1991 submitter_client_addrs.len()
1992 );
1993 }
1994 }
1995 }
1996
1997 if parsed.rejected {
1998 if matches!(
2000 parsed.transaction.kind,
2001 ConsensusTransactionKind::UserTransaction(_)
2002 | ConsensusTransactionKind::UserTransactionV2(_)
2003 ) {
2004 self.epoch_store
2005 .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
2006 num_rejected_user_transactions[author] += 1;
2007 }
2008 continue;
2011 }
2012 if matches!(
2013 parsed.transaction.kind,
2014 ConsensusTransactionKind::UserTransaction(_)
2015 | ConsensusTransactionKind::UserTransactionV2(_)
2016 ) {
2017 self.epoch_store
2018 .set_consensus_tx_status(position, ConsensusTxStatus::Finalized);
2019 num_finalized_user_transactions[author] += 1;
2020 }
2021 let kind = classify(&parsed.transaction);
2022 self.metrics
2023 .consensus_handler_processed
2024 .with_label_values(&[kind])
2025 .inc();
2026 self.metrics
2027 .consensus_handler_transaction_sizes
2028 .with_label_values(&[kind])
2029 .observe(parsed.serialized_len as f64);
2030 if matches!(
2032 &parsed.transaction.kind,
2033 ConsensusTransactionKind::CertifiedTransaction(_)
2034 | ConsensusTransactionKind::UserTransaction(_)
2035 | ConsensusTransactionKind::UserTransactionV2(_)
2036 ) {
2037 self.last_consensus_stats
2038 .stats
2039 .inc_num_user_transactions(author);
2040 }
2041
2042 if !initial_reconfig_state.should_accept_consensus_certs() {
2043 match &parsed.transaction.kind {
2046 ConsensusTransactionKind::UserTransaction(_)
2047 | ConsensusTransactionKind::UserTransactionV2(_)
2048 | ConsensusTransactionKind::CertifiedTransaction(_)
2049 | ConsensusTransactionKind::CapabilityNotification(_)
2051 | ConsensusTransactionKind::CapabilityNotificationV2(_)
2052 | ConsensusTransactionKind::EndOfPublish(_)
2053 | ConsensusTransactionKind::ExecutionTimeObservation(_)
2055 | ConsensusTransactionKind::NewJWKFetched(_, _, _) => {
2056 debug!(
2057 "Ignoring consensus transaction {:?} because of end of epoch",
2058 parsed.transaction.key()
2059 );
2060 continue;
2061 }
2062
2063 ConsensusTransactionKind::CheckpointSignature(_)
2065 | ConsensusTransactionKind::CheckpointSignatureV2(_)
2066 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2067 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
2068 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => ()
2069 }
2070 }
2071
2072 if !initial_reconfig_state.should_accept_tx() {
2073 match &parsed.transaction.kind {
2074 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
2075 | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue,
2076 _ => {}
2077 }
2078 }
2079
2080 if parsed.transaction.is_mfp_transaction()
2081 && !self.epoch_store.protocol_config().mysticeti_fastpath()
2082 {
2083 debug!(
2084 "Ignoring MFP transaction {:?} because MFP is disabled",
2085 parsed.transaction.key()
2086 );
2087 continue;
2088 }
2089
2090 if let ConsensusTransactionKind::CertifiedTransaction(certificate) =
2091 &parsed.transaction.kind
2092 && certificate.epoch() != epoch
2093 {
2094 debug!(
2095 "Certificate epoch ({:?}) doesn't match the current epoch ({:?})",
2096 certificate.epoch(),
2097 epoch
2098 );
2099 continue;
2100 }
2101
2102 match &parsed.transaction.kind {
2104 ConsensusTransactionKind::CapabilityNotification(_)
2105 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2106 | ConsensusTransactionKind::CheckpointSignature(_) => {
2107 debug_fatal!(
2108 "BUG: saw deprecated tx {:?}for commit round {}",
2109 parsed.transaction.key(),
2110 commit_info.round
2111 );
2112 continue;
2113 }
2114 _ => {}
2115 }
2116
2117 if matches!(
2118 &parsed.transaction.kind,
2119 ConsensusTransactionKind::UserTransaction(_)
2120 | ConsensusTransactionKind::UserTransactionV2(_)
2121 | ConsensusTransactionKind::CertifiedTransaction(_)
2122 ) {
2123 let author_name = self
2124 .epoch_store
2125 .committee()
2126 .authority_by_index(author as u32)
2127 .unwrap();
2128 if self
2129 .epoch_store
2130 .has_received_end_of_publish_from(author_name)
2131 {
2132 warn!(
2136 "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus",
2137 author_name.concise(),
2138 parsed.transaction.key(),
2139 );
2140 continue;
2141 }
2142 }
2143
2144 let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);
2145 transactions.push((transaction, author as u32));
2146 }
2147 }
2148
2149 for (i, authority) in self.committee.authorities() {
2150 let hostname = &authority.hostname;
2151 self.metrics
2152 .consensus_committed_messages
2153 .with_label_values(&[hostname])
2154 .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
2155 self.metrics
2156 .consensus_committed_user_transactions
2157 .with_label_values(&[hostname])
2158 .set(
2159 self.last_consensus_stats
2160 .stats
2161 .get_num_user_transactions(i.value()) as i64,
2162 );
2163 self.metrics
2164 .consensus_finalized_user_transactions
2165 .with_label_values(&[hostname])
2166 .add(num_finalized_user_transactions[i.value()] as i64);
2167 self.metrics
2168 .consensus_rejected_user_transactions
2169 .with_label_values(&[hostname])
2170 .add(num_rejected_user_transactions[i.value()] as i64);
2171 }
2172
2173 transactions
2174 }
2175
2176 fn deduplicate_consensus_txns(
2177 &mut self,
2178 state: &mut CommitHandlerState,
2179 commit_info: &ConsensusCommitInfo,
2180 transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
2181 ) -> Vec<VerifiedSequencedConsensusTransaction> {
2182 let mut processed_set = HashSet::new();
2185
2186 let mut all_transactions = Vec::new();
2187
2188 for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
2193 let current_tx_index = ExecutionIndices {
2198 last_committed_round: commit_info.round,
2199 sub_dag_index: commit_info.consensus_commit_ref.index.into(),
2200 transaction_index: (seq + 1) as u64,
2201 };
2202
2203 self.last_consensus_stats.index = current_tx_index;
2204
2205 let certificate_author = *self
2206 .epoch_store
2207 .committee()
2208 .authority_by_index(cert_origin)
2209 .unwrap();
2210
2211 let sequenced_transaction = SequencedConsensusTransaction {
2212 certificate_author_index: cert_origin,
2213 certificate_author,
2214 consensus_index: current_tx_index,
2215 transaction,
2216 };
2217
2218 let Some(verified_transaction) = self
2219 .epoch_store
2220 .verify_consensus_transaction(sequenced_transaction)
2221 else {
2222 continue;
2223 };
2224
2225 let key = verified_transaction.0.key();
2226
2227 if let Some(tx_digest) = key.user_transaction_digest() {
2228 self.epoch_store
2229 .cache_recently_finalized_transaction(tx_digest);
2230 }
2231
2232 let in_set = !processed_set.insert(key.clone());
2233 let in_cache = self.processed_cache.put(key.clone(), ()).is_some();
2234 if in_set || in_cache {
2235 self.metrics.skipped_consensus_txns_cache_hit.inc();
2236 continue;
2237 }
2238 if self
2239 .epoch_store
2240 .is_consensus_message_processed(&key)
2241 .expect("db error")
2242 {
2243 self.metrics.skipped_consensus_txns.inc();
2244 continue;
2245 }
2246
2247 state.output.record_consensus_message_processed(key);
2248
2249 all_transactions.push(verified_transaction);
2250 }
2251
2252 all_transactions
2253 }
2254
2255 fn build_commit_handler_input(
2256 &self,
2257 transactions: Vec<VerifiedSequencedConsensusTransaction>,
2258 ) -> CommitHandlerInput {
2259 let epoch = self.epoch_store.epoch();
2260 let mut commit_handler_input = CommitHandlerInput::default();
2261
2262 for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() {
2263 match transaction.transaction {
2264 SequencedConsensusTransactionKind::External(consensus_transaction) => {
2265 match consensus_transaction.kind {
2266 ConsensusTransactionKind::CertifiedTransaction(cert) => {
2268 let cert = VerifiedCertificate::new_unchecked(*cert);
2270 let transaction =
2271 VerifiedExecutableTransaction::new_from_certificate(cert);
2272 commit_handler_input.user_transactions.push(
2273 VerifiedExecutableTransactionWithAliases::no_aliases(transaction),
2274 );
2275 }
2276 ConsensusTransactionKind::UserTransaction(tx) => {
2277 let tx = VerifiedTransaction::new_unchecked(*tx);
2279 let transaction =
2281 VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
2282 commit_handler_input
2283 .user_transactions
2284 .push(VerifiedExecutableTransactionWithAliases::no_aliases(
2286 transaction,
2287 ));
2288 }
2289 ConsensusTransactionKind::UserTransactionV2(tx) => {
2290 let (tx, used_alias_versions) = tx.into_inner();
2291 let tx = VerifiedTransaction::new_unchecked(tx);
2293 let transaction =
2295 VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
2296 commit_handler_input
2297 .user_transactions
2298 .push(WithAliases::new(transaction, used_alias_versions));
2299 }
2300
2301 ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => {
2303 commit_handler_input
2304 .end_of_publish_transactions
2305 .push(authority_public_key_bytes);
2306 }
2307 ConsensusTransactionKind::NewJWKFetched(
2308 authority_public_key_bytes,
2309 jwk_id,
2310 jwk,
2311 ) => {
2312 commit_handler_input.new_jwks.push((
2313 authority_public_key_bytes,
2314 jwk_id,
2315 jwk,
2316 ));
2317 }
2318 ConsensusTransactionKind::RandomnessDkgMessage(
2319 authority_public_key_bytes,
2320 items,
2321 ) => {
2322 commit_handler_input
2323 .randomness_dkg_messages
2324 .push((authority_public_key_bytes, items));
2325 }
2326 ConsensusTransactionKind::RandomnessDkgConfirmation(
2327 authority_public_key_bytes,
2328 items,
2329 ) => {
2330 commit_handler_input
2331 .randomness_dkg_confirmations
2332 .push((authority_public_key_bytes, items));
2333 }
2334 ConsensusTransactionKind::CapabilityNotificationV2(
2335 authority_capabilities_v2,
2336 ) => {
2337 commit_handler_input
2338 .capability_notifications
2339 .push(authority_capabilities_v2);
2340 }
2341 ConsensusTransactionKind::ExecutionTimeObservation(
2342 execution_time_observation,
2343 ) => {
2344 commit_handler_input
2345 .execution_time_observations
2346 .push(execution_time_observation);
2347 }
2348 ConsensusTransactionKind::CheckpointSignatureV2(
2349 checkpoint_signature_message,
2350 ) => {
2351 commit_handler_input
2352 .checkpoint_signature_messages
2353 .push(*checkpoint_signature_message);
2354 }
2355
2356 ConsensusTransactionKind::CheckpointSignature(_)
2358 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2359 | ConsensusTransactionKind::CapabilityNotification(_) => {
2360 unreachable!("filtered earlier")
2361 }
2362 }
2363 }
2364 SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(),
2366 }
2367 }
2368
2369 commit_handler_input
2370 }
2371
2372 async fn send_end_of_publish_if_needed(&self) {
2373 if !self.epoch_store.should_send_end_of_publish() {
2374 return;
2375 }
2376
2377 let end_of_publish = ConsensusTransaction::new_end_of_publish(self.epoch_store.name);
2378 if let Err(err) =
2379 self.consensus_adapter
2380 .submit(end_of_publish, None, &self.epoch_store, None, None)
2381 {
2382 warn!(
2383 "Error when sending EndOfPublish message from ConsensusHandler: {:?}",
2384 err
2385 );
2386 } else {
2387 info!(epoch=?self.epoch_store.epoch(), "Sending EndOfPublish message to consensus");
2388 }
2389 }
2390}
2391
2392#[derive(Clone)]
2395pub(crate) struct ExecutionSchedulerSender {
2396 sender: monitored_mpsc::UnboundedSender<(
2398 Vec<Schedulable>,
2399 AssignedTxAndVersions,
2400 SchedulingSource,
2401 )>,
2402}
2403
2404impl ExecutionSchedulerSender {
2405 fn start(
2406 execution_scheduler: Arc<ExecutionScheduler>,
2407 epoch_store: Arc<AuthorityPerEpochStore>,
2408 ) -> Self {
2409 let (sender, recv) = monitored_mpsc::unbounded_channel("execution_scheduler_sender");
2410 spawn_monitored_task!(Self::run(recv, execution_scheduler, epoch_store));
2411 Self { sender }
2412 }
2413
2414 pub(crate) fn new_for_testing(
2415 sender: monitored_mpsc::UnboundedSender<(
2416 Vec<Schedulable>,
2417 AssignedTxAndVersions,
2418 SchedulingSource,
2419 )>,
2420 ) -> Self {
2421 Self { sender }
2422 }
2423
2424 fn send(
2425 &self,
2426 transactions: Vec<Schedulable>,
2427 assigned_versions: AssignedTxAndVersions,
2428 scheduling_source: SchedulingSource,
2429 ) {
2430 let _ = self
2431 .sender
2432 .send((transactions, assigned_versions, scheduling_source));
2433 }
2434
2435 async fn run(
2436 mut recv: monitored_mpsc::UnboundedReceiver<(
2437 Vec<Schedulable>,
2438 AssignedTxAndVersions,
2439 SchedulingSource,
2440 )>,
2441 execution_scheduler: Arc<ExecutionScheduler>,
2442 epoch_store: Arc<AuthorityPerEpochStore>,
2443 ) {
2444 while let Some((transactions, assigned_versions, scheduling_source)) = recv.recv().await {
2445 let _guard = monitored_scope("ConsensusHandler::enqueue");
2446 let assigned_versions = assigned_versions.into_map();
2447 let txns = transactions
2448 .into_iter()
2449 .map(|txn| {
2450 let key = txn.key();
2451 (
2452 txn,
2453 ExecutionEnv::new()
2454 .with_scheduling_source(scheduling_source)
2455 .with_assigned_versions(
2456 assigned_versions.get(&key).cloned().unwrap_or_default(),
2457 ),
2458 )
2459 })
2460 .collect();
2461 execution_scheduler.enqueue(txns, &epoch_store);
2462 }
2463 }
2464}
2465
2466pub(crate) struct MysticetiConsensusHandler {
2468 tasks: JoinSet<()>,
2469}
2470
2471impl MysticetiConsensusHandler {
2472 pub(crate) fn new(
2473 last_processed_commit_at_startup: CommitIndex,
2474 mut consensus_handler: ConsensusHandler<CheckpointService>,
2475 consensus_block_handler: ConsensusBlockHandler,
2476 mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
2477 mut block_receiver: UnboundedReceiver<consensus_core::CertifiedBlocksOutput>,
2478 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
2479 ) -> Self {
2480 let mut tasks = JoinSet::new();
2481 tasks.spawn(monitored_future!(async move {
2482 while let Some(consensus_commit) = commit_receiver.recv().await {
2484 let commit_index = consensus_commit.commit_ref.index;
2485 if commit_index <= last_processed_commit_at_startup {
2486 consensus_handler.handle_prior_consensus_commit(consensus_commit);
2487 } else {
2488 consensus_handler
2489 .handle_consensus_commit(consensus_commit)
2490 .await;
2491 }
2492 commit_consumer_monitor.set_highest_handled_commit(commit_index);
2493 }
2494 }));
2495 if consensus_block_handler.enabled() {
2496 tasks.spawn(monitored_future!(async move {
2497 while let Some(blocks) = block_receiver.recv().await {
2498 consensus_block_handler
2499 .handle_certified_blocks(blocks)
2500 .await;
2501 }
2502 }));
2503 }
2504 Self { tasks }
2505 }
2506
2507 pub(crate) async fn abort(&mut self) {
2508 self.tasks.shutdown().await;
2509 }
2510}
2511
2512fn authenticator_state_update_transaction(
2513 epoch_store: &AuthorityPerEpochStore,
2514 round: u64,
2515 mut new_active_jwks: Vec<ActiveJwk>,
2516) -> VerifiedExecutableTransaction {
2517 let epoch = epoch_store.epoch();
2518 new_active_jwks.sort();
2519
2520 info!("creating authenticator state update transaction");
2521 assert!(epoch_store.authenticator_state_enabled());
2522 let transaction = VerifiedTransaction::new_authenticator_state_update(
2523 epoch,
2524 round,
2525 new_active_jwks,
2526 epoch_store
2527 .epoch_start_config()
2528 .authenticator_obj_initial_shared_version()
2529 .expect("authenticator state obj must exist"),
2530 );
2531 VerifiedExecutableTransaction::new_system(transaction, epoch)
2532}
2533
2534pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
2535 match &transaction.kind {
2536 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
2537 if certificate.is_consensus_tx() {
2538 "shared_certificate"
2539 } else {
2540 "owned_certificate"
2541 }
2542 }
2543 ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
2544 ConsensusTransactionKind::CheckpointSignatureV2(_) => "checkpoint_signature",
2545 ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
2546 ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
2547 ConsensusTransactionKind::CapabilityNotificationV2(_) => "capability_notification_v2",
2548 ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
2549 ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
2550 ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
2551 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
2552 ConsensusTransactionKind::UserTransaction(tx) => {
2553 if tx.is_consensus_tx() {
2554 "shared_user_transaction"
2555 } else {
2556 "owned_user_transaction"
2557 }
2558 }
2559 ConsensusTransactionKind::UserTransactionV2(tx) => {
2560 if tx.tx().is_consensus_tx() {
2561 "shared_user_transaction_v2"
2562 } else {
2563 "owned_user_transaction_v2"
2564 }
2565 }
2566 ConsensusTransactionKind::ExecutionTimeObservation(_) => "execution_time_observation",
2567 }
2568}
2569
2570#[derive(Debug, Clone, Serialize, Deserialize)]
2571pub struct SequencedConsensusTransaction {
2572 pub certificate_author_index: AuthorityIndex,
2573 pub certificate_author: AuthorityName,
2574 pub consensus_index: ExecutionIndices,
2575 pub transaction: SequencedConsensusTransactionKind,
2576}
2577
2578#[derive(Debug, Clone)]
2579#[allow(clippy::large_enum_variant)]
2580pub enum SequencedConsensusTransactionKind {
2581 External(ConsensusTransaction),
2582 System(VerifiedExecutableTransaction),
2583}
2584
2585impl Serialize for SequencedConsensusTransactionKind {
2586 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2587 let serializable = SerializableSequencedConsensusTransactionKind::from(self);
2588 serializable.serialize(serializer)
2589 }
2590}
2591
2592impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
2593 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
2594 let serializable =
2595 SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
2596 Ok(serializable.into())
2597 }
2598}
2599
2600#[derive(Debug, Clone, Serialize, Deserialize)]
2604#[allow(clippy::large_enum_variant)]
2605enum SerializableSequencedConsensusTransactionKind {
2606 External(ConsensusTransaction),
2607 System(TrustedExecutableTransaction),
2608}
2609
2610impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
2611 fn from(kind: &SequencedConsensusTransactionKind) -> Self {
2612 match kind {
2613 SequencedConsensusTransactionKind::External(ext) => {
2614 SerializableSequencedConsensusTransactionKind::External(ext.clone())
2615 }
2616 SequencedConsensusTransactionKind::System(txn) => {
2617 SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
2618 }
2619 }
2620 }
2621}
2622
2623impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
2624 fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
2625 match kind {
2626 SerializableSequencedConsensusTransactionKind::External(ext) => {
2627 SequencedConsensusTransactionKind::External(ext)
2628 }
2629 SerializableSequencedConsensusTransactionKind::System(txn) => {
2630 SequencedConsensusTransactionKind::System(txn.into())
2631 }
2632 }
2633 }
2634}
2635
2636#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
2637pub enum SequencedConsensusTransactionKey {
2638 External(ConsensusTransactionKey),
2639 System(TransactionDigest),
2640}
2641
2642impl SequencedConsensusTransactionKey {
2643 pub fn user_transaction_digest(&self) -> Option<TransactionDigest> {
2644 match self {
2645 SequencedConsensusTransactionKey::External(key) => match key {
2646 ConsensusTransactionKey::Certificate(digest) => Some(*digest),
2647 _ => None,
2648 },
2649 SequencedConsensusTransactionKey::System(_) => None,
2650 }
2651 }
2652}
2653
2654impl SequencedConsensusTransactionKind {
2655 pub fn key(&self) -> SequencedConsensusTransactionKey {
2656 match self {
2657 SequencedConsensusTransactionKind::External(ext) => {
2658 SequencedConsensusTransactionKey::External(ext.key())
2659 }
2660 SequencedConsensusTransactionKind::System(txn) => {
2661 SequencedConsensusTransactionKey::System(*txn.digest())
2662 }
2663 }
2664 }
2665
2666 pub fn get_tracking_id(&self) -> u64 {
2667 match self {
2668 SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
2669 SequencedConsensusTransactionKind::System(_txn) => 0,
2670 }
2671 }
2672
2673 pub fn is_executable_transaction(&self) -> bool {
2674 match self {
2675 SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(),
2676 SequencedConsensusTransactionKind::System(_) => true,
2677 }
2678 }
2679
2680 pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
2681 match self {
2682 SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
2683 ConsensusTransactionKind::CertifiedTransaction(txn) => Some(*txn.digest()),
2684 ConsensusTransactionKind::UserTransaction(txn) => Some(*txn.digest()),
2685 ConsensusTransactionKind::UserTransactionV2(txn) => Some(*txn.tx().digest()),
2686 _ => None,
2687 },
2688 SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
2689 }
2690 }
2691
2692 pub fn is_end_of_publish(&self) -> bool {
2693 match self {
2694 SequencedConsensusTransactionKind::External(ext) => {
2695 matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
2696 }
2697 SequencedConsensusTransactionKind::System(_) => false,
2698 }
2699 }
2700}
2701
2702impl SequencedConsensusTransaction {
2703 pub fn sender_authority(&self) -> AuthorityName {
2704 self.certificate_author
2705 }
2706
2707 pub fn key(&self) -> SequencedConsensusTransactionKey {
2708 self.transaction.key()
2709 }
2710
2711 pub fn is_end_of_publish(&self) -> bool {
2712 if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
2713 matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
2714 } else {
2715 false
2716 }
2717 }
2718
2719 pub fn try_take_execution_time_observation(&mut self) -> Option<ExecutionTimeObservation> {
2720 if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
2721 kind: ConsensusTransactionKind::ExecutionTimeObservation(observation),
2722 ..
2723 }) = &mut self.transaction
2724 {
2725 Some(std::mem::take(observation))
2726 } else {
2727 None
2728 }
2729 }
2730
2731 pub fn is_system(&self) -> bool {
2732 matches!(
2733 self.transaction,
2734 SequencedConsensusTransactionKind::System(_)
2735 )
2736 }
2737
2738 pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
2739 if !randomness_state_enabled {
2740 return false;
2743 }
2744 match &self.transaction {
2745 SequencedConsensusTransactionKind::External(ConsensusTransaction {
2746 kind: ConsensusTransactionKind::CertifiedTransaction(cert),
2747 ..
2748 }) => cert.transaction_data().uses_randomness(),
2749 SequencedConsensusTransactionKind::External(ConsensusTransaction {
2750 kind: ConsensusTransactionKind::UserTransaction(txn),
2751 ..
2752 }) => txn.transaction_data().uses_randomness(),
2753 SequencedConsensusTransactionKind::External(ConsensusTransaction {
2754 kind: ConsensusTransactionKind::UserTransactionV2(txn),
2755 ..
2756 }) => txn.tx().transaction_data().uses_randomness(),
2757 _ => false,
2758 }
2759 }
2760
2761 pub fn as_consensus_txn(&self) -> Option<&SenderSignedData> {
2762 match &self.transaction {
2763 SequencedConsensusTransactionKind::External(ConsensusTransaction {
2764 kind: ConsensusTransactionKind::CertifiedTransaction(certificate),
2765 ..
2766 }) if certificate.is_consensus_tx() => Some(certificate.data()),
2767 SequencedConsensusTransactionKind::External(ConsensusTransaction {
2768 kind: ConsensusTransactionKind::UserTransaction(txn),
2769 ..
2770 }) if txn.is_consensus_tx() => Some(txn.data()),
2771 SequencedConsensusTransactionKind::External(ConsensusTransaction {
2772 kind: ConsensusTransactionKind::UserTransactionV2(txn),
2773 ..
2774 }) if txn.tx().is_consensus_tx() => Some(txn.tx().data()),
2775 SequencedConsensusTransactionKind::System(txn) if txn.is_consensus_tx() => {
2776 Some(txn.data())
2777 }
2778 _ => None,
2779 }
2780 }
2781}
2782
2783#[derive(Debug, Clone, Serialize, Deserialize)]
2784pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
2785
2786#[cfg(test)]
2787impl VerifiedSequencedConsensusTransaction {
2788 pub fn new_test(transaction: ConsensusTransaction) -> Self {
2789 Self(SequencedConsensusTransaction::new_test(transaction))
2790 }
2791}
2792
2793impl SequencedConsensusTransaction {
2794 pub fn new_test(transaction: ConsensusTransaction) -> Self {
2795 Self {
2796 certificate_author_index: 0,
2797 certificate_author: AuthorityName::ZERO,
2798 consensus_index: Default::default(),
2799 transaction: SequencedConsensusTransactionKind::External(transaction),
2800 }
2801 }
2802}
2803
2804pub(crate) struct ConsensusBlockHandler {
2806 enabled: bool,
2808 epoch_store: Arc<AuthorityPerEpochStore>,
2810 execution_scheduler_sender: ExecutionSchedulerSender,
2812 backpressure_subscriber: BackpressureSubscriber,
2814 metrics: Arc<AuthorityMetrics>,
2816}
2817
2818impl ConsensusBlockHandler {
2819 pub fn new(
2820 epoch_store: Arc<AuthorityPerEpochStore>,
2821 execution_scheduler_sender: ExecutionSchedulerSender,
2822 backpressure_subscriber: BackpressureSubscriber,
2823 metrics: Arc<AuthorityMetrics>,
2824 ) -> Self {
2825 Self {
2826 enabled: epoch_store.protocol_config().mysticeti_fastpath(),
2827 epoch_store,
2828 execution_scheduler_sender,
2829 backpressure_subscriber,
2830 metrics,
2831 }
2832 }
2833
2834 pub fn enabled(&self) -> bool {
2835 self.enabled
2836 }
2837
2838 #[instrument(level = "debug", skip_all)]
2839 async fn handle_certified_blocks(&self, blocks_output: CertifiedBlocksOutput) {
2840 self.backpressure_subscriber.await_no_backpressure().await;
2841
2842 let _scope = monitored_scope("ConsensusBlockHandler::handle_certified_blocks");
2843
2844 let reconfiguration_lock = self.epoch_store.get_reconfig_state_read_lock_guard();
2846 if !reconfiguration_lock.should_accept_user_certs() {
2847 debug!(
2848 "Skipping fastpath execution because epoch {} is closing user transactions: {}",
2849 self.epoch_store.epoch(),
2850 blocks_output
2851 .blocks
2852 .iter()
2853 .map(|b| b.block.reference().to_string())
2854 .join(", "),
2855 );
2856 return;
2857 }
2858
2859 self.metrics.consensus_block_handler_block_processed.inc();
2860 let epoch = self.epoch_store.epoch();
2861 let parsed_transactions = blocks_output
2862 .blocks
2863 .into_iter()
2864 .map(|certified_block| {
2865 let block_ref = certified_block.block.reference();
2866 let transactions =
2867 parse_block_transactions(&certified_block.block, &certified_block.rejected);
2868 (block_ref, transactions)
2869 })
2870 .collect::<Vec<_>>();
2871 let mut executable_transactions = vec![];
2872 for (block, transactions) in parsed_transactions.into_iter() {
2873 self.epoch_store.set_consensus_tx_status(
2875 ConsensusPosition::ping(epoch, block),
2876 ConsensusTxStatus::FastpathCertified,
2877 );
2878
2879 for (txn_idx, parsed) in transactions.into_iter().enumerate() {
2880 let position = ConsensusPosition {
2881 epoch,
2882 block,
2883 index: txn_idx as TransactionIndex,
2884 };
2885
2886 let status_str = if parsed.rejected {
2887 "rejected"
2888 } else {
2889 "certified"
2890 };
2891 if let Some(tx) = parsed.transaction.kind.as_user_transaction() {
2892 debug!(
2893 "User Transaction in position: {:} with digest {:} is {:}",
2894 position,
2895 tx.digest(),
2896 status_str
2897 );
2898 } else {
2899 debug!(
2900 "System Transaction in position: {:} is {:}",
2901 position, status_str
2902 );
2903 }
2904
2905 if parsed.rejected {
2906 self.epoch_store
2908 .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
2909 self.metrics
2910 .consensus_block_handler_txn_processed
2911 .with_label_values(&["rejected"])
2912 .inc();
2913 continue;
2914 }
2915
2916 self.metrics
2917 .consensus_block_handler_txn_processed
2918 .with_label_values(&["certified"])
2919 .inc();
2920
2921 if let Some(tx) = parsed.transaction.kind.into_user_transaction() {
2922 if tx.is_consensus_tx() {
2923 continue;
2924 }
2925 self.epoch_store
2927 .set_consensus_tx_status(position, ConsensusTxStatus::FastpathCertified);
2928 let tx = VerifiedTransaction::new_unchecked(tx);
2929 executable_transactions.push(Schedulable::Transaction(
2930 VerifiedExecutableTransaction::new_from_consensus(
2931 tx,
2932 self.epoch_store.epoch(),
2933 ),
2934 ));
2935 }
2936 }
2937 }
2938
2939 if executable_transactions.is_empty() {
2940 return;
2941 }
2942 self.metrics
2943 .consensus_block_handler_fastpath_executions
2944 .inc_by(executable_transactions.len() as u64);
2945
2946 self.execution_scheduler_sender.send(
2947 executable_transactions,
2948 Default::default(),
2949 SchedulingSource::MysticetiFastPath,
2950 );
2951 }
2952}
2953
2954#[derive(Serialize, Deserialize)]
2955pub(crate) struct CommitIntervalObserver {
2956 ring_buffer: VecDeque<u64>,
2957}
2958
2959impl CommitIntervalObserver {
2960 pub fn new(window_size: u32) -> Self {
2961 Self {
2962 ring_buffer: VecDeque::with_capacity(window_size as usize),
2963 }
2964 }
2965
2966 pub fn observe_commit_time(&mut self, consensus_commit: &impl ConsensusCommitAPI) {
2967 let commit_time = consensus_commit.commit_timestamp_ms();
2968 if self.ring_buffer.len() == self.ring_buffer.capacity() {
2969 self.ring_buffer.pop_front();
2970 }
2971 self.ring_buffer.push_back(commit_time);
2972 }
2973
2974 pub fn commit_interval_estimate(&self) -> Option<Duration> {
2975 if self.ring_buffer.len() <= 1 {
2976 None
2977 } else {
2978 let first = self.ring_buffer.front().unwrap();
2979 let last = self.ring_buffer.back().unwrap();
2980 let duration = last.saturating_sub(*first);
2981 let num_commits = self.ring_buffer.len() as u64;
2982 Some(Duration::from_millis(duration.div_ceil(num_commits)))
2983 }
2984 }
2985}
2986
2987#[cfg(test)]
2988mod tests {
2989 use std::collections::HashSet;
2990
2991 use consensus_core::{
2992 BlockAPI, CertifiedBlock, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction,
2993 VerifiedBlock,
2994 };
2995 use consensus_types::block::TransactionIndex;
2996 use futures::pin_mut;
2997 use prometheus::Registry;
2998 use sui_protocol_config::{ConsensusTransactionOrdering, ProtocolConfig};
2999 use sui_types::{
3000 base_types::ExecutionDigests,
3001 base_types::{AuthorityName, FullObjectRef, ObjectID, SuiAddress, random_object_ref},
3002 committee::Committee,
3003 crypto::deterministic_random_account_key,
3004 gas::GasCostSummary,
3005 message_envelope::Message,
3006 messages_checkpoint::{
3007 CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
3008 SignedCheckpointSummary,
3009 },
3010 messages_consensus::ConsensusTransaction,
3011 object::Object,
3012 transaction::{
3013 CertifiedTransaction, SenderSignedData, TransactionData, TransactionDataAPI,
3014 },
3015 };
3016
3017 use super::*;
3018 use crate::{
3019 authority::{
3020 authority_per_epoch_store::ConsensusStatsAPI,
3021 test_authority_builder::TestAuthorityBuilder,
3022 },
3023 checkpoints::CheckpointServiceNoop,
3024 consensus_adapter::consensus_tests::{
3025 test_certificates_with_gas_objects, test_user_transaction,
3026 },
3027 consensus_test_utils::make_consensus_adapter_for_test,
3028 post_consensus_tx_reorder::PostConsensusTxReorder,
3029 };
3030
3031 #[tokio::test(flavor = "current_thread", start_paused = true)]
3032 async fn test_consensus_commit_handler() {
3033 telemetry_subscribers::init_for_testing();
3034
3035 let (sender, keypair) = deterministic_random_account_key();
3038 let gas_objects: Vec<Object> = (0..12)
3040 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3041 .collect();
3042 let owned_objects: Vec<Object> = (0..4)
3044 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3045 .collect();
3046 let shared_objects: Vec<Object> = (0..6)
3048 .map(|_| Object::shared_for_testing())
3049 .collect::<Vec<_>>();
3050 let mut all_objects = gas_objects.clone();
3051 all_objects.extend(owned_objects.clone());
3052 all_objects.extend(shared_objects.clone());
3053
3054 let network_config =
3055 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3056 .with_objects(all_objects.clone())
3057 .build();
3058
3059 let state = TestAuthorityBuilder::new()
3060 .with_network_config(&network_config, 0)
3061 .build()
3062 .await;
3063
3064 let epoch_store = state.epoch_store_for_testing().clone();
3065 let new_epoch_start_state = epoch_store.epoch_start_state();
3066 let consensus_committee = new_epoch_start_state.get_consensus_committee();
3067
3068 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3069
3070 let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
3071
3072 let backpressure_manager = BackpressureManager::new_for_tests();
3073 let consensus_adapter =
3074 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3075 let mut consensus_handler = ConsensusHandler::new(
3076 epoch_store,
3077 Arc::new(CheckpointServiceNoop {}),
3078 state.execution_scheduler().clone(),
3079 consensus_adapter,
3080 state.get_object_cache_reader().clone(),
3081 Arc::new(ArcSwap::default()),
3082 consensus_committee.clone(),
3083 metrics,
3084 Arc::new(throughput_calculator),
3085 backpressure_manager.subscribe(),
3086 state.traffic_controller.clone(),
3087 );
3088
3089 let mut user_transactions = vec![];
3091 for (i, gas_object) in gas_objects[0..8].iter().enumerate() {
3092 let input_object = if i % 2 == 0 {
3093 owned_objects.get(i / 2).unwrap().clone()
3094 } else {
3095 shared_objects.get(i / 2).unwrap().clone()
3096 };
3097 let transaction = test_user_transaction(
3098 &state,
3099 sender,
3100 &keypair,
3101 gas_object.clone(),
3102 vec![input_object],
3103 )
3104 .await;
3105 user_transactions.push(transaction);
3106 }
3107
3108 let certified_transactions = [
3111 test_certificates_with_gas_objects(
3112 &state,
3113 &gas_objects[8..10],
3114 shared_objects[4].clone(),
3115 )
3116 .await,
3117 test_certificates_with_gas_objects(
3118 &state,
3119 &gas_objects[10..12],
3120 shared_objects[5].clone(),
3121 )
3122 .await,
3123 ]
3124 .concat();
3125
3126 let mut blocks = Vec::new();
3128 for (i, consensus_transaction) in user_transactions
3129 .iter()
3130 .cloned()
3131 .map(|t| ConsensusTransaction::new_user_transaction_v2_message(&state.name, t.into()))
3132 .chain(
3133 certified_transactions
3134 .iter()
3135 .map(|t| ConsensusTransaction::new_certificate_message(&state.name, t.clone())),
3136 )
3137 .enumerate()
3138 {
3139 let transaction_bytes = bcs::to_bytes(&consensus_transaction).unwrap();
3140 let block = VerifiedBlock::new_for_test(
3141 TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
3142 .set_transactions(vec![Transaction::new(transaction_bytes)])
3143 .build(),
3144 );
3145
3146 blocks.push(block);
3147 }
3148
3149 let leader_block = blocks[0].clone();
3151 let committed_sub_dag = CommittedSubDag::new(
3152 leader_block.reference(),
3153 blocks.clone(),
3154 leader_block.timestamp_ms(),
3155 CommitRef::new(10, CommitDigest::MIN),
3156 );
3157
3158 backpressure_manager.set_backpressure(true);
3160 backpressure_manager.update_highest_certified_checkpoint(1);
3162
3163 {
3165 let waiter = consensus_handler.handle_consensus_commit(committed_sub_dag.clone());
3166 pin_mut!(waiter);
3167
3168 tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
3170 .await
3171 .unwrap_err();
3172
3173 backpressure_manager.set_backpressure(false);
3175
3176 tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
3178 .await
3179 .unwrap();
3180 }
3181
3182 let num_blocks = blocks.len();
3184 let num_transactions = user_transactions.len() + certified_transactions.len();
3185 let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
3186 assert_eq!(
3187 last_consensus_stats_1.index.transaction_index,
3188 num_transactions as u64
3189 );
3190 assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
3191 assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
3192 assert_eq!(last_consensus_stats_1.hash, 0);
3193 assert_eq!(
3194 last_consensus_stats_1.stats.get_num_messages(0),
3195 num_blocks as u64
3196 );
3197 assert_eq!(
3198 last_consensus_stats_1.stats.get_num_user_transactions(0),
3199 num_transactions as u64
3200 );
3201
3202 for (i, t) in user_transactions.iter().enumerate() {
3204 let digest = t.tx().digest();
3205 if let Ok(Ok(_)) = tokio::time::timeout(
3206 std::time::Duration::from_secs(10),
3207 state.notify_read_effects("", *digest),
3208 )
3209 .await
3210 {
3211 } else {
3213 panic!("User transaction {} {} did not execute", i, digest);
3214 }
3215 }
3216
3217 for (i, t) in certified_transactions.iter().enumerate() {
3219 let digest = t.digest();
3220 if let Ok(Ok(_)) = tokio::time::timeout(
3221 std::time::Duration::from_secs(10),
3222 state.notify_read_effects("", *digest),
3223 )
3224 .await
3225 {
3226 } else {
3228 panic!("Certified transaction {} {} did not execute", i, digest);
3229 }
3230 }
3231
3232 state.execution_scheduler().check_empty_for_testing();
3234 }
3235
3236 #[tokio::test]
3237 async fn test_consensus_block_handler() {
3238 let (sender, keypair) = deterministic_random_account_key();
3241 let gas_objects: Vec<Object> = (0..8)
3243 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3244 .collect();
3245 let owned_objects: Vec<Object> = (0..4)
3247 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3248 .collect();
3249 let shared_objects: Vec<Object> = (0..4)
3251 .map(|_| Object::shared_for_testing())
3252 .collect::<Vec<_>>();
3253 let mut all_objects = gas_objects.clone();
3254 all_objects.extend(owned_objects.clone());
3255 all_objects.extend(shared_objects.clone());
3256
3257 let network_config =
3258 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3259 .with_objects(all_objects.clone())
3260 .build();
3261
3262 let state = TestAuthorityBuilder::new()
3263 .with_network_config(&network_config, 0)
3264 .build()
3265 .await;
3266 let epoch_store = state.epoch_store_for_testing().clone();
3267 let execution_scheduler_sender = ExecutionSchedulerSender::start(
3268 state.execution_scheduler().clone(),
3269 epoch_store.clone(),
3270 );
3271
3272 let backpressure_manager = BackpressureManager::new_for_tests();
3273 let block_handler = ConsensusBlockHandler::new(
3274 epoch_store.clone(),
3275 execution_scheduler_sender,
3276 backpressure_manager.subscribe(),
3277 state.metrics.clone(),
3278 );
3279
3280 let mut transactions = vec![];
3282 for (i, gas_object) in gas_objects.iter().enumerate() {
3283 let input_object = if i % 2 == 0 {
3284 owned_objects.get(i / 2).unwrap().clone()
3285 } else {
3286 shared_objects.get(i / 2).unwrap().clone()
3287 };
3288 let transaction = test_user_transaction(
3289 &state,
3290 sender,
3291 &keypair,
3292 gas_object.clone(),
3293 vec![input_object],
3294 )
3295 .await;
3296 transactions.push(transaction);
3297 }
3298
3299 let serialized_transactions: Vec<_> = transactions
3300 .iter()
3301 .cloned()
3302 .map(|t| {
3303 Transaction::new(
3304 bcs::to_bytes(&ConsensusTransaction::new_user_transaction_v2_message(
3305 &state.name,
3306 t.into(),
3307 ))
3308 .unwrap(),
3309 )
3310 })
3311 .collect();
3312
3313 let block = VerifiedBlock::new_for_test(
3315 TestBlock::new(100, 1)
3316 .set_transactions(serialized_transactions.clone())
3317 .build(),
3318 );
3319
3320 let rejected_transactions = vec![0, 3, 4];
3322
3323 block_handler
3325 .handle_certified_blocks(CertifiedBlocksOutput {
3326 blocks: vec![CertifiedBlock {
3327 block: block.clone(),
3328 rejected: rejected_transactions.clone(),
3329 }],
3330 })
3331 .await;
3332
3333 let consensus_tx_status_cache = epoch_store.consensus_tx_status_cache.as_ref().unwrap();
3335 for txn_idx in 0..transactions.len() {
3336 let position = ConsensusPosition {
3337 epoch: epoch_store.epoch(),
3338 block: block.reference(),
3339 index: txn_idx as TransactionIndex,
3340 };
3341 if rejected_transactions.contains(&(txn_idx as TransactionIndex)) {
3342 assert_eq!(
3344 consensus_tx_status_cache.get_transaction_status(&position),
3345 Some(ConsensusTxStatus::Rejected)
3346 );
3347 } else if txn_idx % 2 == 0 {
3348 assert_eq!(
3350 consensus_tx_status_cache.get_transaction_status(&position),
3351 Some(ConsensusTxStatus::FastpathCertified),
3352 );
3353 } else {
3354 assert_eq!(
3356 consensus_tx_status_cache.get_transaction_status(&position),
3357 None,
3358 );
3359 }
3360 }
3361
3362 for (i, t) in transactions.iter().enumerate() {
3364 if i % 2 == 1 || rejected_transactions.contains(&(i as TransactionIndex)) {
3366 continue;
3367 }
3368 let digest = t.tx().digest();
3369 if tokio::time::timeout(
3370 std::time::Duration::from_secs(10),
3371 state
3372 .get_transaction_cache_reader()
3373 .notify_read_fastpath_transaction_outputs(&[*digest]),
3374 )
3375 .await
3376 .is_err()
3377 {
3378 panic!("Transaction {} {} did not execute", i, digest);
3379 }
3380 }
3381
3382 state.execution_scheduler().check_empty_for_testing();
3384
3385 for (i, t) in transactions.iter().enumerate() {
3387 if i % 2 == 0 && !rejected_transactions.contains(&(i as TransactionIndex)) {
3389 continue;
3390 }
3391 let digest = t.tx().digest();
3392 assert!(
3393 !state.is_tx_already_executed(digest),
3394 "Rejected transaction {} {} should not have been executed",
3395 i,
3396 digest
3397 );
3398 }
3399 }
3400
3401 fn to_short_strings(txs: Vec<VerifiedExecutableTransactionWithAliases>) -> Vec<String> {
3402 txs.into_iter()
3403 .map(|tx| format!("transaction({})", tx.tx().transaction_data().gas_price()))
3404 .collect()
3405 }
3406
3407 #[test]
3408 fn test_order_by_gas_price() {
3409 let mut v = vec![user_txn(42), user_txn(100)];
3410 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3411 assert_eq!(
3412 to_short_strings(v),
3413 vec![
3414 "transaction(100)".to_string(),
3415 "transaction(42)".to_string(),
3416 ]
3417 );
3418
3419 let mut v = vec![
3420 user_txn(1200),
3421 user_txn(12),
3422 user_txn(1000),
3423 user_txn(42),
3424 user_txn(100),
3425 user_txn(1000),
3426 ];
3427 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3428 assert_eq!(
3429 to_short_strings(v),
3430 vec![
3431 "transaction(1200)".to_string(),
3432 "transaction(1000)".to_string(),
3433 "transaction(1000)".to_string(),
3434 "transaction(100)".to_string(),
3435 "transaction(42)".to_string(),
3436 "transaction(12)".to_string(),
3437 ]
3438 );
3439 }
3440
3441 #[tokio::test(flavor = "current_thread")]
3442 async fn test_checkpoint_signature_dedup() {
3443 telemetry_subscribers::init_for_testing();
3444
3445 let network_config =
3446 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3447 let state = TestAuthorityBuilder::new()
3448 .with_network_config(&network_config, 0)
3449 .build()
3450 .await;
3451
3452 let epoch_store = state.epoch_store_for_testing().clone();
3453 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3454
3455 let make_signed = || {
3456 let epoch = epoch_store.epoch();
3457 let contents =
3458 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3459 let summary = CheckpointSummary::new(
3460 &ProtocolConfig::get_for_max_version_UNSAFE(),
3461 epoch,
3462 42, 10, &contents,
3465 None, GasCostSummary::default(),
3467 None, 0, Vec::new(), Vec::new(), );
3472 SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name)
3473 };
3474
3475 let v2_s1 = make_signed();
3477 let v2_s1_clone = v2_s1.clone();
3478 let v2_digest_a = v2_s1.data().digest();
3479 let v2_a =
3480 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3481 summary: v2_s1,
3482 });
3483
3484 let v2_s2 = make_signed();
3485 let v2_digest_b = v2_s2.data().digest();
3486 let v2_b =
3487 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3488 summary: v2_s2,
3489 });
3490
3491 assert_ne!(v2_digest_a, v2_digest_b);
3492
3493 assert_eq!(v2_s1_clone.data().digest(), v2_digest_a);
3495 let v2_dup =
3496 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3497 summary: v2_s1_clone,
3498 });
3499
3500 let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3501 let block = VerifiedBlock::new_for_test(
3502 TestBlock::new(100, 0)
3503 .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)])
3504 .build(),
3505 );
3506 let commit = CommittedSubDag::new(
3507 block.reference(),
3508 vec![block.clone()],
3509 block.timestamp_ms(),
3510 CommitRef::new(10, CommitDigest::MIN),
3511 );
3512
3513 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3514 let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3515 let backpressure = BackpressureManager::new_for_tests();
3516 let consensus_adapter =
3517 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3518 let mut handler = ConsensusHandler::new(
3519 epoch_store.clone(),
3520 Arc::new(CheckpointServiceNoop {}),
3521 state.execution_scheduler().clone(),
3522 consensus_adapter,
3523 state.get_object_cache_reader().clone(),
3524 Arc::new(ArcSwap::default()),
3525 consensus_committee.clone(),
3526 metrics,
3527 Arc::new(throughput),
3528 backpressure.subscribe(),
3529 state.traffic_controller.clone(),
3530 );
3531
3532 handler.handle_consensus_commit(commit).await;
3533
3534 use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3535 use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3536
3537 let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a));
3539 let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b));
3540 assert!(
3541 epoch_store
3542 .is_consensus_message_processed(&v2_key_a)
3543 .unwrap()
3544 );
3545 assert!(
3546 epoch_store
3547 .is_consensus_message_processed(&v2_key_b)
3548 .unwrap()
3549 );
3550 }
3551
3552 #[tokio::test(flavor = "current_thread")]
3553 async fn test_verify_consensus_transaction_filters_mismatched_authorities() {
3554 telemetry_subscribers::init_for_testing();
3555
3556 let network_config =
3557 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3558 let state = TestAuthorityBuilder::new()
3559 .with_network_config(&network_config, 0)
3560 .build()
3561 .await;
3562
3563 let epoch_store = state.epoch_store_for_testing().clone();
3564 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3565
3566 use fastcrypto::traits::KeyPair;
3568 let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair();
3569 let wrong_authority: AuthorityName = wrong_keypair.public().into();
3570
3571 let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority);
3573
3574 let valid_eop = ConsensusTransaction::new_end_of_publish(state.name);
3576
3577 let epoch = epoch_store.epoch();
3579 let contents =
3580 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3581 let summary = CheckpointSummary::new(
3582 &ProtocolConfig::get_for_max_version_UNSAFE(),
3583 epoch,
3584 42, 10, &contents,
3587 None, GasCostSummary::default(),
3589 None, 0, Vec::new(), Vec::new(), );
3594
3595 let mismatched_checkpoint_signed =
3597 SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority);
3598 let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest();
3599 let mismatched_checkpoint =
3600 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3601 summary: mismatched_checkpoint_signed,
3602 });
3603
3604 let valid_checkpoint_signed =
3606 SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name);
3607 let valid_checkpoint_digest = valid_checkpoint_signed.data().digest();
3608 let valid_checkpoint =
3609 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3610 summary: valid_checkpoint_signed,
3611 });
3612
3613 let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3614
3615 let block = VerifiedBlock::new_for_test(
3617 TestBlock::new(100, 0)
3618 .set_transactions(vec![
3619 to_tx(&mismatched_eop),
3620 to_tx(&valid_eop),
3621 to_tx(&mismatched_checkpoint),
3622 to_tx(&valid_checkpoint),
3623 ])
3624 .build(),
3625 );
3626 let commit = CommittedSubDag::new(
3627 block.reference(),
3628 vec![block.clone()],
3629 block.timestamp_ms(),
3630 CommitRef::new(10, CommitDigest::MIN),
3631 );
3632
3633 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3634 let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3635 let backpressure = BackpressureManager::new_for_tests();
3636 let consensus_adapter =
3637 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3638 let mut handler = ConsensusHandler::new(
3639 epoch_store.clone(),
3640 Arc::new(CheckpointServiceNoop {}),
3641 state.execution_scheduler().clone(),
3642 consensus_adapter,
3643 state.get_object_cache_reader().clone(),
3644 Arc::new(ArcSwap::default()),
3645 consensus_committee.clone(),
3646 metrics,
3647 Arc::new(throughput),
3648 backpressure.subscribe(),
3649 state.traffic_controller.clone(),
3650 );
3651
3652 handler.handle_consensus_commit(commit).await;
3653
3654 use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3655 use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3656
3657 let valid_eop_key = SK::External(CK::EndOfPublish(state.name));
3659 assert!(
3660 epoch_store
3661 .is_consensus_message_processed(&valid_eop_key)
3662 .unwrap(),
3663 "Valid EndOfPublish should have been processed"
3664 );
3665
3666 let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
3667 state.name,
3668 42,
3669 valid_checkpoint_digest,
3670 ));
3671 assert!(
3672 epoch_store
3673 .is_consensus_message_processed(&valid_checkpoint_key)
3674 .unwrap(),
3675 "Valid CheckpointSignature should have been processed"
3676 );
3677
3678 let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority));
3680 assert!(
3681 !epoch_store
3682 .is_consensus_message_processed(&mismatched_eop_key)
3683 .unwrap(),
3684 "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)"
3685 );
3686
3687 let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
3688 wrong_authority,
3689 42,
3690 mismatched_checkpoint_digest,
3691 ));
3692 assert!(
3693 !epoch_store
3694 .is_consensus_message_processed(&mismatched_checkpoint_key)
3695 .unwrap(),
3696 "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)"
3697 );
3698 }
3699
3700 fn user_txn(gas_price: u64) -> VerifiedExecutableTransactionWithAliases {
3701 let (committee, keypairs) = Committee::new_simple_test_committee();
3702 let data = SenderSignedData::new(
3703 TransactionData::new_transfer(
3704 SuiAddress::default(),
3705 FullObjectRef::from_fastpath_ref(random_object_ref()),
3706 SuiAddress::default(),
3707 random_object_ref(),
3708 1000 * gas_price,
3709 gas_price,
3710 ),
3711 vec![],
3712 );
3713 let tx = VerifiedExecutableTransaction::new_from_certificate(
3714 VerifiedCertificate::new_unchecked(
3715 CertifiedTransaction::new_from_keypairs_for_testing(data, &keypairs, &committee),
3716 ),
3717 );
3718 VerifiedExecutableTransactionWithAliases::no_aliases(tx)
3719 }
3720}