1use std::{
5 collections::{BTreeMap, HashMap, HashSet, VecDeque},
6 hash::Hash,
7 num::NonZeroUsize,
8 sync::{Arc, Mutex},
9 time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use arc_swap::ArcSwap;
13use consensus_config::Committee as ConsensusCommittee;
14use consensus_core::{CommitConsumerMonitor, CommitIndex, CommitRef};
15use consensus_types::block::TransactionIndex;
16use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
17use lru::LruCache;
18use mysten_common::{
19 assert_reachable, assert_sometimes, debug_fatal, random_util::randomize_cache_capacity_in_tests,
20};
21use mysten_metrics::{
22 monitored_future,
23 monitored_mpsc::{self, UnboundedReceiver},
24 monitored_scope, spawn_monitored_task,
25};
26use nonempty::NonEmpty;
27use parking_lot::RwLockWriteGuard;
28use serde::{Deserialize, Serialize};
29use sui_config::node::CongestionLogConfig;
30use sui_macros::{fail_point, fail_point_arg, fail_point_if};
31use sui_protocol_config::{PerObjectCongestionControlMode, ProtocolConfig};
32use sui_types::{
33 SUI_RANDOMNESS_STATE_OBJECT_ID,
34 authenticator_state::ActiveJwk,
35 base_types::{
36 AuthorityName, ConciseableName, ConsensusObjectSequenceKey, ObjectID, ObjectRef,
37 SequenceNumber, TransactionDigest,
38 },
39 crypto::RandomnessRound,
40 digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest, Digest},
41 executable_transaction::{
42 TrustedExecutableTransaction, VerifiedExecutableTransaction,
43 VerifiedExecutableTransactionWithAliases,
44 },
45 messages_checkpoint::{
46 CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointTimestamp,
47 },
48 messages_consensus::{
49 AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments,
50 ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
51 ExecutionTimeObservation,
52 },
53 sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
54 transaction::{
55 InputObjectKind, SenderSignedData, TransactionDataAPI, TransactionKey, VerifiedCertificate,
56 VerifiedTransaction, WithAliases,
57 },
58};
59use tokio::task::JoinSet;
60use tracing::{debug, error, info, instrument, trace, warn};
61
62use crate::{
63 authority::{
64 AuthorityMetrics, AuthorityState, ExecutionEnv,
65 authority_per_epoch_store::{
66 AuthorityPerEpochStore, CancelConsensusCertificateReason, ConsensusStats,
67 ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStatsV2,
68 consensus_quarantine::ConsensusCommitOutput,
69 },
70 backpressure::{BackpressureManager, BackpressureSubscriber},
71 congestion_log::CongestionCommitLogger,
72 consensus_tx_status_cache::ConsensusTxStatus,
73 epoch_start_configuration::EpochStartConfigTrait,
74 execution_time_estimator::ExecutionTimeEstimator,
75 shared_object_congestion_tracker::SharedObjectCongestionTracker,
76 shared_object_version_manager::{AssignedTxAndVersions, AssignedVersions, Schedulable},
77 transaction_deferral::{DeferralKey, DeferralReason, transaction_deferral_within_limit},
78 },
79 checkpoints::{
80 CheckpointHeight, CheckpointRoots, CheckpointService, CheckpointServiceNotify,
81 PendingCheckpoint, PendingCheckpointInfo, PendingCheckpointV2,
82 },
83 consensus_adapter::ConsensusAdapter,
84 consensus_throughput_calculator::ConsensusThroughputCalculator,
85 consensus_types::consensus_output_api::ConsensusCommitAPI,
86 epoch::{
87 randomness::{DkgStatus, RandomnessManager},
88 reconfiguration::ReconfigState,
89 },
90 execution_cache::ObjectCacheRead,
91 execution_scheduler::{SettlementBatchInfo, SettlementScheduler},
92 post_consensus_tx_reorder::PostConsensusTxReorder,
93 scoring_decision::update_low_scoring_authorities,
94 traffic_controller::{TrafficController, policies::TrafficTally},
95};
96
97struct FilteredConsensusOutput {
100 transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
101 owned_object_locks: HashMap<ObjectRef, TransactionDigest>,
102}
103
104pub struct ConsensusHandlerInitializer {
105 state: Arc<AuthorityState>,
106 checkpoint_service: Arc<CheckpointService>,
107 epoch_store: Arc<AuthorityPerEpochStore>,
108 consensus_adapter: Arc<ConsensusAdapter>,
109 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
110 throughput_calculator: Arc<ConsensusThroughputCalculator>,
111 backpressure_manager: Arc<BackpressureManager>,
112 congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
113}
114
115impl ConsensusHandlerInitializer {
116 pub fn new(
117 state: Arc<AuthorityState>,
118 checkpoint_service: Arc<CheckpointService>,
119 epoch_store: Arc<AuthorityPerEpochStore>,
120 consensus_adapter: Arc<ConsensusAdapter>,
121 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
122 throughput_calculator: Arc<ConsensusThroughputCalculator>,
123 backpressure_manager: Arc<BackpressureManager>,
124 congestion_log_config: Option<CongestionLogConfig>,
125 ) -> Self {
126 let congestion_logger =
127 congestion_log_config.and_then(|config| match CongestionCommitLogger::new(&config) {
128 Ok(logger) => Some(Arc::new(Mutex::new(logger))),
129 Err(e) => {
130 debug_fatal!("Failed to create congestion logger: {e}");
131 None
132 }
133 });
134 Self {
135 state,
136 checkpoint_service,
137 epoch_store,
138 consensus_adapter,
139 low_scoring_authorities,
140 throughput_calculator,
141 backpressure_manager,
142 congestion_logger,
143 }
144 }
145
146 #[cfg(test)]
147 pub(crate) fn new_for_testing(
148 state: Arc<AuthorityState>,
149 checkpoint_service: Arc<CheckpointService>,
150 ) -> Self {
151 use crate::consensus_test_utils::make_consensus_adapter_for_test;
152 use std::collections::HashSet;
153
154 let backpressure_manager = BackpressureManager::new_for_tests();
155 let consensus_adapter =
156 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
157 Self {
158 state: state.clone(),
159 checkpoint_service,
160 epoch_store: state.epoch_store_for_testing().clone(),
161 consensus_adapter,
162 low_scoring_authorities: Arc::new(Default::default()),
163 throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
164 None,
165 state.metrics.clone(),
166 )),
167 backpressure_manager,
168 congestion_logger: None,
169 }
170 }
171
172 pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
173 let new_epoch_start_state = self.epoch_store.epoch_start_state();
174 let consensus_committee = new_epoch_start_state.get_consensus_committee();
175
176 let settlement_scheduler = SettlementScheduler::new(
177 self.state.execution_scheduler().as_ref().clone(),
178 self.state.get_transaction_cache_reader().clone(),
179 );
180 ConsensusHandler::new(
181 self.epoch_store.clone(),
182 self.checkpoint_service.clone(),
183 settlement_scheduler,
184 self.consensus_adapter.clone(),
185 self.state.get_object_cache_reader().clone(),
186 self.low_scoring_authorities.clone(),
187 consensus_committee,
188 self.state.metrics.clone(),
189 self.throughput_calculator.clone(),
190 self.backpressure_manager.subscribe(),
191 self.state.traffic_controller.clone(),
192 self.congestion_logger.clone(),
193 )
194 }
195}
196
197mod additional_consensus_state {
198 use std::marker::PhantomData;
199
200 use consensus_core::CommitRef;
201 use fastcrypto::hash::HashFunction as _;
202 use sui_types::{crypto::DefaultHash, digests::Digest};
203
204 use super::*;
205 #[derive(Serialize, Deserialize)]
214 pub(super) struct AdditionalConsensusState {
215 commit_interval_observer: CommitIntervalObserver,
216 }
217
218 impl AdditionalConsensusState {
219 pub fn new(additional_consensus_state_window_size: u32) -> Self {
220 Self {
221 commit_interval_observer: CommitIntervalObserver::new(
222 additional_consensus_state_window_size,
223 ),
224 }
225 }
226
227 pub(crate) fn observe_commit(
229 &mut self,
230 protocol_config: &ProtocolConfig,
231 epoch_start_time: u64,
232 consensus_commit: &impl ConsensusCommitAPI,
233 ) -> ConsensusCommitInfo {
234 self.commit_interval_observer
235 .observe_commit_time(consensus_commit);
236
237 let estimated_commit_period = self
238 .commit_interval_observer
239 .commit_interval_estimate()
240 .unwrap_or(Duration::from_millis(
241 protocol_config.min_checkpoint_interval_ms(),
242 ));
243
244 info!("estimated commit rate: {:?}", estimated_commit_period);
245
246 self.commit_info_impl(
247 epoch_start_time,
248 consensus_commit,
249 Some(estimated_commit_period),
250 )
251 }
252
253 fn commit_info_impl(
254 &self,
255 epoch_start_time: u64,
256 consensus_commit: &impl ConsensusCommitAPI,
257 estimated_commit_period: Option<Duration>,
258 ) -> ConsensusCommitInfo {
259 let leader_author = consensus_commit.leader_author_index();
260 let timestamp = consensus_commit.commit_timestamp_ms();
261
262 let timestamp = if timestamp < epoch_start_time {
263 error!(
264 "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}"
265 );
266 epoch_start_time
267 } else {
268 timestamp
269 };
270
271 ConsensusCommitInfo {
272 _phantom: PhantomData,
273 round: consensus_commit.leader_round(),
274 timestamp,
275 leader_author,
276 consensus_commit_ref: consensus_commit.commit_ref(),
277 rejected_transactions_digest: consensus_commit.rejected_transactions_digest(),
278 additional_state_digest: Some(self.digest()),
279 estimated_commit_period,
280 skip_consensus_commit_prologue_in_test: false,
281 }
282 }
283
284 fn digest(&self) -> AdditionalConsensusStateDigest {
286 let mut hash = DefaultHash::new();
287 bcs::serialize_into(&mut hash, self).unwrap();
288 AdditionalConsensusStateDigest::new(hash.finalize().into())
289 }
290 }
291
292 pub struct ConsensusCommitInfo {
293 _phantom: PhantomData<()>,
295
296 pub round: u64,
297 pub timestamp: u64,
298 pub leader_author: AuthorityIndex,
299 pub consensus_commit_ref: CommitRef,
300 pub rejected_transactions_digest: Digest,
301
302 additional_state_digest: Option<AdditionalConsensusStateDigest>,
303 estimated_commit_period: Option<Duration>,
304
305 pub skip_consensus_commit_prologue_in_test: bool,
306 }
307
308 impl ConsensusCommitInfo {
309 pub fn new_for_test(
310 commit_round: u64,
311 commit_timestamp: u64,
312 estimated_commit_period: Option<Duration>,
313 skip_consensus_commit_prologue_in_test: bool,
314 ) -> Self {
315 Self {
316 _phantom: PhantomData,
317 round: commit_round,
318 timestamp: commit_timestamp,
319 leader_author: 0,
320 consensus_commit_ref: CommitRef::default(),
321 rejected_transactions_digest: Digest::default(),
322 additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO),
323 estimated_commit_period,
324 skip_consensus_commit_prologue_in_test,
325 }
326 }
327
328 pub fn new_for_congestion_test(
329 commit_round: u64,
330 commit_timestamp: u64,
331 estimated_commit_period: Duration,
332 ) -> Self {
333 Self::new_for_test(
334 commit_round,
335 commit_timestamp,
336 Some(estimated_commit_period),
337 true,
338 )
339 }
340
341 pub fn additional_state_digest(&self) -> AdditionalConsensusStateDigest {
342 self.additional_state_digest
344 .expect("additional_state_digest is not available")
345 }
346
347 pub fn estimated_commit_period(&self) -> Duration {
348 self.estimated_commit_period
350 .expect("estimated commit period is not available")
351 }
352
353 fn consensus_commit_digest(&self) -> ConsensusCommitDigest {
354 ConsensusCommitDigest::new(self.consensus_commit_ref.digest.into_inner())
355 }
356
357 fn consensus_commit_prologue_transaction(
358 &self,
359 epoch: u64,
360 ) -> VerifiedExecutableTransaction {
361 let transaction = VerifiedTransaction::new_consensus_commit_prologue(
362 epoch,
363 self.round,
364 self.timestamp,
365 );
366 VerifiedExecutableTransaction::new_system(transaction, epoch)
367 }
368
369 fn consensus_commit_prologue_v2_transaction(
370 &self,
371 epoch: u64,
372 ) -> VerifiedExecutableTransaction {
373 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
374 epoch,
375 self.round,
376 self.timestamp,
377 self.consensus_commit_digest(),
378 );
379 VerifiedExecutableTransaction::new_system(transaction, epoch)
380 }
381
382 fn consensus_commit_prologue_v3_transaction(
383 &self,
384 epoch: u64,
385 consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
386 ) -> VerifiedExecutableTransaction {
387 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v3(
388 epoch,
389 self.round,
390 self.timestamp,
391 self.consensus_commit_digest(),
392 consensus_determined_version_assignments,
393 );
394 VerifiedExecutableTransaction::new_system(transaction, epoch)
395 }
396
397 fn consensus_commit_prologue_v4_transaction(
398 &self,
399 epoch: u64,
400 consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
401 additional_state_digest: AdditionalConsensusStateDigest,
402 ) -> VerifiedExecutableTransaction {
403 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v4(
404 epoch,
405 self.round,
406 self.timestamp,
407 self.consensus_commit_digest(),
408 consensus_determined_version_assignments,
409 additional_state_digest,
410 );
411 VerifiedExecutableTransaction::new_system(transaction, epoch)
412 }
413
414 pub fn create_consensus_commit_prologue_transaction(
415 &self,
416 epoch: u64,
417 protocol_config: &ProtocolConfig,
418 cancelled_txn_version_assignment: Vec<(
419 TransactionDigest,
420 Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
421 )>,
422 commit_info: &ConsensusCommitInfo,
423 indirect_state_observer: IndirectStateObserver,
424 ) -> VerifiedExecutableTransaction {
425 let version_assignments = if protocol_config
426 .record_consensus_determined_version_assignments_in_prologue_v2()
427 {
428 Some(
429 ConsensusDeterminedVersionAssignments::CancelledTransactionsV2(
430 cancelled_txn_version_assignment,
431 ),
432 )
433 } else if protocol_config.record_consensus_determined_version_assignments_in_prologue()
434 {
435 Some(
436 ConsensusDeterminedVersionAssignments::CancelledTransactions(
437 cancelled_txn_version_assignment
438 .into_iter()
439 .map(|(tx_digest, versions)| {
440 (
441 tx_digest,
442 versions.into_iter().map(|(id, v)| (id.0, v)).collect(),
443 )
444 })
445 .collect(),
446 ),
447 )
448 } else {
449 None
450 };
451
452 if protocol_config.record_additional_state_digest_in_prologue() {
453 let additional_state_digest =
454 if protocol_config.additional_consensus_digest_indirect_state() {
455 let d1 = commit_info.additional_state_digest();
456 indirect_state_observer.fold_with(d1)
457 } else {
458 commit_info.additional_state_digest()
459 };
460
461 self.consensus_commit_prologue_v4_transaction(
462 epoch,
463 version_assignments.unwrap(),
464 additional_state_digest,
465 )
466 } else if let Some(version_assignments) = version_assignments {
467 self.consensus_commit_prologue_v3_transaction(epoch, version_assignments)
468 } else if protocol_config.include_consensus_digest_in_prologue() {
469 self.consensus_commit_prologue_v2_transaction(epoch)
470 } else {
471 self.consensus_commit_prologue_transaction(epoch)
472 }
473 }
474 }
475
476 #[derive(Default)]
477 pub struct IndirectStateObserver {
478 hash: DefaultHash,
479 }
480
481 impl IndirectStateObserver {
482 pub fn new() -> Self {
483 Self::default()
484 }
485
486 pub fn observe_indirect_state<T: Serialize>(&mut self, state: &T) {
487 bcs::serialize_into(&mut self.hash, state).unwrap();
488 }
489
490 pub fn fold_with(
491 self,
492 d1: AdditionalConsensusStateDigest,
493 ) -> AdditionalConsensusStateDigest {
494 let hash = self.hash.finalize();
495 let d2 = AdditionalConsensusStateDigest::new(hash.into());
496
497 let mut hasher = DefaultHash::new();
498 bcs::serialize_into(&mut hasher, &d1).unwrap();
499 bcs::serialize_into(&mut hasher, &d2).unwrap();
500 AdditionalConsensusStateDigest::new(hasher.finalize().into())
501 }
502 }
503
504 #[test]
505 fn test_additional_consensus_state() {
506 use crate::consensus_test_utils::TestConsensusCommit;
507
508 fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) {
509 let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
510 state.observe_commit(
511 &protocol_config,
512 100,
513 &TestConsensusCommit::empty(round, timestamp, 0),
514 );
515 }
516
517 let mut s1 = AdditionalConsensusState::new(3);
518 observe(&mut s1, 1, 1000);
519 observe(&mut s1, 2, 2000);
520 observe(&mut s1, 3, 3000);
521 observe(&mut s1, 4, 4000);
522
523 let mut s2 = AdditionalConsensusState::new(3);
524 observe(&mut s2, 2, 2000);
527 observe(&mut s2, 3, 3000);
528 observe(&mut s2, 4, 4000);
529
530 assert_eq!(s1.digest(), s2.digest());
531
532 observe(&mut s1, 5, 5000);
533 observe(&mut s2, 5, 5000);
534
535 assert_eq!(s1.digest(), s2.digest());
536 }
537}
538use additional_consensus_state::AdditionalConsensusState;
539pub(crate) use additional_consensus_state::{ConsensusCommitInfo, IndirectStateObserver};
540
541struct QueuedCheckpointRoots {
542 roots: CheckpointRoots,
543 timestamp: CheckpointTimestamp,
544 consensus_commit_ref: CommitRef,
545 rejected_transactions_digest: Digest,
546}
547
548struct Chunk<
549 T: crate::authority::shared_object_version_manager::AsTx = VerifiedExecutableTransaction,
550> {
551 schedulables: Vec<Schedulable<T>>,
552 settlement: Option<Schedulable<T>>,
553 height: CheckpointHeight,
554}
555
556impl<T: crate::authority::shared_object_version_manager::AsTx + Clone> Chunk<T> {
557 fn all_schedulables(&self) -> impl Iterator<Item = &Schedulable<T>> + Clone {
558 self.schedulables.iter().chain(self.settlement.iter())
559 }
560
561 fn all_schedulables_from(chunks: &[Self]) -> impl Iterator<Item = &Schedulable<T>> + Clone {
562 chunks.iter().flat_map(|c| c.all_schedulables())
563 }
564
565 fn to_checkpoint_roots(&self) -> CheckpointRoots {
566 let tx_roots: Vec<_> = self.schedulables.iter().map(|s| s.key()).collect();
567 let settlement_root = self.settlement.as_ref().map(|s| s.key());
568 CheckpointRoots {
569 tx_roots,
570 settlement_root,
571 height: self.height,
572 }
573 }
574}
575
576impl From<Chunk<VerifiedExecutableTransactionWithAliases>> for Chunk {
577 fn from(chunk: Chunk<VerifiedExecutableTransactionWithAliases>) -> Self {
578 Chunk {
579 schedulables: chunk.schedulables.into_iter().map(|s| s.into()).collect(),
580 settlement: chunk.settlement.map(|s| s.into()),
581 height: chunk.height,
582 }
583 }
584}
585
586pub(crate) struct CheckpointQueue {
594 last_built_timestamp: CheckpointTimestamp,
595 pending_roots: VecDeque<QueuedCheckpointRoots>,
596 height: u64,
597 pending_tx_count: usize,
598 current_checkpoint_seq: CheckpointSequenceNumber,
599 max_tx: usize,
600 min_checkpoint_interval_ms: u64,
601 execution_scheduler_sender: ExecutionSchedulerSender,
602}
603
604impl CheckpointQueue {
605 pub(crate) fn new(
606 last_built_timestamp: CheckpointTimestamp,
607 checkpoint_height: u64,
608 next_checkpoint_seq: CheckpointSequenceNumber,
609 max_tx: usize,
610 min_checkpoint_interval_ms: u64,
611 execution_scheduler_sender: ExecutionSchedulerSender,
612 ) -> Self {
613 Self {
614 last_built_timestamp,
615 pending_roots: VecDeque::new(),
616 height: checkpoint_height,
617 pending_tx_count: 0,
618 current_checkpoint_seq: next_checkpoint_seq,
619 max_tx,
620 min_checkpoint_interval_ms,
621 execution_scheduler_sender,
622 }
623 }
624
625 #[cfg(test)]
626 fn new_for_testing(
627 last_built_timestamp: CheckpointTimestamp,
628 checkpoint_height: u64,
629 next_checkpoint_seq: CheckpointSequenceNumber,
630 max_tx: usize,
631 min_checkpoint_interval_ms: u64,
632 ) -> Self {
633 let (sender, _receiver) = monitored_mpsc::unbounded_channel("test_checkpoint_queue_sender");
634 Self {
635 last_built_timestamp,
636 pending_roots: VecDeque::new(),
637 height: checkpoint_height,
638 pending_tx_count: 0,
639 current_checkpoint_seq: next_checkpoint_seq,
640 max_tx,
641 min_checkpoint_interval_ms,
642 execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
643 }
644 }
645
646 #[cfg(test)]
647 fn new_for_testing_with_sender(
648 last_built_timestamp: CheckpointTimestamp,
649 checkpoint_height: u64,
650 next_checkpoint_seq: CheckpointSequenceNumber,
651 max_tx: usize,
652 min_checkpoint_interval_ms: u64,
653 sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
654 ) -> Self {
655 Self {
656 last_built_timestamp,
657 pending_roots: VecDeque::new(),
658 height: checkpoint_height,
659 pending_tx_count: 0,
660 current_checkpoint_seq: next_checkpoint_seq,
661 max_tx,
662 min_checkpoint_interval_ms,
663 execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
664 }
665 }
666
667 pub(crate) fn last_built_timestamp(&self) -> CheckpointTimestamp {
668 self.last_built_timestamp
669 }
670
671 pub(crate) fn is_empty(&self) -> bool {
672 self.pending_roots.is_empty()
673 }
674
675 fn next_height(&mut self) -> u64 {
676 self.height += 1;
677 self.height
678 }
679
680 fn push_chunk(
681 &mut self,
682 chunk: Chunk,
683 assigned_versions: &HashMap<TransactionKey, AssignedVersions>,
684 timestamp: CheckpointTimestamp,
685 consensus_commit_ref: CommitRef,
686 rejected_transactions_digest: Digest,
687 ) -> Vec<PendingCheckpointV2> {
688 let max_tx = self.max_tx;
689 let user_tx_count = chunk.schedulables.len();
690
691 let roots = chunk.to_checkpoint_roots();
692
693 let schedulables: Vec<_> = chunk
694 .schedulables
695 .into_iter()
696 .map(|s| {
697 let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
698 (s, versions)
699 })
700 .collect();
701
702 let mut flushed_checkpoints = Vec::new();
703
704 if self.pending_tx_count > 0
705 && self.pending_tx_count + user_tx_count > max_tx
706 && let Some(checkpoint) = self.flush_forced()
707 {
708 flushed_checkpoints.push(checkpoint);
709 }
710
711 let settlement_info = chunk.settlement.as_ref().map(|s| {
712 let settlement_key = s.key();
713 let tx_keys: Vec<_> = schedulables.iter().map(|(s, _)| s.key()).collect();
714 SettlementBatchInfo {
715 settlement_key,
716 tx_keys,
717 checkpoint_height: chunk.height,
718 checkpoint_seq: self.current_checkpoint_seq,
719 assigned_versions: assigned_versions
720 .get(&settlement_key)
721 .cloned()
722 .unwrap_or_default(),
723 }
724 });
725
726 self.execution_scheduler_sender
727 .send(schedulables, settlement_info);
728
729 self.pending_tx_count += user_tx_count;
730 self.pending_roots.push_back(QueuedCheckpointRoots {
731 roots,
732 timestamp,
733 consensus_commit_ref,
734 rejected_transactions_digest,
735 });
736
737 flushed_checkpoints
738 }
739
740 pub(crate) fn flush(
741 &mut self,
742 current_timestamp: CheckpointTimestamp,
743 force: bool,
744 ) -> Option<PendingCheckpointV2> {
745 if !force && current_timestamp < self.last_built_timestamp + self.min_checkpoint_interval_ms
746 {
747 return None;
748 }
749 self.flush_forced()
750 }
751
752 fn flush_forced(&mut self) -> Option<PendingCheckpointV2> {
753 if self.pending_roots.is_empty() {
754 return None;
755 }
756
757 let to_flush: Vec<_> = self.pending_roots.drain(..).collect();
758 let last_root = to_flush.last().unwrap();
759
760 let checkpoint = PendingCheckpointV2 {
761 roots: to_flush.iter().map(|q| q.roots.clone()).collect(),
762 details: PendingCheckpointInfo {
763 timestamp_ms: last_root.timestamp,
764 last_of_epoch: false,
765 checkpoint_height: last_root.roots.height,
766 consensus_commit_ref: last_root.consensus_commit_ref,
767 rejected_transactions_digest: last_root.rejected_transactions_digest,
768 checkpoint_seq: Some(self.current_checkpoint_seq),
769 },
770 };
771
772 self.last_built_timestamp = last_root.timestamp;
773 self.pending_tx_count = 0;
774 self.current_checkpoint_seq += 1;
775
776 Some(checkpoint)
777 }
778
779 pub(crate) fn checkpoint_seq(&self) -> CheckpointSequenceNumber {
780 self.current_checkpoint_seq
781 .checked_sub(1)
782 .expect("checkpoint_seq called before any checkpoint was assigned")
783 }
784}
785
786pub struct ConsensusHandler<C> {
787 epoch_store: Arc<AuthorityPerEpochStore>,
790 last_consensus_stats: ExecutionIndicesWithStatsV2,
794 checkpoint_service: Arc<C>,
795 cache_reader: Arc<dyn ObjectCacheRead>,
797 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
799 committee: ConsensusCommittee,
801 metrics: Arc<AuthorityMetrics>,
804 processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
806 execution_scheduler_sender: ExecutionSchedulerSender,
808 consensus_adapter: Arc<ConsensusAdapter>,
810
811 throughput_calculator: Arc<ConsensusThroughputCalculator>,
813
814 additional_consensus_state: AdditionalConsensusState,
815
816 backpressure_subscriber: BackpressureSubscriber,
817
818 traffic_controller: Option<Arc<TrafficController>>,
819
820 congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
821
822 checkpoint_queue: Mutex<CheckpointQueue>,
823}
824
825const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
826
827impl<C> ConsensusHandler<C> {
828 pub(crate) fn new(
829 epoch_store: Arc<AuthorityPerEpochStore>,
830 checkpoint_service: Arc<C>,
831 settlement_scheduler: SettlementScheduler,
832 consensus_adapter: Arc<ConsensusAdapter>,
833 cache_reader: Arc<dyn ObjectCacheRead>,
834 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
835 committee: ConsensusCommittee,
836 metrics: Arc<AuthorityMetrics>,
837 throughput_calculator: Arc<ConsensusThroughputCalculator>,
838 backpressure_subscriber: BackpressureSubscriber,
839 traffic_controller: Option<Arc<TrafficController>>,
840 congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
841 ) -> Self {
842 assert!(
843 matches!(
844 epoch_store
845 .protocol_config()
846 .per_object_congestion_control_mode(),
847 PerObjectCongestionControlMode::ExecutionTimeEstimate(_)
848 ),
849 "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
850 );
851
852 let mut last_consensus_stats = epoch_store
854 .get_last_consensus_stats()
855 .expect("Should be able to read last consensus index");
856 if !last_consensus_stats.stats.is_initialized() {
858 last_consensus_stats.stats = ConsensusStats::new(committee.size());
859 if epoch_store
860 .protocol_config()
861 .split_checkpoints_in_consensus_handler()
862 {
863 last_consensus_stats.checkpoint_seq = epoch_store.previous_epoch_last_checkpoint();
864 }
865 }
866 let max_tx = epoch_store
867 .protocol_config()
868 .max_transactions_per_checkpoint() as usize;
869 let min_checkpoint_interval_ms = epoch_store
870 .protocol_config()
871 .min_checkpoint_interval_ms_as_option()
872 .unwrap_or_default();
873 let execution_scheduler_sender =
874 ExecutionSchedulerSender::start(settlement_scheduler, epoch_store.clone());
875 let commit_rate_estimate_window_size = epoch_store
876 .protocol_config()
877 .get_consensus_commit_rate_estimation_window_size();
878 let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
879 let checkpoint_height = last_consensus_stats.height;
880 let next_checkpoint_seq = if epoch_store
881 .protocol_config()
882 .split_checkpoints_in_consensus_handler()
883 {
884 last_consensus_stats.checkpoint_seq + 1
885 } else {
886 0
887 };
888 Self {
889 epoch_store,
890 last_consensus_stats,
891 checkpoint_service,
892 cache_reader,
893 low_scoring_authorities,
894 committee,
895 metrics,
896 processed_cache: LruCache::new(
897 NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
898 ),
899 execution_scheduler_sender: execution_scheduler_sender.clone(),
900 consensus_adapter,
901 throughput_calculator,
902 additional_consensus_state: AdditionalConsensusState::new(
903 commit_rate_estimate_window_size,
904 ),
905 backpressure_subscriber,
906 traffic_controller,
907 congestion_logger,
908 checkpoint_queue: Mutex::new(CheckpointQueue::new(
909 last_built_timestamp,
910 checkpoint_height,
911 next_checkpoint_seq,
912 max_tx,
913 min_checkpoint_interval_ms,
914 execution_scheduler_sender,
915 )),
916 }
917 }
918
919 pub(crate) fn last_processed_subdag_index(&self) -> u64 {
921 self.last_consensus_stats.index.sub_dag_index
922 }
923
924 pub(crate) fn new_for_testing(
925 epoch_store: Arc<AuthorityPerEpochStore>,
926 checkpoint_service: Arc<C>,
927 execution_scheduler_sender: ExecutionSchedulerSender,
928 consensus_adapter: Arc<ConsensusAdapter>,
929 cache_reader: Arc<dyn ObjectCacheRead>,
930 low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
931 committee: ConsensusCommittee,
932 metrics: Arc<AuthorityMetrics>,
933 throughput_calculator: Arc<ConsensusThroughputCalculator>,
934 backpressure_subscriber: BackpressureSubscriber,
935 traffic_controller: Option<Arc<TrafficController>>,
936 last_consensus_stats: ExecutionIndicesWithStatsV2,
937 ) -> Self {
938 let commit_rate_estimate_window_size = epoch_store
939 .protocol_config()
940 .get_consensus_commit_rate_estimation_window_size();
941 let max_tx = epoch_store
942 .protocol_config()
943 .max_transactions_per_checkpoint() as usize;
944 let min_checkpoint_interval_ms = epoch_store
945 .protocol_config()
946 .min_checkpoint_interval_ms_as_option()
947 .unwrap_or_default();
948 let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
949 let checkpoint_height = last_consensus_stats.height;
950 Self {
951 epoch_store,
952 last_consensus_stats,
953 checkpoint_service,
954 cache_reader,
955 low_scoring_authorities,
956 committee,
957 metrics,
958 processed_cache: LruCache::new(
959 NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
960 ),
961 execution_scheduler_sender: execution_scheduler_sender.clone(),
962 consensus_adapter,
963 throughput_calculator,
964 additional_consensus_state: AdditionalConsensusState::new(
965 commit_rate_estimate_window_size,
966 ),
967 backpressure_subscriber,
968 traffic_controller,
969 congestion_logger: None,
970 checkpoint_queue: Mutex::new(CheckpointQueue::new(
971 last_built_timestamp,
972 checkpoint_height,
973 0,
974 max_tx,
975 min_checkpoint_interval_ms,
976 execution_scheduler_sender,
977 )),
978 }
979 }
980}
981
982#[derive(Default)]
983struct CommitHandlerInput {
984 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
985 capability_notifications: Vec<AuthorityCapabilitiesV2>,
986 execution_time_observations: Vec<ExecutionTimeObservation>,
987 checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
988 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
989 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
990 end_of_publish_transactions: Vec<AuthorityName>,
991 new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
992}
993
994struct CommitHandlerState {
995 dkg_failed: bool,
996 randomness_round: Option<RandomnessRound>,
997 output: ConsensusCommitOutput,
998 indirect_state_observer: Option<IndirectStateObserver>,
999 initial_reconfig_state: ReconfigState,
1000 occurrence_counts: HashMap<TransactionDigest, u32>,
1002}
1003
1004impl CommitHandlerState {
1005 fn get_notifications(&self) -> Vec<SequencedConsensusTransactionKey> {
1006 self.output
1007 .get_consensus_messages_processed()
1008 .cloned()
1009 .collect()
1010 }
1011
1012 fn init_randomness<'a, 'epoch>(
1013 &'a mut self,
1014 epoch_store: &'epoch AuthorityPerEpochStore,
1015 commit_info: &'a ConsensusCommitInfo,
1016 ) -> Option<tokio::sync::MutexGuard<'epoch, RandomnessManager>> {
1017 let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| {
1018 rm.try_lock()
1019 .expect("should only ever be called from the commit handler thread")
1020 });
1021
1022 let mut dkg_failed = false;
1023 let randomness_round = if epoch_store.randomness_state_enabled() {
1024 let randomness_manager = randomness_manager
1025 .as_mut()
1026 .expect("randomness manager should exist if randomness is enabled");
1027 match randomness_manager.dkg_status() {
1028 DkgStatus::Pending => None,
1029 DkgStatus::Failed => {
1030 dkg_failed = true;
1031 None
1032 }
1033 DkgStatus::Successful => {
1034 if self.initial_reconfig_state.should_accept_tx() {
1037 randomness_manager
1038 .reserve_next_randomness(commit_info.timestamp, &mut self.output)
1040 .expect("epoch ended")
1041 } else {
1042 None
1043 }
1044 }
1045 }
1046 } else {
1047 None
1048 };
1049
1050 if randomness_round.is_some() {
1051 assert!(!dkg_failed); }
1053
1054 self.randomness_round = randomness_round;
1055 self.dkg_failed = dkg_failed;
1056
1057 randomness_manager
1058 }
1059}
1060
1061impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
1062 fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
1066 assert!(
1067 self.epoch_store
1068 .protocol_config()
1069 .record_additional_state_digest_in_prologue()
1070 );
1071 let protocol_config = self.epoch_store.protocol_config();
1072 let epoch_start_time = self
1073 .epoch_store
1074 .epoch_start_config()
1075 .epoch_start_timestamp_ms();
1076
1077 self.additional_consensus_state.observe_commit(
1078 protocol_config,
1079 epoch_start_time,
1080 &consensus_commit,
1081 );
1082 }
1083
1084 #[cfg(test)]
1085 pub(crate) async fn handle_consensus_commit_for_test(
1086 &mut self,
1087 consensus_commit: impl ConsensusCommitAPI,
1088 ) {
1089 self.handle_consensus_commit(consensus_commit).await;
1090 }
1091
1092 #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))]
1093 pub(crate) async fn handle_consensus_commit(
1094 &mut self,
1095 consensus_commit: impl ConsensusCommitAPI,
1096 ) {
1097 {
1098 let protocol_config = self.epoch_store.protocol_config();
1099
1100 assert!(protocol_config.ignore_execution_time_observations_after_certs_closed());
1102 assert!(protocol_config.record_time_estimate_processed());
1103 assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints());
1104 assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest());
1105 assert!(protocol_config.authority_capabilities_v2());
1106 assert!(protocol_config.cancel_for_failed_dkg_early());
1107 }
1108
1109 self.backpressure_subscriber.await_no_backpressure().await;
1114
1115 let epoch = self.epoch_store.epoch();
1116
1117 let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit");
1118
1119 let last_committed_round = self.last_consensus_stats.index.last_committed_round;
1120
1121 if let Some(consensus_tx_status_cache) = self.epoch_store.consensus_tx_status_cache.as_ref()
1122 {
1123 consensus_tx_status_cache
1124 .update_last_committed_leader_round(last_committed_round as u32)
1125 .await;
1126 }
1127 if let Some(tx_reject_reason_cache) = self.epoch_store.tx_reject_reason_cache.as_ref() {
1128 tx_reject_reason_cache.set_last_committed_leader_round(last_committed_round as u32);
1129 }
1130
1131 let commit_info = self.additional_consensus_state.observe_commit(
1132 self.epoch_store.protocol_config(),
1133 self.epoch_store
1134 .epoch_start_config()
1135 .epoch_start_timestamp_ms(),
1136 &consensus_commit,
1137 );
1138 assert!(commit_info.round > last_committed_round);
1139
1140 let (timestamp, leader_author, commit_sub_dag_index) =
1141 self.gather_commit_metadata(&consensus_commit);
1142
1143 info!(
1144 %consensus_commit,
1145 "Received consensus output {}. Rejected transactions {}",
1146 consensus_commit.commit_ref(),
1147 consensus_commit.rejected_transactions_debug_string(),
1148 );
1149
1150 self.last_consensus_stats.index = ExecutionIndices {
1151 last_committed_round: commit_info.round,
1152 sub_dag_index: commit_sub_dag_index,
1153 transaction_index: 0_u64,
1154 };
1155
1156 update_low_scoring_authorities(
1157 self.low_scoring_authorities.clone(),
1158 self.epoch_store.committee(),
1159 &self.committee,
1160 consensus_commit.reputation_score_sorted_desc(),
1161 &self.metrics,
1162 self.epoch_store
1163 .protocol_config()
1164 .consensus_bad_nodes_stake_threshold(),
1165 );
1166
1167 self.metrics
1168 .consensus_committed_subdags
1169 .with_label_values(&[&leader_author.to_string()])
1170 .inc();
1171
1172 let mut state = CommitHandlerState {
1173 output: ConsensusCommitOutput::new(commit_info.round),
1174 dkg_failed: false,
1175 randomness_round: None,
1176 indirect_state_observer: Some(IndirectStateObserver::new()),
1177 initial_reconfig_state: self
1178 .epoch_store
1179 .get_reconfig_state_read_lock_guard()
1180 .clone(),
1181 occurrence_counts: HashMap::new(),
1182 };
1183
1184 let FilteredConsensusOutput {
1185 transactions,
1186 owned_object_locks,
1187 } = self.filter_consensus_txns(
1188 state.initial_reconfig_state.clone(),
1189 &commit_info,
1190 &consensus_commit,
1191 );
1192 if !owned_object_locks.is_empty() {
1194 state.output.set_owned_object_locks(owned_object_locks);
1195 }
1196 let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions);
1197
1198 let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info);
1199
1200 let CommitHandlerInput {
1201 user_transactions,
1202 capability_notifications,
1203 execution_time_observations,
1204 checkpoint_signature_messages,
1205 randomness_dkg_messages,
1206 randomness_dkg_confirmations,
1207 end_of_publish_transactions,
1208 new_jwks,
1209 } = self.build_commit_handler_input(transactions);
1210
1211 self.process_jwks(&mut state, &commit_info, new_jwks);
1212 self.process_capability_notifications(capability_notifications);
1213 self.process_execution_time_observations(&mut state, execution_time_observations);
1214 self.process_checkpoint_signature_messages(checkpoint_signature_messages);
1215
1216 self.process_dkg_updates(
1217 &mut state,
1218 &commit_info,
1219 randomness_manager.as_deref_mut(),
1220 randomness_dkg_messages,
1221 randomness_dkg_confirmations,
1222 )
1223 .await;
1224
1225 let mut execution_time_estimator = self
1226 .epoch_store
1227 .execution_time_estimator
1228 .try_lock()
1229 .expect("should only ever be called from the commit handler thread");
1230
1231 let authenticator_state_update_transaction =
1232 self.create_authenticator_state_update(last_committed_round, &commit_info);
1233
1234 let split_checkpoints = self
1235 .epoch_store
1236 .protocol_config()
1237 .split_checkpoints_in_consensus_handler();
1238
1239 let (lock, final_round, num_schedulables, checkpoint_height) = if !split_checkpoints {
1240 let (schedulables, randomness_schedulables, assigned_versions) = self
1241 .process_transactions(
1242 &mut state,
1243 &mut execution_time_estimator,
1244 &commit_info,
1245 authenticator_state_update_transaction,
1246 user_transactions,
1247 );
1248
1249 let (should_accept_tx, lock, final_round) =
1250 self.handle_eop(&mut state, end_of_publish_transactions);
1251
1252 let make_checkpoint = should_accept_tx || final_round;
1253 if !make_checkpoint {
1254 return;
1256 }
1257
1258 if final_round {
1261 self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1262 }
1263
1264 let checkpoint_height = self
1265 .epoch_store
1266 .calculate_pending_checkpoint_height(commit_info.round);
1267
1268 self.create_pending_checkpoints(
1269 &mut state,
1270 &commit_info,
1271 &schedulables,
1272 &randomness_schedulables,
1273 final_round,
1274 );
1275
1276 let num_schedulables = schedulables.len();
1277 let mut all_schedulables = schedulables;
1278 all_schedulables.extend(randomness_schedulables);
1279 let assigned_versions = assigned_versions.into_map();
1280 let paired: Vec<_> = all_schedulables
1281 .into_iter()
1282 .map(|s| {
1283 let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
1284 (s, versions)
1285 })
1286 .collect();
1287 self.execution_scheduler_sender.send(paired, None);
1288
1289 (lock, final_round, num_schedulables, checkpoint_height)
1290 } else {
1291 let (
1292 transactions_to_schedule,
1293 randomness_transactions_to_schedule,
1294 cancelled_txns,
1295 randomness_state_update_transaction,
1296 ) = self.collect_transactions_to_schedule(
1297 &mut state,
1298 &mut execution_time_estimator,
1299 &commit_info,
1300 user_transactions,
1301 );
1302
1303 let (should_accept_tx, lock, final_round) =
1304 self.handle_eop(&mut state, end_of_publish_transactions);
1305
1306 let make_checkpoint = should_accept_tx || final_round;
1307 if !make_checkpoint {
1308 return;
1310 }
1311
1312 if final_round {
1315 self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1316 }
1317
1318 let epoch = self.epoch_store.epoch();
1319 let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1320 .then_some(Schedulable::ConsensusCommitPrologue(
1321 epoch,
1322 commit_info.round,
1323 commit_info.consensus_commit_ref.index,
1324 ));
1325
1326 let schedulables: Vec<_> = itertools::chain!(
1327 consensus_commit_prologue.into_iter(),
1328 authenticator_state_update_transaction
1329 .into_iter()
1330 .map(Schedulable::Transaction),
1331 transactions_to_schedule
1332 .into_iter()
1333 .map(Schedulable::Transaction),
1334 )
1335 .collect();
1336
1337 let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1338 .into_iter()
1339 .chain(
1340 randomness_transactions_to_schedule
1341 .into_iter()
1342 .map(Schedulable::Transaction),
1343 )
1344 .collect();
1345
1346 let num_schedulables = schedulables.len();
1347 let checkpoint_height = self.create_pending_checkpoints_v2(
1348 &mut state,
1349 &commit_info,
1350 schedulables,
1351 randomness_schedulables,
1352 &cancelled_txns,
1353 final_round,
1354 );
1355
1356 (lock, final_round, num_schedulables, checkpoint_height)
1357 };
1358
1359 let notifications = state.get_notifications();
1360
1361 let mut stats_to_record = self.last_consensus_stats.clone();
1362 stats_to_record.height = checkpoint_height;
1363 if split_checkpoints {
1364 let queue = self.checkpoint_queue.lock().unwrap();
1365 stats_to_record.last_checkpoint_flush_timestamp = queue.last_built_timestamp();
1366 stats_to_record.checkpoint_seq = queue.checkpoint_seq();
1367 }
1368
1369 state.output.record_consensus_commit_stats(stats_to_record);
1370
1371 self.record_deferral_deletion(&mut state);
1372
1373 self.epoch_store
1374 .consensus_quarantine
1375 .write()
1376 .push_consensus_output(state.output, &self.epoch_store)
1377 .expect("push_consensus_output should not fail");
1378
1379 debug!(
1380 ?commit_info.round,
1381 "Notifying checkpoint service about new pending checkpoint(s)",
1382 );
1383 self.checkpoint_service
1384 .notify_checkpoint()
1385 .expect("failed to notify checkpoint service");
1386
1387 if let Some(randomness_round) = state.randomness_round {
1388 randomness_manager
1389 .as_ref()
1390 .expect("randomness manager should exist if randomness round is provided")
1391 .generate_randomness(epoch, randomness_round);
1392 }
1393
1394 self.epoch_store.process_notifications(notifications.iter());
1395
1396 self.log_final_round(lock, final_round);
1398
1399 self.throughput_calculator
1401 .add_transactions(timestamp, num_schedulables as u64);
1402
1403 fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
1404 let key = [commit_sub_dag_index, epoch];
1405 if sui_simulator::random::deterministic_probability_once(&key, 0.01) {
1406 sui_simulator::task::kill_current_node(None);
1407 }
1408 });
1409
1410 fail_point!("crash");
1411
1412 self.send_end_of_publish_if_needed().await;
1413 }
1414
1415 fn handle_eop(
1416 &self,
1417 state: &mut CommitHandlerState,
1418 end_of_publish_transactions: Vec<AuthorityName>,
1419 ) -> (bool, Option<RwLockWriteGuard<'_, ReconfigState>>, bool) {
1420 let collected_eop =
1421 self.process_end_of_publish_transactions(state, end_of_publish_transactions);
1422 if collected_eop {
1423 let (lock, final_round) = self.advance_eop_state_machine(state);
1424 (lock.should_accept_tx(), Some(lock), final_round)
1425 } else {
1426 (true, None, false)
1427 }
1428 }
1429
1430 fn record_end_of_epoch_execution_time_observations(
1431 &self,
1432 estimator: &mut ExecutionTimeEstimator,
1433 ) {
1434 self.epoch_store
1435 .end_of_epoch_execution_time_observations
1436 .set(estimator.take_observations())
1437 .expect("`stored_execution_time_observations` should only be set once at end of epoch");
1438 }
1439
1440 fn record_deferral_deletion(&self, state: &mut CommitHandlerState) {
1441 let mut deferred_transactions = self
1442 .epoch_store
1443 .consensus_output_cache
1444 .deferred_transactions
1445 .lock();
1446 for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() {
1447 deferred_transactions.remove(&deleted_deferred_key);
1448 }
1449 }
1450
1451 fn log_final_round(&self, lock: Option<RwLockWriteGuard<ReconfigState>>, final_round: bool) {
1452 if final_round {
1453 let epoch = self.epoch_store.epoch();
1454 info!(
1455 ?epoch,
1456 lock=?lock.as_ref(),
1457 final_round=?final_round,
1458 "Notified last checkpoint"
1459 );
1460 self.epoch_store.record_end_of_message_quorum_time_metric();
1461 }
1462 }
1463
1464 fn create_pending_checkpoints(
1465 &self,
1466 state: &mut CommitHandlerState,
1467 commit_info: &ConsensusCommitInfo,
1468 schedulables: &[Schedulable],
1469 randomness_schedulables: &[Schedulable],
1470 final_round: bool,
1471 ) {
1472 assert!(
1473 !self
1474 .epoch_store
1475 .protocol_config()
1476 .split_checkpoints_in_consensus_handler()
1477 );
1478
1479 let checkpoint_height = self
1480 .epoch_store
1481 .calculate_pending_checkpoint_height(commit_info.round);
1482
1483 let should_write_random_checkpoint = state.randomness_round.is_some()
1490 || (state.dkg_failed && !randomness_schedulables.is_empty());
1491
1492 let pending_checkpoint = PendingCheckpoint {
1493 roots: schedulables.iter().map(|s| s.key()).collect(),
1494 details: PendingCheckpointInfo {
1495 timestamp_ms: commit_info.timestamp,
1496 last_of_epoch: final_round && !should_write_random_checkpoint,
1497 checkpoint_height,
1498 consensus_commit_ref: commit_info.consensus_commit_ref,
1499 rejected_transactions_digest: commit_info.rejected_transactions_digest,
1500 checkpoint_seq: None,
1501 },
1502 };
1503 self.epoch_store
1504 .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1505 .expect("failed to write pending checkpoint");
1506
1507 info!(
1508 "Written pending checkpoint: {:?}",
1509 pending_checkpoint.details,
1510 );
1511
1512 if should_write_random_checkpoint {
1513 let pending_checkpoint = PendingCheckpoint {
1514 roots: randomness_schedulables.iter().map(|s| s.key()).collect(),
1515 details: PendingCheckpointInfo {
1516 timestamp_ms: commit_info.timestamp,
1517 last_of_epoch: final_round,
1518 checkpoint_height: checkpoint_height + 1,
1519 consensus_commit_ref: commit_info.consensus_commit_ref,
1520 rejected_transactions_digest: commit_info.rejected_transactions_digest,
1521 checkpoint_seq: None,
1522 },
1523 };
1524 self.epoch_store
1525 .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1526 .expect("failed to write pending checkpoint");
1527 }
1528 }
1529
1530 #[allow(clippy::type_complexity)]
1531 fn collect_transactions_to_schedule(
1532 &self,
1533 state: &mut CommitHandlerState,
1534 execution_time_estimator: &mut ExecutionTimeEstimator,
1535 commit_info: &ConsensusCommitInfo,
1536 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1537 ) -> (
1538 Vec<VerifiedExecutableTransactionWithAliases>,
1539 Vec<VerifiedExecutableTransactionWithAliases>,
1540 BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1541 Option<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1542 ) {
1543 let protocol_config = self.epoch_store.protocol_config();
1544 let epoch = self.epoch_store.epoch();
1545
1546 let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) =
1547 self.merge_and_reorder_transactions(state, commit_info, user_transactions);
1548
1549 let mut shared_object_congestion_tracker =
1550 self.init_congestion_tracker(commit_info, false, &ordered_txns);
1551 let mut shared_object_using_randomness_congestion_tracker =
1552 self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns);
1553
1554 let randomness_state_update_transaction = state
1555 .randomness_round
1556 .map(|round| Schedulable::RandomnessStateUpdate(epoch, round));
1557 debug!(
1558 "Randomness state update transaction: {:?}",
1559 randomness_state_update_transaction
1560 .as_ref()
1561 .map(|t| t.key())
1562 );
1563
1564 let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len());
1565 let mut randomness_transactions_to_schedule =
1566 Vec::with_capacity(ordered_randomness_txns.len());
1567 let mut deferred_txns = BTreeMap::new();
1568 let mut cancelled_txns = BTreeMap::new();
1569
1570 for transaction in ordered_txns {
1571 self.handle_deferral_and_cancellation(
1572 state,
1573 &mut cancelled_txns,
1574 &mut deferred_txns,
1575 &mut transactions_to_schedule,
1576 protocol_config,
1577 commit_info,
1578 transaction,
1579 &mut shared_object_congestion_tracker,
1580 &previously_deferred_tx_digests,
1581 execution_time_estimator,
1582 );
1583 }
1584
1585 for transaction in ordered_randomness_txns {
1586 if state.dkg_failed {
1587 debug!(
1588 "Canceling randomness-using transaction {:?} because DKG failed",
1589 transaction.tx().digest(),
1590 );
1591 cancelled_txns.insert(
1592 *transaction.tx().digest(),
1593 CancelConsensusCertificateReason::DkgFailed,
1594 );
1595 randomness_transactions_to_schedule.push(transaction);
1596 continue;
1597 }
1598 self.handle_deferral_and_cancellation(
1599 state,
1600 &mut cancelled_txns,
1601 &mut deferred_txns,
1602 &mut randomness_transactions_to_schedule,
1603 protocol_config,
1604 commit_info,
1605 transaction,
1606 &mut shared_object_using_randomness_congestion_tracker,
1607 &previously_deferred_tx_digests,
1608 execution_time_estimator,
1609 );
1610 }
1611
1612 let mut total_deferred_txns = 0;
1613 {
1614 let mut deferred_transactions = self
1615 .epoch_store
1616 .consensus_output_cache
1617 .deferred_transactions
1618 .lock();
1619 for (key, txns) in deferred_txns.into_iter() {
1620 total_deferred_txns += txns.len();
1621 deferred_transactions.insert(key, txns.clone());
1622 state.output.defer_transactions(key, txns);
1623 }
1624 }
1625
1626 self.metrics
1627 .consensus_handler_deferred_transactions
1628 .inc_by(total_deferred_txns as u64);
1629 self.metrics
1630 .consensus_handler_cancelled_transactions
1631 .inc_by(cancelled_txns.len() as u64);
1632 self.metrics
1633 .consensus_handler_max_object_costs
1634 .with_label_values(&["regular_commit"])
1635 .set(shared_object_congestion_tracker.max_cost() as i64);
1636 self.metrics
1637 .consensus_handler_max_object_costs
1638 .with_label_values(&["randomness_commit"])
1639 .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
1640
1641 let congestion_commit_data = shared_object_congestion_tracker.finish_commit(commit_info);
1642 let randomness_congestion_commit_data =
1643 shared_object_using_randomness_congestion_tracker.finish_commit(commit_info);
1644
1645 if let Some(logger) = &self.congestion_logger {
1646 let epoch = self.epoch_store.epoch();
1647 let mut logger = logger.lock().unwrap();
1648 logger.write_commit_log(epoch, commit_info, false, &congestion_commit_data);
1649 logger.write_commit_log(epoch, commit_info, true, &randomness_congestion_commit_data);
1650 }
1651
1652 if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get()
1653 && let Err(e) = tx_object_debts.try_send(
1654 congestion_commit_data
1655 .accumulated_debts
1656 .iter()
1657 .chain(randomness_congestion_commit_data.accumulated_debts.iter())
1658 .map(|(id, _)| *id)
1659 .collect(),
1660 )
1661 {
1662 info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
1663 }
1664
1665 state
1666 .output
1667 .set_congestion_control_object_debts(congestion_commit_data.accumulated_debts);
1668 state.output.set_congestion_control_randomness_object_debts(
1669 randomness_congestion_commit_data.accumulated_debts,
1670 );
1671
1672 (
1673 transactions_to_schedule,
1674 randomness_transactions_to_schedule,
1675 cancelled_txns,
1676 randomness_state_update_transaction,
1677 )
1678 }
1679
1680 fn process_transactions(
1681 &self,
1682 state: &mut CommitHandlerState,
1683 execution_time_estimator: &mut ExecutionTimeEstimator,
1684 commit_info: &ConsensusCommitInfo,
1685 authenticator_state_update_transaction: Option<VerifiedExecutableTransactionWithAliases>,
1686 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1687 ) -> (Vec<Schedulable>, Vec<Schedulable>, AssignedTxAndVersions) {
1688 let protocol_config = self.epoch_store.protocol_config();
1689 assert!(!protocol_config.split_checkpoints_in_consensus_handler());
1690 let epoch = self.epoch_store.epoch();
1691
1692 let (
1693 transactions_to_schedule,
1694 randomness_transactions_to_schedule,
1695 cancelled_txns,
1696 randomness_state_update_transaction,
1697 ) = self.collect_transactions_to_schedule(
1698 state,
1699 execution_time_estimator,
1700 commit_info,
1701 user_transactions,
1702 );
1703
1704 let mut settlement = None;
1705 let mut randomness_settlement = None;
1706 if self.epoch_store.accumulators_enabled() {
1707 let checkpoint_height = self
1708 .epoch_store
1709 .calculate_pending_checkpoint_height(commit_info.round);
1710
1711 settlement = Some(Schedulable::AccumulatorSettlement(epoch, checkpoint_height));
1712
1713 if state.randomness_round.is_some() || !randomness_transactions_to_schedule.is_empty() {
1714 randomness_settlement = Some(Schedulable::AccumulatorSettlement(
1715 epoch,
1716 checkpoint_height + 1,
1717 ));
1718 }
1719 }
1720
1721 let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1722 .then_some(Schedulable::ConsensusCommitPrologue(
1723 epoch,
1724 commit_info.round,
1725 commit_info.consensus_commit_ref.index,
1726 ));
1727
1728 let schedulables: Vec<_> = itertools::chain!(
1729 consensus_commit_prologue.into_iter(),
1730 authenticator_state_update_transaction
1731 .into_iter()
1732 .map(Schedulable::Transaction),
1733 transactions_to_schedule
1734 .into_iter()
1735 .map(Schedulable::Transaction),
1736 settlement,
1737 )
1738 .collect();
1739
1740 let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1741 .into_iter()
1742 .chain(
1743 randomness_transactions_to_schedule
1744 .into_iter()
1745 .map(Schedulable::Transaction),
1746 )
1747 .chain(randomness_settlement)
1748 .collect();
1749
1750 let assigned_versions = self
1751 .epoch_store
1752 .process_consensus_transaction_shared_object_versions(
1753 self.cache_reader.as_ref(),
1754 schedulables.iter(),
1755 randomness_schedulables.iter(),
1756 &cancelled_txns,
1757 &mut state.output,
1758 )
1759 .expect("failed to assign shared object versions");
1760
1761 let consensus_commit_prologue =
1762 self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1763
1764 let mut schedulables = schedulables;
1765 let mut assigned_versions = assigned_versions;
1766 if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1767 assert!(matches!(
1768 schedulables[0],
1769 Schedulable::ConsensusCommitPrologue(..)
1770 ));
1771 assert!(matches!(
1772 assigned_versions.0[0].0,
1773 TransactionKey::ConsensusCommitPrologue(..)
1774 ));
1775 assigned_versions.0[0].0 =
1776 TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1777 schedulables[0] = Schedulable::Transaction(consensus_commit_prologue);
1778 }
1779
1780 self.epoch_store
1781 .process_user_signatures(schedulables.iter().chain(randomness_schedulables.iter()));
1782
1783 let schedulables: Vec<Schedulable> = schedulables.into_iter().map(|s| s.into()).collect();
1785 let randomness_schedulables: Vec<Schedulable> = randomness_schedulables
1786 .into_iter()
1787 .map(|s| s.into())
1788 .collect();
1789
1790 (schedulables, randomness_schedulables, assigned_versions)
1791 }
1792
1793 #[allow(clippy::type_complexity)]
1794 fn create_pending_checkpoints_v2(
1795 &self,
1796 state: &mut CommitHandlerState,
1797 commit_info: &ConsensusCommitInfo,
1798 schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1799 randomness_schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1800 cancelled_txns: &BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1801 final_round: bool,
1802 ) -> CheckpointHeight {
1803 let protocol_config = self.epoch_store.protocol_config();
1804 assert!(protocol_config.split_checkpoints_in_consensus_handler());
1805
1806 let epoch = self.epoch_store.epoch();
1807 let accumulators_enabled = self.epoch_store.accumulators_enabled();
1808 let max_transactions_per_checkpoint =
1809 protocol_config.max_transactions_per_checkpoint() as usize;
1810
1811 let should_write_random_checkpoint = state.randomness_round.is_some()
1812 || (state.dkg_failed && !randomness_schedulables.is_empty());
1813
1814 let mut checkpoint_queue = self.checkpoint_queue.lock().unwrap();
1815
1816 let build_chunks =
1817 |schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1818 queue: &mut CheckpointQueue|
1819 -> Vec<Chunk<VerifiedExecutableTransactionWithAliases>> {
1820 schedulables
1821 .chunks(max_transactions_per_checkpoint)
1822 .map(|chunk| {
1823 let height = queue.next_height();
1824 let schedulables: Vec<_> = chunk.to_vec();
1825 let settlement = if accumulators_enabled {
1826 Some(Schedulable::AccumulatorSettlement(epoch, height))
1827 } else {
1828 None
1829 };
1830 Chunk {
1831 schedulables,
1832 settlement,
1833 height,
1834 }
1835 })
1836 .collect()
1837 };
1838
1839 let num_schedulables = schedulables.len();
1840 let chunked_schedulables = build_chunks(schedulables, &mut checkpoint_queue);
1841 if chunked_schedulables.len() > 1 {
1842 info!(
1843 "Splitting transactions into {} checkpoint chunks (num_schedulables={}, max_tx={})",
1844 chunked_schedulables.len(),
1845 num_schedulables,
1846 max_transactions_per_checkpoint
1847 );
1848 assert_reachable!("checkpoint split due to transaction limit");
1849 }
1850 let chunked_randomness_schedulables = if should_write_random_checkpoint {
1851 build_chunks(randomness_schedulables, &mut checkpoint_queue)
1852 } else {
1853 vec![]
1854 };
1855
1856 let schedulables_for_version_assignment =
1857 Chunk::all_schedulables_from(&chunked_schedulables);
1858 let randomness_schedulables_for_version_assignment =
1859 Chunk::all_schedulables_from(&chunked_randomness_schedulables);
1860
1861 let assigned_versions = self
1862 .epoch_store
1863 .process_consensus_transaction_shared_object_versions(
1864 self.cache_reader.as_ref(),
1865 schedulables_for_version_assignment,
1866 randomness_schedulables_for_version_assignment,
1867 cancelled_txns,
1868 &mut state.output,
1869 )
1870 .expect("failed to assign shared object versions");
1871
1872 let consensus_commit_prologue =
1873 self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1874
1875 let mut chunked_schedulables = chunked_schedulables;
1876 let mut assigned_versions = assigned_versions;
1877 if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1878 assert!(matches!(
1879 chunked_schedulables[0].schedulables[0],
1880 Schedulable::ConsensusCommitPrologue(..)
1881 ));
1882 assert!(matches!(
1883 assigned_versions.0[0].0,
1884 TransactionKey::ConsensusCommitPrologue(..)
1885 ));
1886 assigned_versions.0[0].0 =
1887 TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1888 chunked_schedulables[0].schedulables[0] =
1889 Schedulable::Transaction(consensus_commit_prologue);
1890 }
1891
1892 let assigned_versions = assigned_versions.into_map();
1893
1894 self.epoch_store.process_user_signatures(
1895 chunked_schedulables
1896 .iter()
1897 .flat_map(|c| c.all_schedulables())
1898 .chain(
1899 chunked_randomness_schedulables
1900 .iter()
1901 .flat_map(|c| c.all_schedulables()),
1902 ),
1903 );
1904
1905 let commit_height = chunked_randomness_schedulables
1906 .last()
1907 .or(chunked_schedulables.last())
1908 .map(|c| c.height)
1909 .expect("at least one checkpoint root must be created per commit");
1910
1911 let mut pending_checkpoints = Vec::new();
1912 for chunk in chunked_schedulables {
1913 pending_checkpoints.extend(checkpoint_queue.push_chunk(
1914 chunk.into(),
1915 &assigned_versions,
1916 commit_info.timestamp,
1917 commit_info.consensus_commit_ref,
1918 commit_info.rejected_transactions_digest,
1919 ));
1920 }
1921
1922 let force = final_round || should_write_random_checkpoint;
1923 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, force));
1924
1925 if should_write_random_checkpoint {
1926 for chunk in chunked_randomness_schedulables {
1927 pending_checkpoints.extend(checkpoint_queue.push_chunk(
1928 chunk.into(),
1929 &assigned_versions,
1930 commit_info.timestamp,
1931 commit_info.consensus_commit_ref,
1932 commit_info.rejected_transactions_digest,
1933 ));
1934 }
1935
1936 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1937 }
1938
1939 if final_round && let Some(last) = pending_checkpoints.last_mut() {
1940 last.details.last_of_epoch = true;
1941 }
1942
1943 let queue_drained = checkpoint_queue.is_empty();
1944 drop(checkpoint_queue);
1945
1946 for pending_checkpoint in pending_checkpoints {
1947 debug!(
1948 checkpoint_height = pending_checkpoint.details.checkpoint_height,
1949 roots_count = pending_checkpoint.num_roots(),
1950 "Writing pending checkpoint",
1951 );
1952 self.epoch_store
1953 .write_pending_checkpoint_v2(&mut state.output, &pending_checkpoint)
1954 .expect("failed to write pending checkpoint");
1955 }
1956
1957 state.output.set_checkpoint_queue_drained(queue_drained);
1958
1959 commit_height
1960 }
1961
1962 fn add_consensus_commit_prologue_transaction<'a>(
1966 &'a self,
1967 state: &'a mut CommitHandlerState,
1968 commit_info: &'a ConsensusCommitInfo,
1969 assigned_versions: &AssignedTxAndVersions,
1970 ) -> Option<VerifiedExecutableTransactionWithAliases> {
1971 {
1972 if commit_info.skip_consensus_commit_prologue_in_test {
1973 return None;
1974 }
1975 }
1976
1977 let mut cancelled_txn_version_assignment = Vec::new();
1978
1979 let protocol_config = self.epoch_store.protocol_config();
1980
1981 for (txn_key, assigned_versions) in assigned_versions.0.iter() {
1982 let Some(d) = txn_key.as_digest() else {
1983 continue;
1984 };
1985
1986 if !protocol_config.include_cancelled_randomness_txns_in_prologue()
1987 && assigned_versions
1988 .shared_object_versions
1989 .iter()
1990 .any(|((id, _), _)| *id == SUI_RANDOMNESS_STATE_OBJECT_ID)
1991 {
1992 continue;
1993 }
1994
1995 if assigned_versions
1996 .shared_object_versions
1997 .iter()
1998 .any(|(_, version)| version.is_cancelled())
1999 {
2000 assert_reachable!("cancelled transactions");
2001 cancelled_txn_version_assignment
2002 .push((*d, assigned_versions.shared_object_versions.clone()));
2003 }
2004 }
2005
2006 fail_point_arg!(
2007 "additional_cancelled_txns_for_tests",
2008 |additional_cancelled_txns: Vec<(
2009 TransactionDigest,
2010 Vec<(ConsensusObjectSequenceKey, SequenceNumber)>
2011 )>| {
2012 cancelled_txn_version_assignment.extend(additional_cancelled_txns);
2013 }
2014 );
2015
2016 let transaction = commit_info.create_consensus_commit_prologue_transaction(
2017 self.epoch_store.epoch(),
2018 self.epoch_store.protocol_config(),
2019 cancelled_txn_version_assignment,
2020 commit_info,
2021 state.indirect_state_observer.take().unwrap(),
2022 );
2023 Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2024 transaction,
2025 ))
2026 }
2027
2028 fn handle_deferral_and_cancellation(
2029 &self,
2030 state: &mut CommitHandlerState,
2031 cancelled_txns: &mut BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
2032 deferred_txns: &mut BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>,
2033 scheduled_txns: &mut Vec<VerifiedExecutableTransactionWithAliases>,
2034 protocol_config: &ProtocolConfig,
2035 commit_info: &ConsensusCommitInfo,
2036 transaction: VerifiedExecutableTransactionWithAliases,
2037 shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
2038 previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
2039 execution_time_estimator: &ExecutionTimeEstimator,
2040 ) {
2041 if protocol_config.defer_unpaid_amplification() {
2045 let occurrence_count = state
2046 .occurrence_counts
2047 .get(transaction.tx().digest())
2048 .copied()
2049 .unwrap_or(0);
2050
2051 let rgp = self.epoch_store.reference_gas_price();
2052 let gas_price = transaction.tx().transaction_data().gas_price();
2053 let allowed_count = (gas_price / rgp.max(1)) + 1;
2054
2055 if occurrence_count as u64 > allowed_count {
2056 self.metrics
2057 .consensus_handler_unpaid_amplification_deferrals
2058 .inc();
2059
2060 let deferred_from_round = previously_deferred_tx_digests
2061 .get(transaction.tx().digest())
2062 .map(|k| k.deferred_from_round())
2063 .unwrap_or(commit_info.round);
2064
2065 let deferral_key = DeferralKey::new_for_consensus_round(
2066 commit_info.round + 1,
2067 deferred_from_round,
2068 );
2069
2070 if transaction_deferral_within_limit(
2071 &deferral_key,
2072 protocol_config.max_deferral_rounds_for_congestion_control(),
2073 ) {
2074 assert_reachable!("unpaid amplification deferral");
2075 debug!(
2076 "Deferring transaction {:?} due to unpaid amplification (count={}, allowed={})",
2077 transaction.tx().digest(),
2078 occurrence_count,
2079 allowed_count
2080 );
2081 deferred_txns
2082 .entry(deferral_key)
2083 .or_default()
2084 .push(transaction);
2085 return;
2086 }
2087 }
2088 }
2089
2090 let tx_cost = shared_object_congestion_tracker.get_tx_cost(
2091 execution_time_estimator,
2092 transaction.tx(),
2093 state.indirect_state_observer.as_mut().unwrap(),
2094 );
2095
2096 let deferral_info = self.epoch_store.should_defer(
2097 transaction.tx(),
2098 commit_info,
2099 state.dkg_failed,
2100 state.randomness_round.is_some(),
2101 previously_deferred_tx_digests,
2102 shared_object_congestion_tracker,
2103 );
2104
2105 if let Some((deferral_key, deferral_reason)) = deferral_info {
2106 debug!(
2107 "Deferring consensus certificate for transaction {:?} until {:?}",
2108 transaction.tx().digest(),
2109 deferral_key
2110 );
2111
2112 match deferral_reason {
2113 DeferralReason::RandomnessNotReady => {
2114 deferred_txns
2115 .entry(deferral_key)
2116 .or_default()
2117 .push(transaction);
2118 }
2119 DeferralReason::SharedObjectCongestion(congested_objects) => {
2120 self.metrics.consensus_handler_congested_transactions.inc();
2121 if transaction_deferral_within_limit(
2122 &deferral_key,
2123 protocol_config.max_deferral_rounds_for_congestion_control(),
2124 ) {
2125 deferred_txns
2126 .entry(deferral_key)
2127 .or_default()
2128 .push(transaction);
2129 } else {
2130 assert_sometimes!(
2131 transaction.tx().data().transaction_data().uses_randomness(),
2132 "cancelled randomness-using transaction"
2133 );
2134 assert_sometimes!(
2135 !transaction.tx().data().transaction_data().uses_randomness(),
2136 "cancelled non-randomness-using transaction"
2137 );
2138
2139 debug!(
2141 "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}",
2142 transaction.tx().digest(),
2143 deferral_key,
2144 congested_objects
2145 );
2146 cancelled_txns.insert(
2147 *transaction.tx().digest(),
2148 CancelConsensusCertificateReason::CongestionOnObjects(
2149 congested_objects,
2150 ),
2151 );
2152 scheduled_txns.push(transaction);
2153 }
2154 }
2155 }
2156 } else {
2157 shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, transaction.tx());
2159 scheduled_txns.push(transaction);
2160 }
2161 }
2162
2163 fn merge_and_reorder_transactions(
2164 &self,
2165 state: &mut CommitHandlerState,
2166 commit_info: &ConsensusCommitInfo,
2167 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
2168 ) -> (
2169 Vec<VerifiedExecutableTransactionWithAliases>,
2170 Vec<VerifiedExecutableTransactionWithAliases>,
2171 HashMap<TransactionDigest, DeferralKey>,
2172 ) {
2173 let protocol_config = self.epoch_store.protocol_config();
2174
2175 let (mut txns, mut randomness_txns, previously_deferred_tx_digests) =
2176 self.load_deferred_transactions(state, commit_info);
2177
2178 txns.reserve(user_transactions.len());
2179 randomness_txns.reserve(user_transactions.len());
2180
2181 let mut txns: Vec<_> = txns
2184 .into_iter()
2185 .filter_map(|tx| {
2186 if tx.tx().transaction_data().uses_randomness() {
2187 randomness_txns.push(tx);
2188 None
2189 } else {
2190 Some(tx)
2191 }
2192 })
2193 .collect();
2194
2195 for txn in user_transactions {
2196 if txn.tx().transaction_data().uses_randomness() {
2197 randomness_txns.push(txn);
2198 } else {
2199 txns.push(txn);
2200 }
2201 }
2202
2203 PostConsensusTxReorder::reorder(
2204 &mut txns,
2205 protocol_config.consensus_transaction_ordering(),
2206 );
2207 PostConsensusTxReorder::reorder(
2208 &mut randomness_txns,
2209 protocol_config.consensus_transaction_ordering(),
2210 );
2211
2212 (txns, randomness_txns, previously_deferred_tx_digests)
2213 }
2214
2215 fn load_deferred_transactions(
2216 &self,
2217 state: &mut CommitHandlerState,
2218 commit_info: &ConsensusCommitInfo,
2219 ) -> (
2220 Vec<VerifiedExecutableTransactionWithAliases>,
2221 Vec<VerifiedExecutableTransactionWithAliases>,
2222 HashMap<TransactionDigest, DeferralKey>,
2223 ) {
2224 let mut previously_deferred_tx_digests = HashMap::new();
2225
2226 let deferred_txs: Vec<_> = self
2227 .epoch_store
2228 .load_deferred_transactions_for_up_to_consensus_round_v2(
2229 &mut state.output,
2230 commit_info.round,
2231 )
2232 .expect("db error")
2233 .into_iter()
2234 .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2235 .map(|(key, tx)| {
2236 previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2237 tx
2238 })
2239 .collect();
2240 trace!(
2241 "loading deferred transactions: {:?}",
2242 deferred_txs.iter().map(|tx| tx.tx().digest())
2243 );
2244
2245 let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() {
2246 let txns: Vec<_> = self
2247 .epoch_store
2248 .load_deferred_transactions_for_randomness_v2(&mut state.output)
2249 .expect("db error")
2250 .into_iter()
2251 .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2252 .map(|(key, tx)| {
2253 previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2254 tx
2255 })
2256 .collect();
2257 trace!(
2258 "loading deferred randomness transactions: {:?}",
2259 txns.iter().map(|tx| tx.tx().digest())
2260 );
2261 txns
2262 } else {
2263 vec![]
2264 };
2265
2266 (
2267 deferred_txs,
2268 deferred_randomness_txs,
2269 previously_deferred_tx_digests,
2270 )
2271 }
2272
2273 fn init_congestion_tracker(
2274 &self,
2275 commit_info: &ConsensusCommitInfo,
2276 for_randomness: bool,
2277 txns: &[VerifiedExecutableTransactionWithAliases],
2278 ) -> SharedObjectCongestionTracker {
2279 #[allow(unused_mut)]
2280 let mut ret = SharedObjectCongestionTracker::from_protocol_config(
2281 self.epoch_store
2282 .consensus_quarantine
2283 .read()
2284 .load_initial_object_debts(
2285 &self.epoch_store,
2286 commit_info.round,
2287 for_randomness,
2288 txns,
2289 )
2290 .expect("db error"),
2291 self.epoch_store.protocol_config(),
2292 for_randomness,
2293 self.congestion_logger.is_some(),
2294 );
2295
2296 fail_point_arg!(
2297 "initial_congestion_tracker",
2298 |tracker: SharedObjectCongestionTracker| {
2299 info!(
2300 "Initialize shared_object_congestion_tracker to {:?}",
2301 tracker
2302 );
2303 ret = tracker;
2304 }
2305 );
2306
2307 ret
2308 }
2309
2310 fn process_jwks(
2311 &self,
2312 state: &mut CommitHandlerState,
2313 commit_info: &ConsensusCommitInfo,
2314 new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
2315 ) {
2316 for (authority_name, jwk_id, jwk) in new_jwks {
2317 self.epoch_store.record_jwk_vote(
2318 &mut state.output,
2319 commit_info.round,
2320 authority_name,
2321 &jwk_id,
2322 &jwk,
2323 );
2324 }
2325 }
2326
2327 fn process_capability_notifications(
2328 &self,
2329 capability_notifications: Vec<AuthorityCapabilitiesV2>,
2330 ) {
2331 for capabilities in capability_notifications {
2332 self.epoch_store
2333 .record_capabilities_v2(&capabilities)
2334 .expect("db error");
2335 }
2336 }
2337
2338 fn process_execution_time_observations(
2339 &self,
2340 state: &mut CommitHandlerState,
2341 execution_time_observations: Vec<ExecutionTimeObservation>,
2342 ) {
2343 let mut execution_time_estimator = self
2344 .epoch_store
2345 .execution_time_estimator
2346 .try_lock()
2347 .expect("should only ever be called from the commit handler thread");
2348
2349 for ExecutionTimeObservation {
2350 authority,
2351 generation,
2352 estimates,
2353 } in execution_time_observations
2354 {
2355 let authority_index = self
2356 .epoch_store
2357 .committee()
2358 .authority_index(&authority)
2359 .unwrap();
2360 execution_time_estimator.process_observations_from_consensus(
2361 authority_index,
2362 Some(generation),
2363 &estimates,
2364 );
2365 state
2366 .output
2367 .insert_execution_time_observation(authority_index, generation, estimates);
2368 }
2369 }
2370
2371 fn process_checkpoint_signature_messages(
2372 &self,
2373 checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
2374 ) {
2375 for checkpoint_signature_message in checkpoint_signature_messages {
2376 self.checkpoint_service
2377 .notify_checkpoint_signature(&checkpoint_signature_message)
2378 .expect("db error");
2379 }
2380 }
2381
2382 async fn process_dkg_updates(
2383 &self,
2384 state: &mut CommitHandlerState,
2385 commit_info: &ConsensusCommitInfo,
2386 randomness_manager: Option<&mut RandomnessManager>,
2387 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2388 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2389 ) {
2390 if !self.epoch_store.randomness_state_enabled() {
2391 let num_dkg_messages = randomness_dkg_messages.len();
2392 let num_dkg_confirmations = randomness_dkg_confirmations.len();
2393 if num_dkg_messages + num_dkg_confirmations > 0 {
2394 debug_fatal!(
2395 "received {} RandomnessDkgMessage and {} RandomnessDkgConfirmation messages when randomness is not enabled",
2396 num_dkg_messages,
2397 num_dkg_confirmations
2398 );
2399 }
2400 return;
2401 }
2402
2403 let randomness_manager =
2404 randomness_manager.expect("randomness manager should exist if randomness is enabled");
2405
2406 let randomness_dkg_updates =
2407 self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages);
2408
2409 let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations(
2410 state,
2411 randomness_manager,
2412 randomness_dkg_confirmations,
2413 );
2414
2415 if randomness_dkg_updates || randomness_dkg_confirmation_updates {
2416 randomness_manager
2417 .advance_dkg(&mut state.output, commit_info.round)
2418 .await
2419 .expect("epoch ended");
2420 }
2421 }
2422
2423 fn process_randomness_dkg_messages(
2424 &self,
2425 randomness_manager: &mut RandomnessManager,
2426 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2427 ) -> bool {
2428 if randomness_dkg_messages.is_empty() {
2429 return false;
2430 }
2431
2432 let mut randomness_state_updated = false;
2433 for (authority, bytes) in randomness_dkg_messages {
2434 match bcs::from_bytes(&bytes) {
2435 Ok(message) => {
2436 randomness_manager
2437 .add_message(&authority, message)
2438 .expect("epoch ended");
2440 randomness_state_updated = true;
2441 }
2442
2443 Err(e) => {
2444 warn!(
2445 "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
2446 authority.concise(),
2447 );
2448 }
2449 }
2450 }
2451
2452 randomness_state_updated
2453 }
2454
2455 fn process_randomness_dkg_confirmations(
2456 &self,
2457 state: &mut CommitHandlerState,
2458 randomness_manager: &mut RandomnessManager,
2459 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2460 ) -> bool {
2461 if randomness_dkg_confirmations.is_empty() {
2462 return false;
2463 }
2464
2465 let mut randomness_state_updated = false;
2466 for (authority, bytes) in randomness_dkg_confirmations {
2467 match bcs::from_bytes(&bytes) {
2468 Ok(message) => {
2469 randomness_manager
2470 .add_confirmation(&mut state.output, &authority, message)
2471 .expect("epoch ended");
2473 randomness_state_updated = true;
2474 }
2475 Err(e) => {
2476 warn!(
2477 "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}",
2478 authority.concise(),
2479 );
2480 }
2481 }
2482 }
2483
2484 randomness_state_updated
2485 }
2486
2487 fn process_end_of_publish_transactions(
2489 &self,
2490 state: &mut CommitHandlerState,
2491 end_of_publish_transactions: Vec<AuthorityName>,
2492 ) -> bool {
2493 let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect(
2494 "No contention on end_of_publish as it is only accessed from consensus handler",
2495 );
2496
2497 if eop_aggregator.has_quorum() {
2498 return true;
2499 }
2500
2501 if end_of_publish_transactions.is_empty() {
2502 return false;
2503 }
2504
2505 for authority in end_of_publish_transactions {
2506 info!("Received EndOfPublish from {:?}", authority.concise());
2507
2508 state.output.insert_end_of_publish(authority);
2511 if eop_aggregator
2512 .insert_generic(authority, ())
2513 .is_quorum_reached()
2514 {
2515 debug!(
2516 "Collected enough end_of_publish messages with last message from validator {:?}",
2517 authority.concise(),
2518 );
2519 return true;
2520 }
2521 }
2522
2523 false
2524 }
2525
2526 fn advance_eop_state_machine(
2529 &self,
2530 state: &mut CommitHandlerState,
2531 ) -> (
2532 RwLockWriteGuard<'_, ReconfigState>,
2533 bool, ) {
2535 let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard();
2536 let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx();
2537
2538 reconfig_state.close_all_certs();
2539
2540 let commit_has_deferred_txns = state.output.has_deferred_transactions();
2541 let previous_commits_have_deferred_txns = !self.epoch_store.deferred_transactions_empty();
2542
2543 if !commit_has_deferred_txns && !previous_commits_have_deferred_txns {
2544 if !start_state_is_reject_all_tx {
2545 info!("Transitioning to RejectAllTx");
2546 }
2547 reconfig_state.close_all_tx();
2548 } else {
2549 debug!(
2550 "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}",
2551 previous_commits_have_deferred_txns, commit_has_deferred_txns,
2552 );
2553 }
2554
2555 state.output.store_reconfig_state(reconfig_state.clone());
2556
2557 if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() {
2558 (reconfig_state, true)
2559 } else {
2560 (reconfig_state, false)
2561 }
2562 }
2563
2564 fn gather_commit_metadata(
2565 &self,
2566 consensus_commit: &impl ConsensusCommitAPI,
2567 ) -> (u64, AuthorityIndex, u64) {
2568 let timestamp = consensus_commit.commit_timestamp_ms();
2569 let leader_author = consensus_commit.leader_author_index();
2570 let commit_sub_dag_index = consensus_commit.commit_sub_dag_index();
2571
2572 let system_time_ms = SystemTime::now()
2573 .duration_since(UNIX_EPOCH)
2574 .unwrap()
2575 .as_millis() as i64;
2576
2577 let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64);
2578 let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0;
2579 self.metrics
2580 .consensus_timestamp_bias
2581 .observe(consensus_timestamp_bias_seconds);
2582
2583 let epoch_start = self
2584 .epoch_store
2585 .epoch_start_config()
2586 .epoch_start_timestamp_ms();
2587 let timestamp = if timestamp < epoch_start {
2588 error!(
2589 "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}"
2590 );
2591 epoch_start
2592 } else {
2593 timestamp
2594 };
2595
2596 (timestamp, leader_author, commit_sub_dag_index)
2597 }
2598
2599 fn create_authenticator_state_update(
2600 &self,
2601 last_committed_round: u64,
2602 commit_info: &ConsensusCommitInfo,
2603 ) -> Option<VerifiedExecutableTransactionWithAliases> {
2604 let new_jwks = self
2612 .epoch_store
2613 .get_new_jwks(last_committed_round)
2614 .expect("Unrecoverable error in consensus handler");
2615
2616 if !new_jwks.is_empty() {
2617 let authenticator_state_update_transaction = authenticator_state_update_transaction(
2618 &self.epoch_store,
2619 commit_info.round,
2620 new_jwks,
2621 );
2622 debug!(
2623 "adding AuthenticatorStateUpdate({:?}) tx: {:?}",
2624 authenticator_state_update_transaction.digest(),
2625 authenticator_state_update_transaction,
2626 );
2627
2628 Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2629 authenticator_state_update_transaction,
2630 ))
2631 } else {
2632 None
2633 }
2634 }
2635
2636 #[instrument(level = "trace", skip_all)]
2640 fn filter_consensus_txns(
2641 &mut self,
2642 initial_reconfig_state: ReconfigState,
2643 commit_info: &ConsensusCommitInfo,
2644 consensus_commit: &impl ConsensusCommitAPI,
2645 ) -> FilteredConsensusOutput {
2646 let mut transactions = Vec::new();
2647 let mut owned_object_locks = HashMap::new();
2648 let epoch = self.epoch_store.epoch();
2649 let mut num_finalized_user_transactions = vec![0; self.committee.size()];
2650 let mut num_rejected_user_transactions = vec![0; self.committee.size()];
2651 for (block, parsed_transactions) in consensus_commit.transactions() {
2652 let author = block.author.value();
2653 self.last_consensus_stats.stats.inc_num_messages(author);
2655
2656 self.epoch_store.set_consensus_tx_status(
2658 ConsensusPosition::ping(epoch, block),
2659 ConsensusTxStatus::Finalized,
2660 );
2661
2662 for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() {
2663 let position = ConsensusPosition {
2664 epoch,
2665 block,
2666 index: tx_index as TransactionIndex,
2667 };
2668
2669 if self.epoch_store.protocol_config().mysticeti_fastpath()
2672 && let Some(tx) = parsed.transaction.kind.as_user_transaction()
2673 {
2674 let digest = tx.digest();
2675 if let Some((spam_weight, submitter_client_addrs)) = self
2676 .epoch_store
2677 .submitted_transaction_cache
2678 .increment_submission_count(digest)
2679 {
2680 if let Some(ref traffic_controller) = self.traffic_controller {
2681 debug!(
2682 "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses",
2683 submitter_client_addrs.len()
2684 );
2685
2686 for addr in submitter_client_addrs {
2688 traffic_controller.tally(TrafficTally::new(
2689 Some(addr),
2690 None,
2691 None,
2692 spam_weight.clone(),
2693 ));
2694 }
2695 } else {
2696 warn!(
2697 "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)",
2698 submitter_client_addrs.len()
2699 );
2700 }
2701 }
2702 }
2703
2704 if parsed.rejected {
2705 if matches!(
2707 parsed.transaction.kind,
2708 ConsensusTransactionKind::UserTransaction(_)
2709 | ConsensusTransactionKind::UserTransactionV2(_)
2710 ) {
2711 self.epoch_store
2712 .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
2713 num_rejected_user_transactions[author] += 1;
2714 }
2715 continue;
2717 }
2718
2719 let kind = classify(&parsed.transaction);
2720 self.metrics
2721 .consensus_handler_processed
2722 .with_label_values(&[kind])
2723 .inc();
2724 self.metrics
2725 .consensus_handler_transaction_sizes
2726 .with_label_values(&[kind])
2727 .observe(parsed.serialized_len as f64);
2728 if matches!(
2730 &parsed.transaction.kind,
2731 ConsensusTransactionKind::CertifiedTransaction(_)
2732 | ConsensusTransactionKind::UserTransaction(_)
2733 | ConsensusTransactionKind::UserTransactionV2(_)
2734 ) {
2735 self.last_consensus_stats
2736 .stats
2737 .inc_num_user_transactions(author);
2738 }
2739
2740 if !initial_reconfig_state.should_accept_consensus_certs() {
2741 match &parsed.transaction.kind {
2744 ConsensusTransactionKind::UserTransaction(_)
2745 | ConsensusTransactionKind::UserTransactionV2(_)
2746 | ConsensusTransactionKind::CertifiedTransaction(_)
2747 | ConsensusTransactionKind::CapabilityNotification(_)
2749 | ConsensusTransactionKind::CapabilityNotificationV2(_)
2750 | ConsensusTransactionKind::EndOfPublish(_)
2751 | ConsensusTransactionKind::ExecutionTimeObservation(_)
2753 | ConsensusTransactionKind::NewJWKFetched(_, _, _) => {
2754 debug!(
2755 "Ignoring consensus transaction {:?} because of end of epoch",
2756 parsed.transaction.key()
2757 );
2758 continue;
2759 }
2760
2761 ConsensusTransactionKind::CheckpointSignature(_)
2763 | ConsensusTransactionKind::CheckpointSignatureV2(_)
2764 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2765 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
2766 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => ()
2767 }
2768 }
2769
2770 if !initial_reconfig_state.should_accept_tx() {
2771 match &parsed.transaction.kind {
2772 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
2773 | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue,
2774 _ => {}
2775 }
2776 }
2777
2778 if parsed.transaction.is_user_transaction()
2779 && !self.epoch_store.protocol_config().mysticeti_fastpath()
2780 {
2781 debug!(
2782 "Ignoring MFP transaction {:?} because MFP is disabled",
2783 parsed.transaction.key()
2784 );
2785 continue;
2786 }
2787
2788 if let ConsensusTransactionKind::CertifiedTransaction(certificate) =
2789 &parsed.transaction.kind
2790 && certificate.epoch() != epoch
2791 {
2792 debug!(
2793 "Certificate epoch ({:?}) doesn't match the current epoch ({:?})",
2794 certificate.epoch(),
2795 epoch
2796 );
2797 continue;
2798 }
2799
2800 match &parsed.transaction.kind {
2802 ConsensusTransactionKind::CapabilityNotification(_)
2803 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2804 | ConsensusTransactionKind::CheckpointSignature(_) => {
2805 debug_fatal!(
2806 "BUG: saw deprecated tx {:?}for commit round {}",
2807 parsed.transaction.key(),
2808 commit_info.round
2809 );
2810 continue;
2811 }
2812 _ => {}
2813 }
2814
2815 if matches!(
2816 &parsed.transaction.kind,
2817 ConsensusTransactionKind::UserTransaction(_)
2818 | ConsensusTransactionKind::UserTransactionV2(_)
2819 | ConsensusTransactionKind::CertifiedTransaction(_)
2820 ) {
2821 let author_name = self
2822 .epoch_store
2823 .committee()
2824 .authority_by_index(author as u32)
2825 .unwrap();
2826 if self
2827 .epoch_store
2828 .has_received_end_of_publish_from(author_name)
2829 {
2830 warn!(
2834 "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus",
2835 author_name.concise(),
2836 parsed.transaction.key(),
2837 );
2838 continue;
2839 }
2840 }
2841
2842 if let ConsensusTransactionKind::UserTransactionV2(tx_with_claims) =
2849 &parsed.transaction.kind
2850 {
2851 let immutable_object_ids: HashSet<ObjectID> =
2852 tx_with_claims.get_immutable_objects().into_iter().collect();
2853 let tx = tx_with_claims.tx();
2854
2855 let Ok(input_objects) = tx.transaction_data().input_objects() else {
2856 debug_fatal!("Invalid input objects for transaction {}", tx.digest());
2857 continue;
2858 };
2859
2860 let owned_object_refs: Vec<_> = input_objects
2863 .iter()
2864 .filter_map(|obj| match obj {
2865 InputObjectKind::ImmOrOwnedMoveObject(obj_ref)
2866 if !immutable_object_ids.contains(&obj_ref.0) =>
2867 {
2868 Some(*obj_ref)
2869 }
2870 _ => None,
2871 })
2872 .collect();
2873
2874 match self
2875 .epoch_store
2876 .try_acquire_owned_object_locks_post_consensus(
2877 &owned_object_refs,
2878 *tx.digest(),
2879 &owned_object_locks,
2880 ) {
2881 Ok(new_locks) => {
2882 owned_object_locks.extend(new_locks.into_iter());
2883 self.epoch_store
2885 .set_consensus_tx_status(position, ConsensusTxStatus::Finalized);
2886 num_finalized_user_transactions[author] += 1;
2887 }
2888 Err(e) => {
2889 debug!("Dropping transaction {}: {}", tx.digest(), e);
2890 self.epoch_store
2891 .set_consensus_tx_status(position, ConsensusTxStatus::Dropped);
2892 self.epoch_store.set_rejection_vote_reason(position, &e);
2893 continue;
2894 }
2895 }
2896 }
2897
2898 let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);
2899 transactions.push((transaction, author as u32));
2900 }
2901 }
2902
2903 for (i, authority) in self.committee.authorities() {
2904 let hostname = &authority.hostname;
2905 self.metrics
2906 .consensus_committed_messages
2907 .with_label_values(&[hostname])
2908 .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
2909 self.metrics
2910 .consensus_committed_user_transactions
2911 .with_label_values(&[hostname])
2912 .set(
2913 self.last_consensus_stats
2914 .stats
2915 .get_num_user_transactions(i.value()) as i64,
2916 );
2917 self.metrics
2918 .consensus_finalized_user_transactions
2919 .with_label_values(&[hostname])
2920 .add(num_finalized_user_transactions[i.value()] as i64);
2921 self.metrics
2922 .consensus_rejected_user_transactions
2923 .with_label_values(&[hostname])
2924 .add(num_rejected_user_transactions[i.value()] as i64);
2925 }
2926
2927 FilteredConsensusOutput {
2928 transactions,
2929 owned_object_locks,
2930 }
2931 }
2932
2933 fn deduplicate_consensus_txns(
2934 &mut self,
2935 state: &mut CommitHandlerState,
2936 commit_info: &ConsensusCommitInfo,
2937 transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
2938 ) -> Vec<VerifiedSequencedConsensusTransaction> {
2939 let mut all_transactions = Vec::new();
2940
2941 let mut occurrence_counts: HashMap<SequencedConsensusTransactionKey, u32> = HashMap::new();
2944 let mut first_commit_keys: HashSet<SequencedConsensusTransactionKey> = HashSet::new();
2946
2947 for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
2948 let current_tx_index = ExecutionIndices {
2952 last_committed_round: commit_info.round,
2953 sub_dag_index: commit_info.consensus_commit_ref.index.into(),
2954 transaction_index: (seq + 1) as u64,
2955 };
2956
2957 self.last_consensus_stats.index = current_tx_index;
2958
2959 let certificate_author = *self
2960 .epoch_store
2961 .committee()
2962 .authority_by_index(cert_origin)
2963 .unwrap();
2964
2965 let sequenced_transaction = SequencedConsensusTransaction {
2966 certificate_author_index: cert_origin,
2967 certificate_author,
2968 consensus_index: current_tx_index,
2969 transaction,
2970 };
2971
2972 let Some(verified_transaction) = self
2973 .epoch_store
2974 .verify_consensus_transaction(sequenced_transaction)
2975 else {
2976 continue;
2977 };
2978
2979 let key = verified_transaction.0.key();
2980
2981 if let Some(tx_digest) = key.user_transaction_digest() {
2982 self.epoch_store
2983 .cache_recently_finalized_transaction(tx_digest);
2984 }
2985
2986 let count = occurrence_counts.entry(key.clone()).or_insert(0);
2989 *count += 1;
2990 let in_commit = *count > 1;
2991
2992 let in_cache = self.processed_cache.put(key.clone(), ()).is_some();
2993 if in_commit || in_cache {
2994 self.metrics.skipped_consensus_txns_cache_hit.inc();
2995 continue;
2996 }
2997 if self
2998 .epoch_store
2999 .is_consensus_message_processed(&key)
3000 .expect("db error")
3001 {
3002 self.metrics.skipped_consensus_txns.inc();
3003 continue;
3004 }
3005
3006 first_commit_keys.insert(key.clone());
3007
3008 state.output.record_consensus_message_processed(key);
3009
3010 all_transactions.push(verified_transaction);
3011 }
3012
3013 for key in first_commit_keys {
3014 if let Some(&count) = occurrence_counts.get(&key)
3015 && count > 1
3016 {
3017 self.metrics
3018 .consensus_handler_duplicate_tx_count
3019 .observe(count as f64);
3020 }
3021 }
3022
3023 assert!(
3025 state.occurrence_counts.is_empty(),
3026 "occurrence_counts should be empty before populating"
3027 );
3028 state.occurrence_counts.reserve(occurrence_counts.len());
3029 state.occurrence_counts.extend(
3030 occurrence_counts
3031 .into_iter()
3032 .filter_map(|(key, count)| key.user_transaction_digest().map(|d| (d, count))),
3033 );
3034
3035 all_transactions
3036 }
3037
3038 fn build_commit_handler_input(
3039 &self,
3040 transactions: Vec<VerifiedSequencedConsensusTransaction>,
3041 ) -> CommitHandlerInput {
3042 let epoch = self.epoch_store.epoch();
3043 let mut commit_handler_input = CommitHandlerInput::default();
3044
3045 for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() {
3046 match transaction.transaction {
3047 SequencedConsensusTransactionKind::External(consensus_transaction) => {
3048 match consensus_transaction.kind {
3049 ConsensusTransactionKind::CertifiedTransaction(cert) => {
3051 let cert = VerifiedCertificate::new_unchecked(*cert);
3053 let transaction =
3054 VerifiedExecutableTransaction::new_from_certificate(cert);
3055 commit_handler_input.user_transactions.push(
3056 VerifiedExecutableTransactionWithAliases::no_aliases(transaction),
3057 );
3058 }
3059 ConsensusTransactionKind::UserTransaction(tx) => {
3060 let tx = VerifiedTransaction::new_unchecked(*tx);
3062 let transaction =
3064 VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
3065 commit_handler_input
3066 .user_transactions
3067 .push(VerifiedExecutableTransactionWithAliases::no_aliases(
3069 transaction,
3070 ));
3071 }
3072 ConsensusTransactionKind::UserTransactionV2(tx) => {
3073 let used_alias_versions = if self
3075 .epoch_store
3076 .protocol_config()
3077 .fix_checkpoint_signature_mapping()
3078 {
3079 tx.aliases()
3080 } else {
3081 tx.aliases_v1().map(|a| {
3085 NonEmpty::from_vec(
3086 a.into_iter()
3087 .enumerate()
3088 .map(|(idx, (_, seq))| (idx as u8, seq))
3089 .collect(),
3090 )
3091 .unwrap()
3092 })
3093 };
3094 let inner_tx = tx.into_tx();
3095 let tx = VerifiedTransaction::new_unchecked(inner_tx);
3097 let transaction =
3099 VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
3100 if let Some(used_alias_versions) = used_alias_versions {
3101 commit_handler_input
3102 .user_transactions
3103 .push(WithAliases::new(transaction, used_alias_versions));
3104 } else {
3105 commit_handler_input.user_transactions.push(
3106 VerifiedExecutableTransactionWithAliases::no_aliases(
3107 transaction,
3108 ),
3109 );
3110 }
3111 }
3112
3113 ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => {
3115 commit_handler_input
3116 .end_of_publish_transactions
3117 .push(authority_public_key_bytes);
3118 }
3119 ConsensusTransactionKind::NewJWKFetched(
3120 authority_public_key_bytes,
3121 jwk_id,
3122 jwk,
3123 ) => {
3124 commit_handler_input.new_jwks.push((
3125 authority_public_key_bytes,
3126 jwk_id,
3127 jwk,
3128 ));
3129 }
3130 ConsensusTransactionKind::RandomnessDkgMessage(
3131 authority_public_key_bytes,
3132 items,
3133 ) => {
3134 commit_handler_input
3135 .randomness_dkg_messages
3136 .push((authority_public_key_bytes, items));
3137 }
3138 ConsensusTransactionKind::RandomnessDkgConfirmation(
3139 authority_public_key_bytes,
3140 items,
3141 ) => {
3142 commit_handler_input
3143 .randomness_dkg_confirmations
3144 .push((authority_public_key_bytes, items));
3145 }
3146 ConsensusTransactionKind::CapabilityNotificationV2(
3147 authority_capabilities_v2,
3148 ) => {
3149 commit_handler_input
3150 .capability_notifications
3151 .push(authority_capabilities_v2);
3152 }
3153 ConsensusTransactionKind::ExecutionTimeObservation(
3154 execution_time_observation,
3155 ) => {
3156 commit_handler_input
3157 .execution_time_observations
3158 .push(execution_time_observation);
3159 }
3160 ConsensusTransactionKind::CheckpointSignatureV2(
3161 checkpoint_signature_message,
3162 ) => {
3163 commit_handler_input
3164 .checkpoint_signature_messages
3165 .push(*checkpoint_signature_message);
3166 }
3167
3168 ConsensusTransactionKind::CheckpointSignature(_)
3170 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
3171 | ConsensusTransactionKind::CapabilityNotification(_) => {
3172 unreachable!("filtered earlier")
3173 }
3174 }
3175 }
3176 SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(),
3178 }
3179 }
3180
3181 commit_handler_input
3182 }
3183
3184 async fn send_end_of_publish_if_needed(&self) {
3185 if !self.epoch_store.should_send_end_of_publish() {
3186 return;
3187 }
3188
3189 let end_of_publish = ConsensusTransaction::new_end_of_publish(self.epoch_store.name);
3190 if let Err(err) =
3191 self.consensus_adapter
3192 .submit(end_of_publish, None, &self.epoch_store, None, None)
3193 {
3194 warn!(
3195 "Error when sending EndOfPublish message from ConsensusHandler: {:?}",
3196 err
3197 );
3198 } else {
3199 info!(epoch=?self.epoch_store.epoch(), "Sending EndOfPublish message to consensus");
3200 }
3201 }
3202}
3203
3204pub(crate) type SchedulerMessage = (
3207 Vec<(Schedulable, AssignedVersions)>,
3208 Option<SettlementBatchInfo>,
3209);
3210
3211#[derive(Clone)]
3212pub(crate) struct ExecutionSchedulerSender {
3213 sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3214}
3215
3216impl ExecutionSchedulerSender {
3217 fn start(
3218 settlement_scheduler: SettlementScheduler,
3219 epoch_store: Arc<AuthorityPerEpochStore>,
3220 ) -> Self {
3221 let (sender, recv) = monitored_mpsc::unbounded_channel("execution_scheduler_sender");
3222 spawn_monitored_task!(Self::run(recv, settlement_scheduler, epoch_store));
3223 Self { sender }
3224 }
3225
3226 pub(crate) fn new_for_testing(
3227 sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3228 ) -> Self {
3229 Self { sender }
3230 }
3231
3232 fn send(
3233 &self,
3234 transactions: Vec<(Schedulable, AssignedVersions)>,
3235 settlement: Option<SettlementBatchInfo>,
3236 ) {
3237 let _ = self.sender.send((transactions, settlement));
3238 }
3239
3240 async fn run(
3241 mut recv: monitored_mpsc::UnboundedReceiver<SchedulerMessage>,
3242 settlement_scheduler: SettlementScheduler,
3243 epoch_store: Arc<AuthorityPerEpochStore>,
3244 ) {
3245 while let Some((transactions, settlement)) = recv.recv().await {
3246 let _guard = monitored_scope("ConsensusHandler::enqueue");
3247 let txns = transactions
3248 .into_iter()
3249 .map(|(txn, versions)| (txn, ExecutionEnv::new().with_assigned_versions(versions)))
3250 .collect();
3251 if let Some(settlement) = settlement {
3252 settlement_scheduler.enqueue_v2(txns, settlement, &epoch_store);
3253 } else {
3254 settlement_scheduler.enqueue(txns, &epoch_store);
3255 }
3256 }
3257 }
3258}
3259
3260pub(crate) struct MysticetiConsensusHandler {
3262 tasks: JoinSet<()>,
3263}
3264
3265impl MysticetiConsensusHandler {
3266 pub(crate) fn new(
3267 last_processed_commit_at_startup: CommitIndex,
3268 mut consensus_handler: ConsensusHandler<CheckpointService>,
3269 mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
3270 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
3271 ) -> Self {
3272 debug!(
3273 last_processed_commit_at_startup,
3274 "Starting consensus replay"
3275 );
3276 let mut tasks = JoinSet::new();
3277 tasks.spawn(monitored_future!(async move {
3278 while let Some(consensus_commit) = commit_receiver.recv().await {
3280 let commit_index = consensus_commit.commit_ref.index;
3281 if commit_index <= last_processed_commit_at_startup {
3282 consensus_handler.handle_prior_consensus_commit(consensus_commit);
3283 } else {
3284 consensus_handler
3285 .handle_consensus_commit(consensus_commit)
3286 .await;
3287 }
3288 commit_consumer_monitor.set_highest_handled_commit(commit_index);
3289 }
3290 }));
3291 Self { tasks }
3292 }
3293
3294 pub(crate) async fn abort(&mut self) {
3295 self.tasks.shutdown().await;
3296 }
3297}
3298
3299fn authenticator_state_update_transaction(
3300 epoch_store: &AuthorityPerEpochStore,
3301 round: u64,
3302 mut new_active_jwks: Vec<ActiveJwk>,
3303) -> VerifiedExecutableTransaction {
3304 let epoch = epoch_store.epoch();
3305 new_active_jwks.sort();
3306
3307 info!("creating authenticator state update transaction");
3308 assert!(epoch_store.authenticator_state_enabled());
3309 let transaction = VerifiedTransaction::new_authenticator_state_update(
3310 epoch,
3311 round,
3312 new_active_jwks,
3313 epoch_store
3314 .epoch_start_config()
3315 .authenticator_obj_initial_shared_version()
3316 .expect("authenticator state obj must exist"),
3317 );
3318 VerifiedExecutableTransaction::new_system(transaction, epoch)
3319}
3320
3321pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
3322 match &transaction.kind {
3323 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
3324 if certificate.is_consensus_tx() {
3325 "shared_certificate"
3326 } else {
3327 "owned_certificate"
3328 }
3329 }
3330 ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
3331 ConsensusTransactionKind::CheckpointSignatureV2(_) => "checkpoint_signature",
3332 ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
3333 ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
3334 ConsensusTransactionKind::CapabilityNotificationV2(_) => "capability_notification_v2",
3335 ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
3336 ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
3337 ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
3338 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
3339 ConsensusTransactionKind::UserTransaction(tx) => {
3340 if tx.is_consensus_tx() {
3341 "shared_user_transaction"
3342 } else {
3343 "owned_user_transaction"
3344 }
3345 }
3346 ConsensusTransactionKind::UserTransactionV2(tx) => {
3347 if tx.tx().is_consensus_tx() {
3348 "shared_user_transaction_v2"
3349 } else {
3350 "owned_user_transaction_v2"
3351 }
3352 }
3353 ConsensusTransactionKind::ExecutionTimeObservation(_) => "execution_time_observation",
3354 }
3355}
3356
3357#[derive(Debug, Clone, Serialize, Deserialize)]
3358pub struct SequencedConsensusTransaction {
3359 pub certificate_author_index: AuthorityIndex,
3360 pub certificate_author: AuthorityName,
3361 pub consensus_index: ExecutionIndices,
3362 pub transaction: SequencedConsensusTransactionKind,
3363}
3364
3365#[derive(Debug, Clone)]
3366#[allow(clippy::large_enum_variant)]
3367pub enum SequencedConsensusTransactionKind {
3368 External(ConsensusTransaction),
3369 System(VerifiedExecutableTransaction),
3370}
3371
3372impl Serialize for SequencedConsensusTransactionKind {
3373 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
3374 let serializable = SerializableSequencedConsensusTransactionKind::from(self);
3375 serializable.serialize(serializer)
3376 }
3377}
3378
3379impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
3380 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
3381 let serializable =
3382 SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
3383 Ok(serializable.into())
3384 }
3385}
3386
3387#[derive(Debug, Clone, Serialize, Deserialize)]
3391#[allow(clippy::large_enum_variant)]
3392enum SerializableSequencedConsensusTransactionKind {
3393 External(ConsensusTransaction),
3394 System(TrustedExecutableTransaction),
3395}
3396
3397impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
3398 fn from(kind: &SequencedConsensusTransactionKind) -> Self {
3399 match kind {
3400 SequencedConsensusTransactionKind::External(ext) => {
3401 SerializableSequencedConsensusTransactionKind::External(ext.clone())
3402 }
3403 SequencedConsensusTransactionKind::System(txn) => {
3404 SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
3405 }
3406 }
3407 }
3408}
3409
3410impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
3411 fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
3412 match kind {
3413 SerializableSequencedConsensusTransactionKind::External(ext) => {
3414 SequencedConsensusTransactionKind::External(ext)
3415 }
3416 SerializableSequencedConsensusTransactionKind::System(txn) => {
3417 SequencedConsensusTransactionKind::System(txn.into())
3418 }
3419 }
3420 }
3421}
3422
3423#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
3424pub enum SequencedConsensusTransactionKey {
3425 External(ConsensusTransactionKey),
3426 System(TransactionDigest),
3427}
3428
3429impl SequencedConsensusTransactionKey {
3430 pub fn user_transaction_digest(&self) -> Option<TransactionDigest> {
3431 match self {
3432 SequencedConsensusTransactionKey::External(key) => match key {
3433 ConsensusTransactionKey::Certificate(digest) => Some(*digest),
3434 _ => None,
3435 },
3436 SequencedConsensusTransactionKey::System(_) => None,
3437 }
3438 }
3439}
3440
3441impl SequencedConsensusTransactionKind {
3442 pub fn key(&self) -> SequencedConsensusTransactionKey {
3443 match self {
3444 SequencedConsensusTransactionKind::External(ext) => {
3445 SequencedConsensusTransactionKey::External(ext.key())
3446 }
3447 SequencedConsensusTransactionKind::System(txn) => {
3448 SequencedConsensusTransactionKey::System(*txn.digest())
3449 }
3450 }
3451 }
3452
3453 pub fn get_tracking_id(&self) -> u64 {
3454 match self {
3455 SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
3456 SequencedConsensusTransactionKind::System(_txn) => 0,
3457 }
3458 }
3459
3460 pub fn is_executable_transaction(&self) -> bool {
3461 match self {
3462 SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(),
3463 SequencedConsensusTransactionKind::System(_) => true,
3464 }
3465 }
3466
3467 pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
3468 match self {
3469 SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
3470 ConsensusTransactionKind::CertifiedTransaction(txn) => Some(*txn.digest()),
3471 ConsensusTransactionKind::UserTransaction(txn) => Some(*txn.digest()),
3472 ConsensusTransactionKind::UserTransactionV2(txn) => Some(*txn.tx().digest()),
3473 _ => None,
3474 },
3475 SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
3476 }
3477 }
3478
3479 pub fn is_end_of_publish(&self) -> bool {
3480 match self {
3481 SequencedConsensusTransactionKind::External(ext) => {
3482 matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
3483 }
3484 SequencedConsensusTransactionKind::System(_) => false,
3485 }
3486 }
3487}
3488
3489impl SequencedConsensusTransaction {
3490 pub fn sender_authority(&self) -> AuthorityName {
3491 self.certificate_author
3492 }
3493
3494 pub fn key(&self) -> SequencedConsensusTransactionKey {
3495 self.transaction.key()
3496 }
3497
3498 pub fn is_end_of_publish(&self) -> bool {
3499 if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
3500 matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
3501 } else {
3502 false
3503 }
3504 }
3505
3506 pub fn try_take_execution_time_observation(&mut self) -> Option<ExecutionTimeObservation> {
3507 if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
3508 kind: ConsensusTransactionKind::ExecutionTimeObservation(observation),
3509 ..
3510 }) = &mut self.transaction
3511 {
3512 Some(std::mem::take(observation))
3513 } else {
3514 None
3515 }
3516 }
3517
3518 pub fn is_system(&self) -> bool {
3519 matches!(
3520 self.transaction,
3521 SequencedConsensusTransactionKind::System(_)
3522 )
3523 }
3524
3525 pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
3526 if !randomness_state_enabled {
3527 return false;
3530 }
3531 match &self.transaction {
3532 SequencedConsensusTransactionKind::External(ConsensusTransaction {
3533 kind: ConsensusTransactionKind::CertifiedTransaction(cert),
3534 ..
3535 }) => cert.transaction_data().uses_randomness(),
3536 SequencedConsensusTransactionKind::External(ConsensusTransaction {
3537 kind: ConsensusTransactionKind::UserTransaction(txn),
3538 ..
3539 }) => txn.transaction_data().uses_randomness(),
3540 SequencedConsensusTransactionKind::External(ConsensusTransaction {
3541 kind: ConsensusTransactionKind::UserTransactionV2(txn),
3542 ..
3543 }) => txn.tx().transaction_data().uses_randomness(),
3544 _ => false,
3545 }
3546 }
3547
3548 pub fn as_consensus_txn(&self) -> Option<&SenderSignedData> {
3549 match &self.transaction {
3550 SequencedConsensusTransactionKind::External(ConsensusTransaction {
3551 kind: ConsensusTransactionKind::CertifiedTransaction(certificate),
3552 ..
3553 }) if certificate.is_consensus_tx() => Some(certificate.data()),
3554 SequencedConsensusTransactionKind::External(ConsensusTransaction {
3555 kind: ConsensusTransactionKind::UserTransaction(txn),
3556 ..
3557 }) if txn.is_consensus_tx() => Some(txn.data()),
3558 SequencedConsensusTransactionKind::External(ConsensusTransaction {
3559 kind: ConsensusTransactionKind::UserTransactionV2(txn),
3560 ..
3561 }) if txn.tx().is_consensus_tx() => Some(txn.tx().data()),
3562 SequencedConsensusTransactionKind::System(txn) if txn.is_consensus_tx() => {
3563 Some(txn.data())
3564 }
3565 _ => None,
3566 }
3567 }
3568}
3569
3570#[derive(Debug, Clone, Serialize, Deserialize)]
3571pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
3572
3573#[cfg(test)]
3574impl VerifiedSequencedConsensusTransaction {
3575 pub fn new_test(transaction: ConsensusTransaction) -> Self {
3576 Self(SequencedConsensusTransaction::new_test(transaction))
3577 }
3578}
3579
3580impl SequencedConsensusTransaction {
3581 pub fn new_test(transaction: ConsensusTransaction) -> Self {
3582 Self {
3583 certificate_author_index: 0,
3584 certificate_author: AuthorityName::ZERO,
3585 consensus_index: Default::default(),
3586 transaction: SequencedConsensusTransactionKind::External(transaction),
3587 }
3588 }
3589}
3590
3591#[derive(Serialize, Deserialize)]
3592pub(crate) struct CommitIntervalObserver {
3593 ring_buffer: VecDeque<u64>,
3594}
3595
3596impl CommitIntervalObserver {
3597 pub fn new(window_size: u32) -> Self {
3598 Self {
3599 ring_buffer: VecDeque::with_capacity(window_size as usize),
3600 }
3601 }
3602
3603 pub fn observe_commit_time(&mut self, consensus_commit: &impl ConsensusCommitAPI) {
3604 let commit_time = consensus_commit.commit_timestamp_ms();
3605 if self.ring_buffer.len() == self.ring_buffer.capacity() {
3606 self.ring_buffer.pop_front();
3607 }
3608 self.ring_buffer.push_back(commit_time);
3609 }
3610
3611 pub fn commit_interval_estimate(&self) -> Option<Duration> {
3612 if self.ring_buffer.len() <= 1 {
3613 None
3614 } else {
3615 let first = self.ring_buffer.front().unwrap();
3616 let last = self.ring_buffer.back().unwrap();
3617 let duration = last.saturating_sub(*first);
3618 let num_commits = self.ring_buffer.len() as u64;
3619 Some(Duration::from_millis(duration.div_ceil(num_commits)))
3620 }
3621 }
3622}
3623
3624#[cfg(test)]
3625mod tests {
3626 use std::collections::HashSet;
3627
3628 use consensus_core::{
3629 BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock,
3630 };
3631 use futures::pin_mut;
3632 use prometheus::Registry;
3633 use sui_protocol_config::{ConsensusTransactionOrdering, ProtocolConfig};
3634 use sui_types::{
3635 base_types::ExecutionDigests,
3636 base_types::{AuthorityName, FullObjectRef, ObjectID, SuiAddress, random_object_ref},
3637 committee::Committee,
3638 crypto::deterministic_random_account_key,
3639 gas::GasCostSummary,
3640 message_envelope::Message,
3641 messages_checkpoint::{
3642 CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
3643 SignedCheckpointSummary,
3644 },
3645 messages_consensus::ConsensusTransaction,
3646 object::Object,
3647 transaction::{
3648 CertifiedTransaction, TransactionData, TransactionDataAPI, VerifiedCertificate,
3649 },
3650 };
3651
3652 use super::*;
3653 use crate::{
3654 authority::{
3655 authority_per_epoch_store::ConsensusStatsAPI,
3656 test_authority_builder::TestAuthorityBuilder,
3657 },
3658 checkpoints::CheckpointServiceNoop,
3659 consensus_adapter::consensus_tests::test_user_transaction,
3660 consensus_test_utils::make_consensus_adapter_for_test,
3661 post_consensus_tx_reorder::PostConsensusTxReorder,
3662 };
3663
3664 #[tokio::test(flavor = "current_thread", start_paused = true)]
3665 async fn test_consensus_commit_handler() {
3666 telemetry_subscribers::init_for_testing();
3667
3668 let (sender, keypair) = deterministic_random_account_key();
3671 let gas_objects: Vec<Object> = (0..12)
3673 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3674 .collect();
3675 let owned_objects: Vec<Object> = (0..4)
3677 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3678 .collect();
3679 let shared_objects: Vec<Object> = (0..6)
3681 .map(|_| Object::shared_for_testing())
3682 .collect::<Vec<_>>();
3683 let mut all_objects = gas_objects.clone();
3684 all_objects.extend(owned_objects.clone());
3685 all_objects.extend(shared_objects.clone());
3686
3687 let network_config =
3688 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3689 .with_objects(all_objects.clone())
3690 .build();
3691
3692 let state = TestAuthorityBuilder::new()
3693 .with_network_config(&network_config, 0)
3694 .build()
3695 .await;
3696
3697 let epoch_store = state.epoch_store_for_testing().clone();
3698 let new_epoch_start_state = epoch_store.epoch_start_state();
3699 let consensus_committee = new_epoch_start_state.get_consensus_committee();
3700
3701 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3702
3703 let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
3704
3705 let backpressure_manager = BackpressureManager::new_for_tests();
3706 let consensus_adapter =
3707 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3708 let settlement_scheduler = SettlementScheduler::new(
3709 state.execution_scheduler().as_ref().clone(),
3710 state.get_transaction_cache_reader().clone(),
3711 );
3712 let mut consensus_handler = ConsensusHandler::new(
3713 epoch_store,
3714 Arc::new(CheckpointServiceNoop {}),
3715 settlement_scheduler,
3716 consensus_adapter,
3717 state.get_object_cache_reader().clone(),
3718 Arc::new(ArcSwap::default()),
3719 consensus_committee.clone(),
3720 metrics,
3721 Arc::new(throughput_calculator),
3722 backpressure_manager.subscribe(),
3723 state.traffic_controller.clone(),
3724 None,
3725 );
3726
3727 let mut user_transactions = vec![];
3729 for (i, gas_object) in gas_objects[0..8].iter().enumerate() {
3730 let input_object = if i % 2 == 0 {
3731 owned_objects.get(i / 2).unwrap().clone()
3732 } else {
3733 shared_objects.get(i / 2).unwrap().clone()
3734 };
3735 let transaction = test_user_transaction(
3736 &state,
3737 sender,
3738 &keypair,
3739 gas_object.clone(),
3740 vec![input_object],
3741 )
3742 .await;
3743 user_transactions.push(transaction);
3744 }
3745
3746 for (i, gas_object) in gas_objects[8..12].iter().enumerate() {
3749 let shared_object = if i < 2 {
3750 shared_objects[4].clone()
3751 } else {
3752 shared_objects[5].clone()
3753 };
3754 let transaction = test_user_transaction(
3755 &state,
3756 sender,
3757 &keypair,
3758 gas_object.clone(),
3759 vec![shared_object],
3760 )
3761 .await;
3762 user_transactions.push(transaction);
3763 }
3764
3765 let mut blocks = Vec::new();
3767 for (i, consensus_transaction) in user_transactions
3768 .iter()
3769 .cloned()
3770 .map(|t| ConsensusTransaction::new_user_transaction_v2_message(&state.name, t.into()))
3771 .enumerate()
3772 {
3773 let transaction_bytes = bcs::to_bytes(&consensus_transaction).unwrap();
3774 let block = VerifiedBlock::new_for_test(
3775 TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
3776 .set_transactions(vec![Transaction::new(transaction_bytes)])
3777 .build(),
3778 );
3779
3780 blocks.push(block);
3781 }
3782
3783 let leader_block = blocks[0].clone();
3785 let committed_sub_dag = CommittedSubDag::new(
3786 leader_block.reference(),
3787 blocks.clone(),
3788 leader_block.timestamp_ms(),
3789 CommitRef::new(10, CommitDigest::MIN),
3790 true,
3791 );
3792
3793 backpressure_manager.set_backpressure(true);
3795 backpressure_manager.update_highest_certified_checkpoint(1);
3797
3798 {
3800 let waiter = consensus_handler.handle_consensus_commit(committed_sub_dag.clone());
3801 pin_mut!(waiter);
3802
3803 tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
3805 .await
3806 .unwrap_err();
3807
3808 backpressure_manager.set_backpressure(false);
3810
3811 tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
3813 .await
3814 .unwrap();
3815 }
3816
3817 let num_blocks = blocks.len();
3819 let num_transactions = user_transactions.len();
3820 let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
3821 assert_eq!(
3822 last_consensus_stats_1.index.transaction_index,
3823 num_transactions as u64
3824 );
3825 assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
3826 assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
3827 assert_eq!(
3828 last_consensus_stats_1.stats.get_num_messages(0),
3829 num_blocks as u64
3830 );
3831 assert_eq!(
3832 last_consensus_stats_1.stats.get_num_user_transactions(0),
3833 num_transactions as u64
3834 );
3835
3836 for (i, t) in user_transactions.iter().enumerate() {
3838 let digest = t.tx().digest();
3839 if tokio::time::timeout(
3840 std::time::Duration::from_secs(10),
3841 state.notify_read_effects_for_testing("", *digest),
3842 )
3843 .await
3844 .is_ok()
3845 {
3846 } else {
3848 panic!("User transaction {} {} did not execute", i, digest);
3849 }
3850 }
3851
3852 state.execution_scheduler().check_empty_for_testing().await;
3854 }
3855
3856 fn to_short_strings(txs: Vec<VerifiedExecutableTransactionWithAliases>) -> Vec<String> {
3857 txs.into_iter()
3858 .map(|tx| format!("transaction({})", tx.tx().transaction_data().gas_price()))
3859 .collect()
3860 }
3861
3862 #[test]
3863 fn test_order_by_gas_price() {
3864 let mut v = vec![user_txn(42), user_txn(100)];
3865 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3866 assert_eq!(
3867 to_short_strings(v),
3868 vec![
3869 "transaction(100)".to_string(),
3870 "transaction(42)".to_string(),
3871 ]
3872 );
3873
3874 let mut v = vec![
3875 user_txn(1200),
3876 user_txn(12),
3877 user_txn(1000),
3878 user_txn(42),
3879 user_txn(100),
3880 user_txn(1000),
3881 ];
3882 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3883 assert_eq!(
3884 to_short_strings(v),
3885 vec![
3886 "transaction(1200)".to_string(),
3887 "transaction(1000)".to_string(),
3888 "transaction(1000)".to_string(),
3889 "transaction(100)".to_string(),
3890 "transaction(42)".to_string(),
3891 "transaction(12)".to_string(),
3892 ]
3893 );
3894 }
3895
3896 #[tokio::test(flavor = "current_thread")]
3897 async fn test_checkpoint_signature_dedup() {
3898 telemetry_subscribers::init_for_testing();
3899
3900 let network_config =
3901 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3902 let state = TestAuthorityBuilder::new()
3903 .with_network_config(&network_config, 0)
3904 .build()
3905 .await;
3906
3907 let epoch_store = state.epoch_store_for_testing().clone();
3908 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3909
3910 let make_signed = || {
3911 let epoch = epoch_store.epoch();
3912 let contents =
3913 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3914 let summary = CheckpointSummary::new(
3915 &ProtocolConfig::get_for_max_version_UNSAFE(),
3916 epoch,
3917 42, 10, &contents,
3920 None, GasCostSummary::default(),
3922 None, 0, Vec::new(), Vec::new(), );
3927 SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name)
3928 };
3929
3930 let v2_s1 = make_signed();
3932 let v2_s1_clone = v2_s1.clone();
3933 let v2_digest_a = v2_s1.data().digest();
3934 let v2_a =
3935 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3936 summary: v2_s1,
3937 });
3938
3939 let v2_s2 = make_signed();
3940 let v2_digest_b = v2_s2.data().digest();
3941 let v2_b =
3942 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3943 summary: v2_s2,
3944 });
3945
3946 assert_ne!(v2_digest_a, v2_digest_b);
3947
3948 assert_eq!(v2_s1_clone.data().digest(), v2_digest_a);
3950 let v2_dup =
3951 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3952 summary: v2_s1_clone,
3953 });
3954
3955 let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3956 let block = VerifiedBlock::new_for_test(
3957 TestBlock::new(100, 0)
3958 .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)])
3959 .build(),
3960 );
3961 let commit = CommittedSubDag::new(
3962 block.reference(),
3963 vec![block.clone()],
3964 block.timestamp_ms(),
3965 CommitRef::new(10, CommitDigest::MIN),
3966 true,
3967 );
3968
3969 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3970 let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3971 let backpressure = BackpressureManager::new_for_tests();
3972 let consensus_adapter =
3973 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3974 let settlement_scheduler = SettlementScheduler::new(
3975 state.execution_scheduler().as_ref().clone(),
3976 state.get_transaction_cache_reader().clone(),
3977 );
3978 let mut handler = ConsensusHandler::new(
3979 epoch_store.clone(),
3980 Arc::new(CheckpointServiceNoop {}),
3981 settlement_scheduler,
3982 consensus_adapter,
3983 state.get_object_cache_reader().clone(),
3984 Arc::new(ArcSwap::default()),
3985 consensus_committee.clone(),
3986 metrics,
3987 Arc::new(throughput),
3988 backpressure.subscribe(),
3989 state.traffic_controller.clone(),
3990 None,
3991 );
3992
3993 handler.handle_consensus_commit(commit).await;
3994
3995 use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3996 use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3997
3998 let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a));
4000 let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b));
4001 assert!(
4002 epoch_store
4003 .is_consensus_message_processed(&v2_key_a)
4004 .unwrap()
4005 );
4006 assert!(
4007 epoch_store
4008 .is_consensus_message_processed(&v2_key_b)
4009 .unwrap()
4010 );
4011 }
4012
4013 #[tokio::test(flavor = "current_thread")]
4014 async fn test_verify_consensus_transaction_filters_mismatched_authorities() {
4015 telemetry_subscribers::init_for_testing();
4016
4017 let network_config =
4018 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
4019 let state = TestAuthorityBuilder::new()
4020 .with_network_config(&network_config, 0)
4021 .build()
4022 .await;
4023
4024 let epoch_store = state.epoch_store_for_testing().clone();
4025 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
4026
4027 use fastcrypto::traits::KeyPair;
4029 let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair();
4030 let wrong_authority: AuthorityName = wrong_keypair.public().into();
4031
4032 let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority);
4034
4035 let valid_eop = ConsensusTransaction::new_end_of_publish(state.name);
4037
4038 let epoch = epoch_store.epoch();
4040 let contents =
4041 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
4042 let summary = CheckpointSummary::new(
4043 &ProtocolConfig::get_for_max_version_UNSAFE(),
4044 epoch,
4045 42, 10, &contents,
4048 None, GasCostSummary::default(),
4050 None, 0, Vec::new(), Vec::new(), );
4055
4056 let mismatched_checkpoint_signed =
4058 SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority);
4059 let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest();
4060 let mismatched_checkpoint =
4061 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4062 summary: mismatched_checkpoint_signed,
4063 });
4064
4065 let valid_checkpoint_signed =
4067 SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name);
4068 let valid_checkpoint_digest = valid_checkpoint_signed.data().digest();
4069 let valid_checkpoint =
4070 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4071 summary: valid_checkpoint_signed,
4072 });
4073
4074 let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
4075
4076 let block = VerifiedBlock::new_for_test(
4078 TestBlock::new(100, 0)
4079 .set_transactions(vec![
4080 to_tx(&mismatched_eop),
4081 to_tx(&valid_eop),
4082 to_tx(&mismatched_checkpoint),
4083 to_tx(&valid_checkpoint),
4084 ])
4085 .build(),
4086 );
4087 let commit = CommittedSubDag::new(
4088 block.reference(),
4089 vec![block.clone()],
4090 block.timestamp_ms(),
4091 CommitRef::new(10, CommitDigest::MIN),
4092 true,
4093 );
4094
4095 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
4096 let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
4097 let backpressure = BackpressureManager::new_for_tests();
4098 let consensus_adapter =
4099 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
4100 let settlement_scheduler = SettlementScheduler::new(
4101 state.execution_scheduler().as_ref().clone(),
4102 state.get_transaction_cache_reader().clone(),
4103 );
4104 let mut handler = ConsensusHandler::new(
4105 epoch_store.clone(),
4106 Arc::new(CheckpointServiceNoop {}),
4107 settlement_scheduler,
4108 consensus_adapter,
4109 state.get_object_cache_reader().clone(),
4110 Arc::new(ArcSwap::default()),
4111 consensus_committee.clone(),
4112 metrics,
4113 Arc::new(throughput),
4114 backpressure.subscribe(),
4115 state.traffic_controller.clone(),
4116 None,
4117 );
4118
4119 handler.handle_consensus_commit(commit).await;
4120
4121 use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
4122 use sui_types::messages_consensus::ConsensusTransactionKey as CK;
4123
4124 let valid_eop_key = SK::External(CK::EndOfPublish(state.name));
4126 assert!(
4127 epoch_store
4128 .is_consensus_message_processed(&valid_eop_key)
4129 .unwrap(),
4130 "Valid EndOfPublish should have been processed"
4131 );
4132
4133 let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4134 state.name,
4135 42,
4136 valid_checkpoint_digest,
4137 ));
4138 assert!(
4139 epoch_store
4140 .is_consensus_message_processed(&valid_checkpoint_key)
4141 .unwrap(),
4142 "Valid CheckpointSignature should have been processed"
4143 );
4144
4145 let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority));
4147 assert!(
4148 !epoch_store
4149 .is_consensus_message_processed(&mismatched_eop_key)
4150 .unwrap(),
4151 "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)"
4152 );
4153
4154 let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4155 wrong_authority,
4156 42,
4157 mismatched_checkpoint_digest,
4158 ));
4159 assert!(
4160 !epoch_store
4161 .is_consensus_message_processed(&mismatched_checkpoint_key)
4162 .unwrap(),
4163 "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)"
4164 );
4165 }
4166
4167 fn user_txn(gas_price: u64) -> VerifiedExecutableTransactionWithAliases {
4168 let (committee, keypairs) = Committee::new_simple_test_committee();
4169 let (sender, sender_keypair) = deterministic_random_account_key();
4170 let tx = sui_types::transaction::Transaction::from_data_and_signer(
4171 TransactionData::new_transfer(
4172 SuiAddress::default(),
4173 FullObjectRef::from_fastpath_ref(random_object_ref()),
4174 sender,
4175 random_object_ref(),
4176 1000 * gas_price,
4177 gas_price,
4178 ),
4179 vec![&sender_keypair],
4180 );
4181 let tx = VerifiedExecutableTransaction::new_from_certificate(
4182 VerifiedCertificate::new_unchecked(
4183 CertifiedTransaction::new_from_keypairs_for_testing(
4184 tx.into_data(),
4185 &keypairs,
4186 &committee,
4187 ),
4188 ),
4189 );
4190 VerifiedExecutableTransactionWithAliases::no_aliases(tx)
4191 }
4192
4193 mod checkpoint_queue_tests {
4194 use super::*;
4195 use consensus_core::CommitRef;
4196 use sui_types::digests::Digest;
4197
4198 fn make_chunk(tx_count: usize, height: u64) -> Chunk {
4199 Chunk {
4200 schedulables: (0..tx_count)
4201 .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4202 .collect(),
4203 settlement: None,
4204 height,
4205 }
4206 }
4207
4208 fn make_commit_ref(index: u32) -> CommitRef {
4209 CommitRef {
4210 index,
4211 digest: CommitDigest::MIN,
4212 }
4213 }
4214
4215 fn default_versions() -> HashMap<TransactionKey, AssignedVersions> {
4216 HashMap::new()
4217 }
4218
4219 #[test]
4220 fn test_flush_all_checkpoint_roots() {
4221 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4222 let versions = default_versions();
4223
4224 queue.push_chunk(
4225 make_chunk(5, 1),
4226 &versions,
4227 1000,
4228 make_commit_ref(1),
4229 Digest::default(),
4230 );
4231 queue.push_chunk(
4232 make_chunk(3, 2),
4233 &versions,
4234 1000,
4235 make_commit_ref(1),
4236 Digest::default(),
4237 );
4238
4239 let pending = queue.flush(1000, true);
4240
4241 assert!(pending.is_some());
4242 assert!(queue.pending_roots.is_empty());
4243 }
4244
4245 #[test]
4246 fn test_flush_respects_min_checkpoint_interval() {
4247 let min_interval = 200;
4248 let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, 1000, min_interval);
4249 let versions = default_versions();
4250
4251 queue.push_chunk(
4252 make_chunk(5, 1),
4253 &versions,
4254 1000,
4255 make_commit_ref(1),
4256 Digest::default(),
4257 );
4258
4259 let pending = queue.flush(1000 + min_interval - 1, false);
4260 assert!(pending.is_none());
4261 assert_eq!(queue.pending_roots.len(), 1);
4262
4263 let pending = queue.flush(1000 + min_interval, false);
4264 assert!(pending.is_some());
4265 assert!(queue.pending_roots.is_empty());
4266 }
4267
4268 #[test]
4269 fn test_push_chunk_flushes_when_exceeds_max() {
4270 let max_tx = 10;
4271 let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, max_tx, 0);
4272 let versions = default_versions();
4273
4274 queue.push_chunk(
4275 make_chunk(max_tx / 2 + 1, 1),
4276 &versions,
4277 1000,
4278 make_commit_ref(1),
4279 Digest::default(),
4280 );
4281
4282 let flushed = queue.push_chunk(
4283 make_chunk(max_tx / 2 + 1, 2),
4284 &versions,
4285 1000,
4286 make_commit_ref(2),
4287 Digest::default(),
4288 );
4289
4290 assert_eq!(flushed.len(), 1);
4291 assert_eq!(queue.pending_roots.len(), 1);
4292 }
4293
4294 #[test]
4295 fn test_multiple_chunks_merged_into_one_checkpoint() {
4296 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 200);
4297 let versions = default_versions();
4298
4299 queue.push_chunk(
4300 make_chunk(10, 1),
4301 &versions,
4302 1000,
4303 make_commit_ref(1),
4304 Digest::default(),
4305 );
4306 queue.push_chunk(
4307 make_chunk(10, 2),
4308 &versions,
4309 1000,
4310 make_commit_ref(2),
4311 Digest::default(),
4312 );
4313 queue.push_chunk(
4314 make_chunk(10, 3),
4315 &versions,
4316 1000,
4317 make_commit_ref(3),
4318 Digest::default(),
4319 );
4320
4321 let pending = queue.flush(1000, true).unwrap();
4322
4323 assert_eq!(pending.roots.len(), 3);
4324 }
4325
4326 #[test]
4327 fn test_push_chunk_handles_overflow() {
4328 let max_tx = 10;
4329 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4330 let versions = default_versions();
4331
4332 let flushed1 = queue.push_chunk(
4333 make_chunk(max_tx / 2, 1),
4334 &versions,
4335 1000,
4336 make_commit_ref(1),
4337 Digest::default(),
4338 );
4339 assert!(flushed1.is_empty());
4340
4341 let flushed2 = queue.push_chunk(
4342 make_chunk(max_tx / 2, 2),
4343 &versions,
4344 1000,
4345 make_commit_ref(2),
4346 Digest::default(),
4347 );
4348 assert!(flushed2.is_empty());
4349
4350 let flushed3 = queue.push_chunk(
4351 make_chunk(max_tx / 2, 3),
4352 &versions,
4353 1000,
4354 make_commit_ref(3),
4355 Digest::default(),
4356 );
4357 assert_eq!(flushed3.len(), 1);
4358
4359 let pending = queue.flush(1000, true);
4360
4361 for p in pending.iter().chain(flushed3.iter()) {
4362 let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4363 assert!(tx_count <= max_tx);
4364 }
4365 }
4366
4367 #[test]
4368 fn test_checkpoint_uses_last_chunk_height() {
4369 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4370 let versions = default_versions();
4371
4372 queue.push_chunk(
4373 make_chunk(10, 100),
4374 &versions,
4375 1000,
4376 make_commit_ref(1),
4377 Digest::default(),
4378 );
4379 queue.push_chunk(
4380 make_chunk(10, 200),
4381 &versions,
4382 1000,
4383 make_commit_ref(2),
4384 Digest::default(),
4385 );
4386
4387 let pending = queue.flush(1000, true).unwrap();
4388
4389 assert_eq!(pending.details.checkpoint_height, 200);
4390 }
4391
4392 #[test]
4393 fn test_last_built_timestamp_updated_on_flush() {
4394 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4395 let versions = default_versions();
4396
4397 queue.push_chunk(
4398 make_chunk(10, 1),
4399 &versions,
4400 5000,
4401 make_commit_ref(1),
4402 Digest::default(),
4403 );
4404
4405 assert_eq!(queue.last_built_timestamp, 0);
4406
4407 let _ = queue.flush(5000, true);
4408
4409 assert_eq!(queue.last_built_timestamp, 5000);
4410 }
4411
4412 #[test]
4413 fn test_settlement_info_sent_through_channel() {
4414 let mut queue = CheckpointQueue::new_for_testing(0, 0, 5, 1000, 0);
4415 let versions = default_versions();
4416
4417 let chunk1 = Chunk {
4418 schedulables: vec![
4419 Schedulable::ConsensusCommitPrologue(0, 1, 0),
4420 Schedulable::ConsensusCommitPrologue(0, 2, 0),
4421 Schedulable::ConsensusCommitPrologue(0, 3, 0),
4422 ],
4423 settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4424 height: 1,
4425 };
4426
4427 let chunk2 = Chunk {
4428 schedulables: vec![
4429 Schedulable::ConsensusCommitPrologue(0, 4, 0),
4430 Schedulable::ConsensusCommitPrologue(0, 5, 0),
4431 ],
4432 settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4433 height: 2,
4434 };
4435
4436 queue.push_chunk(
4437 chunk1,
4438 &versions,
4439 1000,
4440 make_commit_ref(1),
4441 Digest::default(),
4442 );
4443 queue.push_chunk(
4444 chunk2,
4445 &versions,
4446 1000,
4447 make_commit_ref(1),
4448 Digest::default(),
4449 );
4450 }
4451
4452 #[test]
4453 fn test_settlement_checkpoint_seq_correct_after_flush() {
4454 let max_tx = 10;
4455 let initial_seq = 5;
4456 let (sender, mut receiver) = monitored_mpsc::unbounded_channel("test_settlement_seq");
4457 let mut queue =
4458 CheckpointQueue::new_for_testing_with_sender(0, 0, initial_seq, max_tx, 0, sender);
4459 let versions = default_versions();
4460
4461 let chunk1 = Chunk {
4463 schedulables: (0..max_tx / 2 + 1)
4464 .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4465 .collect(),
4466 settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4467 height: 1,
4468 };
4469 queue.push_chunk(
4470 chunk1,
4471 &versions,
4472 1000,
4473 make_commit_ref(1),
4474 Digest::default(),
4475 );
4476
4477 let msg1 = receiver.try_recv().unwrap();
4479 let settlement1 = msg1.1.unwrap();
4480 assert_eq!(settlement1.checkpoint_seq, initial_seq);
4481
4482 let chunk2 = Chunk {
4484 schedulables: (0..max_tx / 2 + 1)
4485 .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4486 .collect(),
4487 settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4488 height: 2,
4489 };
4490 let flushed = queue.push_chunk(
4491 chunk2,
4492 &versions,
4493 1000,
4494 make_commit_ref(2),
4495 Digest::default(),
4496 );
4497 assert_eq!(flushed.len(), 1);
4498 assert_eq!(flushed[0].details.checkpoint_seq, Some(initial_seq));
4499
4500 let msg2 = receiver.try_recv().unwrap();
4503 let settlement2 = msg2.1.unwrap();
4504 assert_eq!(settlement2.checkpoint_seq, initial_seq + 1);
4505
4506 let pending = queue.flush_forced().unwrap();
4509 assert_eq!(
4510 pending.details.checkpoint_seq,
4511 Some(settlement2.checkpoint_seq)
4512 );
4513 }
4514
4515 #[test]
4516 fn test_checkpoint_seq_increments_on_flush() {
4517 let mut queue = CheckpointQueue::new_for_testing(0, 0, 10, 1000, 0);
4518 let versions = default_versions();
4519
4520 queue.push_chunk(
4521 make_chunk(5, 1),
4522 &versions,
4523 1000,
4524 make_commit_ref(1),
4525 Digest::default(),
4526 );
4527
4528 let pending = queue.flush(1000, true).unwrap();
4529
4530 assert_eq!(pending.details.checkpoint_seq, Some(10));
4531 assert_eq!(queue.current_checkpoint_seq, 11);
4532 }
4533
4534 #[test]
4535 fn test_multiple_chunks_with_overflow() {
4536 let max_tx = 10;
4537 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4538 let versions = default_versions();
4539
4540 let flushed1 = queue.push_chunk(
4541 make_chunk(max_tx / 2 + 1, 1),
4542 &versions,
4543 1000,
4544 make_commit_ref(1),
4545 Digest::default(),
4546 );
4547 let flushed2 = queue.push_chunk(
4548 make_chunk(max_tx / 2 + 1, 2),
4549 &versions,
4550 1000,
4551 make_commit_ref(1),
4552 Digest::default(),
4553 );
4554 let flushed3 = queue.push_chunk(
4555 make_chunk(max_tx / 2 + 1, 3),
4556 &versions,
4557 1000,
4558 make_commit_ref(1),
4559 Digest::default(),
4560 );
4561
4562 let all_flushed: Vec<_> = flushed1
4563 .into_iter()
4564 .chain(flushed2)
4565 .chain(flushed3)
4566 .collect();
4567 assert_eq!(all_flushed.len(), 2);
4568 assert_eq!(queue.pending_roots.len(), 1);
4569
4570 for p in &all_flushed {
4571 let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4572 assert!(tx_count <= max_tx);
4573 }
4574 }
4575 }
4576}