1use std::{
5 collections::{BTreeMap, HashMap, HashSet, VecDeque},
6 hash::Hash,
7 num::NonZeroUsize,
8 sync::Arc,
9 time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use arc_swap::ArcSwap;
13use consensus_config::Committee as ConsensusCommittee;
14use consensus_core::{CertifiedBlocksOutput, CommitConsumerMonitor, CommitIndex};
15use consensus_types::block::TransactionIndex;
16use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
17use itertools::Itertools as _;
18use lru::LruCache;
19use mysten_common::{
20 assert_reachable, assert_sometimes, debug_fatal, random_util::randomize_cache_capacity_in_tests,
21};
22use mysten_metrics::{
23 monitored_future,
24 monitored_mpsc::{self, UnboundedReceiver},
25 monitored_scope, spawn_monitored_task,
26};
27use parking_lot::RwLockWriteGuard;
28use serde::{Deserialize, Serialize};
29use sui_macros::{fail_point, fail_point_arg, fail_point_if};
30use sui_protocol_config::{PerObjectCongestionControlMode, ProtocolConfig};
31use sui_types::{
32 SUI_RANDOMNESS_STATE_OBJECT_ID,
33 authenticator_state::ActiveJwk,
34 base_types::{
35 AuthorityName, ConciseableName, ConsensusObjectSequenceKey, ObjectID, ObjectRef,
36 SequenceNumber, TransactionDigest,
37 },
38 crypto::RandomnessRound,
39 digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest},
40 executable_transaction::{
41 TrustedExecutableTransaction, VerifiedExecutableTransaction,
42 VerifiedExecutableTransactionWithAliases,
43 },
44 messages_checkpoint::CheckpointSignatureMessage,
45 messages_consensus::{
46 AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments,
47 ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
48 ExecutionTimeObservation,
49 },
50 sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
51 transaction::{
52 InputObjectKind, SenderSignedData, TransactionDataAPI, TransactionKey, VerifiedCertificate,
53 VerifiedTransaction, WithAliases,
54 },
55};
56use tokio::task::JoinSet;
57use tracing::{debug, error, info, instrument, trace, warn};
58
59use crate::{
60 authority::{
61 AuthorityMetrics, AuthorityState, ExecutionEnv,
62 authority_per_epoch_store::{
63 AuthorityPerEpochStore, CancelConsensusCertificateReason, ConsensusStats,
64 ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStats,
65 consensus_quarantine::ConsensusCommitOutput,
66 },
67 backpressure::{BackpressureManager, BackpressureSubscriber},
68 consensus_tx_status_cache::ConsensusTxStatus,
69 epoch_start_configuration::EpochStartConfigTrait,
70 execution_time_estimator::ExecutionTimeEstimator,
71 shared_object_congestion_tracker::SharedObjectCongestionTracker,
72 shared_object_version_manager::{AssignedTxAndVersions, Schedulable},
73 transaction_deferral::{DeferralKey, DeferralReason, transaction_deferral_within_limit},
74 },
75 checkpoints::{
76 CheckpointService, CheckpointServiceNotify, PendingCheckpoint, PendingCheckpointInfo,
77 },
78 consensus_adapter::ConsensusAdapter,
79 consensus_throughput_calculator::ConsensusThroughputCalculator,
80 consensus_types::consensus_output_api::{ConsensusCommitAPI, parse_block_transactions},
81 epoch::{
82 randomness::{DkgStatus, RandomnessManager},
83 reconfiguration::ReconfigState,
84 },
85 execution_cache::ObjectCacheRead,
86 execution_scheduler::{ExecutionScheduler, SchedulingSource},
87 post_consensus_tx_reorder::PostConsensusTxReorder,
88 scoring_decision::update_low_scoring_authorities,
89 traffic_controller::{TrafficController, policies::TrafficTally},
90};
91
92struct FilteredConsensusOutput {
95 transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
96 owned_object_locks: HashMap<ObjectRef, TransactionDigest>,
97}
98
99pub struct ConsensusHandlerInitializer {
100 state: Arc<AuthorityState>,
101 checkpoint_service: Arc<CheckpointService>,
102 epoch_store: Arc<AuthorityPerEpochStore>,
103 consensus_adapter: Arc<ConsensusAdapter>,
104 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
105 throughput_calculator: Arc<ConsensusThroughputCalculator>,
106 backpressure_manager: Arc<BackpressureManager>,
107}
108
109impl ConsensusHandlerInitializer {
110 pub fn new(
111 state: Arc<AuthorityState>,
112 checkpoint_service: Arc<CheckpointService>,
113 epoch_store: Arc<AuthorityPerEpochStore>,
114 consensus_adapter: Arc<ConsensusAdapter>,
115 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
116 throughput_calculator: Arc<ConsensusThroughputCalculator>,
117 backpressure_manager: Arc<BackpressureManager>,
118 ) -> Self {
119 Self {
120 state,
121 checkpoint_service,
122 epoch_store,
123 consensus_adapter,
124 low_scoring_authorities,
125 throughput_calculator,
126 backpressure_manager,
127 }
128 }
129
130 #[cfg(test)]
131 pub(crate) fn new_for_testing(
132 state: Arc<AuthorityState>,
133 checkpoint_service: Arc<CheckpointService>,
134 ) -> Self {
135 use crate::consensus_test_utils::make_consensus_adapter_for_test;
136 use std::collections::HashSet;
137
138 let backpressure_manager = BackpressureManager::new_for_tests();
139 let consensus_adapter =
140 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
141 Self {
142 state: state.clone(),
143 checkpoint_service,
144 epoch_store: state.epoch_store_for_testing().clone(),
145 consensus_adapter,
146 low_scoring_authorities: Arc::new(Default::default()),
147 throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
148 None,
149 state.metrics.clone(),
150 )),
151 backpressure_manager,
152 }
153 }
154
155 pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
156 let new_epoch_start_state = self.epoch_store.epoch_start_state();
157 let consensus_committee = new_epoch_start_state.get_consensus_committee();
158
159 ConsensusHandler::new(
160 self.epoch_store.clone(),
161 self.checkpoint_service.clone(),
162 self.state.execution_scheduler().clone(),
163 self.consensus_adapter.clone(),
164 self.state.get_object_cache_reader().clone(),
165 self.low_scoring_authorities.clone(),
166 consensus_committee,
167 self.state.metrics.clone(),
168 self.throughput_calculator.clone(),
169 self.backpressure_manager.subscribe(),
170 self.state.traffic_controller.clone(),
171 )
172 }
173
174 pub(crate) fn metrics(&self) -> &Arc<AuthorityMetrics> {
175 &self.state.metrics
176 }
177
178 pub(crate) fn backpressure_subscriber(&self) -> BackpressureSubscriber {
179 self.backpressure_manager.subscribe()
180 }
181}
182
183mod additional_consensus_state {
184 use std::marker::PhantomData;
185
186 use consensus_core::CommitRef;
187 use fastcrypto::hash::HashFunction as _;
188 use sui_types::{crypto::DefaultHash, digests::Digest};
189
190 use super::*;
191 #[derive(Serialize, Deserialize)]
200 pub(super) struct AdditionalConsensusState {
201 commit_interval_observer: CommitIntervalObserver,
202 }
203
204 impl AdditionalConsensusState {
205 pub fn new(additional_consensus_state_window_size: u32) -> Self {
206 Self {
207 commit_interval_observer: CommitIntervalObserver::new(
208 additional_consensus_state_window_size,
209 ),
210 }
211 }
212
213 pub(crate) fn observe_commit(
215 &mut self,
216 protocol_config: &ProtocolConfig,
217 epoch_start_time: u64,
218 consensus_commit: &impl ConsensusCommitAPI,
219 ) -> ConsensusCommitInfo {
220 self.commit_interval_observer
221 .observe_commit_time(consensus_commit);
222
223 let estimated_commit_period = self
224 .commit_interval_observer
225 .commit_interval_estimate()
226 .unwrap_or(Duration::from_millis(
227 protocol_config.min_checkpoint_interval_ms(),
228 ));
229
230 info!("estimated commit rate: {:?}", estimated_commit_period);
231
232 self.commit_info_impl(
233 epoch_start_time,
234 consensus_commit,
235 Some(estimated_commit_period),
236 )
237 }
238
239 fn commit_info_impl(
240 &self,
241 epoch_start_time: u64,
242 consensus_commit: &impl ConsensusCommitAPI,
243 estimated_commit_period: Option<Duration>,
244 ) -> ConsensusCommitInfo {
245 let leader_author = consensus_commit.leader_author_index();
246 let timestamp = consensus_commit.commit_timestamp_ms();
247
248 let timestamp = if timestamp < epoch_start_time {
249 error!(
250 "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}"
251 );
252 epoch_start_time
253 } else {
254 timestamp
255 };
256
257 ConsensusCommitInfo {
258 _phantom: PhantomData,
259 round: consensus_commit.leader_round(),
260 timestamp,
261 leader_author,
262 consensus_commit_ref: consensus_commit.commit_ref(),
263 rejected_transactions_digest: consensus_commit.rejected_transactions_digest(),
264 additional_state_digest: Some(self.digest()),
265 estimated_commit_period,
266 skip_consensus_commit_prologue_in_test: false,
267 }
268 }
269
270 fn digest(&self) -> AdditionalConsensusStateDigest {
272 let mut hash = DefaultHash::new();
273 bcs::serialize_into(&mut hash, self).unwrap();
274 AdditionalConsensusStateDigest::new(hash.finalize().into())
275 }
276 }
277
278 pub struct ConsensusCommitInfo {
279 _phantom: PhantomData<()>,
281
282 pub round: u64,
283 pub timestamp: u64,
284 pub leader_author: AuthorityIndex,
285 pub consensus_commit_ref: CommitRef,
286 pub rejected_transactions_digest: Digest,
287
288 additional_state_digest: Option<AdditionalConsensusStateDigest>,
289 estimated_commit_period: Option<Duration>,
290
291 pub skip_consensus_commit_prologue_in_test: bool,
292 }
293
294 impl ConsensusCommitInfo {
295 pub fn new_for_test(
296 commit_round: u64,
297 commit_timestamp: u64,
298 estimated_commit_period: Option<Duration>,
299 skip_consensus_commit_prologue_in_test: bool,
300 ) -> Self {
301 Self {
302 _phantom: PhantomData,
303 round: commit_round,
304 timestamp: commit_timestamp,
305 leader_author: 0,
306 consensus_commit_ref: CommitRef::default(),
307 rejected_transactions_digest: Digest::default(),
308 additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO),
309 estimated_commit_period,
310 skip_consensus_commit_prologue_in_test,
311 }
312 }
313
314 pub fn new_for_congestion_test(
315 commit_round: u64,
316 commit_timestamp: u64,
317 estimated_commit_period: Duration,
318 ) -> Self {
319 Self::new_for_test(
320 commit_round,
321 commit_timestamp,
322 Some(estimated_commit_period),
323 true,
324 )
325 }
326
327 pub fn additional_state_digest(&self) -> AdditionalConsensusStateDigest {
328 self.additional_state_digest
330 .expect("additional_state_digest is not available")
331 }
332
333 pub fn estimated_commit_period(&self) -> Duration {
334 self.estimated_commit_period
336 .expect("estimated commit period is not available")
337 }
338
339 fn consensus_commit_digest(&self) -> ConsensusCommitDigest {
340 ConsensusCommitDigest::new(self.consensus_commit_ref.digest.into_inner())
341 }
342
343 fn consensus_commit_prologue_transaction(
344 &self,
345 epoch: u64,
346 ) -> VerifiedExecutableTransaction {
347 let transaction = VerifiedTransaction::new_consensus_commit_prologue(
348 epoch,
349 self.round,
350 self.timestamp,
351 );
352 VerifiedExecutableTransaction::new_system(transaction, epoch)
353 }
354
355 fn consensus_commit_prologue_v2_transaction(
356 &self,
357 epoch: u64,
358 ) -> VerifiedExecutableTransaction {
359 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
360 epoch,
361 self.round,
362 self.timestamp,
363 self.consensus_commit_digest(),
364 );
365 VerifiedExecutableTransaction::new_system(transaction, epoch)
366 }
367
368 fn consensus_commit_prologue_v3_transaction(
369 &self,
370 epoch: u64,
371 consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
372 ) -> VerifiedExecutableTransaction {
373 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v3(
374 epoch,
375 self.round,
376 self.timestamp,
377 self.consensus_commit_digest(),
378 consensus_determined_version_assignments,
379 );
380 VerifiedExecutableTransaction::new_system(transaction, epoch)
381 }
382
383 fn consensus_commit_prologue_v4_transaction(
384 &self,
385 epoch: u64,
386 consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
387 additional_state_digest: AdditionalConsensusStateDigest,
388 ) -> VerifiedExecutableTransaction {
389 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v4(
390 epoch,
391 self.round,
392 self.timestamp,
393 self.consensus_commit_digest(),
394 consensus_determined_version_assignments,
395 additional_state_digest,
396 );
397 VerifiedExecutableTransaction::new_system(transaction, epoch)
398 }
399
400 pub fn create_consensus_commit_prologue_transaction(
401 &self,
402 epoch: u64,
403 protocol_config: &ProtocolConfig,
404 cancelled_txn_version_assignment: Vec<(
405 TransactionDigest,
406 Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
407 )>,
408 commit_info: &ConsensusCommitInfo,
409 indirect_state_observer: IndirectStateObserver,
410 ) -> VerifiedExecutableTransaction {
411 let version_assignments = if protocol_config
412 .record_consensus_determined_version_assignments_in_prologue_v2()
413 {
414 Some(
415 ConsensusDeterminedVersionAssignments::CancelledTransactionsV2(
416 cancelled_txn_version_assignment,
417 ),
418 )
419 } else if protocol_config.record_consensus_determined_version_assignments_in_prologue()
420 {
421 Some(
422 ConsensusDeterminedVersionAssignments::CancelledTransactions(
423 cancelled_txn_version_assignment
424 .into_iter()
425 .map(|(tx_digest, versions)| {
426 (
427 tx_digest,
428 versions.into_iter().map(|(id, v)| (id.0, v)).collect(),
429 )
430 })
431 .collect(),
432 ),
433 )
434 } else {
435 None
436 };
437
438 if protocol_config.record_additional_state_digest_in_prologue() {
439 let additional_state_digest =
440 if protocol_config.additional_consensus_digest_indirect_state() {
441 let d1 = commit_info.additional_state_digest();
442 indirect_state_observer.fold_with(d1)
443 } else {
444 commit_info.additional_state_digest()
445 };
446
447 self.consensus_commit_prologue_v4_transaction(
448 epoch,
449 version_assignments.unwrap(),
450 additional_state_digest,
451 )
452 } else if let Some(version_assignments) = version_assignments {
453 self.consensus_commit_prologue_v3_transaction(epoch, version_assignments)
454 } else if protocol_config.include_consensus_digest_in_prologue() {
455 self.consensus_commit_prologue_v2_transaction(epoch)
456 } else {
457 self.consensus_commit_prologue_transaction(epoch)
458 }
459 }
460 }
461
462 #[derive(Default)]
463 pub struct IndirectStateObserver {
464 hash: DefaultHash,
465 }
466
467 impl IndirectStateObserver {
468 pub fn new() -> Self {
469 Self::default()
470 }
471
472 pub fn observe_indirect_state<T: Serialize>(&mut self, state: &T) {
473 bcs::serialize_into(&mut self.hash, state).unwrap();
474 }
475
476 pub fn fold_with(
477 self,
478 d1: AdditionalConsensusStateDigest,
479 ) -> AdditionalConsensusStateDigest {
480 let hash = self.hash.finalize();
481 let d2 = AdditionalConsensusStateDigest::new(hash.into());
482
483 let mut hasher = DefaultHash::new();
484 bcs::serialize_into(&mut hasher, &d1).unwrap();
485 bcs::serialize_into(&mut hasher, &d2).unwrap();
486 AdditionalConsensusStateDigest::new(hasher.finalize().into())
487 }
488 }
489
490 #[test]
491 fn test_additional_consensus_state() {
492 use crate::consensus_test_utils::TestConsensusCommit;
493
494 fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) {
495 let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
496 state.observe_commit(
497 &protocol_config,
498 100,
499 &TestConsensusCommit::empty(round, timestamp, 0),
500 );
501 }
502
503 let mut s1 = AdditionalConsensusState::new(3);
504 observe(&mut s1, 1, 1000);
505 observe(&mut s1, 2, 2000);
506 observe(&mut s1, 3, 3000);
507 observe(&mut s1, 4, 4000);
508
509 let mut s2 = AdditionalConsensusState::new(3);
510 observe(&mut s2, 2, 2000);
513 observe(&mut s2, 3, 3000);
514 observe(&mut s2, 4, 4000);
515
516 assert_eq!(s1.digest(), s2.digest());
517
518 observe(&mut s1, 5, 5000);
519 observe(&mut s2, 5, 5000);
520
521 assert_eq!(s1.digest(), s2.digest());
522 }
523}
524use additional_consensus_state::AdditionalConsensusState;
525pub(crate) use additional_consensus_state::{ConsensusCommitInfo, IndirectStateObserver};
526
527pub struct ConsensusHandler<C> {
528 epoch_store: Arc<AuthorityPerEpochStore>,
531 last_consensus_stats: ExecutionIndicesWithStats,
535 checkpoint_service: Arc<C>,
536 cache_reader: Arc<dyn ObjectCacheRead>,
538 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
540 committee: ConsensusCommittee,
542 metrics: Arc<AuthorityMetrics>,
545 processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
547 execution_scheduler_sender: ExecutionSchedulerSender,
549 consensus_adapter: Arc<ConsensusAdapter>,
551
552 throughput_calculator: Arc<ConsensusThroughputCalculator>,
554
555 additional_consensus_state: AdditionalConsensusState,
556
557 backpressure_subscriber: BackpressureSubscriber,
558
559 traffic_controller: Option<Arc<TrafficController>>,
560}
561
562const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
563
564impl<C> ConsensusHandler<C> {
565 pub(crate) fn new(
566 epoch_store: Arc<AuthorityPerEpochStore>,
567 checkpoint_service: Arc<C>,
568 execution_scheduler: Arc<ExecutionScheduler>,
569 consensus_adapter: Arc<ConsensusAdapter>,
570 cache_reader: Arc<dyn ObjectCacheRead>,
571 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
572 committee: ConsensusCommittee,
573 metrics: Arc<AuthorityMetrics>,
574 throughput_calculator: Arc<ConsensusThroughputCalculator>,
575 backpressure_subscriber: BackpressureSubscriber,
576 traffic_controller: Option<Arc<TrafficController>>,
577 ) -> Self {
578 assert!(
579 matches!(
580 epoch_store
581 .protocol_config()
582 .per_object_congestion_control_mode(),
583 PerObjectCongestionControlMode::ExecutionTimeEstimate(_)
584 ),
585 "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
586 );
587
588 let mut last_consensus_stats = epoch_store
590 .get_last_consensus_stats()
591 .expect("Should be able to read last consensus index");
592 if !last_consensus_stats.stats.is_initialized() {
594 last_consensus_stats.stats = ConsensusStats::new(committee.size());
595 }
596 let execution_scheduler_sender =
597 ExecutionSchedulerSender::start(execution_scheduler, epoch_store.clone());
598 let commit_rate_estimate_window_size = epoch_store
599 .protocol_config()
600 .get_consensus_commit_rate_estimation_window_size();
601 Self {
602 epoch_store,
603 last_consensus_stats,
604 checkpoint_service,
605 cache_reader,
606 low_scoring_authorities,
607 committee,
608 metrics,
609 processed_cache: LruCache::new(
610 NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
611 ),
612 execution_scheduler_sender,
613 consensus_adapter,
614 throughput_calculator,
615 additional_consensus_state: AdditionalConsensusState::new(
616 commit_rate_estimate_window_size,
617 ),
618 backpressure_subscriber,
619 traffic_controller,
620 }
621 }
622
623 pub(crate) fn last_processed_subdag_index(&self) -> u64 {
625 self.last_consensus_stats.index.sub_dag_index
626 }
627
628 pub(crate) fn execution_scheduler_sender(&self) -> &ExecutionSchedulerSender {
629 &self.execution_scheduler_sender
630 }
631
632 pub(crate) fn new_for_testing(
633 epoch_store: Arc<AuthorityPerEpochStore>,
634 checkpoint_service: Arc<C>,
635 execution_scheduler_sender: ExecutionSchedulerSender,
636 consensus_adapter: Arc<ConsensusAdapter>,
637 cache_reader: Arc<dyn ObjectCacheRead>,
638 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
639 committee: ConsensusCommittee,
640 metrics: Arc<AuthorityMetrics>,
641 throughput_calculator: Arc<ConsensusThroughputCalculator>,
642 backpressure_subscriber: BackpressureSubscriber,
643 traffic_controller: Option<Arc<TrafficController>>,
644 last_consensus_stats: ExecutionIndicesWithStats,
645 ) -> Self {
646 let commit_rate_estimate_window_size = epoch_store
647 .protocol_config()
648 .get_consensus_commit_rate_estimation_window_size();
649 Self {
650 epoch_store,
651 last_consensus_stats,
652 checkpoint_service,
653 cache_reader,
654 low_scoring_authorities,
655 committee,
656 metrics,
657 processed_cache: LruCache::new(
658 NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
659 ),
660 execution_scheduler_sender,
661 consensus_adapter,
662 throughput_calculator,
663 additional_consensus_state: AdditionalConsensusState::new(
664 commit_rate_estimate_window_size,
665 ),
666 backpressure_subscriber,
667 traffic_controller,
668 }
669 }
670}
671
672#[derive(Default)]
673struct CommitHandlerInput {
674 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
675 capability_notifications: Vec<AuthorityCapabilitiesV2>,
676 execution_time_observations: Vec<ExecutionTimeObservation>,
677 checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
678 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
679 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
680 end_of_publish_transactions: Vec<AuthorityName>,
681 new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
682}
683
684struct CommitHandlerState {
685 dkg_failed: bool,
686 randomness_round: Option<RandomnessRound>,
687 output: ConsensusCommitOutput,
688 indirect_state_observer: Option<IndirectStateObserver>,
689 initial_reconfig_state: ReconfigState,
690}
691
692impl CommitHandlerState {
693 fn get_notifications(&self) -> Vec<SequencedConsensusTransactionKey> {
694 self.output
695 .get_consensus_messages_processed()
696 .cloned()
697 .collect()
698 }
699
700 fn init_randomness<'a, 'epoch>(
701 &'a mut self,
702 epoch_store: &'epoch AuthorityPerEpochStore,
703 commit_info: &'a ConsensusCommitInfo,
704 ) -> Option<tokio::sync::MutexGuard<'epoch, RandomnessManager>> {
705 let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| {
706 rm.try_lock()
707 .expect("should only ever be called from the commit handler thread")
708 });
709
710 let mut dkg_failed = false;
711 let randomness_round = if epoch_store.randomness_state_enabled() {
712 let randomness_manager = randomness_manager
713 .as_mut()
714 .expect("randomness manager should exist if randomness is enabled");
715 match randomness_manager.dkg_status() {
716 DkgStatus::Pending => None,
717 DkgStatus::Failed => {
718 dkg_failed = true;
719 None
720 }
721 DkgStatus::Successful => {
722 if self.initial_reconfig_state.should_accept_tx() {
725 randomness_manager
726 .reserve_next_randomness(commit_info.timestamp, &mut self.output)
728 .expect("epoch ended")
729 } else {
730 None
731 }
732 }
733 }
734 } else {
735 None
736 };
737
738 if randomness_round.is_some() {
739 assert!(!dkg_failed); }
741
742 self.randomness_round = randomness_round;
743 self.dkg_failed = dkg_failed;
744
745 randomness_manager
746 }
747}
748
749impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
750 fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
754 assert!(
755 self.epoch_store
756 .protocol_config()
757 .record_additional_state_digest_in_prologue()
758 );
759 let protocol_config = self.epoch_store.protocol_config();
760 let epoch_start_time = self
761 .epoch_store
762 .epoch_start_config()
763 .epoch_start_timestamp_ms();
764
765 self.additional_consensus_state.observe_commit(
766 protocol_config,
767 epoch_start_time,
768 &consensus_commit,
769 );
770 }
771
772 #[cfg(test)]
773 pub(crate) async fn handle_consensus_commit_for_test(
774 &mut self,
775 consensus_commit: impl ConsensusCommitAPI,
776 ) {
777 self.handle_consensus_commit(consensus_commit).await;
778 }
779
780 #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))]
781 pub(crate) async fn handle_consensus_commit(
782 &mut self,
783 consensus_commit: impl ConsensusCommitAPI,
784 ) {
785 let protocol_config = self.epoch_store.protocol_config();
786
787 assert!(protocol_config.ignore_execution_time_observations_after_certs_closed());
789 assert!(protocol_config.record_time_estimate_processed());
790 assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints());
791 assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest());
792 assert!(protocol_config.authority_capabilities_v2());
793 assert!(protocol_config.cancel_for_failed_dkg_early());
794
795 self.backpressure_subscriber.await_no_backpressure().await;
800
801 let epoch = self.epoch_store.epoch();
802
803 let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit");
804
805 let last_committed_round = self.last_consensus_stats.index.last_committed_round;
806
807 if let Some(consensus_tx_status_cache) = self.epoch_store.consensus_tx_status_cache.as_ref()
808 {
809 consensus_tx_status_cache
810 .update_last_committed_leader_round(last_committed_round as u32)
811 .await;
812 }
813 if let Some(tx_reject_reason_cache) = self.epoch_store.tx_reject_reason_cache.as_ref() {
814 tx_reject_reason_cache.set_last_committed_leader_round(last_committed_round as u32);
815 }
816
817 let commit_info = self.additional_consensus_state.observe_commit(
818 protocol_config,
819 self.epoch_store
820 .epoch_start_config()
821 .epoch_start_timestamp_ms(),
822 &consensus_commit,
823 );
824 assert!(commit_info.round > last_committed_round);
825
826 let (timestamp, leader_author, commit_sub_dag_index) =
827 self.gather_commit_metadata(&consensus_commit);
828
829 info!(
830 %consensus_commit,
831 "Received consensus output {}. Rejected transactions {}",
832 consensus_commit.commit_ref(),
833 consensus_commit.rejected_transactions_debug_string(),
834 );
835
836 self.last_consensus_stats.index = ExecutionIndices {
837 last_committed_round: commit_info.round,
838 sub_dag_index: commit_sub_dag_index,
839 transaction_index: 0_u64,
840 };
841
842 update_low_scoring_authorities(
843 self.low_scoring_authorities.clone(),
844 self.epoch_store.committee(),
845 &self.committee,
846 consensus_commit.reputation_score_sorted_desc(),
847 &self.metrics,
848 protocol_config.consensus_bad_nodes_stake_threshold(),
849 );
850
851 self.metrics
852 .consensus_committed_subdags
853 .with_label_values(&[&leader_author.to_string()])
854 .inc();
855
856 let mut state = CommitHandlerState {
857 output: ConsensusCommitOutput::new(commit_info.round),
858 dkg_failed: false,
859 randomness_round: None,
860 indirect_state_observer: Some(IndirectStateObserver::new()),
861 initial_reconfig_state: self
862 .epoch_store
863 .get_reconfig_state_read_lock_guard()
864 .clone(),
865 };
866
867 let FilteredConsensusOutput {
868 transactions,
869 owned_object_locks,
870 } = self.filter_consensus_txns(
871 state.initial_reconfig_state.clone(),
872 &commit_info,
873 &consensus_commit,
874 );
875 if !owned_object_locks.is_empty() {
877 state.output.set_owned_object_locks(owned_object_locks);
878 }
879 let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions);
880
881 let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info);
882
883 let CommitHandlerInput {
884 user_transactions,
885 capability_notifications,
886 execution_time_observations,
887 checkpoint_signature_messages,
888 randomness_dkg_messages,
889 randomness_dkg_confirmations,
890 end_of_publish_transactions,
891 new_jwks,
892 } = self.build_commit_handler_input(transactions);
893
894 self.process_jwks(&mut state, &commit_info, new_jwks);
895 self.process_capability_notifications(capability_notifications);
896 self.process_execution_time_observations(&mut state, execution_time_observations);
897 self.process_checkpoint_signature_messages(checkpoint_signature_messages);
898
899 self.process_dkg_updates(
900 &mut state,
901 &commit_info,
902 randomness_manager.as_deref_mut(),
903 randomness_dkg_messages,
904 randomness_dkg_confirmations,
905 )
906 .await;
907
908 let mut execution_time_estimator = self
909 .epoch_store
910 .execution_time_estimator
911 .try_lock()
912 .expect("should only ever be called from the commit handler thread");
913
914 let authenticator_state_update_transaction =
915 self.create_authenticator_state_update(last_committed_round, &commit_info);
916
917 let (schedulables, randomness_schedulables, assigned_versions) = self.process_transactions(
918 &mut state,
919 &mut execution_time_estimator,
920 &commit_info,
921 authenticator_state_update_transaction,
922 user_transactions,
923 );
924
925 let (should_accept_tx, lock, final_round) =
926 self.handle_eop(&mut state, end_of_publish_transactions);
927
928 let make_checkpoint = should_accept_tx || final_round;
929 if !make_checkpoint {
930 return;
932 }
933
934 if final_round {
937 self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
938 }
939
940 self.create_pending_checkpoints(
941 &mut state,
942 &commit_info,
943 &schedulables,
944 &randomness_schedulables,
945 final_round,
946 );
947
948 let notifications = state.get_notifications();
949
950 state
951 .output
952 .record_consensus_commit_stats(self.last_consensus_stats.clone());
953
954 self.record_deferral_deletion(&mut state);
955
956 self.epoch_store
957 .consensus_quarantine
958 .write()
959 .push_consensus_output(state.output, &self.epoch_store)
960 .expect("push_consensus_output should not fail");
961
962 debug!(
965 ?commit_info.round,
966 "Notifying checkpoint service about new pending checkpoint(s)",
967 );
968 self.checkpoint_service
969 .notify_checkpoint()
970 .expect("failed to notify checkpoint service");
971
972 if let Some(randomness_round) = state.randomness_round {
973 randomness_manager
974 .as_ref()
975 .expect("randomness manager should exist if randomness round is provided")
976 .generate_randomness(epoch, randomness_round);
977 }
978
979 self.epoch_store.process_notifications(notifications.iter());
980
981 self.log_final_round(lock, final_round);
983
984 self.throughput_calculator
986 .add_transactions(timestamp, schedulables.len() as u64);
987
988 fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
989 let key = [commit_sub_dag_index, epoch];
990 if sui_simulator::random::deterministic_probability_once(&key, 0.01) {
991 sui_simulator::task::kill_current_node(None);
992 }
993 });
994
995 fail_point!("crash"); let mut schedulables = schedulables;
998 schedulables.extend(randomness_schedulables);
999 self.execution_scheduler_sender.send(
1000 schedulables,
1001 assigned_versions,
1002 SchedulingSource::NonFastPath,
1003 );
1004
1005 self.send_end_of_publish_if_needed().await;
1006 }
1007
1008 fn handle_eop(
1009 &self,
1010 state: &mut CommitHandlerState,
1011 end_of_publish_transactions: Vec<AuthorityName>,
1012 ) -> (bool, Option<RwLockWriteGuard<'_, ReconfigState>>, bool) {
1013 let collected_eop =
1014 self.process_end_of_publish_transactions(state, end_of_publish_transactions);
1015 if collected_eop {
1016 let (lock, final_round) = self.advance_eop_state_machine(state);
1017 (lock.should_accept_tx(), Some(lock), final_round)
1018 } else {
1019 (true, None, false)
1020 }
1021 }
1022
1023 fn record_end_of_epoch_execution_time_observations(
1024 &self,
1025 estimator: &mut ExecutionTimeEstimator,
1026 ) {
1027 self.epoch_store
1028 .end_of_epoch_execution_time_observations
1029 .set(estimator.take_observations())
1030 .expect("`stored_execution_time_observations` should only be set once at end of epoch");
1031 }
1032
1033 fn record_deferral_deletion(&self, state: &mut CommitHandlerState) {
1034 let mut deferred_transactions = self
1035 .epoch_store
1036 .consensus_output_cache
1037 .deferred_transactions
1038 .lock();
1039 for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() {
1040 deferred_transactions.remove(&deleted_deferred_key);
1041 }
1042 }
1043
1044 fn log_final_round(&self, lock: Option<RwLockWriteGuard<ReconfigState>>, final_round: bool) {
1045 if final_round {
1046 let epoch = self.epoch_store.epoch();
1047 info!(
1048 ?epoch,
1049 lock=?lock.as_ref(),
1050 final_round=?final_round,
1051 "Notified last checkpoint"
1052 );
1053 self.epoch_store.record_end_of_message_quorum_time_metric();
1054 }
1055 }
1056
1057 fn create_pending_checkpoints(
1058 &self,
1059 state: &mut CommitHandlerState,
1060 commit_info: &ConsensusCommitInfo,
1061 schedulables: &[Schedulable],
1062 randomness_schedulables: &[Schedulable],
1063 final_round: bool,
1064 ) {
1065 let checkpoint_height = self
1066 .epoch_store
1067 .calculate_pending_checkpoint_height(commit_info.round);
1068
1069 let should_write_random_checkpoint = state.randomness_round.is_some()
1076 || (state.dkg_failed && !randomness_schedulables.is_empty());
1077
1078 let pending_checkpoint = PendingCheckpoint {
1079 roots: schedulables.iter().map(|s| s.key()).collect(),
1080 details: PendingCheckpointInfo {
1081 timestamp_ms: commit_info.timestamp,
1082 last_of_epoch: final_round && !should_write_random_checkpoint,
1083 checkpoint_height,
1084 consensus_commit_ref: commit_info.consensus_commit_ref,
1085 rejected_transactions_digest: commit_info.rejected_transactions_digest,
1086 },
1087 };
1088 self.epoch_store
1089 .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1090 .expect("failed to write pending checkpoint");
1091
1092 info!(
1093 "Written pending checkpoint: {:?}",
1094 pending_checkpoint.details,
1095 );
1096
1097 if should_write_random_checkpoint {
1098 let pending_checkpoint = PendingCheckpoint {
1099 roots: randomness_schedulables.iter().map(|s| s.key()).collect(),
1100 details: PendingCheckpointInfo {
1101 timestamp_ms: commit_info.timestamp,
1102 last_of_epoch: final_round,
1103 checkpoint_height: checkpoint_height + 1,
1104 consensus_commit_ref: commit_info.consensus_commit_ref,
1105 rejected_transactions_digest: commit_info.rejected_transactions_digest,
1106 },
1107 };
1108 self.epoch_store
1109 .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1110 .expect("failed to write pending checkpoint");
1111 }
1112 }
1113
1114 fn process_transactions(
1115 &self,
1116 state: &mut CommitHandlerState,
1117 execution_time_estimator: &mut ExecutionTimeEstimator,
1118 commit_info: &ConsensusCommitInfo,
1119 authenticator_state_update_transaction: Option<VerifiedExecutableTransactionWithAliases>,
1120 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1121 ) -> (Vec<Schedulable>, Vec<Schedulable>, AssignedTxAndVersions) {
1122 let protocol_config = self.epoch_store.protocol_config();
1123 let epoch = self.epoch_store.epoch();
1124
1125 let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) =
1128 self.merge_and_reorder_transactions(state, commit_info, user_transactions);
1129
1130 let mut shared_object_congestion_tracker =
1131 self.init_congestion_tracker(commit_info, false, &ordered_txns);
1132 let mut shared_object_using_randomness_congestion_tracker =
1133 self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns);
1134
1135 let randomness_state_update_transaction = state
1136 .randomness_round
1137 .map(|round| Schedulable::RandomnessStateUpdate(epoch, round));
1138 debug!(
1139 "Randomness state update transaction: {:?}",
1140 randomness_state_update_transaction
1141 .as_ref()
1142 .map(|t| t.key())
1143 );
1144
1145 let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len());
1146 let mut randomness_transactions_to_schedule =
1147 Vec::with_capacity(ordered_randomness_txns.len());
1148 let mut deferred_txns = BTreeMap::new();
1149 let mut cancelled_txns = BTreeMap::new();
1150
1151 for transaction in ordered_txns {
1152 self.handle_deferral_and_cancellation(
1153 state,
1154 &mut cancelled_txns,
1155 &mut deferred_txns,
1156 &mut transactions_to_schedule,
1157 protocol_config,
1158 commit_info,
1159 transaction,
1160 &mut shared_object_congestion_tracker,
1161 &previously_deferred_tx_digests,
1162 execution_time_estimator,
1163 );
1164 }
1165
1166 for transaction in ordered_randomness_txns {
1167 if state.dkg_failed {
1168 debug!(
1169 "Canceling randomness-using transaction {:?} because DKG failed",
1170 transaction.tx().digest(),
1171 );
1172 cancelled_txns.insert(
1173 *transaction.tx().digest(),
1174 CancelConsensusCertificateReason::DkgFailed,
1175 );
1176 randomness_transactions_to_schedule.push(transaction);
1177 continue;
1178 }
1179 self.handle_deferral_and_cancellation(
1180 state,
1181 &mut cancelled_txns,
1182 &mut deferred_txns,
1183 &mut randomness_transactions_to_schedule,
1184 protocol_config,
1185 commit_info,
1186 transaction,
1187 &mut shared_object_using_randomness_congestion_tracker,
1188 &previously_deferred_tx_digests,
1189 execution_time_estimator,
1190 );
1191 }
1192
1193 let mut total_deferred_txns = 0;
1194 {
1195 let mut deferred_transactions = self
1196 .epoch_store
1197 .consensus_output_cache
1198 .deferred_transactions
1199 .lock();
1200 for (key, txns) in deferred_txns.into_iter() {
1201 total_deferred_txns += txns.len();
1202 deferred_transactions.insert(key, txns.clone());
1203 state.output.defer_transactions(key, txns);
1204 }
1205 }
1206
1207 self.metrics
1208 .consensus_handler_deferred_transactions
1209 .inc_by(total_deferred_txns as u64);
1210 self.metrics
1211 .consensus_handler_cancelled_transactions
1212 .inc_by(cancelled_txns.len() as u64);
1213 self.metrics
1214 .consensus_handler_max_object_costs
1215 .with_label_values(&["regular_commit"])
1216 .set(shared_object_congestion_tracker.max_cost() as i64);
1217 self.metrics
1218 .consensus_handler_max_object_costs
1219 .with_label_values(&["randomness_commit"])
1220 .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
1221
1222 let object_debts = shared_object_congestion_tracker.accumulated_debts(commit_info);
1223 let randomness_object_debts =
1224 shared_object_using_randomness_congestion_tracker.accumulated_debts(commit_info);
1225 if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get()
1226 && let Err(e) = tx_object_debts.try_send(
1227 object_debts
1228 .iter()
1229 .chain(randomness_object_debts.iter())
1230 .map(|(id, _)| *id)
1231 .collect(),
1232 )
1233 {
1234 info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
1235 }
1236
1237 state
1238 .output
1239 .set_congestion_control_object_debts(object_debts);
1240 state
1241 .output
1242 .set_congestion_control_randomness_object_debts(randomness_object_debts);
1243
1244 let mut settlement = None;
1245 let mut randomness_settlement = None;
1246 if self.epoch_store.accumulators_enabled() {
1247 let checkpoint_height = self
1248 .epoch_store
1249 .calculate_pending_checkpoint_height(commit_info.round);
1250
1251 settlement = Some(Schedulable::AccumulatorSettlement(epoch, checkpoint_height));
1252
1253 if state.randomness_round.is_some() || !randomness_transactions_to_schedule.is_empty() {
1254 randomness_settlement = Some(Schedulable::AccumulatorSettlement(
1255 epoch,
1256 checkpoint_height + 1,
1257 ));
1258 }
1259 }
1260
1261 let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1262 .then_some(Schedulable::ConsensusCommitPrologue(
1263 epoch,
1264 commit_info.round,
1265 commit_info.consensus_commit_ref.index,
1266 ));
1267
1268 let schedulables: Vec<_> = itertools::chain!(
1269 consensus_commit_prologue.into_iter(),
1270 authenticator_state_update_transaction
1271 .into_iter()
1272 .map(Schedulable::Transaction),
1273 transactions_to_schedule
1274 .into_iter()
1275 .map(Schedulable::Transaction),
1276 settlement,
1277 )
1278 .collect();
1279
1280 let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1281 .into_iter()
1282 .chain(
1283 randomness_transactions_to_schedule
1284 .into_iter()
1285 .map(Schedulable::Transaction),
1286 )
1287 .chain(randomness_settlement)
1288 .collect();
1289
1290 let assigned_versions = self
1291 .epoch_store
1292 .process_consensus_transaction_shared_object_versions(
1293 self.cache_reader.as_ref(),
1294 schedulables.iter(),
1295 randomness_schedulables.iter(),
1296 &cancelled_txns,
1297 &mut state.output,
1298 )
1299 .expect("failed to assign shared object versions");
1300
1301 let consensus_commit_prologue =
1302 self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1303
1304 let mut schedulables = schedulables;
1305 let mut assigned_versions = assigned_versions;
1306 if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1307 assert!(matches!(
1308 schedulables[0],
1309 Schedulable::ConsensusCommitPrologue(..)
1310 ));
1311 assert!(matches!(
1312 assigned_versions.0[0].0,
1313 TransactionKey::ConsensusCommitPrologue(..)
1314 ));
1315 assigned_versions.0[0].0 =
1316 TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1317 schedulables[0] = Schedulable::Transaction(consensus_commit_prologue);
1318 }
1319
1320 self.epoch_store
1321 .process_user_signatures(schedulables.iter().chain(randomness_schedulables.iter()));
1322
1323 let schedulables: Vec<Schedulable> = schedulables.into_iter().map(|s| s.into()).collect();
1325 let randomness_schedulables: Vec<Schedulable> = randomness_schedulables
1326 .into_iter()
1327 .map(|s| s.into())
1328 .collect();
1329
1330 (schedulables, randomness_schedulables, assigned_versions)
1331 }
1332
1333 fn add_consensus_commit_prologue_transaction<'a>(
1337 &'a self,
1338 state: &'a mut CommitHandlerState,
1339 commit_info: &'a ConsensusCommitInfo,
1340 assigned_versions: &AssignedTxAndVersions,
1341 ) -> Option<VerifiedExecutableTransactionWithAliases> {
1342 {
1343 if commit_info.skip_consensus_commit_prologue_in_test {
1344 return None;
1345 }
1346 }
1347
1348 let mut cancelled_txn_version_assignment = Vec::new();
1349
1350 let protocol_config = self.epoch_store.protocol_config();
1351
1352 for (txn_key, assigned_versions) in assigned_versions.0.iter() {
1353 let Some(d) = txn_key.as_digest() else {
1354 continue;
1355 };
1356
1357 if !protocol_config.include_cancelled_randomness_txns_in_prologue()
1358 && assigned_versions
1359 .shared_object_versions
1360 .iter()
1361 .any(|((id, _), _)| *id == SUI_RANDOMNESS_STATE_OBJECT_ID)
1362 {
1363 continue;
1364 }
1365
1366 if assigned_versions
1367 .shared_object_versions
1368 .iter()
1369 .any(|(_, version)| version.is_cancelled())
1370 {
1371 assert_reachable!("cancelled transactions");
1372 cancelled_txn_version_assignment
1373 .push((*d, assigned_versions.shared_object_versions.clone()));
1374 }
1375 }
1376
1377 fail_point_arg!(
1378 "additional_cancelled_txns_for_tests",
1379 |additional_cancelled_txns: Vec<(
1380 TransactionDigest,
1381 Vec<(ConsensusObjectSequenceKey, SequenceNumber)>
1382 )>| {
1383 cancelled_txn_version_assignment.extend(additional_cancelled_txns);
1384 }
1385 );
1386
1387 let transaction = commit_info.create_consensus_commit_prologue_transaction(
1388 self.epoch_store.epoch(),
1389 self.epoch_store.protocol_config(),
1390 cancelled_txn_version_assignment,
1391 commit_info,
1392 state.indirect_state_observer.take().unwrap(),
1393 );
1394 Some(VerifiedExecutableTransactionWithAliases::no_aliases(
1395 transaction,
1396 ))
1397 }
1398
1399 fn handle_deferral_and_cancellation(
1400 &self,
1401 state: &mut CommitHandlerState,
1402 cancelled_txns: &mut BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1403 deferred_txns: &mut BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>,
1404 scheduled_txns: &mut Vec<VerifiedExecutableTransactionWithAliases>,
1405 protocol_config: &ProtocolConfig,
1406 commit_info: &ConsensusCommitInfo,
1407 transaction: VerifiedExecutableTransactionWithAliases,
1408 shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
1409 previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
1410 execution_time_estimator: &ExecutionTimeEstimator,
1411 ) {
1412 let tx_cost = shared_object_congestion_tracker.get_tx_cost(
1413 execution_time_estimator,
1414 transaction.tx(),
1415 state.indirect_state_observer.as_mut().unwrap(),
1416 );
1417
1418 let deferral_info = self.epoch_store.should_defer(
1419 transaction.tx(),
1420 commit_info,
1421 state.dkg_failed,
1422 state.randomness_round.is_some(),
1423 previously_deferred_tx_digests,
1424 shared_object_congestion_tracker,
1425 );
1426
1427 if let Some((deferral_key, deferral_reason)) = deferral_info {
1428 debug!(
1429 "Deferring consensus certificate for transaction {:?} until {:?}",
1430 transaction.tx().digest(),
1431 deferral_key
1432 );
1433
1434 match deferral_reason {
1435 DeferralReason::RandomnessNotReady => {
1436 deferred_txns
1437 .entry(deferral_key)
1438 .or_default()
1439 .push(transaction);
1440 }
1441 DeferralReason::SharedObjectCongestion(congested_objects) => {
1442 self.metrics.consensus_handler_congested_transactions.inc();
1443 if transaction_deferral_within_limit(
1444 &deferral_key,
1445 protocol_config.max_deferral_rounds_for_congestion_control(),
1446 ) {
1447 deferred_txns
1448 .entry(deferral_key)
1449 .or_default()
1450 .push(transaction);
1451 } else {
1452 assert_sometimes!(
1453 transaction.tx().data().transaction_data().uses_randomness(),
1454 "cancelled randomness-using transaction"
1455 );
1456 assert_sometimes!(
1457 !transaction.tx().data().transaction_data().uses_randomness(),
1458 "cancelled non-randomness-using transaction"
1459 );
1460
1461 debug!(
1463 "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}",
1464 transaction.tx().digest(),
1465 deferral_key,
1466 congested_objects
1467 );
1468 cancelled_txns.insert(
1469 *transaction.tx().digest(),
1470 CancelConsensusCertificateReason::CongestionOnObjects(
1471 congested_objects,
1472 ),
1473 );
1474 scheduled_txns.push(transaction);
1475 }
1476 }
1477 }
1478 } else {
1479 shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, transaction.tx());
1481 scheduled_txns.push(transaction);
1482 }
1483 }
1484
1485 fn merge_and_reorder_transactions(
1486 &self,
1487 state: &mut CommitHandlerState,
1488 commit_info: &ConsensusCommitInfo,
1489 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1490 ) -> (
1491 Vec<VerifiedExecutableTransactionWithAliases>,
1492 Vec<VerifiedExecutableTransactionWithAliases>,
1493 HashMap<TransactionDigest, DeferralKey>,
1494 ) {
1495 let protocol_config = self.epoch_store.protocol_config();
1496
1497 let (mut txns, mut randomness_txns, previously_deferred_tx_digests) =
1498 self.load_deferred_transactions(state, commit_info);
1499
1500 txns.reserve(user_transactions.len());
1501 randomness_txns.reserve(user_transactions.len());
1502
1503 let mut txns: Vec<_> = txns
1506 .into_iter()
1507 .filter_map(|tx| {
1508 if tx.tx().transaction_data().uses_randomness() {
1509 randomness_txns.push(tx);
1510 None
1511 } else {
1512 Some(tx)
1513 }
1514 })
1515 .collect();
1516
1517 for txn in user_transactions {
1518 if txn.tx().transaction_data().uses_randomness() {
1519 randomness_txns.push(txn);
1520 } else {
1521 txns.push(txn);
1522 }
1523 }
1524
1525 PostConsensusTxReorder::reorder(
1526 &mut txns,
1527 protocol_config.consensus_transaction_ordering(),
1528 );
1529 PostConsensusTxReorder::reorder(
1530 &mut randomness_txns,
1531 protocol_config.consensus_transaction_ordering(),
1532 );
1533
1534 (txns, randomness_txns, previously_deferred_tx_digests)
1535 }
1536
1537 fn load_deferred_transactions(
1538 &self,
1539 state: &mut CommitHandlerState,
1540 commit_info: &ConsensusCommitInfo,
1541 ) -> (
1542 Vec<VerifiedExecutableTransactionWithAliases>,
1543 Vec<VerifiedExecutableTransactionWithAliases>,
1544 HashMap<TransactionDigest, DeferralKey>,
1545 ) {
1546 let mut previously_deferred_tx_digests = HashMap::new();
1547
1548 let deferred_txs: Vec<_> = self
1549 .epoch_store
1550 .load_deferred_transactions_for_up_to_consensus_round_v2(
1551 &mut state.output,
1552 commit_info.round,
1553 )
1554 .expect("db error")
1555 .into_iter()
1556 .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
1557 .map(|(key, tx)| {
1558 previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
1559 tx
1560 })
1561 .collect();
1562 trace!(
1563 "loading deferred transactions: {:?}",
1564 deferred_txs.iter().map(|tx| tx.tx().digest())
1565 );
1566
1567 let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() {
1568 let txns: Vec<_> = self
1569 .epoch_store
1570 .load_deferred_transactions_for_randomness_v2(&mut state.output)
1571 .expect("db error")
1572 .into_iter()
1573 .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
1574 .map(|(key, tx)| {
1575 previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
1576 tx
1577 })
1578 .collect();
1579 trace!(
1580 "loading deferred randomness transactions: {:?}",
1581 txns.iter().map(|tx| tx.tx().digest())
1582 );
1583 txns
1584 } else {
1585 vec![]
1586 };
1587
1588 (
1589 deferred_txs,
1590 deferred_randomness_txs,
1591 previously_deferred_tx_digests,
1592 )
1593 }
1594
1595 fn init_congestion_tracker(
1596 &self,
1597 commit_info: &ConsensusCommitInfo,
1598 for_randomness: bool,
1599 txns: &[VerifiedExecutableTransactionWithAliases],
1600 ) -> SharedObjectCongestionTracker {
1601 #[allow(unused_mut)]
1602 let mut ret = SharedObjectCongestionTracker::from_protocol_config(
1603 self.epoch_store
1604 .consensus_quarantine
1605 .read()
1606 .load_initial_object_debts(
1607 &self.epoch_store,
1608 commit_info.round,
1609 for_randomness,
1610 txns,
1611 )
1612 .expect("db error"),
1613 self.epoch_store.protocol_config(),
1614 for_randomness,
1615 );
1616
1617 fail_point_arg!(
1618 "initial_congestion_tracker",
1619 |tracker: SharedObjectCongestionTracker| {
1620 info!(
1621 "Initialize shared_object_congestion_tracker to {:?}",
1622 tracker
1623 );
1624 ret = tracker;
1625 }
1626 );
1627
1628 ret
1629 }
1630
1631 fn process_jwks(
1632 &self,
1633 state: &mut CommitHandlerState,
1634 commit_info: &ConsensusCommitInfo,
1635 new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
1636 ) {
1637 for (authority_name, jwk_id, jwk) in new_jwks {
1638 self.epoch_store.record_jwk_vote(
1639 &mut state.output,
1640 commit_info.round,
1641 authority_name,
1642 &jwk_id,
1643 &jwk,
1644 );
1645 }
1646 }
1647
1648 fn process_capability_notifications(
1649 &self,
1650 capability_notifications: Vec<AuthorityCapabilitiesV2>,
1651 ) {
1652 for capabilities in capability_notifications {
1653 self.epoch_store
1654 .record_capabilities_v2(&capabilities)
1655 .expect("db error");
1656 }
1657 }
1658
1659 fn process_execution_time_observations(
1660 &self,
1661 state: &mut CommitHandlerState,
1662 execution_time_observations: Vec<ExecutionTimeObservation>,
1663 ) {
1664 let mut execution_time_estimator = self
1665 .epoch_store
1666 .execution_time_estimator
1667 .try_lock()
1668 .expect("should only ever be called from the commit handler thread");
1669
1670 for ExecutionTimeObservation {
1671 authority,
1672 generation,
1673 estimates,
1674 } in execution_time_observations
1675 {
1676 let authority_index = self
1677 .epoch_store
1678 .committee()
1679 .authority_index(&authority)
1680 .unwrap();
1681 execution_time_estimator.process_observations_from_consensus(
1682 authority_index,
1683 Some(generation),
1684 &estimates,
1685 );
1686 state
1687 .output
1688 .insert_execution_time_observation(authority_index, generation, estimates);
1689 }
1690 }
1691
1692 fn process_checkpoint_signature_messages(
1693 &self,
1694 checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
1695 ) {
1696 for checkpoint_signature_message in checkpoint_signature_messages {
1697 self.checkpoint_service
1698 .notify_checkpoint_signature(&self.epoch_store, &checkpoint_signature_message)
1699 .expect("db error");
1700 }
1701 }
1702
1703 async fn process_dkg_updates(
1704 &self,
1705 state: &mut CommitHandlerState,
1706 commit_info: &ConsensusCommitInfo,
1707 randomness_manager: Option<&mut RandomnessManager>,
1708 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
1709 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
1710 ) {
1711 if !self.epoch_store.randomness_state_enabled() {
1712 let num_dkg_messages = randomness_dkg_messages.len();
1713 let num_dkg_confirmations = randomness_dkg_confirmations.len();
1714 if num_dkg_messages + num_dkg_confirmations > 0 {
1715 debug_fatal!(
1716 "received {} RandomnessDkgMessage and {} RandomnessDkgConfirmation messages when randomness is not enabled",
1717 num_dkg_messages,
1718 num_dkg_confirmations
1719 );
1720 }
1721 return;
1722 }
1723
1724 let randomness_manager =
1725 randomness_manager.expect("randomness manager should exist if randomness is enabled");
1726
1727 let randomness_dkg_updates =
1728 self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages);
1729
1730 let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations(
1731 state,
1732 randomness_manager,
1733 randomness_dkg_confirmations,
1734 );
1735
1736 if randomness_dkg_updates || randomness_dkg_confirmation_updates {
1737 randomness_manager
1738 .advance_dkg(&mut state.output, commit_info.round)
1739 .await
1740 .expect("epoch ended");
1741 }
1742 }
1743
1744 fn process_randomness_dkg_messages(
1745 &self,
1746 randomness_manager: &mut RandomnessManager,
1747 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
1748 ) -> bool {
1749 if randomness_dkg_messages.is_empty() {
1750 return false;
1751 }
1752
1753 let mut randomness_state_updated = false;
1754 for (authority, bytes) in randomness_dkg_messages {
1755 match bcs::from_bytes(&bytes) {
1756 Ok(message) => {
1757 randomness_manager
1758 .add_message(&authority, message)
1759 .expect("epoch ended");
1761 randomness_state_updated = true;
1762 }
1763
1764 Err(e) => {
1765 warn!(
1766 "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
1767 authority.concise(),
1768 );
1769 }
1770 }
1771 }
1772
1773 randomness_state_updated
1774 }
1775
1776 fn process_randomness_dkg_confirmations(
1777 &self,
1778 state: &mut CommitHandlerState,
1779 randomness_manager: &mut RandomnessManager,
1780 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
1781 ) -> bool {
1782 if randomness_dkg_confirmations.is_empty() {
1783 return false;
1784 }
1785
1786 let mut randomness_state_updated = false;
1787 for (authority, bytes) in randomness_dkg_confirmations {
1788 match bcs::from_bytes(&bytes) {
1789 Ok(message) => {
1790 randomness_manager
1791 .add_confirmation(&mut state.output, &authority, message)
1792 .expect("epoch ended");
1794 randomness_state_updated = true;
1795 }
1796 Err(e) => {
1797 warn!(
1798 "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}",
1799 authority.concise(),
1800 );
1801 }
1802 }
1803 }
1804
1805 randomness_state_updated
1806 }
1807
1808 fn process_end_of_publish_transactions(
1810 &self,
1811 state: &mut CommitHandlerState,
1812 end_of_publish_transactions: Vec<AuthorityName>,
1813 ) -> bool {
1814 let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect(
1815 "No contention on end_of_publish as it is only accessed from consensus handler",
1816 );
1817
1818 if eop_aggregator.has_quorum() {
1819 return true;
1820 }
1821
1822 if end_of_publish_transactions.is_empty() {
1823 return false;
1824 }
1825
1826 for authority in end_of_publish_transactions {
1827 info!("Received EndOfPublish from {:?}", authority.concise());
1828
1829 state.output.insert_end_of_publish(authority);
1832 if eop_aggregator
1833 .insert_generic(authority, ())
1834 .is_quorum_reached()
1835 {
1836 debug!(
1837 "Collected enough end_of_publish messages with last message from validator {:?}",
1838 authority.concise(),
1839 );
1840 return true;
1841 }
1842 }
1843
1844 false
1845 }
1846
1847 fn advance_eop_state_machine(
1850 &self,
1851 state: &mut CommitHandlerState,
1852 ) -> (
1853 RwLockWriteGuard<'_, ReconfigState>,
1854 bool, ) {
1856 let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard();
1857 let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx();
1858
1859 reconfig_state.close_all_certs();
1860
1861 let commit_has_deferred_txns = state.output.has_deferred_transactions();
1862 let previous_commits_have_deferred_txns = !self.epoch_store.deferred_transactions_empty();
1863
1864 if !commit_has_deferred_txns && !previous_commits_have_deferred_txns {
1865 if !start_state_is_reject_all_tx {
1866 info!("Transitioning to RejectAllTx");
1867 }
1868 reconfig_state.close_all_tx();
1869 } else {
1870 debug!(
1871 "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}",
1872 previous_commits_have_deferred_txns, commit_has_deferred_txns,
1873 );
1874 }
1875
1876 state.output.store_reconfig_state(reconfig_state.clone());
1877
1878 if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() {
1879 (reconfig_state, true)
1880 } else {
1881 (reconfig_state, false)
1882 }
1883 }
1884
1885 fn gather_commit_metadata(
1886 &self,
1887 consensus_commit: &impl ConsensusCommitAPI,
1888 ) -> (u64, AuthorityIndex, u64) {
1889 let timestamp = consensus_commit.commit_timestamp_ms();
1890 let leader_author = consensus_commit.leader_author_index();
1891 let commit_sub_dag_index = consensus_commit.commit_sub_dag_index();
1892
1893 let system_time_ms = SystemTime::now()
1894 .duration_since(UNIX_EPOCH)
1895 .unwrap()
1896 .as_millis() as i64;
1897
1898 let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64);
1899 let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0;
1900 self.metrics
1901 .consensus_timestamp_bias
1902 .observe(consensus_timestamp_bias_seconds);
1903
1904 let epoch_start = self
1905 .epoch_store
1906 .epoch_start_config()
1907 .epoch_start_timestamp_ms();
1908 let timestamp = if timestamp < epoch_start {
1909 error!(
1910 "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}"
1911 );
1912 epoch_start
1913 } else {
1914 timestamp
1915 };
1916
1917 (timestamp, leader_author, commit_sub_dag_index)
1918 }
1919
1920 fn create_authenticator_state_update(
1921 &self,
1922 last_committed_round: u64,
1923 commit_info: &ConsensusCommitInfo,
1924 ) -> Option<VerifiedExecutableTransactionWithAliases> {
1925 let new_jwks = self
1933 .epoch_store
1934 .get_new_jwks(last_committed_round)
1935 .expect("Unrecoverable error in consensus handler");
1936
1937 if !new_jwks.is_empty() {
1938 let authenticator_state_update_transaction = authenticator_state_update_transaction(
1939 &self.epoch_store,
1940 commit_info.round,
1941 new_jwks,
1942 );
1943 debug!(
1944 "adding AuthenticatorStateUpdate({:?}) tx: {:?}",
1945 authenticator_state_update_transaction.digest(),
1946 authenticator_state_update_transaction,
1947 );
1948
1949 Some(VerifiedExecutableTransactionWithAliases::no_aliases(
1950 authenticator_state_update_transaction,
1951 ))
1952 } else {
1953 None
1954 }
1955 }
1956
1957 #[instrument(level = "trace", skip_all)]
1961 fn filter_consensus_txns(
1962 &mut self,
1963 initial_reconfig_state: ReconfigState,
1964 commit_info: &ConsensusCommitInfo,
1965 consensus_commit: &impl ConsensusCommitAPI,
1966 ) -> FilteredConsensusOutput {
1967 let mut transactions = Vec::new();
1968 let mut owned_object_locks = HashMap::new();
1969 let epoch = self.epoch_store.epoch();
1970 let mut num_finalized_user_transactions = vec![0; self.committee.size()];
1971 let mut num_rejected_user_transactions = vec![0; self.committee.size()];
1972 for (block, parsed_transactions) in consensus_commit.transactions() {
1973 let author = block.author.value();
1974 self.last_consensus_stats.stats.inc_num_messages(author);
1976
1977 self.epoch_store.set_consensus_tx_status(
1979 ConsensusPosition::ping(epoch, block),
1980 ConsensusTxStatus::Finalized,
1981 );
1982
1983 for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() {
1984 let position = ConsensusPosition {
1985 epoch,
1986 block,
1987 index: tx_index as TransactionIndex,
1988 };
1989
1990 if self.epoch_store.protocol_config().mysticeti_fastpath()
1993 && let Some(tx) = parsed.transaction.kind.as_user_transaction()
1994 {
1995 let digest = tx.digest();
1996 if let Some((spam_weight, submitter_client_addrs)) = self
1997 .epoch_store
1998 .submitted_transaction_cache
1999 .increment_submission_count(digest)
2000 {
2001 if let Some(ref traffic_controller) = self.traffic_controller {
2002 debug!(
2003 "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses",
2004 submitter_client_addrs.len()
2005 );
2006
2007 for addr in submitter_client_addrs {
2009 traffic_controller.tally(TrafficTally::new(
2010 Some(addr),
2011 None,
2012 None,
2013 spam_weight.clone(),
2014 ));
2015 }
2016 } else {
2017 warn!(
2018 "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)",
2019 submitter_client_addrs.len()
2020 );
2021 }
2022 }
2023 }
2024
2025 if parsed.rejected {
2026 if matches!(
2028 parsed.transaction.kind,
2029 ConsensusTransactionKind::UserTransaction(_)
2030 | ConsensusTransactionKind::UserTransactionV2(_)
2031 ) {
2032 self.epoch_store
2033 .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
2034 num_rejected_user_transactions[author] += 1;
2035 }
2036 continue;
2039 }
2040 let defer_finalized_status = self
2044 .epoch_store
2045 .protocol_config()
2046 .disable_preconsensus_locking()
2047 && matches!(
2048 parsed.transaction.kind,
2049 ConsensusTransactionKind::UserTransactionV2(_)
2050 );
2051 if !defer_finalized_status
2052 && matches!(
2053 parsed.transaction.kind,
2054 ConsensusTransactionKind::UserTransaction(_)
2055 | ConsensusTransactionKind::UserTransactionV2(_)
2056 )
2057 {
2058 self.epoch_store
2059 .set_consensus_tx_status(position, ConsensusTxStatus::Finalized);
2060 num_finalized_user_transactions[author] += 1;
2061 }
2062 let kind = classify(&parsed.transaction);
2063 self.metrics
2064 .consensus_handler_processed
2065 .with_label_values(&[kind])
2066 .inc();
2067 self.metrics
2068 .consensus_handler_transaction_sizes
2069 .with_label_values(&[kind])
2070 .observe(parsed.serialized_len as f64);
2071 if matches!(
2073 &parsed.transaction.kind,
2074 ConsensusTransactionKind::CertifiedTransaction(_)
2075 | ConsensusTransactionKind::UserTransaction(_)
2076 | ConsensusTransactionKind::UserTransactionV2(_)
2077 ) {
2078 self.last_consensus_stats
2079 .stats
2080 .inc_num_user_transactions(author);
2081 }
2082
2083 if !initial_reconfig_state.should_accept_consensus_certs() {
2084 match &parsed.transaction.kind {
2087 ConsensusTransactionKind::UserTransaction(_)
2088 | ConsensusTransactionKind::UserTransactionV2(_)
2089 | ConsensusTransactionKind::CertifiedTransaction(_)
2090 | ConsensusTransactionKind::CapabilityNotification(_)
2092 | ConsensusTransactionKind::CapabilityNotificationV2(_)
2093 | ConsensusTransactionKind::EndOfPublish(_)
2094 | ConsensusTransactionKind::ExecutionTimeObservation(_)
2096 | ConsensusTransactionKind::NewJWKFetched(_, _, _) => {
2097 debug!(
2098 "Ignoring consensus transaction {:?} because of end of epoch",
2099 parsed.transaction.key()
2100 );
2101 continue;
2102 }
2103
2104 ConsensusTransactionKind::CheckpointSignature(_)
2106 | ConsensusTransactionKind::CheckpointSignatureV2(_)
2107 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2108 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
2109 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => ()
2110 }
2111 }
2112
2113 if !initial_reconfig_state.should_accept_tx() {
2114 match &parsed.transaction.kind {
2115 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
2116 | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue,
2117 _ => {}
2118 }
2119 }
2120
2121 if parsed.transaction.is_mfp_transaction()
2122 && !self.epoch_store.protocol_config().mysticeti_fastpath()
2123 {
2124 debug!(
2125 "Ignoring MFP transaction {:?} because MFP is disabled",
2126 parsed.transaction.key()
2127 );
2128 continue;
2129 }
2130
2131 if let ConsensusTransactionKind::CertifiedTransaction(certificate) =
2132 &parsed.transaction.kind
2133 && certificate.epoch() != epoch
2134 {
2135 debug!(
2136 "Certificate epoch ({:?}) doesn't match the current epoch ({:?})",
2137 certificate.epoch(),
2138 epoch
2139 );
2140 continue;
2141 }
2142
2143 match &parsed.transaction.kind {
2145 ConsensusTransactionKind::CapabilityNotification(_)
2146 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2147 | ConsensusTransactionKind::CheckpointSignature(_) => {
2148 debug_fatal!(
2149 "BUG: saw deprecated tx {:?}for commit round {}",
2150 parsed.transaction.key(),
2151 commit_info.round
2152 );
2153 continue;
2154 }
2155 _ => {}
2156 }
2157
2158 if matches!(
2159 &parsed.transaction.kind,
2160 ConsensusTransactionKind::UserTransaction(_)
2161 | ConsensusTransactionKind::UserTransactionV2(_)
2162 | ConsensusTransactionKind::CertifiedTransaction(_)
2163 ) {
2164 let author_name = self
2165 .epoch_store
2166 .committee()
2167 .authority_by_index(author as u32)
2168 .unwrap();
2169 if self
2170 .epoch_store
2171 .has_received_end_of_publish_from(author_name)
2172 {
2173 warn!(
2177 "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus",
2178 author_name.concise(),
2179 parsed.transaction.key(),
2180 );
2181 continue;
2182 }
2183 }
2184
2185 if self
2192 .epoch_store
2193 .protocol_config()
2194 .disable_preconsensus_locking()
2195 && let ConsensusTransactionKind::UserTransactionV2(tx_with_claims) =
2196 &parsed.transaction.kind
2197 {
2198 let immutable_object_ids: HashSet<ObjectID> =
2199 tx_with_claims.get_immutable_objects().into_iter().collect();
2200 let tx = tx_with_claims.tx();
2201
2202 let Ok(input_objects) = tx.transaction_data().input_objects() else {
2203 debug_fatal!("Invalid input objects for transaction {}", tx.digest());
2204 continue;
2205 };
2206
2207 let owned_object_refs: Vec<_> = input_objects
2210 .iter()
2211 .filter_map(|obj| match obj {
2212 InputObjectKind::ImmOrOwnedMoveObject(obj_ref)
2213 if !immutable_object_ids.contains(&obj_ref.0) =>
2214 {
2215 Some(*obj_ref)
2216 }
2217 _ => None,
2218 })
2219 .collect();
2220
2221 match self
2222 .epoch_store
2223 .try_acquire_owned_object_locks_post_consensus(
2224 &owned_object_refs,
2225 *tx.digest(),
2226 &owned_object_locks,
2227 ) {
2228 Ok(new_locks) => {
2229 owned_object_locks.extend(new_locks.into_iter());
2230 self.epoch_store
2232 .set_consensus_tx_status(position, ConsensusTxStatus::Finalized);
2233 num_finalized_user_transactions[author] += 1;
2234 }
2235 Err(e) => {
2236 debug!("Dropping transaction {}: {}", tx.digest(), e);
2237 self.epoch_store
2238 .set_consensus_tx_status(position, ConsensusTxStatus::Dropped);
2239 self.epoch_store.set_rejection_vote_reason(position, &e);
2240 continue;
2241 }
2242 }
2243 }
2244
2245 let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);
2246 transactions.push((transaction, author as u32));
2247 }
2248 }
2249
2250 for (i, authority) in self.committee.authorities() {
2251 let hostname = &authority.hostname;
2252 self.metrics
2253 .consensus_committed_messages
2254 .with_label_values(&[hostname])
2255 .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
2256 self.metrics
2257 .consensus_committed_user_transactions
2258 .with_label_values(&[hostname])
2259 .set(
2260 self.last_consensus_stats
2261 .stats
2262 .get_num_user_transactions(i.value()) as i64,
2263 );
2264 self.metrics
2265 .consensus_finalized_user_transactions
2266 .with_label_values(&[hostname])
2267 .add(num_finalized_user_transactions[i.value()] as i64);
2268 self.metrics
2269 .consensus_rejected_user_transactions
2270 .with_label_values(&[hostname])
2271 .add(num_rejected_user_transactions[i.value()] as i64);
2272 }
2273
2274 FilteredConsensusOutput {
2275 transactions,
2276 owned_object_locks,
2277 }
2278 }
2279
2280 fn deduplicate_consensus_txns(
2281 &mut self,
2282 state: &mut CommitHandlerState,
2283 commit_info: &ConsensusCommitInfo,
2284 transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
2285 ) -> Vec<VerifiedSequencedConsensusTransaction> {
2286 let mut processed_set = HashSet::new();
2289
2290 let mut all_transactions = Vec::new();
2291
2292 for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
2297 let current_tx_index = ExecutionIndices {
2302 last_committed_round: commit_info.round,
2303 sub_dag_index: commit_info.consensus_commit_ref.index.into(),
2304 transaction_index: (seq + 1) as u64,
2305 };
2306
2307 self.last_consensus_stats.index = current_tx_index;
2308
2309 let certificate_author = *self
2310 .epoch_store
2311 .committee()
2312 .authority_by_index(cert_origin)
2313 .unwrap();
2314
2315 let sequenced_transaction = SequencedConsensusTransaction {
2316 certificate_author_index: cert_origin,
2317 certificate_author,
2318 consensus_index: current_tx_index,
2319 transaction,
2320 };
2321
2322 let Some(verified_transaction) = self
2323 .epoch_store
2324 .verify_consensus_transaction(sequenced_transaction)
2325 else {
2326 continue;
2327 };
2328
2329 let key = verified_transaction.0.key();
2330
2331 if let Some(tx_digest) = key.user_transaction_digest() {
2332 self.epoch_store
2333 .cache_recently_finalized_transaction(tx_digest);
2334 }
2335
2336 let in_set = !processed_set.insert(key.clone());
2337 let in_cache = self.processed_cache.put(key.clone(), ()).is_some();
2338 if in_set || in_cache {
2339 self.metrics.skipped_consensus_txns_cache_hit.inc();
2340 continue;
2341 }
2342 if self
2343 .epoch_store
2344 .is_consensus_message_processed(&key)
2345 .expect("db error")
2346 {
2347 self.metrics.skipped_consensus_txns.inc();
2348 continue;
2349 }
2350
2351 state.output.record_consensus_message_processed(key);
2352
2353 all_transactions.push(verified_transaction);
2354 }
2355
2356 all_transactions
2357 }
2358
2359 fn build_commit_handler_input(
2360 &self,
2361 transactions: Vec<VerifiedSequencedConsensusTransaction>,
2362 ) -> CommitHandlerInput {
2363 let epoch = self.epoch_store.epoch();
2364 let mut commit_handler_input = CommitHandlerInput::default();
2365
2366 for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() {
2367 match transaction.transaction {
2368 SequencedConsensusTransactionKind::External(consensus_transaction) => {
2369 match consensus_transaction.kind {
2370 ConsensusTransactionKind::CertifiedTransaction(cert) => {
2372 let cert = VerifiedCertificate::new_unchecked(*cert);
2374 let transaction =
2375 VerifiedExecutableTransaction::new_from_certificate(cert);
2376 commit_handler_input.user_transactions.push(
2377 VerifiedExecutableTransactionWithAliases::no_aliases(transaction),
2378 );
2379 }
2380 ConsensusTransactionKind::UserTransaction(tx) => {
2381 let tx = VerifiedTransaction::new_unchecked(*tx);
2383 let transaction =
2385 VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
2386 commit_handler_input
2387 .user_transactions
2388 .push(VerifiedExecutableTransactionWithAliases::no_aliases(
2390 transaction,
2391 ));
2392 }
2393 ConsensusTransactionKind::UserTransactionV2(tx) => {
2394 let used_alias_versions = tx.aliases();
2396 let inner_tx = tx.into_tx();
2397 let tx = VerifiedTransaction::new_unchecked(inner_tx);
2399 let transaction =
2401 VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
2402 if let Some(used_alias_versions) = used_alias_versions {
2403 commit_handler_input
2404 .user_transactions
2405 .push(WithAliases::new(transaction, used_alias_versions));
2406 } else {
2407 commit_handler_input.user_transactions.push(
2408 VerifiedExecutableTransactionWithAliases::no_aliases(
2409 transaction,
2410 ),
2411 );
2412 }
2413 }
2414
2415 ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => {
2417 commit_handler_input
2418 .end_of_publish_transactions
2419 .push(authority_public_key_bytes);
2420 }
2421 ConsensusTransactionKind::NewJWKFetched(
2422 authority_public_key_bytes,
2423 jwk_id,
2424 jwk,
2425 ) => {
2426 commit_handler_input.new_jwks.push((
2427 authority_public_key_bytes,
2428 jwk_id,
2429 jwk,
2430 ));
2431 }
2432 ConsensusTransactionKind::RandomnessDkgMessage(
2433 authority_public_key_bytes,
2434 items,
2435 ) => {
2436 commit_handler_input
2437 .randomness_dkg_messages
2438 .push((authority_public_key_bytes, items));
2439 }
2440 ConsensusTransactionKind::RandomnessDkgConfirmation(
2441 authority_public_key_bytes,
2442 items,
2443 ) => {
2444 commit_handler_input
2445 .randomness_dkg_confirmations
2446 .push((authority_public_key_bytes, items));
2447 }
2448 ConsensusTransactionKind::CapabilityNotificationV2(
2449 authority_capabilities_v2,
2450 ) => {
2451 commit_handler_input
2452 .capability_notifications
2453 .push(authority_capabilities_v2);
2454 }
2455 ConsensusTransactionKind::ExecutionTimeObservation(
2456 execution_time_observation,
2457 ) => {
2458 commit_handler_input
2459 .execution_time_observations
2460 .push(execution_time_observation);
2461 }
2462 ConsensusTransactionKind::CheckpointSignatureV2(
2463 checkpoint_signature_message,
2464 ) => {
2465 commit_handler_input
2466 .checkpoint_signature_messages
2467 .push(*checkpoint_signature_message);
2468 }
2469
2470 ConsensusTransactionKind::CheckpointSignature(_)
2472 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2473 | ConsensusTransactionKind::CapabilityNotification(_) => {
2474 unreachable!("filtered earlier")
2475 }
2476 }
2477 }
2478 SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(),
2480 }
2481 }
2482
2483 commit_handler_input
2484 }
2485
2486 async fn send_end_of_publish_if_needed(&self) {
2487 if !self.epoch_store.should_send_end_of_publish() {
2488 return;
2489 }
2490
2491 let end_of_publish = ConsensusTransaction::new_end_of_publish(self.epoch_store.name);
2492 if let Err(err) =
2493 self.consensus_adapter
2494 .submit(end_of_publish, None, &self.epoch_store, None, None)
2495 {
2496 warn!(
2497 "Error when sending EndOfPublish message from ConsensusHandler: {:?}",
2498 err
2499 );
2500 } else {
2501 info!(epoch=?self.epoch_store.epoch(), "Sending EndOfPublish message to consensus");
2502 }
2503 }
2504}
2505
2506#[derive(Clone)]
2509pub(crate) struct ExecutionSchedulerSender {
2510 sender: monitored_mpsc::UnboundedSender<(
2512 Vec<Schedulable>,
2513 AssignedTxAndVersions,
2514 SchedulingSource,
2515 )>,
2516}
2517
2518impl ExecutionSchedulerSender {
2519 fn start(
2520 execution_scheduler: Arc<ExecutionScheduler>,
2521 epoch_store: Arc<AuthorityPerEpochStore>,
2522 ) -> Self {
2523 let (sender, recv) = monitored_mpsc::unbounded_channel("execution_scheduler_sender");
2524 spawn_monitored_task!(Self::run(recv, execution_scheduler, epoch_store));
2525 Self { sender }
2526 }
2527
2528 pub(crate) fn new_for_testing(
2529 sender: monitored_mpsc::UnboundedSender<(
2530 Vec<Schedulable>,
2531 AssignedTxAndVersions,
2532 SchedulingSource,
2533 )>,
2534 ) -> Self {
2535 Self { sender }
2536 }
2537
2538 fn send(
2539 &self,
2540 transactions: Vec<Schedulable>,
2541 assigned_versions: AssignedTxAndVersions,
2542 scheduling_source: SchedulingSource,
2543 ) {
2544 let _ = self
2545 .sender
2546 .send((transactions, assigned_versions, scheduling_source));
2547 }
2548
2549 async fn run(
2550 mut recv: monitored_mpsc::UnboundedReceiver<(
2551 Vec<Schedulable>,
2552 AssignedTxAndVersions,
2553 SchedulingSource,
2554 )>,
2555 execution_scheduler: Arc<ExecutionScheduler>,
2556 epoch_store: Arc<AuthorityPerEpochStore>,
2557 ) {
2558 while let Some((transactions, assigned_versions, scheduling_source)) = recv.recv().await {
2559 let _guard = monitored_scope("ConsensusHandler::enqueue");
2560 let assigned_versions = assigned_versions.into_map();
2561 let txns = transactions
2562 .into_iter()
2563 .map(|txn| {
2564 let key = txn.key();
2565 (
2566 txn,
2567 ExecutionEnv::new()
2568 .with_scheduling_source(scheduling_source)
2569 .with_assigned_versions(
2570 assigned_versions.get(&key).cloned().unwrap_or_default(),
2571 ),
2572 )
2573 })
2574 .collect();
2575 execution_scheduler.enqueue(txns, &epoch_store);
2576 }
2577 }
2578}
2579
2580pub(crate) struct MysticetiConsensusHandler {
2582 tasks: JoinSet<()>,
2583}
2584
2585impl MysticetiConsensusHandler {
2586 pub(crate) fn new(
2587 last_processed_commit_at_startup: CommitIndex,
2588 mut consensus_handler: ConsensusHandler<CheckpointService>,
2589 consensus_block_handler: ConsensusBlockHandler,
2590 mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
2591 mut block_receiver: UnboundedReceiver<consensus_core::CertifiedBlocksOutput>,
2592 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
2593 ) -> Self {
2594 let mut tasks = JoinSet::new();
2595 tasks.spawn(monitored_future!(async move {
2596 while let Some(consensus_commit) = commit_receiver.recv().await {
2598 let commit_index = consensus_commit.commit_ref.index;
2599 if commit_index <= last_processed_commit_at_startup {
2600 consensus_handler.handle_prior_consensus_commit(consensus_commit);
2601 } else {
2602 consensus_handler
2603 .handle_consensus_commit(consensus_commit)
2604 .await;
2605 }
2606 commit_consumer_monitor.set_highest_handled_commit(commit_index);
2607 }
2608 }));
2609 if consensus_block_handler.enabled() {
2610 tasks.spawn(monitored_future!(async move {
2611 while let Some(blocks) = block_receiver.recv().await {
2612 consensus_block_handler
2613 .handle_certified_blocks(blocks)
2614 .await;
2615 }
2616 }));
2617 }
2618 Self { tasks }
2619 }
2620
2621 pub(crate) async fn abort(&mut self) {
2622 self.tasks.shutdown().await;
2623 }
2624}
2625
2626fn authenticator_state_update_transaction(
2627 epoch_store: &AuthorityPerEpochStore,
2628 round: u64,
2629 mut new_active_jwks: Vec<ActiveJwk>,
2630) -> VerifiedExecutableTransaction {
2631 let epoch = epoch_store.epoch();
2632 new_active_jwks.sort();
2633
2634 info!("creating authenticator state update transaction");
2635 assert!(epoch_store.authenticator_state_enabled());
2636 let transaction = VerifiedTransaction::new_authenticator_state_update(
2637 epoch,
2638 round,
2639 new_active_jwks,
2640 epoch_store
2641 .epoch_start_config()
2642 .authenticator_obj_initial_shared_version()
2643 .expect("authenticator state obj must exist"),
2644 );
2645 VerifiedExecutableTransaction::new_system(transaction, epoch)
2646}
2647
2648pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
2649 match &transaction.kind {
2650 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
2651 if certificate.is_consensus_tx() {
2652 "shared_certificate"
2653 } else {
2654 "owned_certificate"
2655 }
2656 }
2657 ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
2658 ConsensusTransactionKind::CheckpointSignatureV2(_) => "checkpoint_signature",
2659 ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
2660 ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
2661 ConsensusTransactionKind::CapabilityNotificationV2(_) => "capability_notification_v2",
2662 ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
2663 ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
2664 ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
2665 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
2666 ConsensusTransactionKind::UserTransaction(tx) => {
2667 if tx.is_consensus_tx() {
2668 "shared_user_transaction"
2669 } else {
2670 "owned_user_transaction"
2671 }
2672 }
2673 ConsensusTransactionKind::UserTransactionV2(tx) => {
2674 if tx.tx().is_consensus_tx() {
2675 "shared_user_transaction_v2"
2676 } else {
2677 "owned_user_transaction_v2"
2678 }
2679 }
2680 ConsensusTransactionKind::ExecutionTimeObservation(_) => "execution_time_observation",
2681 }
2682}
2683
2684#[derive(Debug, Clone, Serialize, Deserialize)]
2685pub struct SequencedConsensusTransaction {
2686 pub certificate_author_index: AuthorityIndex,
2687 pub certificate_author: AuthorityName,
2688 pub consensus_index: ExecutionIndices,
2689 pub transaction: SequencedConsensusTransactionKind,
2690}
2691
2692#[derive(Debug, Clone)]
2693#[allow(clippy::large_enum_variant)]
2694pub enum SequencedConsensusTransactionKind {
2695 External(ConsensusTransaction),
2696 System(VerifiedExecutableTransaction),
2697}
2698
2699impl Serialize for SequencedConsensusTransactionKind {
2700 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2701 let serializable = SerializableSequencedConsensusTransactionKind::from(self);
2702 serializable.serialize(serializer)
2703 }
2704}
2705
2706impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
2707 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
2708 let serializable =
2709 SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
2710 Ok(serializable.into())
2711 }
2712}
2713
2714#[derive(Debug, Clone, Serialize, Deserialize)]
2718#[allow(clippy::large_enum_variant)]
2719enum SerializableSequencedConsensusTransactionKind {
2720 External(ConsensusTransaction),
2721 System(TrustedExecutableTransaction),
2722}
2723
2724impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
2725 fn from(kind: &SequencedConsensusTransactionKind) -> Self {
2726 match kind {
2727 SequencedConsensusTransactionKind::External(ext) => {
2728 SerializableSequencedConsensusTransactionKind::External(ext.clone())
2729 }
2730 SequencedConsensusTransactionKind::System(txn) => {
2731 SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
2732 }
2733 }
2734 }
2735}
2736
2737impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
2738 fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
2739 match kind {
2740 SerializableSequencedConsensusTransactionKind::External(ext) => {
2741 SequencedConsensusTransactionKind::External(ext)
2742 }
2743 SerializableSequencedConsensusTransactionKind::System(txn) => {
2744 SequencedConsensusTransactionKind::System(txn.into())
2745 }
2746 }
2747 }
2748}
2749
2750#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
2751pub enum SequencedConsensusTransactionKey {
2752 External(ConsensusTransactionKey),
2753 System(TransactionDigest),
2754}
2755
2756impl SequencedConsensusTransactionKey {
2757 pub fn user_transaction_digest(&self) -> Option<TransactionDigest> {
2758 match self {
2759 SequencedConsensusTransactionKey::External(key) => match key {
2760 ConsensusTransactionKey::Certificate(digest) => Some(*digest),
2761 _ => None,
2762 },
2763 SequencedConsensusTransactionKey::System(_) => None,
2764 }
2765 }
2766}
2767
2768impl SequencedConsensusTransactionKind {
2769 pub fn key(&self) -> SequencedConsensusTransactionKey {
2770 match self {
2771 SequencedConsensusTransactionKind::External(ext) => {
2772 SequencedConsensusTransactionKey::External(ext.key())
2773 }
2774 SequencedConsensusTransactionKind::System(txn) => {
2775 SequencedConsensusTransactionKey::System(*txn.digest())
2776 }
2777 }
2778 }
2779
2780 pub fn get_tracking_id(&self) -> u64 {
2781 match self {
2782 SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
2783 SequencedConsensusTransactionKind::System(_txn) => 0,
2784 }
2785 }
2786
2787 pub fn is_executable_transaction(&self) -> bool {
2788 match self {
2789 SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(),
2790 SequencedConsensusTransactionKind::System(_) => true,
2791 }
2792 }
2793
2794 pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
2795 match self {
2796 SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
2797 ConsensusTransactionKind::CertifiedTransaction(txn) => Some(*txn.digest()),
2798 ConsensusTransactionKind::UserTransaction(txn) => Some(*txn.digest()),
2799 ConsensusTransactionKind::UserTransactionV2(txn) => Some(*txn.tx().digest()),
2800 _ => None,
2801 },
2802 SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
2803 }
2804 }
2805
2806 pub fn is_end_of_publish(&self) -> bool {
2807 match self {
2808 SequencedConsensusTransactionKind::External(ext) => {
2809 matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
2810 }
2811 SequencedConsensusTransactionKind::System(_) => false,
2812 }
2813 }
2814}
2815
2816impl SequencedConsensusTransaction {
2817 pub fn sender_authority(&self) -> AuthorityName {
2818 self.certificate_author
2819 }
2820
2821 pub fn key(&self) -> SequencedConsensusTransactionKey {
2822 self.transaction.key()
2823 }
2824
2825 pub fn is_end_of_publish(&self) -> bool {
2826 if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
2827 matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
2828 } else {
2829 false
2830 }
2831 }
2832
2833 pub fn try_take_execution_time_observation(&mut self) -> Option<ExecutionTimeObservation> {
2834 if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
2835 kind: ConsensusTransactionKind::ExecutionTimeObservation(observation),
2836 ..
2837 }) = &mut self.transaction
2838 {
2839 Some(std::mem::take(observation))
2840 } else {
2841 None
2842 }
2843 }
2844
2845 pub fn is_system(&self) -> bool {
2846 matches!(
2847 self.transaction,
2848 SequencedConsensusTransactionKind::System(_)
2849 )
2850 }
2851
2852 pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
2853 if !randomness_state_enabled {
2854 return false;
2857 }
2858 match &self.transaction {
2859 SequencedConsensusTransactionKind::External(ConsensusTransaction {
2860 kind: ConsensusTransactionKind::CertifiedTransaction(cert),
2861 ..
2862 }) => cert.transaction_data().uses_randomness(),
2863 SequencedConsensusTransactionKind::External(ConsensusTransaction {
2864 kind: ConsensusTransactionKind::UserTransaction(txn),
2865 ..
2866 }) => txn.transaction_data().uses_randomness(),
2867 SequencedConsensusTransactionKind::External(ConsensusTransaction {
2868 kind: ConsensusTransactionKind::UserTransactionV2(txn),
2869 ..
2870 }) => txn.tx().transaction_data().uses_randomness(),
2871 _ => false,
2872 }
2873 }
2874
2875 pub fn as_consensus_txn(&self) -> Option<&SenderSignedData> {
2876 match &self.transaction {
2877 SequencedConsensusTransactionKind::External(ConsensusTransaction {
2878 kind: ConsensusTransactionKind::CertifiedTransaction(certificate),
2879 ..
2880 }) if certificate.is_consensus_tx() => Some(certificate.data()),
2881 SequencedConsensusTransactionKind::External(ConsensusTransaction {
2882 kind: ConsensusTransactionKind::UserTransaction(txn),
2883 ..
2884 }) if txn.is_consensus_tx() => Some(txn.data()),
2885 SequencedConsensusTransactionKind::External(ConsensusTransaction {
2886 kind: ConsensusTransactionKind::UserTransactionV2(txn),
2887 ..
2888 }) if txn.tx().is_consensus_tx() => Some(txn.tx().data()),
2889 SequencedConsensusTransactionKind::System(txn) if txn.is_consensus_tx() => {
2890 Some(txn.data())
2891 }
2892 _ => None,
2893 }
2894 }
2895}
2896
2897#[derive(Debug, Clone, Serialize, Deserialize)]
2898pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
2899
2900#[cfg(test)]
2901impl VerifiedSequencedConsensusTransaction {
2902 pub fn new_test(transaction: ConsensusTransaction) -> Self {
2903 Self(SequencedConsensusTransaction::new_test(transaction))
2904 }
2905}
2906
2907impl SequencedConsensusTransaction {
2908 pub fn new_test(transaction: ConsensusTransaction) -> Self {
2909 Self {
2910 certificate_author_index: 0,
2911 certificate_author: AuthorityName::ZERO,
2912 consensus_index: Default::default(),
2913 transaction: SequencedConsensusTransactionKind::External(transaction),
2914 }
2915 }
2916}
2917
2918pub(crate) struct ConsensusBlockHandler {
2920 enabled: bool,
2922 epoch_store: Arc<AuthorityPerEpochStore>,
2924 execution_scheduler_sender: ExecutionSchedulerSender,
2926 backpressure_subscriber: BackpressureSubscriber,
2928 metrics: Arc<AuthorityMetrics>,
2930}
2931
2932impl ConsensusBlockHandler {
2933 pub fn new(
2934 epoch_store: Arc<AuthorityPerEpochStore>,
2935 execution_scheduler_sender: ExecutionSchedulerSender,
2936 backpressure_subscriber: BackpressureSubscriber,
2937 metrics: Arc<AuthorityMetrics>,
2938 ) -> Self {
2939 Self {
2940 enabled: epoch_store.protocol_config().mysticeti_fastpath()
2944 && !epoch_store.protocol_config().disable_preconsensus_locking(),
2945 epoch_store,
2946 execution_scheduler_sender,
2947 backpressure_subscriber,
2948 metrics,
2949 }
2950 }
2951
2952 pub fn enabled(&self) -> bool {
2953 self.enabled
2954 }
2955
2956 #[instrument(level = "debug", skip_all)]
2957 async fn handle_certified_blocks(&self, blocks_output: CertifiedBlocksOutput) {
2958 self.backpressure_subscriber.await_no_backpressure().await;
2959
2960 let _scope = monitored_scope("ConsensusBlockHandler::handle_certified_blocks");
2961
2962 let reconfiguration_lock = self.epoch_store.get_reconfig_state_read_lock_guard();
2964 if !reconfiguration_lock.should_accept_user_certs() {
2965 debug!(
2966 "Skipping fastpath execution because epoch {} is closing user transactions: {}",
2967 self.epoch_store.epoch(),
2968 blocks_output
2969 .blocks
2970 .iter()
2971 .map(|b| b.block.reference().to_string())
2972 .join(", "),
2973 );
2974 return;
2975 }
2976
2977 self.metrics.consensus_block_handler_block_processed.inc();
2978 let epoch = self.epoch_store.epoch();
2979 let parsed_transactions = blocks_output
2980 .blocks
2981 .into_iter()
2982 .map(|certified_block| {
2983 let block_ref = certified_block.block.reference();
2984 let transactions =
2985 parse_block_transactions(&certified_block.block, &certified_block.rejected);
2986 (block_ref, transactions)
2987 })
2988 .collect::<Vec<_>>();
2989 let mut executable_transactions = vec![];
2990 for (block, transactions) in parsed_transactions.into_iter() {
2991 self.epoch_store.set_consensus_tx_status(
2993 ConsensusPosition::ping(epoch, block),
2994 ConsensusTxStatus::FastpathCertified,
2995 );
2996
2997 for (txn_idx, parsed) in transactions.into_iter().enumerate() {
2998 let position = ConsensusPosition {
2999 epoch,
3000 block,
3001 index: txn_idx as TransactionIndex,
3002 };
3003
3004 let status_str = if parsed.rejected {
3005 "rejected"
3006 } else {
3007 "certified"
3008 };
3009 if let Some(tx) = parsed.transaction.kind.as_user_transaction() {
3010 debug!(
3011 "User Transaction in position: {:} with digest {:} is {:}",
3012 position,
3013 tx.digest(),
3014 status_str
3015 );
3016 } else {
3017 debug!(
3018 "System Transaction in position: {:} is {:}",
3019 position, status_str
3020 );
3021 }
3022
3023 if parsed.rejected {
3024 self.epoch_store
3026 .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
3027 self.metrics
3028 .consensus_block_handler_txn_processed
3029 .with_label_values(&["rejected"])
3030 .inc();
3031 continue;
3032 }
3033
3034 self.metrics
3035 .consensus_block_handler_txn_processed
3036 .with_label_values(&["certified"])
3037 .inc();
3038
3039 if let Some(tx) = parsed.transaction.kind.into_user_transaction() {
3040 if tx.is_consensus_tx() {
3041 continue;
3042 }
3043 self.epoch_store
3045 .set_consensus_tx_status(position, ConsensusTxStatus::FastpathCertified);
3046 let tx = VerifiedTransaction::new_unchecked(tx);
3047 executable_transactions.push(Schedulable::Transaction(
3048 VerifiedExecutableTransaction::new_from_consensus(
3049 tx,
3050 self.epoch_store.epoch(),
3051 ),
3052 ));
3053 }
3054 }
3055 }
3056
3057 if executable_transactions.is_empty() {
3058 return;
3059 }
3060 self.metrics
3061 .consensus_block_handler_fastpath_executions
3062 .inc_by(executable_transactions.len() as u64);
3063
3064 self.execution_scheduler_sender.send(
3065 executable_transactions,
3066 Default::default(),
3067 SchedulingSource::MysticetiFastPath,
3068 );
3069 }
3070}
3071
3072#[derive(Serialize, Deserialize)]
3073pub(crate) struct CommitIntervalObserver {
3074 ring_buffer: VecDeque<u64>,
3075}
3076
3077impl CommitIntervalObserver {
3078 pub fn new(window_size: u32) -> Self {
3079 Self {
3080 ring_buffer: VecDeque::with_capacity(window_size as usize),
3081 }
3082 }
3083
3084 pub fn observe_commit_time(&mut self, consensus_commit: &impl ConsensusCommitAPI) {
3085 let commit_time = consensus_commit.commit_timestamp_ms();
3086 if self.ring_buffer.len() == self.ring_buffer.capacity() {
3087 self.ring_buffer.pop_front();
3088 }
3089 self.ring_buffer.push_back(commit_time);
3090 }
3091
3092 pub fn commit_interval_estimate(&self) -> Option<Duration> {
3093 if self.ring_buffer.len() <= 1 {
3094 None
3095 } else {
3096 let first = self.ring_buffer.front().unwrap();
3097 let last = self.ring_buffer.back().unwrap();
3098 let duration = last.saturating_sub(*first);
3099 let num_commits = self.ring_buffer.len() as u64;
3100 Some(Duration::from_millis(duration.div_ceil(num_commits)))
3101 }
3102 }
3103}
3104
3105#[cfg(test)]
3106mod tests {
3107 use std::collections::HashSet;
3108
3109 use consensus_core::{
3110 BlockAPI, CertifiedBlock, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction,
3111 VerifiedBlock,
3112 };
3113 use consensus_types::block::TransactionIndex;
3114 use futures::pin_mut;
3115 use prometheus::Registry;
3116 use sui_protocol_config::{ConsensusTransactionOrdering, ProtocolConfig};
3117 use sui_types::{
3118 base_types::ExecutionDigests,
3119 base_types::{AuthorityName, FullObjectRef, ObjectID, SuiAddress, random_object_ref},
3120 committee::Committee,
3121 crypto::deterministic_random_account_key,
3122 gas::GasCostSummary,
3123 message_envelope::Message,
3124 messages_checkpoint::{
3125 CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
3126 SignedCheckpointSummary,
3127 },
3128 messages_consensus::ConsensusTransaction,
3129 object::Object,
3130 transaction::{
3131 CertifiedTransaction, SenderSignedData, TransactionData, TransactionDataAPI,
3132 VerifiedCertificate,
3133 },
3134 };
3135
3136 use super::*;
3137 use crate::{
3138 authority::{
3139 authority_per_epoch_store::ConsensusStatsAPI,
3140 test_authority_builder::TestAuthorityBuilder,
3141 },
3142 checkpoints::CheckpointServiceNoop,
3143 consensus_adapter::consensus_tests::test_user_transaction,
3144 consensus_test_utils::make_consensus_adapter_for_test,
3145 post_consensus_tx_reorder::PostConsensusTxReorder,
3146 };
3147
3148 #[tokio::test(flavor = "current_thread", start_paused = true)]
3149 async fn test_consensus_commit_handler() {
3150 telemetry_subscribers::init_for_testing();
3151
3152 let (sender, keypair) = deterministic_random_account_key();
3155 let gas_objects: Vec<Object> = (0..12)
3157 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3158 .collect();
3159 let owned_objects: Vec<Object> = (0..4)
3161 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3162 .collect();
3163 let shared_objects: Vec<Object> = (0..6)
3165 .map(|_| Object::shared_for_testing())
3166 .collect::<Vec<_>>();
3167 let mut all_objects = gas_objects.clone();
3168 all_objects.extend(owned_objects.clone());
3169 all_objects.extend(shared_objects.clone());
3170
3171 let network_config =
3172 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3173 .with_objects(all_objects.clone())
3174 .build();
3175
3176 let state = TestAuthorityBuilder::new()
3177 .with_network_config(&network_config, 0)
3178 .build()
3179 .await;
3180
3181 let epoch_store = state.epoch_store_for_testing().clone();
3182 let new_epoch_start_state = epoch_store.epoch_start_state();
3183 let consensus_committee = new_epoch_start_state.get_consensus_committee();
3184
3185 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3186
3187 let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
3188
3189 let backpressure_manager = BackpressureManager::new_for_tests();
3190 let consensus_adapter =
3191 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3192 let mut consensus_handler = ConsensusHandler::new(
3193 epoch_store,
3194 Arc::new(CheckpointServiceNoop {}),
3195 state.execution_scheduler().clone(),
3196 consensus_adapter,
3197 state.get_object_cache_reader().clone(),
3198 Arc::new(ArcSwap::default()),
3199 consensus_committee.clone(),
3200 metrics,
3201 Arc::new(throughput_calculator),
3202 backpressure_manager.subscribe(),
3203 state.traffic_controller.clone(),
3204 );
3205
3206 let mut user_transactions = vec![];
3208 for (i, gas_object) in gas_objects[0..8].iter().enumerate() {
3209 let input_object = if i % 2 == 0 {
3210 owned_objects.get(i / 2).unwrap().clone()
3211 } else {
3212 shared_objects.get(i / 2).unwrap().clone()
3213 };
3214 let transaction = test_user_transaction(
3215 &state,
3216 sender,
3217 &keypair,
3218 gas_object.clone(),
3219 vec![input_object],
3220 )
3221 .await;
3222 user_transactions.push(transaction);
3223 }
3224
3225 for (i, gas_object) in gas_objects[8..12].iter().enumerate() {
3228 let shared_object = if i < 2 {
3229 shared_objects[4].clone()
3230 } else {
3231 shared_objects[5].clone()
3232 };
3233 let transaction = test_user_transaction(
3234 &state,
3235 sender,
3236 &keypair,
3237 gas_object.clone(),
3238 vec![shared_object],
3239 )
3240 .await;
3241 user_transactions.push(transaction);
3242 }
3243
3244 let mut blocks = Vec::new();
3246 for (i, consensus_transaction) in user_transactions
3247 .iter()
3248 .cloned()
3249 .map(|t| ConsensusTransaction::new_user_transaction_v2_message(&state.name, t.into()))
3250 .enumerate()
3251 {
3252 let transaction_bytes = bcs::to_bytes(&consensus_transaction).unwrap();
3253 let block = VerifiedBlock::new_for_test(
3254 TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
3255 .set_transactions(vec![Transaction::new(transaction_bytes)])
3256 .build(),
3257 );
3258
3259 blocks.push(block);
3260 }
3261
3262 let leader_block = blocks[0].clone();
3264 let committed_sub_dag = CommittedSubDag::new(
3265 leader_block.reference(),
3266 blocks.clone(),
3267 leader_block.timestamp_ms(),
3268 CommitRef::new(10, CommitDigest::MIN),
3269 );
3270
3271 backpressure_manager.set_backpressure(true);
3273 backpressure_manager.update_highest_certified_checkpoint(1);
3275
3276 {
3278 let waiter = consensus_handler.handle_consensus_commit(committed_sub_dag.clone());
3279 pin_mut!(waiter);
3280
3281 tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
3283 .await
3284 .unwrap_err();
3285
3286 backpressure_manager.set_backpressure(false);
3288
3289 tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
3291 .await
3292 .unwrap();
3293 }
3294
3295 let num_blocks = blocks.len();
3297 let num_transactions = user_transactions.len();
3298 let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
3299 assert_eq!(
3300 last_consensus_stats_1.index.transaction_index,
3301 num_transactions as u64
3302 );
3303 assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
3304 assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
3305 assert_eq!(last_consensus_stats_1.hash, 0);
3306 assert_eq!(
3307 last_consensus_stats_1.stats.get_num_messages(0),
3308 num_blocks as u64
3309 );
3310 assert_eq!(
3311 last_consensus_stats_1.stats.get_num_user_transactions(0),
3312 num_transactions as u64
3313 );
3314
3315 for (i, t) in user_transactions.iter().enumerate() {
3317 let digest = t.tx().digest();
3318 if let Ok(Ok(_)) = tokio::time::timeout(
3319 std::time::Duration::from_secs(10),
3320 state.notify_read_effects("", *digest),
3321 )
3322 .await
3323 {
3324 } else {
3326 panic!("User transaction {} {} did not execute", i, digest);
3327 }
3328 }
3329
3330 state.execution_scheduler().check_empty_for_testing();
3332 }
3333
3334 #[tokio::test]
3335 async fn test_consensus_block_handler() {
3336 let (sender, keypair) = deterministic_random_account_key();
3339 let gas_objects: Vec<Object> = (0..8)
3341 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3342 .collect();
3343 let owned_objects: Vec<Object> = (0..4)
3345 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3346 .collect();
3347 let shared_objects: Vec<Object> = (0..4)
3349 .map(|_| Object::shared_for_testing())
3350 .collect::<Vec<_>>();
3351 let mut all_objects = gas_objects.clone();
3352 all_objects.extend(owned_objects.clone());
3353 all_objects.extend(shared_objects.clone());
3354
3355 let network_config =
3356 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3357 .with_objects(all_objects.clone())
3358 .build();
3359
3360 let state = TestAuthorityBuilder::new()
3361 .with_network_config(&network_config, 0)
3362 .build()
3363 .await;
3364 let epoch_store = state.epoch_store_for_testing().clone();
3365 let execution_scheduler_sender = ExecutionSchedulerSender::start(
3366 state.execution_scheduler().clone(),
3367 epoch_store.clone(),
3368 );
3369
3370 let backpressure_manager = BackpressureManager::new_for_tests();
3371 let block_handler = ConsensusBlockHandler::new(
3372 epoch_store.clone(),
3373 execution_scheduler_sender,
3374 backpressure_manager.subscribe(),
3375 state.metrics.clone(),
3376 );
3377
3378 let mut transactions = vec![];
3380 for (i, gas_object) in gas_objects.iter().enumerate() {
3381 let input_object = if i % 2 == 0 {
3382 owned_objects.get(i / 2).unwrap().clone()
3383 } else {
3384 shared_objects.get(i / 2).unwrap().clone()
3385 };
3386 let transaction = test_user_transaction(
3387 &state,
3388 sender,
3389 &keypair,
3390 gas_object.clone(),
3391 vec![input_object],
3392 )
3393 .await;
3394 transactions.push(transaction);
3395 }
3396
3397 let serialized_transactions: Vec<_> = transactions
3398 .iter()
3399 .cloned()
3400 .map(|t| {
3401 Transaction::new(
3402 bcs::to_bytes(&ConsensusTransaction::new_user_transaction_v2_message(
3403 &state.name,
3404 t.into(),
3405 ))
3406 .unwrap(),
3407 )
3408 })
3409 .collect();
3410
3411 let block = VerifiedBlock::new_for_test(
3413 TestBlock::new(100, 1)
3414 .set_transactions(serialized_transactions.clone())
3415 .build(),
3416 );
3417
3418 let rejected_transactions = vec![0, 3, 4];
3420
3421 block_handler
3423 .handle_certified_blocks(CertifiedBlocksOutput {
3424 blocks: vec![CertifiedBlock {
3425 block: block.clone(),
3426 rejected: rejected_transactions.clone(),
3427 }],
3428 })
3429 .await;
3430
3431 let consensus_tx_status_cache = epoch_store.consensus_tx_status_cache.as_ref().unwrap();
3433 for txn_idx in 0..transactions.len() {
3434 let position = ConsensusPosition {
3435 epoch: epoch_store.epoch(),
3436 block: block.reference(),
3437 index: txn_idx as TransactionIndex,
3438 };
3439 if rejected_transactions.contains(&(txn_idx as TransactionIndex)) {
3440 assert_eq!(
3442 consensus_tx_status_cache.get_transaction_status(&position),
3443 Some(ConsensusTxStatus::Rejected)
3444 );
3445 } else if txn_idx % 2 == 0 {
3446 assert_eq!(
3448 consensus_tx_status_cache.get_transaction_status(&position),
3449 Some(ConsensusTxStatus::FastpathCertified),
3450 );
3451 } else {
3452 assert_eq!(
3454 consensus_tx_status_cache.get_transaction_status(&position),
3455 None,
3456 );
3457 }
3458 }
3459
3460 for (i, t) in transactions.iter().enumerate() {
3462 if i % 2 == 1 || rejected_transactions.contains(&(i as TransactionIndex)) {
3464 continue;
3465 }
3466 let digest = t.tx().digest();
3467 if tokio::time::timeout(
3468 std::time::Duration::from_secs(10),
3469 state
3470 .get_transaction_cache_reader()
3471 .notify_read_fastpath_transaction_outputs(&[*digest]),
3472 )
3473 .await
3474 .is_err()
3475 {
3476 panic!("Transaction {} {} did not execute", i, digest);
3477 }
3478 }
3479
3480 state.execution_scheduler().check_empty_for_testing();
3482
3483 for (i, t) in transactions.iter().enumerate() {
3485 if i % 2 == 0 && !rejected_transactions.contains(&(i as TransactionIndex)) {
3487 continue;
3488 }
3489 let digest = t.tx().digest();
3490 assert!(
3491 !state.is_tx_already_executed(digest),
3492 "Rejected transaction {} {} should not have been executed",
3493 i,
3494 digest
3495 );
3496 }
3497 }
3498
3499 fn to_short_strings(txs: Vec<VerifiedExecutableTransactionWithAliases>) -> Vec<String> {
3500 txs.into_iter()
3501 .map(|tx| format!("transaction({})", tx.tx().transaction_data().gas_price()))
3502 .collect()
3503 }
3504
3505 #[test]
3506 fn test_order_by_gas_price() {
3507 let mut v = vec![user_txn(42), user_txn(100)];
3508 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3509 assert_eq!(
3510 to_short_strings(v),
3511 vec![
3512 "transaction(100)".to_string(),
3513 "transaction(42)".to_string(),
3514 ]
3515 );
3516
3517 let mut v = vec![
3518 user_txn(1200),
3519 user_txn(12),
3520 user_txn(1000),
3521 user_txn(42),
3522 user_txn(100),
3523 user_txn(1000),
3524 ];
3525 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3526 assert_eq!(
3527 to_short_strings(v),
3528 vec![
3529 "transaction(1200)".to_string(),
3530 "transaction(1000)".to_string(),
3531 "transaction(1000)".to_string(),
3532 "transaction(100)".to_string(),
3533 "transaction(42)".to_string(),
3534 "transaction(12)".to_string(),
3535 ]
3536 );
3537 }
3538
3539 #[tokio::test(flavor = "current_thread")]
3540 async fn test_checkpoint_signature_dedup() {
3541 telemetry_subscribers::init_for_testing();
3542
3543 let network_config =
3544 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3545 let state = TestAuthorityBuilder::new()
3546 .with_network_config(&network_config, 0)
3547 .build()
3548 .await;
3549
3550 let epoch_store = state.epoch_store_for_testing().clone();
3551 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3552
3553 let make_signed = || {
3554 let epoch = epoch_store.epoch();
3555 let contents =
3556 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3557 let summary = CheckpointSummary::new(
3558 &ProtocolConfig::get_for_max_version_UNSAFE(),
3559 epoch,
3560 42, 10, &contents,
3563 None, GasCostSummary::default(),
3565 None, 0, Vec::new(), Vec::new(), );
3570 SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name)
3571 };
3572
3573 let v2_s1 = make_signed();
3575 let v2_s1_clone = v2_s1.clone();
3576 let v2_digest_a = v2_s1.data().digest();
3577 let v2_a =
3578 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3579 summary: v2_s1,
3580 });
3581
3582 let v2_s2 = make_signed();
3583 let v2_digest_b = v2_s2.data().digest();
3584 let v2_b =
3585 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3586 summary: v2_s2,
3587 });
3588
3589 assert_ne!(v2_digest_a, v2_digest_b);
3590
3591 assert_eq!(v2_s1_clone.data().digest(), v2_digest_a);
3593 let v2_dup =
3594 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3595 summary: v2_s1_clone,
3596 });
3597
3598 let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3599 let block = VerifiedBlock::new_for_test(
3600 TestBlock::new(100, 0)
3601 .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)])
3602 .build(),
3603 );
3604 let commit = CommittedSubDag::new(
3605 block.reference(),
3606 vec![block.clone()],
3607 block.timestamp_ms(),
3608 CommitRef::new(10, CommitDigest::MIN),
3609 );
3610
3611 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3612 let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3613 let backpressure = BackpressureManager::new_for_tests();
3614 let consensus_adapter =
3615 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3616 let mut handler = ConsensusHandler::new(
3617 epoch_store.clone(),
3618 Arc::new(CheckpointServiceNoop {}),
3619 state.execution_scheduler().clone(),
3620 consensus_adapter,
3621 state.get_object_cache_reader().clone(),
3622 Arc::new(ArcSwap::default()),
3623 consensus_committee.clone(),
3624 metrics,
3625 Arc::new(throughput),
3626 backpressure.subscribe(),
3627 state.traffic_controller.clone(),
3628 );
3629
3630 handler.handle_consensus_commit(commit).await;
3631
3632 use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3633 use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3634
3635 let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a));
3637 let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b));
3638 assert!(
3639 epoch_store
3640 .is_consensus_message_processed(&v2_key_a)
3641 .unwrap()
3642 );
3643 assert!(
3644 epoch_store
3645 .is_consensus_message_processed(&v2_key_b)
3646 .unwrap()
3647 );
3648 }
3649
3650 #[tokio::test(flavor = "current_thread")]
3651 async fn test_verify_consensus_transaction_filters_mismatched_authorities() {
3652 telemetry_subscribers::init_for_testing();
3653
3654 let network_config =
3655 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3656 let state = TestAuthorityBuilder::new()
3657 .with_network_config(&network_config, 0)
3658 .build()
3659 .await;
3660
3661 let epoch_store = state.epoch_store_for_testing().clone();
3662 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3663
3664 use fastcrypto::traits::KeyPair;
3666 let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair();
3667 let wrong_authority: AuthorityName = wrong_keypair.public().into();
3668
3669 let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority);
3671
3672 let valid_eop = ConsensusTransaction::new_end_of_publish(state.name);
3674
3675 let epoch = epoch_store.epoch();
3677 let contents =
3678 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3679 let summary = CheckpointSummary::new(
3680 &ProtocolConfig::get_for_max_version_UNSAFE(),
3681 epoch,
3682 42, 10, &contents,
3685 None, GasCostSummary::default(),
3687 None, 0, Vec::new(), Vec::new(), );
3692
3693 let mismatched_checkpoint_signed =
3695 SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority);
3696 let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest();
3697 let mismatched_checkpoint =
3698 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3699 summary: mismatched_checkpoint_signed,
3700 });
3701
3702 let valid_checkpoint_signed =
3704 SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name);
3705 let valid_checkpoint_digest = valid_checkpoint_signed.data().digest();
3706 let valid_checkpoint =
3707 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3708 summary: valid_checkpoint_signed,
3709 });
3710
3711 let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3712
3713 let block = VerifiedBlock::new_for_test(
3715 TestBlock::new(100, 0)
3716 .set_transactions(vec![
3717 to_tx(&mismatched_eop),
3718 to_tx(&valid_eop),
3719 to_tx(&mismatched_checkpoint),
3720 to_tx(&valid_checkpoint),
3721 ])
3722 .build(),
3723 );
3724 let commit = CommittedSubDag::new(
3725 block.reference(),
3726 vec![block.clone()],
3727 block.timestamp_ms(),
3728 CommitRef::new(10, CommitDigest::MIN),
3729 );
3730
3731 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3732 let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3733 let backpressure = BackpressureManager::new_for_tests();
3734 let consensus_adapter =
3735 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3736 let mut handler = ConsensusHandler::new(
3737 epoch_store.clone(),
3738 Arc::new(CheckpointServiceNoop {}),
3739 state.execution_scheduler().clone(),
3740 consensus_adapter,
3741 state.get_object_cache_reader().clone(),
3742 Arc::new(ArcSwap::default()),
3743 consensus_committee.clone(),
3744 metrics,
3745 Arc::new(throughput),
3746 backpressure.subscribe(),
3747 state.traffic_controller.clone(),
3748 );
3749
3750 handler.handle_consensus_commit(commit).await;
3751
3752 use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3753 use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3754
3755 let valid_eop_key = SK::External(CK::EndOfPublish(state.name));
3757 assert!(
3758 epoch_store
3759 .is_consensus_message_processed(&valid_eop_key)
3760 .unwrap(),
3761 "Valid EndOfPublish should have been processed"
3762 );
3763
3764 let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
3765 state.name,
3766 42,
3767 valid_checkpoint_digest,
3768 ));
3769 assert!(
3770 epoch_store
3771 .is_consensus_message_processed(&valid_checkpoint_key)
3772 .unwrap(),
3773 "Valid CheckpointSignature should have been processed"
3774 );
3775
3776 let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority));
3778 assert!(
3779 !epoch_store
3780 .is_consensus_message_processed(&mismatched_eop_key)
3781 .unwrap(),
3782 "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)"
3783 );
3784
3785 let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
3786 wrong_authority,
3787 42,
3788 mismatched_checkpoint_digest,
3789 ));
3790 assert!(
3791 !epoch_store
3792 .is_consensus_message_processed(&mismatched_checkpoint_key)
3793 .unwrap(),
3794 "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)"
3795 );
3796 }
3797
3798 fn user_txn(gas_price: u64) -> VerifiedExecutableTransactionWithAliases {
3799 let (committee, keypairs) = Committee::new_simple_test_committee();
3800 let data = SenderSignedData::new(
3801 TransactionData::new_transfer(
3802 SuiAddress::default(),
3803 FullObjectRef::from_fastpath_ref(random_object_ref()),
3804 SuiAddress::default(),
3805 random_object_ref(),
3806 1000 * gas_price,
3807 gas_price,
3808 ),
3809 vec![],
3810 );
3811 let tx = VerifiedExecutableTransaction::new_from_certificate(
3812 VerifiedCertificate::new_unchecked(
3813 CertifiedTransaction::new_from_keypairs_for_testing(data, &keypairs, &committee),
3814 ),
3815 );
3816 VerifiedExecutableTransactionWithAliases::no_aliases(tx)
3817 }
3818}