1use std::{
5 collections::{BTreeMap, HashMap, HashSet, VecDeque},
6 hash::Hash,
7 num::NonZeroUsize,
8 sync::{Arc, Mutex},
9 time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use consensus_config::Committee as ConsensusCommittee;
13use consensus_core::{CommitConsumerMonitor, CommitIndex, CommitRef};
14use consensus_types::block::TransactionIndex;
15use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
16use lru::LruCache;
17use mysten_common::{
18 assert_reachable, assert_sometimes, debug_fatal, random_util::randomize_cache_capacity_in_tests,
19};
20use mysten_metrics::{
21 monitored_future,
22 monitored_mpsc::{self, UnboundedReceiver},
23 monitored_scope, spawn_monitored_task,
24};
25use nonempty::NonEmpty;
26use parking_lot::RwLockWriteGuard;
27use serde::{Deserialize, Serialize};
28use sui_config::node::CongestionLogConfig;
29use sui_macros::{fail_point, fail_point_arg, fail_point_if};
30use sui_protocol_config::{PerObjectCongestionControlMode, ProtocolConfig};
31use sui_types::{
32 SUI_RANDOMNESS_STATE_OBJECT_ID,
33 authenticator_state::ActiveJwk,
34 base_types::{
35 AuthorityName, ConciseableName, ConsensusObjectSequenceKey, ObjectID, ObjectRef,
36 SequenceNumber, TransactionDigest,
37 },
38 crypto::RandomnessRound,
39 digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest, Digest},
40 executable_transaction::{
41 TrustedExecutableTransaction, VerifiedExecutableTransaction,
42 VerifiedExecutableTransactionWithAliases,
43 },
44 messages_checkpoint::{
45 CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointTimestamp,
46 },
47 messages_consensus::{
48 AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments,
49 ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
50 ExecutionTimeObservation,
51 },
52 sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
53 transaction::{
54 InputObjectKind, SenderSignedData, TransactionDataAPI, TransactionKey, VerifiedTransaction,
55 WithAliases,
56 },
57};
58use tokio::task::JoinSet;
59use tracing::{debug, error, info, instrument, trace, warn};
60
61use crate::{
62 authority::{
63 AuthorityMetrics, AuthorityState, ExecutionEnv,
64 authority_per_epoch_store::{
65 AuthorityPerEpochStore, CancelConsensusCertificateReason, ConsensusStats,
66 ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStatsV2,
67 consensus_quarantine::ConsensusCommitOutput,
68 },
69 backpressure::{BackpressureManager, BackpressureSubscriber},
70 congestion_log::CongestionCommitLogger,
71 consensus_tx_status_cache::ConsensusTxStatus,
72 epoch_start_configuration::EpochStartConfigTrait,
73 execution_time_estimator::ExecutionTimeEstimator,
74 shared_object_congestion_tracker::SharedObjectCongestionTracker,
75 shared_object_version_manager::{AssignedTxAndVersions, AssignedVersions, Schedulable},
76 transaction_deferral::{DeferralKey, DeferralReason, transaction_deferral_within_limit},
77 },
78 checkpoints::{
79 CheckpointHeight, CheckpointRoots, CheckpointService, CheckpointServiceNotify,
80 PendingCheckpoint, PendingCheckpointInfo, PendingCheckpointV2,
81 },
82 consensus_adapter::ConsensusAdapter,
83 consensus_throughput_calculator::ConsensusThroughputCalculator,
84 consensus_types::consensus_output_api::ConsensusCommitAPI,
85 epoch::{
86 randomness::{DkgStatus, RandomnessManager},
87 reconfiguration::ReconfigState,
88 },
89 execution_cache::ObjectCacheRead,
90 execution_scheduler::{SettlementBatchInfo, SettlementScheduler},
91 gasless_rate_limiter::ConsensusGaslessCounter,
92 post_consensus_tx_reorder::PostConsensusTxReorder,
93 traffic_controller::{TrafficController, policies::TrafficTally},
94};
95
96struct FilteredConsensusOutput {
99 transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
100 owned_object_locks: HashMap<ObjectRef, TransactionDigest>,
101}
102
103pub struct ConsensusHandlerInitializer {
104 state: Arc<AuthorityState>,
105 checkpoint_service: Arc<CheckpointService>,
106 epoch_store: Arc<AuthorityPerEpochStore>,
107 consensus_adapter: Arc<ConsensusAdapter>,
108 throughput_calculator: Arc<ConsensusThroughputCalculator>,
109 backpressure_manager: Arc<BackpressureManager>,
110 congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
111 consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
112}
113
114impl ConsensusHandlerInitializer {
115 pub fn new(
116 state: Arc<AuthorityState>,
117 checkpoint_service: Arc<CheckpointService>,
118 epoch_store: Arc<AuthorityPerEpochStore>,
119 consensus_adapter: Arc<ConsensusAdapter>,
120 throughput_calculator: Arc<ConsensusThroughputCalculator>,
121 backpressure_manager: Arc<BackpressureManager>,
122 congestion_log_config: Option<CongestionLogConfig>,
123 ) -> Self {
124 let congestion_logger =
125 congestion_log_config.and_then(|config| match CongestionCommitLogger::new(&config) {
126 Ok(logger) => Some(Arc::new(Mutex::new(logger))),
127 Err(e) => {
128 debug_fatal!("Failed to create congestion logger: {e}");
129 None
130 }
131 });
132 let consensus_gasless_counter = state.consensus_gasless_counter.clone();
133 Self {
134 state,
135 checkpoint_service,
136 epoch_store,
137 consensus_adapter,
138 throughput_calculator,
139 backpressure_manager,
140 congestion_logger,
141 consensus_gasless_counter,
142 }
143 }
144
145 #[cfg(test)]
146 pub(crate) fn new_for_testing(
147 state: Arc<AuthorityState>,
148 checkpoint_service: Arc<CheckpointService>,
149 ) -> Self {
150 use crate::consensus_test_utils::make_consensus_adapter_for_test;
151 use std::collections::HashSet;
152
153 let backpressure_manager = BackpressureManager::new_for_tests();
154 let consensus_adapter =
155 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
156 let consensus_gasless_counter = state.consensus_gasless_counter.clone();
157 Self {
158 state: state.clone(),
159 checkpoint_service,
160 epoch_store: state.epoch_store_for_testing().clone(),
161 consensus_adapter,
162 throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
163 None,
164 state.metrics.clone(),
165 )),
166 backpressure_manager,
167 congestion_logger: None,
168 consensus_gasless_counter,
169 }
170 }
171
172 pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
173 let new_epoch_start_state = self.epoch_store.epoch_start_state();
174 let consensus_committee = new_epoch_start_state.get_consensus_committee();
175
176 let settlement_scheduler = SettlementScheduler::new(
177 self.state.execution_scheduler().as_ref().clone(),
178 self.state.get_transaction_cache_reader().clone(),
179 self.state.metrics.clone(),
180 );
181 ConsensusHandler::new(
182 self.epoch_store.clone(),
183 self.checkpoint_service.clone(),
184 settlement_scheduler,
185 self.consensus_adapter.clone(),
186 self.state.get_object_cache_reader().clone(),
187 consensus_committee,
188 self.state.metrics.clone(),
189 self.throughput_calculator.clone(),
190 self.backpressure_manager.subscribe(),
191 self.state.traffic_controller.clone(),
192 self.congestion_logger.clone(),
193 self.consensus_gasless_counter.clone(),
194 )
195 }
196}
197
198mod additional_consensus_state {
199 use std::marker::PhantomData;
200
201 use consensus_core::CommitRef;
202 use fastcrypto::hash::HashFunction as _;
203 use sui_types::{crypto::DefaultHash, digests::Digest};
204
205 use super::*;
206 #[derive(Serialize, Deserialize)]
215 pub(super) struct AdditionalConsensusState {
216 commit_interval_observer: CommitIntervalObserver,
217 }
218
219 impl AdditionalConsensusState {
220 pub fn new(additional_consensus_state_window_size: u32) -> Self {
221 Self {
222 commit_interval_observer: CommitIntervalObserver::new(
223 additional_consensus_state_window_size,
224 ),
225 }
226 }
227
228 pub(crate) fn observe_commit(
230 &mut self,
231 protocol_config: &ProtocolConfig,
232 epoch_start_time: u64,
233 consensus_commit: &impl ConsensusCommitAPI,
234 ) -> ConsensusCommitInfo {
235 self.commit_interval_observer
236 .observe_commit_time(consensus_commit);
237
238 let estimated_commit_period = self
239 .commit_interval_observer
240 .commit_interval_estimate()
241 .unwrap_or(Duration::from_millis(
242 protocol_config.min_checkpoint_interval_ms(),
243 ));
244
245 info!("estimated commit rate: {:?}", estimated_commit_period);
246
247 self.commit_info_impl(
248 epoch_start_time,
249 consensus_commit,
250 Some(estimated_commit_period),
251 )
252 }
253
254 fn commit_info_impl(
255 &self,
256 epoch_start_time: u64,
257 consensus_commit: &impl ConsensusCommitAPI,
258 estimated_commit_period: Option<Duration>,
259 ) -> ConsensusCommitInfo {
260 let leader_author = consensus_commit.leader_author_index();
261 let timestamp = consensus_commit.commit_timestamp_ms();
262
263 let timestamp = if timestamp < epoch_start_time {
264 error!(
265 "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}"
266 );
267 epoch_start_time
268 } else {
269 timestamp
270 };
271
272 ConsensusCommitInfo {
273 _phantom: PhantomData,
274 round: consensus_commit.leader_round(),
275 timestamp,
276 leader_author,
277 consensus_commit_ref: consensus_commit.commit_ref(),
278 rejected_transactions_digest: consensus_commit.rejected_transactions_digest(),
279 additional_state_digest: Some(self.digest()),
280 estimated_commit_period,
281 skip_consensus_commit_prologue_in_test: false,
282 }
283 }
284
285 fn digest(&self) -> AdditionalConsensusStateDigest {
287 let mut hash = DefaultHash::new();
288 bcs::serialize_into(&mut hash, self).unwrap();
289 AdditionalConsensusStateDigest::new(hash.finalize().into())
290 }
291 }
292
293 pub struct ConsensusCommitInfo {
294 _phantom: PhantomData<()>,
296
297 pub round: u64,
298 pub timestamp: u64,
299 pub leader_author: AuthorityIndex,
300 pub consensus_commit_ref: CommitRef,
301 pub rejected_transactions_digest: Digest,
302
303 additional_state_digest: Option<AdditionalConsensusStateDigest>,
304 estimated_commit_period: Option<Duration>,
305
306 pub skip_consensus_commit_prologue_in_test: bool,
307 }
308
309 impl ConsensusCommitInfo {
310 pub fn new_for_test(
311 commit_round: u64,
312 commit_timestamp: u64,
313 estimated_commit_period: Option<Duration>,
314 skip_consensus_commit_prologue_in_test: bool,
315 ) -> Self {
316 Self {
317 _phantom: PhantomData,
318 round: commit_round,
319 timestamp: commit_timestamp,
320 leader_author: 0,
321 consensus_commit_ref: CommitRef::default(),
322 rejected_transactions_digest: Digest::default(),
323 additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO),
324 estimated_commit_period,
325 skip_consensus_commit_prologue_in_test,
326 }
327 }
328
329 pub fn new_for_congestion_test(
330 commit_round: u64,
331 commit_timestamp: u64,
332 estimated_commit_period: Duration,
333 ) -> Self {
334 Self::new_for_test(
335 commit_round,
336 commit_timestamp,
337 Some(estimated_commit_period),
338 true,
339 )
340 }
341
342 pub fn additional_state_digest(&self) -> AdditionalConsensusStateDigest {
343 self.additional_state_digest
345 .expect("additional_state_digest is not available")
346 }
347
348 pub fn estimated_commit_period(&self) -> Duration {
349 self.estimated_commit_period
351 .expect("estimated commit period is not available")
352 }
353
354 fn consensus_commit_digest(&self) -> ConsensusCommitDigest {
355 ConsensusCommitDigest::new(self.consensus_commit_ref.digest.into_inner())
356 }
357
358 fn consensus_commit_prologue_transaction(
359 &self,
360 epoch: u64,
361 ) -> VerifiedExecutableTransaction {
362 let transaction = VerifiedTransaction::new_consensus_commit_prologue(
363 epoch,
364 self.round,
365 self.timestamp,
366 );
367 VerifiedExecutableTransaction::new_system(transaction, epoch)
368 }
369
370 fn consensus_commit_prologue_v2_transaction(
371 &self,
372 epoch: u64,
373 ) -> VerifiedExecutableTransaction {
374 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
375 epoch,
376 self.round,
377 self.timestamp,
378 self.consensus_commit_digest(),
379 );
380 VerifiedExecutableTransaction::new_system(transaction, epoch)
381 }
382
383 fn consensus_commit_prologue_v3_transaction(
384 &self,
385 epoch: u64,
386 consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
387 ) -> VerifiedExecutableTransaction {
388 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v3(
389 epoch,
390 self.round,
391 self.timestamp,
392 self.consensus_commit_digest(),
393 consensus_determined_version_assignments,
394 );
395 VerifiedExecutableTransaction::new_system(transaction, epoch)
396 }
397
398 fn consensus_commit_prologue_v4_transaction(
399 &self,
400 epoch: u64,
401 consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
402 additional_state_digest: AdditionalConsensusStateDigest,
403 ) -> VerifiedExecutableTransaction {
404 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v4(
405 epoch,
406 self.round,
407 self.timestamp,
408 self.consensus_commit_digest(),
409 consensus_determined_version_assignments,
410 additional_state_digest,
411 );
412 VerifiedExecutableTransaction::new_system(transaction, epoch)
413 }
414
415 pub fn create_consensus_commit_prologue_transaction(
416 &self,
417 epoch: u64,
418 protocol_config: &ProtocolConfig,
419 cancelled_txn_version_assignment: Vec<(
420 TransactionDigest,
421 Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
422 )>,
423 commit_info: &ConsensusCommitInfo,
424 indirect_state_observer: IndirectStateObserver,
425 ) -> VerifiedExecutableTransaction {
426 let version_assignments = if protocol_config
427 .record_consensus_determined_version_assignments_in_prologue_v2()
428 {
429 Some(
430 ConsensusDeterminedVersionAssignments::CancelledTransactionsV2(
431 cancelled_txn_version_assignment,
432 ),
433 )
434 } else if protocol_config.record_consensus_determined_version_assignments_in_prologue()
435 {
436 Some(
437 ConsensusDeterminedVersionAssignments::CancelledTransactions(
438 cancelled_txn_version_assignment
439 .into_iter()
440 .map(|(tx_digest, versions)| {
441 (
442 tx_digest,
443 versions.into_iter().map(|(id, v)| (id.0, v)).collect(),
444 )
445 })
446 .collect(),
447 ),
448 )
449 } else {
450 None
451 };
452
453 if protocol_config.record_additional_state_digest_in_prologue() {
454 let additional_state_digest =
455 if protocol_config.additional_consensus_digest_indirect_state() {
456 let d1 = commit_info.additional_state_digest();
457 indirect_state_observer.fold_with(d1)
458 } else {
459 commit_info.additional_state_digest()
460 };
461
462 self.consensus_commit_prologue_v4_transaction(
463 epoch,
464 version_assignments.unwrap(),
465 additional_state_digest,
466 )
467 } else if let Some(version_assignments) = version_assignments {
468 self.consensus_commit_prologue_v3_transaction(epoch, version_assignments)
469 } else if protocol_config.include_consensus_digest_in_prologue() {
470 self.consensus_commit_prologue_v2_transaction(epoch)
471 } else {
472 self.consensus_commit_prologue_transaction(epoch)
473 }
474 }
475 }
476
477 #[derive(Default)]
478 pub struct IndirectStateObserver {
479 hash: DefaultHash,
480 }
481
482 impl IndirectStateObserver {
483 pub fn new() -> Self {
484 Self::default()
485 }
486
487 pub fn observe_indirect_state<T: Serialize>(&mut self, state: &T) {
488 bcs::serialize_into(&mut self.hash, state).unwrap();
489 }
490
491 pub fn fold_with(
492 self,
493 d1: AdditionalConsensusStateDigest,
494 ) -> AdditionalConsensusStateDigest {
495 let hash = self.hash.finalize();
496 let d2 = AdditionalConsensusStateDigest::new(hash.into());
497
498 let mut hasher = DefaultHash::new();
499 bcs::serialize_into(&mut hasher, &d1).unwrap();
500 bcs::serialize_into(&mut hasher, &d2).unwrap();
501 AdditionalConsensusStateDigest::new(hasher.finalize().into())
502 }
503 }
504
505 #[test]
506 fn test_additional_consensus_state() {
507 use crate::consensus_test_utils::TestConsensusCommit;
508
509 fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) {
510 let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
511 state.observe_commit(
512 &protocol_config,
513 100,
514 &TestConsensusCommit::empty(round, timestamp, 0),
515 );
516 }
517
518 let mut s1 = AdditionalConsensusState::new(3);
519 observe(&mut s1, 1, 1000);
520 observe(&mut s1, 2, 2000);
521 observe(&mut s1, 3, 3000);
522 observe(&mut s1, 4, 4000);
523
524 let mut s2 = AdditionalConsensusState::new(3);
525 observe(&mut s2, 2, 2000);
528 observe(&mut s2, 3, 3000);
529 observe(&mut s2, 4, 4000);
530
531 assert_eq!(s1.digest(), s2.digest());
532
533 observe(&mut s1, 5, 5000);
534 observe(&mut s2, 5, 5000);
535
536 assert_eq!(s1.digest(), s2.digest());
537 }
538}
539use additional_consensus_state::AdditionalConsensusState;
540pub(crate) use additional_consensus_state::{ConsensusCommitInfo, IndirectStateObserver};
541
542struct QueuedCheckpointRoots {
543 roots: CheckpointRoots,
544 timestamp: CheckpointTimestamp,
545 consensus_commit_ref: CommitRef,
546 rejected_transactions_digest: Digest,
547}
548
549struct Chunk<
550 T: crate::authority::shared_object_version_manager::AsTx = VerifiedExecutableTransaction,
551> {
552 schedulables: Vec<Schedulable<T>>,
553 settlement: Option<Schedulable<T>>,
554 height: CheckpointHeight,
555}
556
557impl<T: crate::authority::shared_object_version_manager::AsTx + Clone> Chunk<T> {
558 fn all_schedulables(&self) -> impl Iterator<Item = &Schedulable<T>> + Clone {
559 self.schedulables.iter().chain(self.settlement.iter())
560 }
561
562 fn all_schedulables_from(chunks: &[Self]) -> impl Iterator<Item = &Schedulable<T>> + Clone {
563 chunks.iter().flat_map(|c| c.all_schedulables())
564 }
565
566 fn to_checkpoint_roots(&self) -> CheckpointRoots {
567 let tx_roots: Vec<_> = self.schedulables.iter().map(|s| s.key()).collect();
568 let settlement_root = self.settlement.as_ref().map(|s| s.key());
569 CheckpointRoots {
570 tx_roots,
571 settlement_root,
572 height: self.height,
573 }
574 }
575}
576
577impl From<Chunk<VerifiedExecutableTransactionWithAliases>> for Chunk {
578 fn from(chunk: Chunk<VerifiedExecutableTransactionWithAliases>) -> Self {
579 Chunk {
580 schedulables: chunk.schedulables.into_iter().map(|s| s.into()).collect(),
581 settlement: chunk.settlement.map(|s| s.into()),
582 height: chunk.height,
583 }
584 }
585}
586
587pub(crate) struct CheckpointQueue {
595 last_built_timestamp: CheckpointTimestamp,
596 pending_roots: VecDeque<QueuedCheckpointRoots>,
597 height: u64,
598 pending_tx_count: usize,
599 current_checkpoint_seq: CheckpointSequenceNumber,
600 max_tx: usize,
601 min_checkpoint_interval_ms: u64,
602 execution_scheduler_sender: ExecutionSchedulerSender,
603}
604
605impl CheckpointQueue {
606 pub(crate) fn new(
607 last_built_timestamp: CheckpointTimestamp,
608 checkpoint_height: u64,
609 next_checkpoint_seq: CheckpointSequenceNumber,
610 max_tx: usize,
611 min_checkpoint_interval_ms: u64,
612 execution_scheduler_sender: ExecutionSchedulerSender,
613 ) -> Self {
614 Self {
615 last_built_timestamp,
616 pending_roots: VecDeque::new(),
617 height: checkpoint_height,
618 pending_tx_count: 0,
619 current_checkpoint_seq: next_checkpoint_seq,
620 max_tx,
621 min_checkpoint_interval_ms,
622 execution_scheduler_sender,
623 }
624 }
625
626 #[cfg(test)]
627 fn new_for_testing(
628 last_built_timestamp: CheckpointTimestamp,
629 checkpoint_height: u64,
630 next_checkpoint_seq: CheckpointSequenceNumber,
631 max_tx: usize,
632 min_checkpoint_interval_ms: u64,
633 ) -> Self {
634 let (sender, _receiver) = monitored_mpsc::unbounded_channel("test_checkpoint_queue_sender");
635 Self {
636 last_built_timestamp,
637 pending_roots: VecDeque::new(),
638 height: checkpoint_height,
639 pending_tx_count: 0,
640 current_checkpoint_seq: next_checkpoint_seq,
641 max_tx,
642 min_checkpoint_interval_ms,
643 execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
644 }
645 }
646
647 #[cfg(test)]
648 fn new_for_testing_with_sender(
649 last_built_timestamp: CheckpointTimestamp,
650 checkpoint_height: u64,
651 next_checkpoint_seq: CheckpointSequenceNumber,
652 max_tx: usize,
653 min_checkpoint_interval_ms: u64,
654 sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
655 ) -> Self {
656 Self {
657 last_built_timestamp,
658 pending_roots: VecDeque::new(),
659 height: checkpoint_height,
660 pending_tx_count: 0,
661 current_checkpoint_seq: next_checkpoint_seq,
662 max_tx,
663 min_checkpoint_interval_ms,
664 execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
665 }
666 }
667
668 pub(crate) fn last_built_timestamp(&self) -> CheckpointTimestamp {
669 self.last_built_timestamp
670 }
671
672 pub(crate) fn is_empty(&self) -> bool {
673 self.pending_roots.is_empty()
674 }
675
676 fn next_height(&mut self) -> u64 {
677 self.height += 1;
678 self.height
679 }
680
681 fn push_chunk(
682 &mut self,
683 chunk: Chunk,
684 assigned_versions: &HashMap<TransactionKey, AssignedVersions>,
685 timestamp: CheckpointTimestamp,
686 consensus_commit_ref: CommitRef,
687 rejected_transactions_digest: Digest,
688 ) -> Vec<PendingCheckpointV2> {
689 let max_tx = self.max_tx;
690 let user_tx_count = chunk.schedulables.len();
691
692 let roots = chunk.to_checkpoint_roots();
693
694 let schedulables: Vec<_> = chunk
695 .schedulables
696 .into_iter()
697 .map(|s| {
698 let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
699 (s, versions)
700 })
701 .collect();
702
703 let mut flushed_checkpoints = Vec::new();
704
705 if self.pending_tx_count > 0
706 && self.pending_tx_count + user_tx_count > max_tx
707 && let Some(checkpoint) = self.flush_forced()
708 {
709 flushed_checkpoints.push(checkpoint);
710 }
711
712 let settlement_info = chunk.settlement.as_ref().map(|s| {
713 let settlement_key = s.key();
714 let tx_keys: Vec<_> = schedulables.iter().map(|(s, _)| s.key()).collect();
715 SettlementBatchInfo {
716 settlement_key,
717 tx_keys,
718 checkpoint_height: chunk.height,
719 checkpoint_seq: self.current_checkpoint_seq,
720 assigned_versions: assigned_versions
721 .get(&settlement_key)
722 .cloned()
723 .unwrap_or_default(),
724 }
725 });
726
727 self.execution_scheduler_sender
728 .send(schedulables, settlement_info);
729
730 self.pending_tx_count += user_tx_count;
731 self.pending_roots.push_back(QueuedCheckpointRoots {
732 roots,
733 timestamp,
734 consensus_commit_ref,
735 rejected_transactions_digest,
736 });
737
738 flushed_checkpoints
739 }
740
741 pub(crate) fn flush(
742 &mut self,
743 current_timestamp: CheckpointTimestamp,
744 force: bool,
745 ) -> Option<PendingCheckpointV2> {
746 if !force && current_timestamp < self.last_built_timestamp + self.min_checkpoint_interval_ms
747 {
748 return None;
749 }
750 self.flush_forced()
751 }
752
753 fn flush_forced(&mut self) -> Option<PendingCheckpointV2> {
754 if self.pending_roots.is_empty() {
755 return None;
756 }
757
758 let to_flush: Vec<_> = self.pending_roots.drain(..).collect();
759 let last_root = to_flush.last().unwrap();
760
761 let checkpoint = PendingCheckpointV2 {
762 roots: to_flush.iter().map(|q| q.roots.clone()).collect(),
763 details: PendingCheckpointInfo {
764 timestamp_ms: last_root.timestamp,
765 last_of_epoch: false,
766 checkpoint_height: last_root.roots.height,
767 consensus_commit_ref: last_root.consensus_commit_ref,
768 rejected_transactions_digest: last_root.rejected_transactions_digest,
769 checkpoint_seq: Some(self.current_checkpoint_seq),
770 },
771 };
772
773 self.last_built_timestamp = last_root.timestamp;
774 self.pending_tx_count = 0;
775 self.current_checkpoint_seq += 1;
776
777 Some(checkpoint)
778 }
779
780 pub(crate) fn checkpoint_seq(&self) -> CheckpointSequenceNumber {
781 self.current_checkpoint_seq
782 .checked_sub(1)
783 .expect("checkpoint_seq called before any checkpoint was assigned")
784 }
785}
786
787pub struct ConsensusHandler<C> {
788 epoch_store: Arc<AuthorityPerEpochStore>,
791 last_consensus_stats: ExecutionIndicesWithStatsV2,
795 checkpoint_service: Arc<C>,
796 cache_reader: Arc<dyn ObjectCacheRead>,
798 committee: ConsensusCommittee,
800 metrics: Arc<AuthorityMetrics>,
803 processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
805 execution_scheduler_sender: ExecutionSchedulerSender,
807 consensus_adapter: Arc<ConsensusAdapter>,
809
810 throughput_calculator: Arc<ConsensusThroughputCalculator>,
812
813 additional_consensus_state: AdditionalConsensusState,
814
815 backpressure_subscriber: BackpressureSubscriber,
816
817 traffic_controller: Option<Arc<TrafficController>>,
818
819 congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
820
821 consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
822
823 checkpoint_queue: Mutex<CheckpointQueue>,
824}
825
826const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
827
828impl<C> ConsensusHandler<C> {
829 pub(crate) fn new(
830 epoch_store: Arc<AuthorityPerEpochStore>,
831 checkpoint_service: Arc<C>,
832 settlement_scheduler: SettlementScheduler,
833 consensus_adapter: Arc<ConsensusAdapter>,
834 cache_reader: Arc<dyn ObjectCacheRead>,
835 committee: ConsensusCommittee,
836 metrics: Arc<AuthorityMetrics>,
837 throughput_calculator: Arc<ConsensusThroughputCalculator>,
838 backpressure_subscriber: BackpressureSubscriber,
839 traffic_controller: Option<Arc<TrafficController>>,
840 congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
841 consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
842 ) -> Self {
843 assert!(
844 matches!(
845 epoch_store
846 .protocol_config()
847 .per_object_congestion_control_mode(),
848 PerObjectCongestionControlMode::ExecutionTimeEstimate(_)
849 ),
850 "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
851 );
852
853 let mut last_consensus_stats = epoch_store
855 .get_last_consensus_stats()
856 .expect("Should be able to read last consensus index");
857 if !last_consensus_stats.stats.is_initialized() {
859 last_consensus_stats.stats = ConsensusStats::new(committee.size());
860 if epoch_store
861 .protocol_config()
862 .split_checkpoints_in_consensus_handler()
863 {
864 last_consensus_stats.checkpoint_seq = epoch_store.previous_epoch_last_checkpoint();
865 }
866 }
867 let max_tx = epoch_store
868 .protocol_config()
869 .max_transactions_per_checkpoint() as usize;
870 let min_checkpoint_interval_ms = epoch_store
871 .protocol_config()
872 .min_checkpoint_interval_ms_as_option()
873 .unwrap_or_default();
874 let execution_scheduler_sender =
875 ExecutionSchedulerSender::start(settlement_scheduler, epoch_store.clone());
876 let commit_rate_estimate_window_size = epoch_store
877 .protocol_config()
878 .get_consensus_commit_rate_estimation_window_size();
879 let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
880 let checkpoint_height = last_consensus_stats.height;
881 let next_checkpoint_seq = if epoch_store
882 .protocol_config()
883 .split_checkpoints_in_consensus_handler()
884 {
885 last_consensus_stats.checkpoint_seq + 1
886 } else {
887 0
888 };
889 Self {
890 epoch_store,
891 last_consensus_stats,
892 checkpoint_service,
893 cache_reader,
894 committee,
895 metrics,
896 processed_cache: LruCache::new(
897 NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
898 ),
899 execution_scheduler_sender: execution_scheduler_sender.clone(),
900 consensus_adapter,
901 throughput_calculator,
902 additional_consensus_state: AdditionalConsensusState::new(
903 commit_rate_estimate_window_size,
904 ),
905 backpressure_subscriber,
906 traffic_controller,
907 congestion_logger,
908 consensus_gasless_counter,
909 checkpoint_queue: Mutex::new(CheckpointQueue::new(
910 last_built_timestamp,
911 checkpoint_height,
912 next_checkpoint_seq,
913 max_tx,
914 min_checkpoint_interval_ms,
915 execution_scheduler_sender,
916 )),
917 }
918 }
919
920 pub(crate) fn last_processed_subdag_index(&self) -> u64 {
922 self.last_consensus_stats.index.sub_dag_index
923 }
924
925 pub(crate) fn new_for_testing(
926 epoch_store: Arc<AuthorityPerEpochStore>,
927 checkpoint_service: Arc<C>,
928 execution_scheduler_sender: ExecutionSchedulerSender,
929 consensus_adapter: Arc<ConsensusAdapter>,
930 cache_reader: Arc<dyn ObjectCacheRead>,
931 committee: ConsensusCommittee,
932 metrics: Arc<AuthorityMetrics>,
933 throughput_calculator: Arc<ConsensusThroughputCalculator>,
934 backpressure_subscriber: BackpressureSubscriber,
935 traffic_controller: Option<Arc<TrafficController>>,
936 last_consensus_stats: ExecutionIndicesWithStatsV2,
937 ) -> Self {
938 let commit_rate_estimate_window_size = epoch_store
939 .protocol_config()
940 .get_consensus_commit_rate_estimation_window_size();
941 let max_tx = epoch_store
942 .protocol_config()
943 .max_transactions_per_checkpoint() as usize;
944 let min_checkpoint_interval_ms = epoch_store
945 .protocol_config()
946 .min_checkpoint_interval_ms_as_option()
947 .unwrap_or_default();
948 let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
949 let checkpoint_height = last_consensus_stats.height;
950 Self {
951 epoch_store,
952 last_consensus_stats,
953 checkpoint_service,
954 cache_reader,
955 committee,
956 metrics,
957 processed_cache: LruCache::new(
958 NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
959 ),
960 execution_scheduler_sender: execution_scheduler_sender.clone(),
961 consensus_adapter,
962 throughput_calculator,
963 additional_consensus_state: AdditionalConsensusState::new(
964 commit_rate_estimate_window_size,
965 ),
966 backpressure_subscriber,
967 traffic_controller,
968 congestion_logger: None,
969 consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
970 checkpoint_queue: Mutex::new(CheckpointQueue::new(
971 last_built_timestamp,
972 checkpoint_height,
973 0,
974 max_tx,
975 min_checkpoint_interval_ms,
976 execution_scheduler_sender,
977 )),
978 }
979 }
980}
981
982#[derive(Default)]
983struct CommitHandlerInput {
984 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
985 capability_notifications: Vec<AuthorityCapabilitiesV2>,
986 execution_time_observations: Vec<ExecutionTimeObservation>,
987 checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
988 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
989 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
990 end_of_publish_transactions: Vec<AuthorityName>,
991 new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
992}
993
994struct CommitHandlerState {
995 dkg_failed: bool,
996 randomness_round: Option<RandomnessRound>,
997 output: ConsensusCommitOutput,
998 indirect_state_observer: Option<IndirectStateObserver>,
999 initial_reconfig_state: ReconfigState,
1000 occurrence_counts: HashMap<TransactionDigest, u32>,
1002}
1003
1004impl CommitHandlerState {
1005 fn get_notifications(&self) -> Vec<SequencedConsensusTransactionKey> {
1006 self.output
1007 .get_consensus_messages_processed()
1008 .cloned()
1009 .collect()
1010 }
1011
1012 fn init_randomness<'a, 'epoch>(
1013 &'a mut self,
1014 epoch_store: &'epoch AuthorityPerEpochStore,
1015 commit_info: &'a ConsensusCommitInfo,
1016 ) -> Option<tokio::sync::MutexGuard<'epoch, RandomnessManager>> {
1017 let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| {
1018 rm.try_lock()
1019 .expect("should only ever be called from the commit handler thread")
1020 });
1021
1022 let mut dkg_failed = false;
1023 let randomness_round = if epoch_store.randomness_state_enabled() {
1024 let randomness_manager = randomness_manager
1025 .as_mut()
1026 .expect("randomness manager should exist if randomness is enabled");
1027 match randomness_manager.dkg_status() {
1028 DkgStatus::Pending => None,
1029 DkgStatus::Failed => {
1030 dkg_failed = true;
1031 None
1032 }
1033 DkgStatus::Successful => {
1034 if self.initial_reconfig_state.should_accept_tx() {
1037 randomness_manager
1038 .reserve_next_randomness(commit_info.timestamp, &mut self.output)
1040 .expect("epoch ended")
1041 } else {
1042 None
1043 }
1044 }
1045 }
1046 } else {
1047 None
1048 };
1049
1050 if randomness_round.is_some() {
1051 assert!(!dkg_failed); }
1053
1054 self.randomness_round = randomness_round;
1055 self.dkg_failed = dkg_failed;
1056
1057 randomness_manager
1058 }
1059}
1060
1061impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
1062 fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
1066 assert!(
1067 self.epoch_store
1068 .protocol_config()
1069 .record_additional_state_digest_in_prologue()
1070 );
1071 let protocol_config = self.epoch_store.protocol_config();
1072 let epoch_start_time = self
1073 .epoch_store
1074 .epoch_start_config()
1075 .epoch_start_timestamp_ms();
1076
1077 self.additional_consensus_state.observe_commit(
1078 protocol_config,
1079 epoch_start_time,
1080 &consensus_commit,
1081 );
1082 }
1083
1084 #[cfg(test)]
1085 pub(crate) async fn handle_consensus_commit_for_test(
1086 &mut self,
1087 consensus_commit: impl ConsensusCommitAPI,
1088 ) {
1089 self.handle_consensus_commit(consensus_commit).await;
1090 }
1091
1092 #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))]
1093 pub(crate) async fn handle_consensus_commit(
1094 &mut self,
1095 consensus_commit: impl ConsensusCommitAPI,
1096 ) {
1097 {
1098 let protocol_config = self.epoch_store.protocol_config();
1099
1100 assert!(protocol_config.ignore_execution_time_observations_after_certs_closed());
1102 assert!(protocol_config.record_time_estimate_processed());
1103 assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints());
1104 assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest());
1105 assert!(protocol_config.authority_capabilities_v2());
1106 assert!(protocol_config.cancel_for_failed_dkg_early());
1107 }
1108
1109 self.backpressure_subscriber.await_no_backpressure().await;
1114
1115 let epoch = self.epoch_store.epoch();
1116
1117 let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit");
1118
1119 let last_committed_round = self.last_consensus_stats.index.last_committed_round;
1120
1121 self.epoch_store
1122 .consensus_tx_status_cache
1123 .update_last_committed_leader_round(last_committed_round as u32);
1124 self.epoch_store
1125 .tx_reject_reason_cache
1126 .set_last_committed_leader_round(last_committed_round as u32);
1127
1128 let commit_info = self.additional_consensus_state.observe_commit(
1129 self.epoch_store.protocol_config(),
1130 self.epoch_store
1131 .epoch_start_config()
1132 .epoch_start_timestamp_ms(),
1133 &consensus_commit,
1134 );
1135 assert!(commit_info.round > last_committed_round);
1136
1137 let (timestamp, leader_author, commit_sub_dag_index) =
1138 self.gather_commit_metadata(&consensus_commit);
1139
1140 info!(
1141 %consensus_commit,
1142 "Received consensus output {}. Rejected transactions {}",
1143 consensus_commit.commit_ref(),
1144 consensus_commit.rejected_transactions_debug_string(),
1145 );
1146
1147 self.last_consensus_stats.index = ExecutionIndices {
1148 last_committed_round: commit_info.round,
1149 sub_dag_index: commit_sub_dag_index,
1150 transaction_index: 0_u64,
1151 };
1152
1153 self.metrics
1154 .consensus_committed_subdags
1155 .with_label_values(&[&leader_author.to_string()])
1156 .inc();
1157
1158 let mut state = CommitHandlerState {
1159 output: ConsensusCommitOutput::new(commit_info.round),
1160 dkg_failed: false,
1161 randomness_round: None,
1162 indirect_state_observer: Some(IndirectStateObserver::new()),
1163 initial_reconfig_state: self
1164 .epoch_store
1165 .get_reconfig_state_read_lock_guard()
1166 .clone(),
1167 occurrence_counts: HashMap::new(),
1168 };
1169
1170 let FilteredConsensusOutput {
1171 transactions,
1172 owned_object_locks,
1173 } = self.filter_consensus_txns(
1174 state.initial_reconfig_state.clone(),
1175 &commit_info,
1176 &consensus_commit,
1177 );
1178 if !owned_object_locks.is_empty() {
1180 state.output.set_owned_object_locks(owned_object_locks);
1181 }
1182 let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions);
1183
1184 let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info);
1185
1186 let CommitHandlerInput {
1187 user_transactions,
1188 capability_notifications,
1189 execution_time_observations,
1190 checkpoint_signature_messages,
1191 randomness_dkg_messages,
1192 randomness_dkg_confirmations,
1193 end_of_publish_transactions,
1194 new_jwks,
1195 } = self.build_commit_handler_input(transactions);
1196
1197 self.process_gasless_transactions(&commit_info, &user_transactions);
1198 self.process_jwks(&mut state, &commit_info, new_jwks);
1199 self.process_capability_notifications(capability_notifications);
1200 self.process_execution_time_observations(&mut state, execution_time_observations);
1201 self.process_checkpoint_signature_messages(checkpoint_signature_messages);
1202
1203 self.process_dkg_updates(
1204 &mut state,
1205 &commit_info,
1206 randomness_manager.as_deref_mut(),
1207 randomness_dkg_messages,
1208 randomness_dkg_confirmations,
1209 )
1210 .await;
1211
1212 let mut execution_time_estimator = self
1213 .epoch_store
1214 .execution_time_estimator
1215 .try_lock()
1216 .expect("should only ever be called from the commit handler thread");
1217
1218 let authenticator_state_update_transaction =
1219 self.create_authenticator_state_update(last_committed_round, &commit_info);
1220
1221 let split_checkpoints = self
1222 .epoch_store
1223 .protocol_config()
1224 .split_checkpoints_in_consensus_handler();
1225
1226 let (lock, final_round, num_schedulables, checkpoint_height) = if !split_checkpoints {
1227 let (schedulables, randomness_schedulables, assigned_versions) = self
1228 .process_transactions(
1229 &mut state,
1230 &mut execution_time_estimator,
1231 &commit_info,
1232 authenticator_state_update_transaction,
1233 user_transactions,
1234 );
1235
1236 let (should_accept_tx, lock, final_round) =
1237 self.handle_close_epoch(&mut state, &commit_info, end_of_publish_transactions);
1238
1239 let make_checkpoint = should_accept_tx || final_round;
1240 if !make_checkpoint {
1241 return;
1243 }
1244
1245 if final_round {
1248 self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1249 }
1250
1251 let checkpoint_height = self
1252 .epoch_store
1253 .calculate_pending_checkpoint_height(commit_info.round);
1254
1255 self.create_pending_checkpoints(
1256 &mut state,
1257 &commit_info,
1258 &schedulables,
1259 &randomness_schedulables,
1260 final_round,
1261 );
1262
1263 let num_schedulables = schedulables.len();
1264 let mut all_schedulables = schedulables;
1265 all_schedulables.extend(randomness_schedulables);
1266 let assigned_versions = assigned_versions.into_map();
1267 let paired: Vec<_> = all_schedulables
1268 .into_iter()
1269 .map(|s| {
1270 let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
1271 (s, versions)
1272 })
1273 .collect();
1274 self.execution_scheduler_sender.send(paired, None);
1275
1276 (lock, final_round, num_schedulables, checkpoint_height)
1277 } else {
1278 let (
1279 transactions_to_schedule,
1280 randomness_transactions_to_schedule,
1281 cancelled_txns,
1282 randomness_state_update_transaction,
1283 ) = self.collect_transactions_to_schedule(
1284 &mut state,
1285 &mut execution_time_estimator,
1286 &commit_info,
1287 user_transactions,
1288 );
1289
1290 let (should_accept_tx, lock, final_round) =
1291 self.handle_close_epoch(&mut state, &commit_info, end_of_publish_transactions);
1292
1293 let make_checkpoint = should_accept_tx || final_round;
1294 if !make_checkpoint {
1295 return;
1297 }
1298
1299 if final_round {
1302 self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1303 }
1304
1305 let epoch = self.epoch_store.epoch();
1306 let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1307 .then_some(Schedulable::ConsensusCommitPrologue(
1308 epoch,
1309 commit_info.round,
1310 commit_info.consensus_commit_ref.index,
1311 ));
1312
1313 let schedulables: Vec<_> = itertools::chain!(
1314 consensus_commit_prologue.into_iter(),
1315 authenticator_state_update_transaction
1316 .into_iter()
1317 .map(Schedulable::Transaction),
1318 transactions_to_schedule
1319 .into_iter()
1320 .map(Schedulable::Transaction),
1321 )
1322 .collect();
1323
1324 let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1325 .into_iter()
1326 .chain(
1327 randomness_transactions_to_schedule
1328 .into_iter()
1329 .map(Schedulable::Transaction),
1330 )
1331 .collect();
1332
1333 let num_schedulables = schedulables.len();
1334 let checkpoint_height = self.create_pending_checkpoints_v2(
1335 &mut state,
1336 &commit_info,
1337 schedulables,
1338 randomness_schedulables,
1339 &cancelled_txns,
1340 final_round,
1341 );
1342
1343 (lock, final_round, num_schedulables, checkpoint_height)
1344 };
1345
1346 let notifications = state.get_notifications();
1347
1348 let mut stats_to_record = self.last_consensus_stats.clone();
1349 stats_to_record.height = checkpoint_height;
1350 if split_checkpoints {
1351 let queue = self.checkpoint_queue.lock().unwrap();
1352 stats_to_record.last_checkpoint_flush_timestamp = queue.last_built_timestamp();
1353 stats_to_record.checkpoint_seq = queue.checkpoint_seq();
1354 }
1355
1356 state.output.record_consensus_commit_stats(stats_to_record);
1357
1358 self.record_deferral_deletion(&mut state);
1359
1360 self.epoch_store
1361 .consensus_quarantine
1362 .write()
1363 .push_consensus_output(state.output, &self.epoch_store)
1364 .expect("push_consensus_output should not fail");
1365
1366 debug!(
1367 ?commit_info.round,
1368 "Notifying checkpoint service about new pending checkpoint(s)",
1369 );
1370 self.checkpoint_service
1371 .notify_checkpoint()
1372 .expect("failed to notify checkpoint service");
1373
1374 if let Some(randomness_round) = state.randomness_round {
1375 randomness_manager
1376 .as_ref()
1377 .expect("randomness manager should exist if randomness round is provided")
1378 .generate_randomness(epoch, randomness_round);
1379 }
1380
1381 self.epoch_store.process_notifications(notifications.iter());
1382
1383 self.log_final_round(lock, final_round);
1385
1386 self.throughput_calculator
1388 .add_transactions(timestamp, num_schedulables as u64);
1389
1390 fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
1391 let key = [commit_sub_dag_index, epoch];
1392 if sui_simulator::random::deterministic_probability_once(&key, 0.01) {
1393 sui_simulator::task::kill_current_node(None);
1394 }
1395 });
1396
1397 fail_point!("crash");
1398
1399 self.send_end_of_publish_if_needed().await;
1400 }
1401
1402 fn handle_close_epoch(
1403 &self,
1404 state: &mut CommitHandlerState,
1405 commit_info: &ConsensusCommitInfo,
1406 end_of_publish_transactions: Vec<AuthorityName>,
1407 ) -> (bool, Option<RwLockWriteGuard<'_, ReconfigState>>, bool) {
1408 let timestamp_based_epoch_close = self
1409 .epoch_store
1410 .protocol_config()
1411 .timestamp_based_epoch_close();
1412 let timestamp_triggered = timestamp_based_epoch_close
1413 && commit_info.timestamp >= self.epoch_store.next_reconfiguration_timestamp_ms();
1414 if timestamp_triggered {
1415 let reconfig_guard = self.epoch_store.get_reconfig_state_write_lock_guard();
1417 if reconfig_guard.should_accept_user_certs() {
1418 self.epoch_store.close_user_certs(reconfig_guard);
1419 }
1420 }
1421 let collected_eop_quorum =
1422 self.process_end_of_publish_transactions(state, end_of_publish_transactions);
1423 if timestamp_triggered || collected_eop_quorum {
1424 let (lock, final_round) = self.advance_eop_state_machine(state);
1425 (lock.should_accept_tx(), Some(lock), final_round)
1426 } else {
1427 (true, None, false)
1428 }
1429 }
1430
1431 fn record_end_of_epoch_execution_time_observations(
1432 &self,
1433 estimator: &mut ExecutionTimeEstimator,
1434 ) {
1435 self.epoch_store
1436 .end_of_epoch_execution_time_observations
1437 .set(estimator.take_observations())
1438 .expect("`stored_execution_time_observations` should only be set once at end of epoch");
1439 }
1440
1441 fn record_deferral_deletion(&self, state: &mut CommitHandlerState) {
1442 let mut deferred_transactions = self
1443 .epoch_store
1444 .consensus_output_cache
1445 .deferred_transactions
1446 .lock();
1447 for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() {
1448 deferred_transactions.remove(&deleted_deferred_key);
1449 }
1450 }
1451
1452 fn log_final_round(&self, lock: Option<RwLockWriteGuard<ReconfigState>>, final_round: bool) {
1453 if final_round {
1454 let epoch = self.epoch_store.epoch();
1455 info!(
1456 ?epoch,
1457 lock=?lock.as_ref(),
1458 final_round=?final_round,
1459 "Notified last checkpoint"
1460 );
1461 self.epoch_store.record_end_of_message_quorum_time_metric();
1462 }
1463 }
1464
1465 fn create_pending_checkpoints(
1466 &self,
1467 state: &mut CommitHandlerState,
1468 commit_info: &ConsensusCommitInfo,
1469 schedulables: &[Schedulable],
1470 randomness_schedulables: &[Schedulable],
1471 final_round: bool,
1472 ) {
1473 assert!(
1474 !self
1475 .epoch_store
1476 .protocol_config()
1477 .split_checkpoints_in_consensus_handler()
1478 );
1479
1480 let checkpoint_height = self
1481 .epoch_store
1482 .calculate_pending_checkpoint_height(commit_info.round);
1483
1484 let should_write_random_checkpoint = state.randomness_round.is_some()
1491 || (state.dkg_failed && !randomness_schedulables.is_empty());
1492
1493 let pending_checkpoint = PendingCheckpoint {
1494 roots: schedulables.iter().map(|s| s.key()).collect(),
1495 details: PendingCheckpointInfo {
1496 timestamp_ms: commit_info.timestamp,
1497 last_of_epoch: final_round && !should_write_random_checkpoint,
1498 checkpoint_height,
1499 consensus_commit_ref: commit_info.consensus_commit_ref,
1500 rejected_transactions_digest: commit_info.rejected_transactions_digest,
1501 checkpoint_seq: None,
1502 },
1503 };
1504 self.epoch_store
1505 .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1506 .expect("failed to write pending checkpoint");
1507
1508 info!(
1509 "Written pending checkpoint: {:?}",
1510 pending_checkpoint.details,
1511 );
1512
1513 if should_write_random_checkpoint {
1514 let pending_checkpoint = PendingCheckpoint {
1515 roots: randomness_schedulables.iter().map(|s| s.key()).collect(),
1516 details: PendingCheckpointInfo {
1517 timestamp_ms: commit_info.timestamp,
1518 last_of_epoch: final_round,
1519 checkpoint_height: checkpoint_height + 1,
1520 consensus_commit_ref: commit_info.consensus_commit_ref,
1521 rejected_transactions_digest: commit_info.rejected_transactions_digest,
1522 checkpoint_seq: None,
1523 },
1524 };
1525 self.epoch_store
1526 .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1527 .expect("failed to write pending checkpoint");
1528 }
1529 }
1530
1531 #[allow(clippy::type_complexity)]
1532 fn collect_transactions_to_schedule(
1533 &self,
1534 state: &mut CommitHandlerState,
1535 execution_time_estimator: &mut ExecutionTimeEstimator,
1536 commit_info: &ConsensusCommitInfo,
1537 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1538 ) -> (
1539 Vec<VerifiedExecutableTransactionWithAliases>,
1540 Vec<VerifiedExecutableTransactionWithAliases>,
1541 BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1542 Option<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1543 ) {
1544 let protocol_config = self.epoch_store.protocol_config();
1545 let epoch = self.epoch_store.epoch();
1546
1547 let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) =
1548 self.merge_and_reorder_transactions(state, commit_info, user_transactions);
1549
1550 let mut shared_object_congestion_tracker =
1551 self.init_congestion_tracker(commit_info, false, &ordered_txns);
1552 let mut shared_object_using_randomness_congestion_tracker =
1553 self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns);
1554
1555 let randomness_state_update_transaction = state
1556 .randomness_round
1557 .map(|round| Schedulable::RandomnessStateUpdate(epoch, round));
1558 debug!(
1559 "Randomness state update transaction: {:?}",
1560 randomness_state_update_transaction
1561 .as_ref()
1562 .map(|t| t.key())
1563 );
1564
1565 let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len());
1566 let mut randomness_transactions_to_schedule =
1567 Vec::with_capacity(ordered_randomness_txns.len());
1568 let mut deferred_txns = BTreeMap::new();
1569 let mut cancelled_txns = BTreeMap::new();
1570
1571 for transaction in ordered_txns {
1572 self.handle_deferral_and_cancellation(
1573 state,
1574 &mut cancelled_txns,
1575 &mut deferred_txns,
1576 &mut transactions_to_schedule,
1577 protocol_config,
1578 commit_info,
1579 transaction,
1580 &mut shared_object_congestion_tracker,
1581 &previously_deferred_tx_digests,
1582 execution_time_estimator,
1583 );
1584 }
1585
1586 for transaction in ordered_randomness_txns {
1587 if state.dkg_failed {
1588 debug!(
1589 "Canceling randomness-using transaction {:?} because DKG failed",
1590 transaction.tx().digest(),
1591 );
1592 cancelled_txns.insert(
1593 *transaction.tx().digest(),
1594 CancelConsensusCertificateReason::DkgFailed,
1595 );
1596 randomness_transactions_to_schedule.push(transaction);
1597 continue;
1598 }
1599 self.handle_deferral_and_cancellation(
1600 state,
1601 &mut cancelled_txns,
1602 &mut deferred_txns,
1603 &mut randomness_transactions_to_schedule,
1604 protocol_config,
1605 commit_info,
1606 transaction,
1607 &mut shared_object_using_randomness_congestion_tracker,
1608 &previously_deferred_tx_digests,
1609 execution_time_estimator,
1610 );
1611 }
1612
1613 let mut total_deferred_txns = 0;
1614 {
1615 let mut deferred_transactions = self
1616 .epoch_store
1617 .consensus_output_cache
1618 .deferred_transactions
1619 .lock();
1620 for (key, txns) in deferred_txns.into_iter() {
1621 total_deferred_txns += txns.len();
1622 deferred_transactions.insert(key, txns.clone());
1623 state.output.defer_transactions(key, txns);
1624 }
1625 }
1626
1627 self.metrics
1628 .consensus_handler_deferred_transactions
1629 .inc_by(total_deferred_txns as u64);
1630 self.metrics
1631 .consensus_handler_cancelled_transactions
1632 .inc_by(cancelled_txns.len() as u64);
1633 self.metrics
1634 .consensus_handler_max_object_costs
1635 .with_label_values(&["regular_commit"])
1636 .set(shared_object_congestion_tracker.max_cost() as i64);
1637 self.metrics
1638 .consensus_handler_max_object_costs
1639 .with_label_values(&["randomness_commit"])
1640 .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
1641
1642 let congestion_commit_data = shared_object_congestion_tracker.finish_commit(commit_info);
1643 let randomness_congestion_commit_data =
1644 shared_object_using_randomness_congestion_tracker.finish_commit(commit_info);
1645
1646 if let Some(logger) = &self.congestion_logger {
1647 let epoch = self.epoch_store.epoch();
1648 let mut logger = logger.lock().unwrap();
1649 logger.write_commit_log(epoch, commit_info, false, &congestion_commit_data);
1650 logger.write_commit_log(epoch, commit_info, true, &randomness_congestion_commit_data);
1651 }
1652
1653 if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get()
1654 && let Err(e) = tx_object_debts.try_send(
1655 congestion_commit_data
1656 .accumulated_debts
1657 .iter()
1658 .chain(randomness_congestion_commit_data.accumulated_debts.iter())
1659 .map(|(id, _)| *id)
1660 .collect(),
1661 )
1662 {
1663 info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
1664 }
1665
1666 state
1667 .output
1668 .set_congestion_control_object_debts(congestion_commit_data.accumulated_debts);
1669 state.output.set_congestion_control_randomness_object_debts(
1670 randomness_congestion_commit_data.accumulated_debts,
1671 );
1672
1673 (
1674 transactions_to_schedule,
1675 randomness_transactions_to_schedule,
1676 cancelled_txns,
1677 randomness_state_update_transaction,
1678 )
1679 }
1680
1681 fn process_transactions(
1682 &self,
1683 state: &mut CommitHandlerState,
1684 execution_time_estimator: &mut ExecutionTimeEstimator,
1685 commit_info: &ConsensusCommitInfo,
1686 authenticator_state_update_transaction: Option<VerifiedExecutableTransactionWithAliases>,
1687 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1688 ) -> (Vec<Schedulable>, Vec<Schedulable>, AssignedTxAndVersions) {
1689 let protocol_config = self.epoch_store.protocol_config();
1690 assert!(!protocol_config.split_checkpoints_in_consensus_handler());
1691 let epoch = self.epoch_store.epoch();
1692
1693 let (
1694 transactions_to_schedule,
1695 randomness_transactions_to_schedule,
1696 cancelled_txns,
1697 randomness_state_update_transaction,
1698 ) = self.collect_transactions_to_schedule(
1699 state,
1700 execution_time_estimator,
1701 commit_info,
1702 user_transactions,
1703 );
1704
1705 let mut settlement = None;
1706 let mut randomness_settlement = None;
1707 if self.epoch_store.accumulators_enabled() {
1708 let checkpoint_height = self
1709 .epoch_store
1710 .calculate_pending_checkpoint_height(commit_info.round);
1711
1712 settlement = Some(Schedulable::AccumulatorSettlement(epoch, checkpoint_height));
1713
1714 if state.randomness_round.is_some() || !randomness_transactions_to_schedule.is_empty() {
1715 randomness_settlement = Some(Schedulable::AccumulatorSettlement(
1716 epoch,
1717 checkpoint_height + 1,
1718 ));
1719 }
1720 }
1721
1722 let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1723 .then_some(Schedulable::ConsensusCommitPrologue(
1724 epoch,
1725 commit_info.round,
1726 commit_info.consensus_commit_ref.index,
1727 ));
1728
1729 let schedulables: Vec<_> = itertools::chain!(
1730 consensus_commit_prologue.into_iter(),
1731 authenticator_state_update_transaction
1732 .into_iter()
1733 .map(Schedulable::Transaction),
1734 transactions_to_schedule
1735 .into_iter()
1736 .map(Schedulable::Transaction),
1737 settlement,
1738 )
1739 .collect();
1740
1741 let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1742 .into_iter()
1743 .chain(
1744 randomness_transactions_to_schedule
1745 .into_iter()
1746 .map(Schedulable::Transaction),
1747 )
1748 .chain(randomness_settlement)
1749 .collect();
1750
1751 let assigned_versions = self
1752 .epoch_store
1753 .process_consensus_transaction_shared_object_versions(
1754 self.cache_reader.as_ref(),
1755 schedulables.iter(),
1756 randomness_schedulables.iter(),
1757 &cancelled_txns,
1758 &mut state.output,
1759 )
1760 .expect("failed to assign shared object versions");
1761
1762 let consensus_commit_prologue =
1763 self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1764
1765 let mut schedulables = schedulables;
1766 let mut assigned_versions = assigned_versions;
1767 if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1768 assert!(matches!(
1769 schedulables[0],
1770 Schedulable::ConsensusCommitPrologue(..)
1771 ));
1772 assert!(matches!(
1773 assigned_versions.0[0].0,
1774 TransactionKey::ConsensusCommitPrologue(..)
1775 ));
1776 assigned_versions.0[0].0 =
1777 TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1778 schedulables[0] = Schedulable::Transaction(consensus_commit_prologue);
1779 }
1780
1781 self.epoch_store
1782 .process_user_signatures(schedulables.iter().chain(randomness_schedulables.iter()));
1783
1784 let schedulables: Vec<Schedulable> = schedulables.into_iter().map(|s| s.into()).collect();
1786 let randomness_schedulables: Vec<Schedulable> = randomness_schedulables
1787 .into_iter()
1788 .map(|s| s.into())
1789 .collect();
1790
1791 (schedulables, randomness_schedulables, assigned_versions)
1792 }
1793
1794 #[allow(clippy::type_complexity)]
1795 fn create_pending_checkpoints_v2(
1796 &self,
1797 state: &mut CommitHandlerState,
1798 commit_info: &ConsensusCommitInfo,
1799 schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1800 randomness_schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1801 cancelled_txns: &BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1802 final_round: bool,
1803 ) -> CheckpointHeight {
1804 let protocol_config = self.epoch_store.protocol_config();
1805 assert!(protocol_config.split_checkpoints_in_consensus_handler());
1806
1807 let epoch = self.epoch_store.epoch();
1808 let accumulators_enabled = self.epoch_store.accumulators_enabled();
1809 let max_transactions_per_checkpoint =
1810 protocol_config.max_transactions_per_checkpoint() as usize;
1811
1812 let should_write_random_checkpoint = state.randomness_round.is_some()
1813 || (state.dkg_failed && !randomness_schedulables.is_empty());
1814
1815 let mut checkpoint_queue = self.checkpoint_queue.lock().unwrap();
1816
1817 let build_chunks =
1818 |schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1819 queue: &mut CheckpointQueue|
1820 -> Vec<Chunk<VerifiedExecutableTransactionWithAliases>> {
1821 schedulables
1822 .chunks(max_transactions_per_checkpoint)
1823 .map(|chunk| {
1824 let height = queue.next_height();
1825 let schedulables: Vec<_> = chunk.to_vec();
1826 let settlement = if accumulators_enabled {
1827 Some(Schedulable::AccumulatorSettlement(epoch, height))
1828 } else {
1829 None
1830 };
1831 Chunk {
1832 schedulables,
1833 settlement,
1834 height,
1835 }
1836 })
1837 .collect()
1838 };
1839
1840 let num_schedulables = schedulables.len();
1841 let chunked_schedulables = build_chunks(schedulables, &mut checkpoint_queue);
1842 if chunked_schedulables.len() > 1 {
1843 info!(
1844 "Splitting transactions into {} checkpoint chunks (num_schedulables={}, max_tx={})",
1845 chunked_schedulables.len(),
1846 num_schedulables,
1847 max_transactions_per_checkpoint
1848 );
1849 assert_reachable!("checkpoint split due to transaction limit");
1850 }
1851 let chunked_randomness_schedulables = if should_write_random_checkpoint {
1852 build_chunks(randomness_schedulables, &mut checkpoint_queue)
1853 } else {
1854 vec![]
1855 };
1856
1857 let schedulables_for_version_assignment =
1858 Chunk::all_schedulables_from(&chunked_schedulables);
1859 let randomness_schedulables_for_version_assignment =
1860 Chunk::all_schedulables_from(&chunked_randomness_schedulables);
1861
1862 let assigned_versions = self
1863 .epoch_store
1864 .process_consensus_transaction_shared_object_versions(
1865 self.cache_reader.as_ref(),
1866 schedulables_for_version_assignment,
1867 randomness_schedulables_for_version_assignment,
1868 cancelled_txns,
1869 &mut state.output,
1870 )
1871 .expect("failed to assign shared object versions");
1872
1873 let consensus_commit_prologue =
1874 self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1875
1876 let mut chunked_schedulables = chunked_schedulables;
1877 let mut assigned_versions = assigned_versions;
1878 if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1879 assert!(matches!(
1880 chunked_schedulables[0].schedulables[0],
1881 Schedulable::ConsensusCommitPrologue(..)
1882 ));
1883 assert!(matches!(
1884 assigned_versions.0[0].0,
1885 TransactionKey::ConsensusCommitPrologue(..)
1886 ));
1887 assigned_versions.0[0].0 =
1888 TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1889 chunked_schedulables[0].schedulables[0] =
1890 Schedulable::Transaction(consensus_commit_prologue);
1891 }
1892
1893 let assigned_versions = assigned_versions.into_map();
1894
1895 self.epoch_store.process_user_signatures(
1896 chunked_schedulables
1897 .iter()
1898 .flat_map(|c| c.all_schedulables())
1899 .chain(
1900 chunked_randomness_schedulables
1901 .iter()
1902 .flat_map(|c| c.all_schedulables()),
1903 ),
1904 );
1905
1906 let commit_height = chunked_randomness_schedulables
1907 .last()
1908 .or(chunked_schedulables.last())
1909 .map(|c| c.height)
1910 .expect("at least one checkpoint root must be created per commit");
1911
1912 let mut pending_checkpoints = Vec::new();
1913 for chunk in chunked_schedulables {
1914 pending_checkpoints.extend(checkpoint_queue.push_chunk(
1915 chunk.into(),
1916 &assigned_versions,
1917 commit_info.timestamp,
1918 commit_info.consensus_commit_ref,
1919 commit_info.rejected_transactions_digest,
1920 ));
1921 }
1922
1923 if protocol_config.merge_randomness_into_checkpoint() {
1924 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, final_round));
1929
1930 if should_write_random_checkpoint {
1931 for chunk in chunked_randomness_schedulables {
1932 pending_checkpoints.extend(checkpoint_queue.push_chunk(
1933 chunk.into(),
1934 &assigned_versions,
1935 commit_info.timestamp,
1936 commit_info.consensus_commit_ref,
1937 commit_info.rejected_transactions_digest,
1938 ));
1939 }
1940 if final_round {
1941 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1942 }
1943 }
1944 } else {
1945 let force = final_round || should_write_random_checkpoint;
1946 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, force));
1947
1948 if should_write_random_checkpoint {
1949 for chunk in chunked_randomness_schedulables {
1950 pending_checkpoints.extend(checkpoint_queue.push_chunk(
1951 chunk.into(),
1952 &assigned_versions,
1953 commit_info.timestamp,
1954 commit_info.consensus_commit_ref,
1955 commit_info.rejected_transactions_digest,
1956 ));
1957 }
1958 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1959 }
1960 }
1961
1962 if final_round && let Some(last) = pending_checkpoints.last_mut() {
1963 last.details.last_of_epoch = true;
1964 }
1965
1966 let queue_drained = checkpoint_queue.is_empty();
1967 drop(checkpoint_queue);
1968
1969 for pending_checkpoint in pending_checkpoints {
1970 debug!(
1971 checkpoint_height = pending_checkpoint.details.checkpoint_height,
1972 roots_count = pending_checkpoint.num_roots(),
1973 "Writing pending checkpoint",
1974 );
1975 self.epoch_store
1976 .write_pending_checkpoint_v2(&mut state.output, &pending_checkpoint)
1977 .expect("failed to write pending checkpoint");
1978 }
1979
1980 state.output.set_checkpoint_queue_drained(queue_drained);
1981
1982 commit_height
1983 }
1984
1985 fn add_consensus_commit_prologue_transaction<'a>(
1989 &'a self,
1990 state: &'a mut CommitHandlerState,
1991 commit_info: &'a ConsensusCommitInfo,
1992 assigned_versions: &AssignedTxAndVersions,
1993 ) -> Option<VerifiedExecutableTransactionWithAliases> {
1994 {
1995 if commit_info.skip_consensus_commit_prologue_in_test {
1996 return None;
1997 }
1998 }
1999
2000 let mut cancelled_txn_version_assignment = Vec::new();
2001
2002 let protocol_config = self.epoch_store.protocol_config();
2003
2004 for (txn_key, assigned_versions) in assigned_versions.0.iter() {
2005 let Some(d) = txn_key.as_digest() else {
2006 continue;
2007 };
2008
2009 if !protocol_config.include_cancelled_randomness_txns_in_prologue()
2010 && assigned_versions
2011 .shared_object_versions
2012 .iter()
2013 .any(|((id, _), _)| *id == SUI_RANDOMNESS_STATE_OBJECT_ID)
2014 {
2015 continue;
2016 }
2017
2018 if assigned_versions
2019 .shared_object_versions
2020 .iter()
2021 .any(|(_, version)| version.is_cancelled())
2022 {
2023 assert_reachable!("cancelled transactions");
2024 cancelled_txn_version_assignment
2025 .push((*d, assigned_versions.shared_object_versions.clone()));
2026 }
2027 }
2028
2029 fail_point_arg!(
2030 "additional_cancelled_txns_for_tests",
2031 |additional_cancelled_txns: Vec<(
2032 TransactionDigest,
2033 Vec<(ConsensusObjectSequenceKey, SequenceNumber)>
2034 )>| {
2035 cancelled_txn_version_assignment.extend(additional_cancelled_txns);
2036 }
2037 );
2038
2039 let transaction = commit_info.create_consensus_commit_prologue_transaction(
2040 self.epoch_store.epoch(),
2041 self.epoch_store.protocol_config(),
2042 cancelled_txn_version_assignment,
2043 commit_info,
2044 state.indirect_state_observer.take().unwrap(),
2045 );
2046 Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2047 transaction,
2048 ))
2049 }
2050
2051 fn handle_deferral_and_cancellation(
2052 &self,
2053 state: &mut CommitHandlerState,
2054 cancelled_txns: &mut BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
2055 deferred_txns: &mut BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>,
2056 scheduled_txns: &mut Vec<VerifiedExecutableTransactionWithAliases>,
2057 protocol_config: &ProtocolConfig,
2058 commit_info: &ConsensusCommitInfo,
2059 transaction: VerifiedExecutableTransactionWithAliases,
2060 shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
2061 previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
2062 execution_time_estimator: &ExecutionTimeEstimator,
2063 ) {
2064 if protocol_config.defer_unpaid_amplification() {
2068 let occurrence_count = state
2069 .occurrence_counts
2070 .get(transaction.tx().digest())
2071 .copied()
2072 .unwrap_or(0);
2073
2074 let rgp = self.epoch_store.reference_gas_price();
2075 let gas_price = transaction.tx().transaction_data().gas_price();
2076 let allowed_count = (gas_price / rgp.max(1)) + 1;
2077
2078 if occurrence_count as u64 > allowed_count {
2079 self.metrics
2080 .consensus_handler_unpaid_amplification_deferrals
2081 .inc();
2082
2083 let deferred_from_round = previously_deferred_tx_digests
2084 .get(transaction.tx().digest())
2085 .map(|k| k.deferred_from_round())
2086 .unwrap_or(commit_info.round);
2087
2088 let deferral_key = DeferralKey::new_for_consensus_round(
2089 commit_info.round + 1,
2090 deferred_from_round,
2091 );
2092
2093 if transaction_deferral_within_limit(
2094 &deferral_key,
2095 protocol_config.max_deferral_rounds_for_congestion_control(),
2096 ) {
2097 assert_reachable!("unpaid amplification deferral");
2098 debug!(
2099 "Deferring transaction {:?} due to unpaid amplification (count={}, allowed={})",
2100 transaction.tx().digest(),
2101 occurrence_count,
2102 allowed_count
2103 );
2104 deferred_txns
2105 .entry(deferral_key)
2106 .or_default()
2107 .push(transaction);
2108 return;
2109 }
2110 }
2111 }
2112
2113 let tx_cost = shared_object_congestion_tracker.get_tx_cost(
2114 execution_time_estimator,
2115 transaction.tx(),
2116 state.indirect_state_observer.as_mut().unwrap(),
2117 );
2118
2119 let deferral_info = self.epoch_store.should_defer(
2120 transaction.tx(),
2121 commit_info,
2122 state.dkg_failed,
2123 state.randomness_round.is_some(),
2124 previously_deferred_tx_digests,
2125 shared_object_congestion_tracker,
2126 );
2127
2128 if let Some((deferral_key, deferral_reason)) = deferral_info {
2129 debug!(
2130 "Deferring consensus certificate for transaction {:?} until {:?}",
2131 transaction.tx().digest(),
2132 deferral_key
2133 );
2134
2135 match deferral_reason {
2136 DeferralReason::RandomnessNotReady => {
2137 deferred_txns
2138 .entry(deferral_key)
2139 .or_default()
2140 .push(transaction);
2141 }
2142 DeferralReason::SharedObjectCongestion(congested_objects) => {
2143 self.metrics.consensus_handler_congested_transactions.inc();
2144 if transaction_deferral_within_limit(
2145 &deferral_key,
2146 protocol_config.max_deferral_rounds_for_congestion_control(),
2147 ) {
2148 deferred_txns
2149 .entry(deferral_key)
2150 .or_default()
2151 .push(transaction);
2152 } else {
2153 assert_sometimes!(
2154 transaction.tx().data().transaction_data().uses_randomness(),
2155 "cancelled randomness-using transaction"
2156 );
2157 assert_sometimes!(
2158 !transaction.tx().data().transaction_data().uses_randomness(),
2159 "cancelled non-randomness-using transaction"
2160 );
2161
2162 debug!(
2164 "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}",
2165 transaction.tx().digest(),
2166 deferral_key,
2167 congested_objects
2168 );
2169 cancelled_txns.insert(
2170 *transaction.tx().digest(),
2171 CancelConsensusCertificateReason::CongestionOnObjects(
2172 congested_objects,
2173 ),
2174 );
2175 scheduled_txns.push(transaction);
2176 }
2177 }
2178 }
2179 } else {
2180 shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, transaction.tx());
2182 scheduled_txns.push(transaction);
2183 }
2184 }
2185
2186 fn merge_and_reorder_transactions(
2187 &self,
2188 state: &mut CommitHandlerState,
2189 commit_info: &ConsensusCommitInfo,
2190 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
2191 ) -> (
2192 Vec<VerifiedExecutableTransactionWithAliases>,
2193 Vec<VerifiedExecutableTransactionWithAliases>,
2194 HashMap<TransactionDigest, DeferralKey>,
2195 ) {
2196 let protocol_config = self.epoch_store.protocol_config();
2197
2198 let (mut txns, mut randomness_txns, previously_deferred_tx_digests) =
2199 self.load_deferred_transactions(state, commit_info);
2200
2201 txns.reserve(user_transactions.len());
2202 randomness_txns.reserve(user_transactions.len());
2203
2204 let mut txns: Vec<_> = txns
2207 .into_iter()
2208 .filter_map(|tx| {
2209 if tx.tx().transaction_data().uses_randomness() {
2210 randomness_txns.push(tx);
2211 None
2212 } else {
2213 Some(tx)
2214 }
2215 })
2216 .collect();
2217
2218 for txn in user_transactions {
2219 if txn.tx().transaction_data().uses_randomness() {
2220 randomness_txns.push(txn);
2221 } else {
2222 txns.push(txn);
2223 }
2224 }
2225
2226 PostConsensusTxReorder::reorder(
2227 &mut txns,
2228 protocol_config.consensus_transaction_ordering(),
2229 );
2230 PostConsensusTxReorder::reorder(
2231 &mut randomness_txns,
2232 protocol_config.consensus_transaction_ordering(),
2233 );
2234
2235 (txns, randomness_txns, previously_deferred_tx_digests)
2236 }
2237
2238 fn load_deferred_transactions(
2239 &self,
2240 state: &mut CommitHandlerState,
2241 commit_info: &ConsensusCommitInfo,
2242 ) -> (
2243 Vec<VerifiedExecutableTransactionWithAliases>,
2244 Vec<VerifiedExecutableTransactionWithAliases>,
2245 HashMap<TransactionDigest, DeferralKey>,
2246 ) {
2247 let mut previously_deferred_tx_digests = HashMap::new();
2248
2249 let deferred_txs: Vec<_> = self
2250 .epoch_store
2251 .load_deferred_transactions_for_up_to_consensus_round_v2(
2252 &mut state.output,
2253 commit_info.round,
2254 )
2255 .expect("db error")
2256 .into_iter()
2257 .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2258 .map(|(key, tx)| {
2259 previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2260 tx
2261 })
2262 .collect();
2263 trace!(
2264 "loading deferred transactions: {:?}",
2265 deferred_txs.iter().map(|tx| tx.tx().digest())
2266 );
2267
2268 let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() {
2269 let txns: Vec<_> = self
2270 .epoch_store
2271 .load_deferred_transactions_for_randomness_v2(&mut state.output)
2272 .expect("db error")
2273 .into_iter()
2274 .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2275 .map(|(key, tx)| {
2276 previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2277 tx
2278 })
2279 .collect();
2280 trace!(
2281 "loading deferred randomness transactions: {:?}",
2282 txns.iter().map(|tx| tx.tx().digest())
2283 );
2284 txns
2285 } else {
2286 vec![]
2287 };
2288
2289 (
2290 deferred_txs,
2291 deferred_randomness_txs,
2292 previously_deferred_tx_digests,
2293 )
2294 }
2295
2296 fn init_congestion_tracker(
2297 &self,
2298 commit_info: &ConsensusCommitInfo,
2299 for_randomness: bool,
2300 txns: &[VerifiedExecutableTransactionWithAliases],
2301 ) -> SharedObjectCongestionTracker {
2302 #[allow(unused_mut)]
2303 let mut ret = SharedObjectCongestionTracker::from_protocol_config(
2304 self.epoch_store
2305 .consensus_quarantine
2306 .read()
2307 .load_initial_object_debts(
2308 &self.epoch_store,
2309 commit_info.round,
2310 for_randomness,
2311 txns,
2312 )
2313 .expect("db error"),
2314 self.epoch_store.protocol_config(),
2315 for_randomness,
2316 self.congestion_logger.is_some(),
2317 );
2318
2319 fail_point_arg!(
2320 "initial_congestion_tracker",
2321 |tracker: SharedObjectCongestionTracker| {
2322 info!(
2323 "Initialize shared_object_congestion_tracker to {:?}",
2324 tracker
2325 );
2326 ret = tracker;
2327 }
2328 );
2329
2330 ret
2331 }
2332
2333 fn process_gasless_transactions(
2334 &self,
2335 commit_info: &ConsensusCommitInfo,
2336 user_transactions: &[VerifiedExecutableTransactionWithAliases],
2337 ) {
2338 let gasless_count = user_transactions
2339 .iter()
2340 .filter(|txn| txn.tx().transaction_data().is_gasless_transaction())
2341 .count() as u64;
2342 self.consensus_gasless_counter
2343 .record_commit(commit_info.timestamp, gasless_count);
2344 }
2345
2346 fn process_jwks(
2347 &self,
2348 state: &mut CommitHandlerState,
2349 commit_info: &ConsensusCommitInfo,
2350 new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
2351 ) {
2352 for (authority_name, jwk_id, jwk) in new_jwks {
2353 self.epoch_store.record_jwk_vote(
2354 &mut state.output,
2355 commit_info.round,
2356 authority_name,
2357 &jwk_id,
2358 &jwk,
2359 );
2360 }
2361 }
2362
2363 fn process_capability_notifications(
2364 &self,
2365 capability_notifications: Vec<AuthorityCapabilitiesV2>,
2366 ) {
2367 for capabilities in capability_notifications {
2368 self.epoch_store
2369 .record_capabilities_v2(&capabilities)
2370 .expect("db error");
2371 }
2372 }
2373
2374 fn process_execution_time_observations(
2375 &self,
2376 state: &mut CommitHandlerState,
2377 execution_time_observations: Vec<ExecutionTimeObservation>,
2378 ) {
2379 let mut execution_time_estimator = self
2380 .epoch_store
2381 .execution_time_estimator
2382 .try_lock()
2383 .expect("should only ever be called from the commit handler thread");
2384
2385 for ExecutionTimeObservation {
2386 authority,
2387 generation,
2388 estimates,
2389 } in execution_time_observations
2390 {
2391 let authority_index = self
2392 .epoch_store
2393 .committee()
2394 .authority_index(&authority)
2395 .unwrap();
2396 execution_time_estimator.process_observations_from_consensus(
2397 authority_index,
2398 Some(generation),
2399 &estimates,
2400 );
2401 state
2402 .output
2403 .insert_execution_time_observation(authority_index, generation, estimates);
2404 }
2405 }
2406
2407 fn process_checkpoint_signature_messages(
2408 &self,
2409 checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
2410 ) {
2411 for checkpoint_signature_message in checkpoint_signature_messages {
2412 self.checkpoint_service
2413 .notify_checkpoint_signature(&checkpoint_signature_message)
2414 .expect("db error");
2415 }
2416 }
2417
2418 async fn process_dkg_updates(
2419 &self,
2420 state: &mut CommitHandlerState,
2421 commit_info: &ConsensusCommitInfo,
2422 randomness_manager: Option<&mut RandomnessManager>,
2423 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2424 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2425 ) {
2426 if !self.epoch_store.randomness_state_enabled() {
2427 let num_dkg_messages = randomness_dkg_messages.len();
2428 let num_dkg_confirmations = randomness_dkg_confirmations.len();
2429 if num_dkg_messages + num_dkg_confirmations > 0 {
2430 debug_fatal!(
2431 "received {} RandomnessDkgMessage and {} RandomnessDkgConfirmation messages when randomness is not enabled",
2432 num_dkg_messages,
2433 num_dkg_confirmations
2434 );
2435 }
2436 return;
2437 }
2438
2439 let randomness_manager =
2440 randomness_manager.expect("randomness manager should exist if randomness is enabled");
2441
2442 let randomness_dkg_updates =
2443 self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages);
2444
2445 let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations(
2446 state,
2447 randomness_manager,
2448 randomness_dkg_confirmations,
2449 );
2450
2451 if randomness_dkg_updates || randomness_dkg_confirmation_updates {
2452 randomness_manager
2453 .advance_dkg(&mut state.output, commit_info.round)
2454 .await
2455 .expect("epoch ended");
2456 }
2457 }
2458
2459 fn process_randomness_dkg_messages(
2460 &self,
2461 randomness_manager: &mut RandomnessManager,
2462 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2463 ) -> bool {
2464 if randomness_dkg_messages.is_empty() {
2465 return false;
2466 }
2467
2468 let mut randomness_state_updated = false;
2469 for (authority, bytes) in randomness_dkg_messages {
2470 match bcs::from_bytes(&bytes) {
2471 Ok(message) => {
2472 randomness_manager
2473 .add_message(&authority, message)
2474 .expect("epoch ended");
2476 randomness_state_updated = true;
2477 }
2478
2479 Err(e) => {
2480 warn!(
2481 "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
2482 authority.concise(),
2483 );
2484 }
2485 }
2486 }
2487
2488 randomness_state_updated
2489 }
2490
2491 fn process_randomness_dkg_confirmations(
2492 &self,
2493 state: &mut CommitHandlerState,
2494 randomness_manager: &mut RandomnessManager,
2495 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2496 ) -> bool {
2497 if randomness_dkg_confirmations.is_empty() {
2498 return false;
2499 }
2500
2501 let mut randomness_state_updated = false;
2502 for (authority, bytes) in randomness_dkg_confirmations {
2503 match bcs::from_bytes(&bytes) {
2504 Ok(message) => {
2505 randomness_manager
2506 .add_confirmation(&mut state.output, &authority, message)
2507 .expect("epoch ended");
2509 randomness_state_updated = true;
2510 }
2511 Err(e) => {
2512 warn!(
2513 "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}",
2514 authority.concise(),
2515 );
2516 }
2517 }
2518 }
2519
2520 randomness_state_updated
2521 }
2522
2523 fn process_end_of_publish_transactions(
2525 &self,
2526 state: &mut CommitHandlerState,
2527 end_of_publish_transactions: Vec<AuthorityName>,
2528 ) -> bool {
2529 let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect(
2530 "No contention on end_of_publish as it is only accessed from consensus handler",
2531 );
2532
2533 if eop_aggregator.has_quorum() {
2534 return true;
2535 }
2536
2537 if end_of_publish_transactions.is_empty() {
2538 return false;
2539 }
2540
2541 for authority in end_of_publish_transactions {
2542 info!("Received EndOfPublish from {:?}", authority.concise());
2543
2544 state.output.insert_end_of_publish(authority);
2547 if eop_aggregator
2548 .insert_generic(authority, ())
2549 .is_quorum_reached()
2550 {
2551 debug!(
2552 "Collected enough end_of_publish messages with last message from validator {:?}",
2553 authority.concise(),
2554 );
2555 return true;
2556 }
2557 }
2558
2559 false
2560 }
2561
2562 fn advance_eop_state_machine(
2565 &self,
2566 state: &mut CommitHandlerState,
2567 ) -> (
2568 RwLockWriteGuard<'_, ReconfigState>,
2569 bool, ) {
2571 let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard();
2572 let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx();
2573
2574 reconfig_state.close_all_certs();
2575
2576 let commit_has_deferred_txns = state.output.has_deferred_transactions();
2577 let previous_commits_have_deferred_txns = !self.epoch_store.deferred_transactions_empty();
2578
2579 if !commit_has_deferred_txns && !previous_commits_have_deferred_txns {
2580 if !start_state_is_reject_all_tx {
2581 info!("Transitioning to RejectAllTx");
2582 }
2583 reconfig_state.close_all_tx();
2584 } else {
2585 debug!(
2586 "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}",
2587 previous_commits_have_deferred_txns, commit_has_deferred_txns,
2588 );
2589 }
2590
2591 state.output.store_reconfig_state(reconfig_state.clone());
2592
2593 if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() {
2594 (reconfig_state, true)
2595 } else {
2596 (reconfig_state, false)
2597 }
2598 }
2599
2600 fn gather_commit_metadata(
2601 &self,
2602 consensus_commit: &impl ConsensusCommitAPI,
2603 ) -> (u64, AuthorityIndex, u64) {
2604 let timestamp = consensus_commit.commit_timestamp_ms();
2605 let leader_author = consensus_commit.leader_author_index();
2606 let commit_sub_dag_index = consensus_commit.commit_sub_dag_index();
2607
2608 let system_time_ms = SystemTime::now()
2609 .duration_since(UNIX_EPOCH)
2610 .unwrap()
2611 .as_millis() as i64;
2612
2613 let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64);
2614 let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0;
2615 self.metrics
2616 .consensus_timestamp_bias
2617 .observe(consensus_timestamp_bias_seconds);
2618
2619 let epoch_start = self
2620 .epoch_store
2621 .epoch_start_config()
2622 .epoch_start_timestamp_ms();
2623 let timestamp = if timestamp < epoch_start {
2624 error!(
2625 "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}"
2626 );
2627 epoch_start
2628 } else {
2629 timestamp
2630 };
2631
2632 (timestamp, leader_author, commit_sub_dag_index)
2633 }
2634
2635 fn create_authenticator_state_update(
2636 &self,
2637 last_committed_round: u64,
2638 commit_info: &ConsensusCommitInfo,
2639 ) -> Option<VerifiedExecutableTransactionWithAliases> {
2640 let new_jwks = self
2648 .epoch_store
2649 .get_new_jwks(last_committed_round)
2650 .expect("Unrecoverable error in consensus handler");
2651
2652 if !new_jwks.is_empty() {
2653 let authenticator_state_update_transaction = authenticator_state_update_transaction(
2654 &self.epoch_store,
2655 commit_info.round,
2656 new_jwks,
2657 );
2658 debug!(
2659 "adding AuthenticatorStateUpdate({:?}) tx: {:?}",
2660 authenticator_state_update_transaction.digest(),
2661 authenticator_state_update_transaction,
2662 );
2663
2664 Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2665 authenticator_state_update_transaction,
2666 ))
2667 } else {
2668 None
2669 }
2670 }
2671
2672 #[instrument(level = "trace", skip_all)]
2675 fn filter_consensus_txns(
2676 &mut self,
2677 initial_reconfig_state: ReconfigState,
2678 commit_info: &ConsensusCommitInfo,
2679 consensus_commit: &impl ConsensusCommitAPI,
2680 ) -> FilteredConsensusOutput {
2681 let mut transactions = Vec::new();
2682 let mut owned_object_locks = HashMap::new();
2683 let epoch = self.epoch_store.epoch();
2684 let mut num_finalized_user_transactions = vec![0; self.committee.size()];
2685 let mut num_rejected_user_transactions = vec![0; self.committee.size()];
2686 for (block, parsed_transactions) in consensus_commit.transactions() {
2687 let author = block.author.value();
2688 self.last_consensus_stats.stats.inc_num_messages(author);
2690
2691 self.epoch_store.set_consensus_tx_status(
2693 ConsensusPosition::ping(epoch, block),
2694 ConsensusTxStatus::Finalized,
2695 );
2696
2697 for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() {
2698 let position = ConsensusPosition {
2699 epoch,
2700 block,
2701 index: tx_index as TransactionIndex,
2702 };
2703
2704 if let Some(tx) = parsed.transaction.kind.as_user_transaction() {
2707 let digest = tx.digest();
2708 if let Some((spam_weight, submitter_client_addrs)) = self
2709 .epoch_store
2710 .submitted_transaction_cache
2711 .increment_submission_count(digest)
2712 {
2713 if let Some(ref traffic_controller) = self.traffic_controller {
2714 debug!(
2715 "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses",
2716 submitter_client_addrs.len()
2717 );
2718
2719 for addr in submitter_client_addrs {
2721 traffic_controller.tally(TrafficTally::new(
2722 Some(addr),
2723 None,
2724 None,
2725 spam_weight.clone(),
2726 ));
2727 }
2728 } else {
2729 warn!(
2730 "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)",
2731 submitter_client_addrs.len()
2732 );
2733 }
2734 }
2735 }
2736
2737 if parsed.rejected {
2738 if parsed.transaction.is_user_transaction() {
2740 self.epoch_store
2741 .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
2742 num_rejected_user_transactions[author] += 1;
2743 }
2744 continue;
2746 }
2747
2748 let kind = classify(&parsed.transaction);
2749 self.metrics
2750 .consensus_handler_processed
2751 .with_label_values(&[kind])
2752 .inc();
2753 self.metrics
2754 .consensus_handler_transaction_sizes
2755 .with_label_values(&[kind])
2756 .observe(parsed.serialized_len as f64);
2757 if parsed.transaction.is_user_transaction() {
2758 self.last_consensus_stats
2759 .stats
2760 .inc_num_user_transactions(author);
2761 }
2762
2763 if !initial_reconfig_state.should_accept_consensus_certs() {
2764 match &parsed.transaction.kind {
2767 ConsensusTransactionKind::UserTransactionV2(_)
2768 | ConsensusTransactionKind::UserTransaction(_)
2770 | ConsensusTransactionKind::CertifiedTransaction(_)
2771 | ConsensusTransactionKind::CapabilityNotification(_)
2772 | ConsensusTransactionKind::CapabilityNotificationV2(_)
2773 | ConsensusTransactionKind::EndOfPublish(_)
2774 | ConsensusTransactionKind::ExecutionTimeObservation(_)
2776 | ConsensusTransactionKind::NewJWKFetched(_, _, _) => {
2777 debug!(
2778 "Ignoring consensus transaction {:?} because of end of epoch",
2779 parsed.transaction.key()
2780 );
2781 continue;
2782 }
2783
2784 ConsensusTransactionKind::CheckpointSignature(_)
2786 | ConsensusTransactionKind::CheckpointSignatureV2(_)
2787 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2788 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
2789 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => ()
2790 }
2791 }
2792
2793 if !initial_reconfig_state.should_accept_tx() {
2794 match &parsed.transaction.kind {
2795 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
2796 | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue,
2797 _ => {}
2798 }
2799 }
2800
2801 match &parsed.transaction.kind {
2803 ConsensusTransactionKind::CapabilityNotification(_)
2804 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2805 | ConsensusTransactionKind::CheckpointSignature(_) => {
2806 debug_fatal!(
2807 "BUG: saw deprecated tx {:?}for commit round {}",
2808 parsed.transaction.key(),
2809 commit_info.round
2810 );
2811 continue;
2812 }
2813 _ => {}
2814 }
2815
2816 if parsed.transaction.is_user_transaction() {
2817 let author_name = self
2818 .epoch_store
2819 .committee()
2820 .authority_by_index(author as u32)
2821 .unwrap();
2822 if self
2823 .epoch_store
2824 .has_received_end_of_publish_from(author_name)
2825 {
2826 warn!(
2830 "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus",
2831 author_name.concise(),
2832 parsed.transaction.key(),
2833 );
2834 continue;
2835 }
2836 }
2837
2838 if let ConsensusTransactionKind::UserTransactionV2(tx_with_claims) =
2844 &parsed.transaction.kind
2845 {
2846 let immutable_object_ids: HashSet<ObjectID> =
2847 tx_with_claims.get_immutable_objects().into_iter().collect();
2848 let tx = tx_with_claims.tx();
2849
2850 let Ok(input_objects) = tx.transaction_data().input_objects() else {
2851 debug_fatal!("Invalid input objects for transaction {}", tx.digest());
2852 continue;
2853 };
2854
2855 let owned_object_refs: Vec<_> = input_objects
2858 .iter()
2859 .filter_map(|obj| match obj {
2860 InputObjectKind::ImmOrOwnedMoveObject(obj_ref)
2861 if !immutable_object_ids.contains(&obj_ref.0) =>
2862 {
2863 Some(*obj_ref)
2864 }
2865 _ => None,
2866 })
2867 .collect();
2868
2869 match self
2870 .epoch_store
2871 .try_acquire_owned_object_locks_post_consensus(
2872 &owned_object_refs,
2873 *tx.digest(),
2874 &owned_object_locks,
2875 ) {
2876 Ok(new_locks) => {
2877 owned_object_locks.extend(new_locks.into_iter());
2878 self.epoch_store
2880 .set_consensus_tx_status(position, ConsensusTxStatus::Finalized);
2881 num_finalized_user_transactions[author] += 1;
2882 }
2883 Err(e) => {
2884 debug!("Dropping transaction {}: {}", tx.digest(), e);
2885 self.epoch_store
2886 .set_consensus_tx_status(position, ConsensusTxStatus::Dropped);
2887 self.epoch_store.set_rejection_vote_reason(position, &e);
2888 continue;
2889 }
2890 }
2891 }
2892
2893 let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);
2894 transactions.push((transaction, author as u32));
2895 }
2896 }
2897
2898 for (i, authority) in self.committee.authorities() {
2899 let hostname = &authority.hostname;
2900 self.metrics
2901 .consensus_committed_messages
2902 .with_label_values(&[hostname])
2903 .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
2904 self.metrics
2905 .consensus_committed_user_transactions
2906 .with_label_values(&[hostname])
2907 .set(
2908 self.last_consensus_stats
2909 .stats
2910 .get_num_user_transactions(i.value()) as i64,
2911 );
2912 self.metrics
2913 .consensus_finalized_user_transactions
2914 .with_label_values(&[hostname])
2915 .add(num_finalized_user_transactions[i.value()] as i64);
2916 self.metrics
2917 .consensus_rejected_user_transactions
2918 .with_label_values(&[hostname])
2919 .add(num_rejected_user_transactions[i.value()] as i64);
2920 }
2921
2922 FilteredConsensusOutput {
2923 transactions,
2924 owned_object_locks,
2925 }
2926 }
2927
2928 fn deduplicate_consensus_txns(
2929 &mut self,
2930 state: &mut CommitHandlerState,
2931 commit_info: &ConsensusCommitInfo,
2932 transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
2933 ) -> Vec<VerifiedSequencedConsensusTransaction> {
2934 let mut all_transactions = Vec::new();
2935
2936 let mut occurrence_counts: HashMap<SequencedConsensusTransactionKey, u32> = HashMap::new();
2939 let mut first_commit_keys: HashSet<SequencedConsensusTransactionKey> = HashSet::new();
2941
2942 for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
2943 let current_tx_index = ExecutionIndices {
2947 last_committed_round: commit_info.round,
2948 sub_dag_index: commit_info.consensus_commit_ref.index.into(),
2949 transaction_index: (seq + 1) as u64,
2950 };
2951
2952 self.last_consensus_stats.index = current_tx_index;
2953
2954 let certificate_author = *self
2955 .epoch_store
2956 .committee()
2957 .authority_by_index(cert_origin)
2958 .unwrap();
2959
2960 let sequenced_transaction = SequencedConsensusTransaction {
2961 certificate_author_index: cert_origin,
2962 certificate_author,
2963 consensus_index: current_tx_index,
2964 transaction,
2965 };
2966
2967 let Some(verified_transaction) = self
2968 .epoch_store
2969 .verify_consensus_transaction(sequenced_transaction)
2970 else {
2971 continue;
2972 };
2973
2974 let key = verified_transaction.0.key();
2975
2976 if let Some(tx_digest) = key.user_transaction_digest() {
2977 self.epoch_store
2978 .cache_recently_finalized_transaction(tx_digest);
2979 }
2980
2981 let count = occurrence_counts.entry(key.clone()).or_insert(0);
2984 *count += 1;
2985 let in_commit = *count > 1;
2986
2987 let in_cache = self.processed_cache.put(key.clone(), ()).is_some();
2988 if in_commit || in_cache {
2989 self.metrics.skipped_consensus_txns_cache_hit.inc();
2990 continue;
2991 }
2992 if self
2993 .epoch_store
2994 .is_consensus_message_processed(&key)
2995 .expect("db error")
2996 {
2997 self.metrics.skipped_consensus_txns.inc();
2998 continue;
2999 }
3000
3001 first_commit_keys.insert(key.clone());
3002
3003 state.output.record_consensus_message_processed(key);
3004
3005 all_transactions.push(verified_transaction);
3006 }
3007
3008 for key in first_commit_keys {
3009 if let Some(&count) = occurrence_counts.get(&key)
3010 && count > 1
3011 {
3012 self.metrics
3013 .consensus_handler_duplicate_tx_count
3014 .observe(count as f64);
3015 }
3016 }
3017
3018 assert!(
3020 state.occurrence_counts.is_empty(),
3021 "occurrence_counts should be empty before populating"
3022 );
3023 state.occurrence_counts.reserve(occurrence_counts.len());
3024 state.occurrence_counts.extend(
3025 occurrence_counts
3026 .into_iter()
3027 .filter_map(|(key, count)| key.user_transaction_digest().map(|d| (d, count))),
3028 );
3029
3030 all_transactions
3031 }
3032
3033 fn build_commit_handler_input(
3034 &self,
3035 transactions: Vec<VerifiedSequencedConsensusTransaction>,
3036 ) -> CommitHandlerInput {
3037 let epoch = self.epoch_store.epoch();
3038 let mut commit_handler_input = CommitHandlerInput::default();
3039
3040 for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() {
3041 match transaction.transaction {
3042 SequencedConsensusTransactionKind::External(consensus_transaction) => {
3043 match consensus_transaction.kind {
3044 ConsensusTransactionKind::UserTransactionV2(tx) => {
3046 let used_alias_versions = if self
3048 .epoch_store
3049 .protocol_config()
3050 .fix_checkpoint_signature_mapping()
3051 {
3052 tx.aliases()
3053 } else {
3054 tx.aliases_v1().map(|a| {
3058 NonEmpty::from_vec(
3059 a.into_iter()
3060 .enumerate()
3061 .map(|(idx, (_, seq))| (idx as u8, seq))
3062 .collect(),
3063 )
3064 .unwrap()
3065 })
3066 };
3067 let inner_tx = tx.into_tx();
3068 let tx = VerifiedTransaction::new_unchecked(inner_tx);
3070 let transaction =
3072 VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
3073 if let Some(used_alias_versions) = used_alias_versions {
3074 commit_handler_input
3075 .user_transactions
3076 .push(WithAliases::new(transaction, used_alias_versions));
3077 } else {
3078 commit_handler_input.user_transactions.push(
3079 VerifiedExecutableTransactionWithAliases::no_aliases(
3080 transaction,
3081 ),
3082 );
3083 }
3084 }
3085
3086 ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => {
3088 commit_handler_input
3089 .end_of_publish_transactions
3090 .push(authority_public_key_bytes);
3091 }
3092 ConsensusTransactionKind::NewJWKFetched(
3093 authority_public_key_bytes,
3094 jwk_id,
3095 jwk,
3096 ) => {
3097 commit_handler_input.new_jwks.push((
3098 authority_public_key_bytes,
3099 jwk_id,
3100 jwk,
3101 ));
3102 }
3103 ConsensusTransactionKind::RandomnessDkgMessage(
3104 authority_public_key_bytes,
3105 items,
3106 ) => {
3107 commit_handler_input
3108 .randomness_dkg_messages
3109 .push((authority_public_key_bytes, items));
3110 }
3111 ConsensusTransactionKind::RandomnessDkgConfirmation(
3112 authority_public_key_bytes,
3113 items,
3114 ) => {
3115 commit_handler_input
3116 .randomness_dkg_confirmations
3117 .push((authority_public_key_bytes, items));
3118 }
3119 ConsensusTransactionKind::CapabilityNotificationV2(
3120 authority_capabilities_v2,
3121 ) => {
3122 commit_handler_input
3123 .capability_notifications
3124 .push(authority_capabilities_v2);
3125 }
3126 ConsensusTransactionKind::ExecutionTimeObservation(
3127 execution_time_observation,
3128 ) => {
3129 commit_handler_input
3130 .execution_time_observations
3131 .push(execution_time_observation);
3132 }
3133 ConsensusTransactionKind::CheckpointSignatureV2(
3134 checkpoint_signature_message,
3135 ) => {
3136 commit_handler_input
3137 .checkpoint_signature_messages
3138 .push(*checkpoint_signature_message);
3139 }
3140
3141 ConsensusTransactionKind::CheckpointSignature(_)
3144 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
3145 | ConsensusTransactionKind::CapabilityNotification(_)
3146 | ConsensusTransactionKind::CertifiedTransaction(_)
3147 | ConsensusTransactionKind::UserTransaction(_) => {
3148 unreachable!("filtered earlier")
3149 }
3150 }
3151 }
3152 SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(),
3154 }
3155 }
3156
3157 commit_handler_input
3158 }
3159
3160 async fn send_end_of_publish_if_needed(&self) {
3161 if self
3162 .epoch_store
3163 .protocol_config()
3164 .timestamp_based_epoch_close()
3165 {
3166 return;
3167 }
3168 if !self.epoch_store.should_send_end_of_publish() {
3169 return;
3170 }
3171
3172 let end_of_publish = ConsensusTransaction::new_end_of_publish(self.epoch_store.name);
3173 if let Err(err) =
3174 self.consensus_adapter
3175 .submit(end_of_publish, None, &self.epoch_store, None, None)
3176 {
3177 warn!(
3178 "Error when sending EndOfPublish message from ConsensusHandler: {:?}",
3179 err
3180 );
3181 } else {
3182 info!(epoch=?self.epoch_store.epoch(), "Sending EndOfPublish message to consensus");
3183 }
3184 }
3185}
3186
3187pub(crate) type SchedulerMessage = (
3190 Vec<(Schedulable, AssignedVersions)>,
3191 Option<SettlementBatchInfo>,
3192);
3193
3194#[derive(Clone)]
3195pub(crate) struct ExecutionSchedulerSender {
3196 sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3197}
3198
3199impl ExecutionSchedulerSender {
3200 fn start(
3201 settlement_scheduler: SettlementScheduler,
3202 epoch_store: Arc<AuthorityPerEpochStore>,
3203 ) -> Self {
3204 let (sender, recv) = monitored_mpsc::unbounded_channel("execution_scheduler_sender");
3205 spawn_monitored_task!(Self::run(recv, settlement_scheduler, epoch_store));
3206 Self { sender }
3207 }
3208
3209 pub(crate) fn new_for_testing(
3210 sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3211 ) -> Self {
3212 Self { sender }
3213 }
3214
3215 fn send(
3216 &self,
3217 transactions: Vec<(Schedulable, AssignedVersions)>,
3218 settlement: Option<SettlementBatchInfo>,
3219 ) {
3220 let _ = self.sender.send((transactions, settlement));
3221 }
3222
3223 async fn run(
3224 mut recv: monitored_mpsc::UnboundedReceiver<SchedulerMessage>,
3225 settlement_scheduler: SettlementScheduler,
3226 epoch_store: Arc<AuthorityPerEpochStore>,
3227 ) {
3228 while let Some((transactions, settlement)) = recv.recv().await {
3229 let _guard = monitored_scope("ConsensusHandler::enqueue");
3230 let txns = transactions
3231 .into_iter()
3232 .map(|(txn, versions)| (txn, ExecutionEnv::new().with_assigned_versions(versions)))
3233 .collect();
3234 if let Some(settlement) = settlement {
3235 settlement_scheduler.enqueue_v2(txns, settlement, &epoch_store);
3236 } else {
3237 settlement_scheduler.enqueue(txns, &epoch_store);
3238 }
3239 }
3240 }
3241}
3242
3243pub(crate) struct MysticetiConsensusHandler {
3245 tasks: JoinSet<()>,
3246}
3247
3248impl MysticetiConsensusHandler {
3249 pub(crate) fn new(
3250 last_processed_commit_at_startup: CommitIndex,
3251 mut consensus_handler: ConsensusHandler<CheckpointService>,
3252 mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
3253 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
3254 process_consensus_commits: bool,
3255 ) -> Self {
3256 debug!(
3257 last_processed_commit_at_startup,
3258 process_consensus_commits, "Starting consensus replay"
3259 );
3260 let mut tasks = JoinSet::new();
3261 tasks.spawn(monitored_future!(async move {
3262 while let Some(consensus_commit) = commit_receiver.recv().await {
3264 let commit_index = consensus_commit.commit_ref.index;
3265 if !process_consensus_commits {
3266 debug!(
3267 commit_index,
3268 "Observer skipping consensus commit processing"
3269 );
3270 } else if commit_index <= last_processed_commit_at_startup {
3271 consensus_handler.handle_prior_consensus_commit(consensus_commit);
3272 } else {
3273 consensus_handler
3274 .handle_consensus_commit(consensus_commit)
3275 .await;
3276 }
3277 commit_consumer_monitor.set_highest_handled_commit(commit_index);
3278 }
3279 }));
3280 Self { tasks }
3281 }
3282
3283 pub(crate) async fn abort(&mut self) {
3284 self.tasks.shutdown().await;
3285 }
3286}
3287
3288fn authenticator_state_update_transaction(
3289 epoch_store: &AuthorityPerEpochStore,
3290 round: u64,
3291 mut new_active_jwks: Vec<ActiveJwk>,
3292) -> VerifiedExecutableTransaction {
3293 let epoch = epoch_store.epoch();
3294 new_active_jwks.sort();
3295
3296 info!("creating authenticator state update transaction");
3297 assert!(epoch_store.authenticator_state_enabled());
3298 let transaction = VerifiedTransaction::new_authenticator_state_update(
3299 epoch,
3300 round,
3301 new_active_jwks,
3302 epoch_store
3303 .epoch_start_config()
3304 .authenticator_obj_initial_shared_version()
3305 .expect("authenticator state obj must exist"),
3306 );
3307 VerifiedExecutableTransaction::new_system(transaction, epoch)
3308}
3309
3310pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
3311 match &transaction.kind {
3312 ConsensusTransactionKind::CertifiedTransaction(_) => "_deprecated_certificate",
3314 ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
3315 ConsensusTransactionKind::CheckpointSignatureV2(_) => "checkpoint_signature",
3316 ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
3317 ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
3318 ConsensusTransactionKind::CapabilityNotificationV2(_) => "capability_notification_v2",
3319 ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
3320 ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
3321 ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
3322 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
3323 ConsensusTransactionKind::UserTransaction(_) => "_deprecated_user_transaction",
3324 ConsensusTransactionKind::UserTransactionV2(tx) => {
3325 if tx.tx().is_consensus_tx() {
3326 "shared_user_transaction_v2"
3327 } else {
3328 "owned_user_transaction_v2"
3329 }
3330 }
3331 ConsensusTransactionKind::ExecutionTimeObservation(_) => "execution_time_observation",
3332 }
3333}
3334
3335#[derive(Debug, Clone, Serialize, Deserialize)]
3336pub struct SequencedConsensusTransaction {
3337 pub certificate_author_index: AuthorityIndex,
3338 pub certificate_author: AuthorityName,
3339 pub consensus_index: ExecutionIndices,
3340 pub transaction: SequencedConsensusTransactionKind,
3341}
3342
3343#[derive(Debug, Clone)]
3344#[allow(clippy::large_enum_variant)]
3345pub enum SequencedConsensusTransactionKind {
3346 External(ConsensusTransaction),
3347 System(VerifiedExecutableTransaction),
3348}
3349
3350impl Serialize for SequencedConsensusTransactionKind {
3351 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
3352 let serializable = SerializableSequencedConsensusTransactionKind::from(self);
3353 serializable.serialize(serializer)
3354 }
3355}
3356
3357impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
3358 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
3359 let serializable =
3360 SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
3361 Ok(serializable.into())
3362 }
3363}
3364
3365#[derive(Debug, Clone, Serialize, Deserialize)]
3369#[allow(clippy::large_enum_variant)]
3370enum SerializableSequencedConsensusTransactionKind {
3371 External(ConsensusTransaction),
3372 System(TrustedExecutableTransaction),
3373}
3374
3375impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
3376 fn from(kind: &SequencedConsensusTransactionKind) -> Self {
3377 match kind {
3378 SequencedConsensusTransactionKind::External(ext) => {
3379 SerializableSequencedConsensusTransactionKind::External(ext.clone())
3380 }
3381 SequencedConsensusTransactionKind::System(txn) => {
3382 SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
3383 }
3384 }
3385 }
3386}
3387
3388impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
3389 fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
3390 match kind {
3391 SerializableSequencedConsensusTransactionKind::External(ext) => {
3392 SequencedConsensusTransactionKind::External(ext)
3393 }
3394 SerializableSequencedConsensusTransactionKind::System(txn) => {
3395 SequencedConsensusTransactionKind::System(txn.into())
3396 }
3397 }
3398 }
3399}
3400
3401#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
3402pub enum SequencedConsensusTransactionKey {
3403 External(ConsensusTransactionKey),
3404 System(TransactionDigest),
3405}
3406
3407impl SequencedConsensusTransactionKey {
3408 pub fn user_transaction_digest(&self) -> Option<TransactionDigest> {
3409 match self {
3410 SequencedConsensusTransactionKey::External(key) => match key {
3411 ConsensusTransactionKey::Certificate(digest) => Some(*digest),
3412 _ => None,
3413 },
3414 SequencedConsensusTransactionKey::System(_) => None,
3415 }
3416 }
3417}
3418
3419impl SequencedConsensusTransactionKind {
3420 pub fn key(&self) -> SequencedConsensusTransactionKey {
3421 match self {
3422 SequencedConsensusTransactionKind::External(ext) => {
3423 SequencedConsensusTransactionKey::External(ext.key())
3424 }
3425 SequencedConsensusTransactionKind::System(txn) => {
3426 SequencedConsensusTransactionKey::System(*txn.digest())
3427 }
3428 }
3429 }
3430
3431 pub fn get_tracking_id(&self) -> u64 {
3432 match self {
3433 SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
3434 SequencedConsensusTransactionKind::System(_txn) => 0,
3435 }
3436 }
3437
3438 pub fn is_executable_transaction(&self) -> bool {
3439 match self {
3440 SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(),
3441 SequencedConsensusTransactionKind::System(_) => true,
3442 }
3443 }
3444
3445 pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
3446 match self {
3447 SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
3448 ConsensusTransactionKind::UserTransactionV2(txn) => Some(*txn.tx().digest()),
3449 _ => None,
3450 },
3451 SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
3452 }
3453 }
3454
3455 pub fn is_end_of_publish(&self) -> bool {
3456 match self {
3457 SequencedConsensusTransactionKind::External(ext) => {
3458 matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
3459 }
3460 SequencedConsensusTransactionKind::System(_) => false,
3461 }
3462 }
3463}
3464
3465impl SequencedConsensusTransaction {
3466 pub fn sender_authority(&self) -> AuthorityName {
3467 self.certificate_author
3468 }
3469
3470 pub fn key(&self) -> SequencedConsensusTransactionKey {
3471 self.transaction.key()
3472 }
3473
3474 pub fn is_end_of_publish(&self) -> bool {
3475 if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
3476 matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
3477 } else {
3478 false
3479 }
3480 }
3481
3482 pub fn try_take_execution_time_observation(&mut self) -> Option<ExecutionTimeObservation> {
3483 if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
3484 kind: ConsensusTransactionKind::ExecutionTimeObservation(observation),
3485 ..
3486 }) = &mut self.transaction
3487 {
3488 Some(std::mem::take(observation))
3489 } else {
3490 None
3491 }
3492 }
3493
3494 pub fn is_system(&self) -> bool {
3495 matches!(
3496 self.transaction,
3497 SequencedConsensusTransactionKind::System(_)
3498 )
3499 }
3500
3501 pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
3502 if !randomness_state_enabled {
3503 return false;
3506 }
3507 match &self.transaction {
3508 SequencedConsensusTransactionKind::External(ConsensusTransaction {
3509 kind: ConsensusTransactionKind::UserTransactionV2(txn),
3510 ..
3511 }) => txn.tx().transaction_data().uses_randomness(),
3512 _ => false,
3513 }
3514 }
3515
3516 pub fn as_consensus_txn(&self) -> Option<&SenderSignedData> {
3517 match &self.transaction {
3518 SequencedConsensusTransactionKind::External(ConsensusTransaction {
3519 kind: ConsensusTransactionKind::UserTransactionV2(txn),
3520 ..
3521 }) if txn.tx().is_consensus_tx() => Some(txn.tx().data()),
3522 SequencedConsensusTransactionKind::System(txn) if txn.is_consensus_tx() => {
3523 Some(txn.data())
3524 }
3525 _ => None,
3526 }
3527 }
3528}
3529
3530#[derive(Debug, Clone, Serialize, Deserialize)]
3531pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
3532
3533#[cfg(test)]
3534impl VerifiedSequencedConsensusTransaction {
3535 pub fn new_test(transaction: ConsensusTransaction) -> Self {
3536 Self(SequencedConsensusTransaction::new_test(transaction))
3537 }
3538}
3539
3540impl SequencedConsensusTransaction {
3541 pub fn new_test(transaction: ConsensusTransaction) -> Self {
3542 Self {
3543 certificate_author_index: 0,
3544 certificate_author: AuthorityName::ZERO,
3545 consensus_index: Default::default(),
3546 transaction: SequencedConsensusTransactionKind::External(transaction),
3547 }
3548 }
3549}
3550
3551#[derive(Serialize, Deserialize)]
3552pub(crate) struct CommitIntervalObserver {
3553 ring_buffer: VecDeque<u64>,
3554}
3555
3556impl CommitIntervalObserver {
3557 pub fn new(window_size: u32) -> Self {
3558 Self {
3559 ring_buffer: VecDeque::with_capacity(window_size as usize),
3560 }
3561 }
3562
3563 pub fn observe_commit_time(&mut self, consensus_commit: &impl ConsensusCommitAPI) {
3564 let commit_time = consensus_commit.commit_timestamp_ms();
3565 if self.ring_buffer.len() == self.ring_buffer.capacity() {
3566 self.ring_buffer.pop_front();
3567 }
3568 self.ring_buffer.push_back(commit_time);
3569 }
3570
3571 pub fn commit_interval_estimate(&self) -> Option<Duration> {
3572 if self.ring_buffer.len() <= 1 {
3573 None
3574 } else {
3575 let first = self.ring_buffer.front().unwrap();
3576 let last = self.ring_buffer.back().unwrap();
3577 let duration = last.saturating_sub(*first);
3578 let num_commits = self.ring_buffer.len() as u64;
3579 Some(Duration::from_millis(duration.div_ceil(num_commits)))
3580 }
3581 }
3582}
3583
3584#[cfg(test)]
3585mod tests {
3586 use std::collections::HashSet;
3587
3588 use consensus_core::{
3589 BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock,
3590 };
3591 use futures::pin_mut;
3592 use prometheus::Registry;
3593 use sui_protocol_config::{ConsensusTransactionOrdering, ProtocolConfig};
3594 use sui_types::{
3595 base_types::ExecutionDigests,
3596 base_types::{AuthorityName, FullObjectRef, ObjectID, SuiAddress, random_object_ref},
3597 committee::Committee,
3598 crypto::deterministic_random_account_key,
3599 gas::GasCostSummary,
3600 message_envelope::Message,
3601 messages_checkpoint::{
3602 CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
3603 SignedCheckpointSummary,
3604 },
3605 messages_consensus::ConsensusTransaction,
3606 object::Object,
3607 transaction::{
3608 CertifiedTransaction, TransactionData, TransactionDataAPI, VerifiedCertificate,
3609 },
3610 };
3611
3612 use super::*;
3613 use crate::{
3614 authority::{
3615 authority_per_epoch_store::ConsensusStatsAPI,
3616 test_authority_builder::TestAuthorityBuilder,
3617 },
3618 checkpoints::CheckpointServiceNoop,
3619 consensus_adapter::consensus_tests::test_user_transaction,
3620 consensus_test_utils::make_consensus_adapter_for_test,
3621 post_consensus_tx_reorder::PostConsensusTxReorder,
3622 };
3623
3624 #[tokio::test(flavor = "current_thread", start_paused = true)]
3625 async fn test_consensus_commit_handler() {
3626 telemetry_subscribers::init_for_testing();
3627
3628 let (sender, keypair) = deterministic_random_account_key();
3631 let gas_objects: Vec<Object> = (0..12)
3633 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3634 .collect();
3635 let owned_objects: Vec<Object> = (0..4)
3637 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3638 .collect();
3639 let shared_objects: Vec<Object> = (0..6)
3641 .map(|_| Object::shared_for_testing())
3642 .collect::<Vec<_>>();
3643 let mut all_objects = gas_objects.clone();
3644 all_objects.extend(owned_objects.clone());
3645 all_objects.extend(shared_objects.clone());
3646
3647 let network_config =
3648 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3649 .with_objects(all_objects.clone())
3650 .build();
3651
3652 let state = TestAuthorityBuilder::new()
3653 .with_network_config(&network_config, 0)
3654 .build()
3655 .await;
3656
3657 let epoch_store = state.epoch_store_for_testing().clone();
3658 let new_epoch_start_state = epoch_store.epoch_start_state();
3659 let consensus_committee = new_epoch_start_state.get_consensus_committee();
3660
3661 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3662
3663 let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
3664
3665 let backpressure_manager = BackpressureManager::new_for_tests();
3666 let consensus_adapter =
3667 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3668 let settlement_scheduler = SettlementScheduler::new(
3669 state.execution_scheduler().as_ref().clone(),
3670 state.get_transaction_cache_reader().clone(),
3671 state.metrics.clone(),
3672 );
3673 let mut consensus_handler = ConsensusHandler::new(
3674 epoch_store,
3675 Arc::new(CheckpointServiceNoop {}),
3676 settlement_scheduler,
3677 consensus_adapter,
3678 state.get_object_cache_reader().clone(),
3679 consensus_committee.clone(),
3680 metrics,
3681 Arc::new(throughput_calculator),
3682 backpressure_manager.subscribe(),
3683 state.traffic_controller.clone(),
3684 None,
3685 state.consensus_gasless_counter.clone(),
3686 );
3687
3688 let mut user_transactions = vec![];
3690 for (i, gas_object) in gas_objects[0..8].iter().enumerate() {
3691 let input_object = if i % 2 == 0 {
3692 owned_objects.get(i / 2).unwrap().clone()
3693 } else {
3694 shared_objects.get(i / 2).unwrap().clone()
3695 };
3696 let transaction = test_user_transaction(
3697 &state,
3698 sender,
3699 &keypair,
3700 gas_object.clone(),
3701 vec![input_object],
3702 )
3703 .await;
3704 user_transactions.push(transaction);
3705 }
3706
3707 for (i, gas_object) in gas_objects[8..12].iter().enumerate() {
3710 let shared_object = if i < 2 {
3711 shared_objects[4].clone()
3712 } else {
3713 shared_objects[5].clone()
3714 };
3715 let transaction = test_user_transaction(
3716 &state,
3717 sender,
3718 &keypair,
3719 gas_object.clone(),
3720 vec![shared_object],
3721 )
3722 .await;
3723 user_transactions.push(transaction);
3724 }
3725
3726 let mut blocks = Vec::new();
3728 for (i, consensus_transaction) in user_transactions
3729 .iter()
3730 .cloned()
3731 .map(|t| ConsensusTransaction::new_user_transaction_v2_message(&state.name, t.into()))
3732 .enumerate()
3733 {
3734 let transaction_bytes = bcs::to_bytes(&consensus_transaction).unwrap();
3735 let block = VerifiedBlock::new_for_test(
3736 TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
3737 .set_transactions(vec![Transaction::new(transaction_bytes)])
3738 .build(),
3739 );
3740
3741 blocks.push(block);
3742 }
3743
3744 let leader_block = blocks[0].clone();
3746 let committed_sub_dag = CommittedSubDag::new(
3747 leader_block.reference(),
3748 blocks.clone(),
3749 leader_block.timestamp_ms(),
3750 CommitRef::new(10, CommitDigest::MIN),
3751 );
3752
3753 backpressure_manager.set_backpressure(true);
3755 backpressure_manager.update_highest_certified_checkpoint(1);
3757
3758 {
3760 let waiter = consensus_handler.handle_consensus_commit(committed_sub_dag.clone());
3761 pin_mut!(waiter);
3762
3763 tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
3765 .await
3766 .unwrap_err();
3767
3768 backpressure_manager.set_backpressure(false);
3770
3771 tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
3773 .await
3774 .unwrap();
3775 }
3776
3777 let num_blocks = blocks.len();
3779 let num_transactions = user_transactions.len();
3780 let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
3781 assert_eq!(
3782 last_consensus_stats_1.index.transaction_index,
3783 num_transactions as u64
3784 );
3785 assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
3786 assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
3787 assert_eq!(
3788 last_consensus_stats_1.stats.get_num_messages(0),
3789 num_blocks as u64
3790 );
3791 assert_eq!(
3792 last_consensus_stats_1.stats.get_num_user_transactions(0),
3793 num_transactions as u64
3794 );
3795
3796 for (i, t) in user_transactions.iter().enumerate() {
3798 let digest = t.tx().digest();
3799 if tokio::time::timeout(
3800 std::time::Duration::from_secs(10),
3801 state.notify_read_effects_for_testing("", *digest),
3802 )
3803 .await
3804 .is_ok()
3805 {
3806 } else {
3808 panic!("User transaction {} {} did not execute", i, digest);
3809 }
3810 }
3811
3812 state.execution_scheduler().check_empty_for_testing().await;
3814 }
3815
3816 fn to_short_strings(txs: Vec<VerifiedExecutableTransactionWithAliases>) -> Vec<String> {
3817 txs.into_iter()
3818 .map(|tx| format!("transaction({})", tx.tx().transaction_data().gas_price()))
3819 .collect()
3820 }
3821
3822 #[test]
3823 fn test_order_by_gas_price() {
3824 let mut v = vec![user_txn(42), user_txn(100)];
3825 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3826 assert_eq!(
3827 to_short_strings(v),
3828 vec![
3829 "transaction(100)".to_string(),
3830 "transaction(42)".to_string(),
3831 ]
3832 );
3833
3834 let mut v = vec![
3835 user_txn(1200),
3836 user_txn(12),
3837 user_txn(1000),
3838 user_txn(42),
3839 user_txn(100),
3840 user_txn(1000),
3841 ];
3842 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3843 assert_eq!(
3844 to_short_strings(v),
3845 vec![
3846 "transaction(1200)".to_string(),
3847 "transaction(1000)".to_string(),
3848 "transaction(1000)".to_string(),
3849 "transaction(100)".to_string(),
3850 "transaction(42)".to_string(),
3851 "transaction(12)".to_string(),
3852 ]
3853 );
3854 }
3855
3856 #[tokio::test(flavor = "current_thread")]
3857 async fn test_checkpoint_signature_dedup() {
3858 telemetry_subscribers::init_for_testing();
3859
3860 let network_config =
3861 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3862 let state = TestAuthorityBuilder::new()
3863 .with_network_config(&network_config, 0)
3864 .build()
3865 .await;
3866
3867 let epoch_store = state.epoch_store_for_testing().clone();
3868 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3869
3870 let make_signed = || {
3871 let epoch = epoch_store.epoch();
3872 let contents =
3873 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3874 let summary = CheckpointSummary::new(
3875 &ProtocolConfig::get_for_max_version_UNSAFE(),
3876 epoch,
3877 42, 10, &contents,
3880 None, GasCostSummary::default(),
3882 None, 0, Vec::new(), Vec::new(), );
3887 SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name)
3888 };
3889
3890 let v2_s1 = make_signed();
3892 let v2_s1_clone = v2_s1.clone();
3893 let v2_digest_a = v2_s1.data().digest();
3894 let v2_a =
3895 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3896 summary: v2_s1,
3897 });
3898
3899 let v2_s2 = make_signed();
3900 let v2_digest_b = v2_s2.data().digest();
3901 let v2_b =
3902 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3903 summary: v2_s2,
3904 });
3905
3906 assert_ne!(v2_digest_a, v2_digest_b);
3907
3908 assert_eq!(v2_s1_clone.data().digest(), v2_digest_a);
3910 let v2_dup =
3911 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3912 summary: v2_s1_clone,
3913 });
3914
3915 let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3916 let block = VerifiedBlock::new_for_test(
3917 TestBlock::new(100, 0)
3918 .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)])
3919 .build(),
3920 );
3921 let commit = CommittedSubDag::new(
3922 block.reference(),
3923 vec![block.clone()],
3924 block.timestamp_ms(),
3925 CommitRef::new(10, CommitDigest::MIN),
3926 );
3927
3928 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3929 let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3930 let backpressure = BackpressureManager::new_for_tests();
3931 let consensus_adapter =
3932 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3933 let settlement_scheduler = SettlementScheduler::new(
3934 state.execution_scheduler().as_ref().clone(),
3935 state.get_transaction_cache_reader().clone(),
3936 state.metrics.clone(),
3937 );
3938 let mut handler = ConsensusHandler::new(
3939 epoch_store.clone(),
3940 Arc::new(CheckpointServiceNoop {}),
3941 settlement_scheduler,
3942 consensus_adapter,
3943 state.get_object_cache_reader().clone(),
3944 consensus_committee.clone(),
3945 metrics,
3946 Arc::new(throughput),
3947 backpressure.subscribe(),
3948 state.traffic_controller.clone(),
3949 None,
3950 state.consensus_gasless_counter.clone(),
3951 );
3952
3953 handler.handle_consensus_commit(commit).await;
3954
3955 use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3956 use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3957
3958 let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a));
3960 let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b));
3961 assert!(
3962 epoch_store
3963 .is_consensus_message_processed(&v2_key_a)
3964 .unwrap()
3965 );
3966 assert!(
3967 epoch_store
3968 .is_consensus_message_processed(&v2_key_b)
3969 .unwrap()
3970 );
3971 }
3972
3973 #[tokio::test(flavor = "current_thread")]
3974 async fn test_verify_consensus_transaction_filters_mismatched_authorities() {
3975 telemetry_subscribers::init_for_testing();
3976
3977 let network_config =
3978 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3979 let state = TestAuthorityBuilder::new()
3980 .with_network_config(&network_config, 0)
3981 .build()
3982 .await;
3983
3984 let epoch_store = state.epoch_store_for_testing().clone();
3985 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3986
3987 use fastcrypto::traits::KeyPair;
3989 let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair();
3990 let wrong_authority: AuthorityName = wrong_keypair.public().into();
3991
3992 let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority);
3994
3995 let valid_eop = ConsensusTransaction::new_end_of_publish(state.name);
3997
3998 let epoch = epoch_store.epoch();
4000 let contents =
4001 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
4002 let summary = CheckpointSummary::new(
4003 &ProtocolConfig::get_for_max_version_UNSAFE(),
4004 epoch,
4005 42, 10, &contents,
4008 None, GasCostSummary::default(),
4010 None, 0, Vec::new(), Vec::new(), );
4015
4016 let mismatched_checkpoint_signed =
4018 SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority);
4019 let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest();
4020 let mismatched_checkpoint =
4021 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4022 summary: mismatched_checkpoint_signed,
4023 });
4024
4025 let valid_checkpoint_signed =
4027 SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name);
4028 let valid_checkpoint_digest = valid_checkpoint_signed.data().digest();
4029 let valid_checkpoint =
4030 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4031 summary: valid_checkpoint_signed,
4032 });
4033
4034 let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
4035
4036 let block = VerifiedBlock::new_for_test(
4038 TestBlock::new(100, 0)
4039 .set_transactions(vec![
4040 to_tx(&mismatched_eop),
4041 to_tx(&valid_eop),
4042 to_tx(&mismatched_checkpoint),
4043 to_tx(&valid_checkpoint),
4044 ])
4045 .build(),
4046 );
4047 let commit = CommittedSubDag::new(
4048 block.reference(),
4049 vec![block.clone()],
4050 block.timestamp_ms(),
4051 CommitRef::new(10, CommitDigest::MIN),
4052 );
4053
4054 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
4055 let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
4056 let backpressure = BackpressureManager::new_for_tests();
4057 let consensus_adapter =
4058 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
4059 let settlement_scheduler = SettlementScheduler::new(
4060 state.execution_scheduler().as_ref().clone(),
4061 state.get_transaction_cache_reader().clone(),
4062 state.metrics.clone(),
4063 );
4064 let mut handler = ConsensusHandler::new(
4065 epoch_store.clone(),
4066 Arc::new(CheckpointServiceNoop {}),
4067 settlement_scheduler,
4068 consensus_adapter,
4069 state.get_object_cache_reader().clone(),
4070 consensus_committee.clone(),
4071 metrics,
4072 Arc::new(throughput),
4073 backpressure.subscribe(),
4074 state.traffic_controller.clone(),
4075 None,
4076 state.consensus_gasless_counter.clone(),
4077 );
4078
4079 handler.handle_consensus_commit(commit).await;
4080
4081 use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
4082 use sui_types::messages_consensus::ConsensusTransactionKey as CK;
4083
4084 let valid_eop_key = SK::External(CK::EndOfPublish(state.name));
4086 assert!(
4087 epoch_store
4088 .is_consensus_message_processed(&valid_eop_key)
4089 .unwrap(),
4090 "Valid EndOfPublish should have been processed"
4091 );
4092
4093 let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4094 state.name,
4095 42,
4096 valid_checkpoint_digest,
4097 ));
4098 assert!(
4099 epoch_store
4100 .is_consensus_message_processed(&valid_checkpoint_key)
4101 .unwrap(),
4102 "Valid CheckpointSignature should have been processed"
4103 );
4104
4105 let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority));
4107 assert!(
4108 !epoch_store
4109 .is_consensus_message_processed(&mismatched_eop_key)
4110 .unwrap(),
4111 "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)"
4112 );
4113
4114 let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4115 wrong_authority,
4116 42,
4117 mismatched_checkpoint_digest,
4118 ));
4119 assert!(
4120 !epoch_store
4121 .is_consensus_message_processed(&mismatched_checkpoint_key)
4122 .unwrap(),
4123 "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)"
4124 );
4125 }
4126
4127 fn user_txn(gas_price: u64) -> VerifiedExecutableTransactionWithAliases {
4128 let (committee, keypairs) = Committee::new_simple_test_committee();
4129 let (sender, sender_keypair) = deterministic_random_account_key();
4130 let tx = sui_types::transaction::Transaction::from_data_and_signer(
4131 TransactionData::new_transfer(
4132 SuiAddress::default(),
4133 FullObjectRef::from_fastpath_ref(random_object_ref()),
4134 sender,
4135 random_object_ref(),
4136 1000 * gas_price,
4137 gas_price,
4138 ),
4139 vec![&sender_keypair],
4140 );
4141 let tx = VerifiedExecutableTransaction::new_from_certificate(
4142 VerifiedCertificate::new_unchecked(
4143 CertifiedTransaction::new_from_keypairs_for_testing(
4144 tx.into_data(),
4145 &keypairs,
4146 &committee,
4147 ),
4148 ),
4149 );
4150 VerifiedExecutableTransactionWithAliases::no_aliases(tx)
4151 }
4152
4153 mod checkpoint_queue_tests {
4154 use super::*;
4155 use consensus_core::CommitRef;
4156 use sui_types::digests::Digest;
4157
4158 fn make_chunk(tx_count: usize, height: u64) -> Chunk {
4159 Chunk {
4160 schedulables: (0..tx_count)
4161 .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4162 .collect(),
4163 settlement: None,
4164 height,
4165 }
4166 }
4167
4168 fn make_commit_ref(index: u32) -> CommitRef {
4169 CommitRef {
4170 index,
4171 digest: CommitDigest::MIN,
4172 }
4173 }
4174
4175 fn default_versions() -> HashMap<TransactionKey, AssignedVersions> {
4176 HashMap::new()
4177 }
4178
4179 #[test]
4180 fn test_flush_all_checkpoint_roots() {
4181 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4182 let versions = default_versions();
4183
4184 queue.push_chunk(
4185 make_chunk(5, 1),
4186 &versions,
4187 1000,
4188 make_commit_ref(1),
4189 Digest::default(),
4190 );
4191 queue.push_chunk(
4192 make_chunk(3, 2),
4193 &versions,
4194 1000,
4195 make_commit_ref(1),
4196 Digest::default(),
4197 );
4198
4199 let pending = queue.flush(1000, true);
4200
4201 assert!(pending.is_some());
4202 assert!(queue.pending_roots.is_empty());
4203 }
4204
4205 #[test]
4206 fn test_flush_respects_min_checkpoint_interval() {
4207 let min_interval = 200;
4208 let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, 1000, min_interval);
4209 let versions = default_versions();
4210
4211 queue.push_chunk(
4212 make_chunk(5, 1),
4213 &versions,
4214 1000,
4215 make_commit_ref(1),
4216 Digest::default(),
4217 );
4218
4219 let pending = queue.flush(1000 + min_interval - 1, false);
4220 assert!(pending.is_none());
4221 assert_eq!(queue.pending_roots.len(), 1);
4222
4223 let pending = queue.flush(1000 + min_interval, false);
4224 assert!(pending.is_some());
4225 assert!(queue.pending_roots.is_empty());
4226 }
4227
4228 #[test]
4229 fn test_push_chunk_flushes_when_exceeds_max() {
4230 let max_tx = 10;
4231 let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, max_tx, 0);
4232 let versions = default_versions();
4233
4234 queue.push_chunk(
4235 make_chunk(max_tx / 2 + 1, 1),
4236 &versions,
4237 1000,
4238 make_commit_ref(1),
4239 Digest::default(),
4240 );
4241
4242 let flushed = queue.push_chunk(
4243 make_chunk(max_tx / 2 + 1, 2),
4244 &versions,
4245 1000,
4246 make_commit_ref(2),
4247 Digest::default(),
4248 );
4249
4250 assert_eq!(flushed.len(), 1);
4251 assert_eq!(queue.pending_roots.len(), 1);
4252 }
4253
4254 #[test]
4255 fn test_multiple_chunks_merged_into_one_checkpoint() {
4256 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 200);
4257 let versions = default_versions();
4258
4259 queue.push_chunk(
4260 make_chunk(10, 1),
4261 &versions,
4262 1000,
4263 make_commit_ref(1),
4264 Digest::default(),
4265 );
4266 queue.push_chunk(
4267 make_chunk(10, 2),
4268 &versions,
4269 1000,
4270 make_commit_ref(2),
4271 Digest::default(),
4272 );
4273 queue.push_chunk(
4274 make_chunk(10, 3),
4275 &versions,
4276 1000,
4277 make_commit_ref(3),
4278 Digest::default(),
4279 );
4280
4281 let pending = queue.flush(1000, true).unwrap();
4282
4283 assert_eq!(pending.roots.len(), 3);
4284 }
4285
4286 #[test]
4287 fn test_push_chunk_handles_overflow() {
4288 let max_tx = 10;
4289 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4290 let versions = default_versions();
4291
4292 let flushed1 = queue.push_chunk(
4293 make_chunk(max_tx / 2, 1),
4294 &versions,
4295 1000,
4296 make_commit_ref(1),
4297 Digest::default(),
4298 );
4299 assert!(flushed1.is_empty());
4300
4301 let flushed2 = queue.push_chunk(
4302 make_chunk(max_tx / 2, 2),
4303 &versions,
4304 1000,
4305 make_commit_ref(2),
4306 Digest::default(),
4307 );
4308 assert!(flushed2.is_empty());
4309
4310 let flushed3 = queue.push_chunk(
4311 make_chunk(max_tx / 2, 3),
4312 &versions,
4313 1000,
4314 make_commit_ref(3),
4315 Digest::default(),
4316 );
4317 assert_eq!(flushed3.len(), 1);
4318
4319 let pending = queue.flush(1000, true);
4320
4321 for p in pending.iter().chain(flushed3.iter()) {
4322 let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4323 assert!(tx_count <= max_tx);
4324 }
4325 }
4326
4327 #[test]
4328 fn test_checkpoint_uses_last_chunk_height() {
4329 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4330 let versions = default_versions();
4331
4332 queue.push_chunk(
4333 make_chunk(10, 100),
4334 &versions,
4335 1000,
4336 make_commit_ref(1),
4337 Digest::default(),
4338 );
4339 queue.push_chunk(
4340 make_chunk(10, 200),
4341 &versions,
4342 1000,
4343 make_commit_ref(2),
4344 Digest::default(),
4345 );
4346
4347 let pending = queue.flush(1000, true).unwrap();
4348
4349 assert_eq!(pending.details.checkpoint_height, 200);
4350 }
4351
4352 #[test]
4353 fn test_last_built_timestamp_updated_on_flush() {
4354 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4355 let versions = default_versions();
4356
4357 queue.push_chunk(
4358 make_chunk(10, 1),
4359 &versions,
4360 5000,
4361 make_commit_ref(1),
4362 Digest::default(),
4363 );
4364
4365 assert_eq!(queue.last_built_timestamp, 0);
4366
4367 let _ = queue.flush(5000, true);
4368
4369 assert_eq!(queue.last_built_timestamp, 5000);
4370 }
4371
4372 #[test]
4373 fn test_settlement_info_sent_through_channel() {
4374 let mut queue = CheckpointQueue::new_for_testing(0, 0, 5, 1000, 0);
4375 let versions = default_versions();
4376
4377 let chunk1 = Chunk {
4378 schedulables: vec![
4379 Schedulable::ConsensusCommitPrologue(0, 1, 0),
4380 Schedulable::ConsensusCommitPrologue(0, 2, 0),
4381 Schedulable::ConsensusCommitPrologue(0, 3, 0),
4382 ],
4383 settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4384 height: 1,
4385 };
4386
4387 let chunk2 = Chunk {
4388 schedulables: vec![
4389 Schedulable::ConsensusCommitPrologue(0, 4, 0),
4390 Schedulable::ConsensusCommitPrologue(0, 5, 0),
4391 ],
4392 settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4393 height: 2,
4394 };
4395
4396 queue.push_chunk(
4397 chunk1,
4398 &versions,
4399 1000,
4400 make_commit_ref(1),
4401 Digest::default(),
4402 );
4403 queue.push_chunk(
4404 chunk2,
4405 &versions,
4406 1000,
4407 make_commit_ref(1),
4408 Digest::default(),
4409 );
4410 }
4411
4412 #[test]
4413 fn test_settlement_checkpoint_seq_correct_after_flush() {
4414 let max_tx = 10;
4415 let initial_seq = 5;
4416 let (sender, mut receiver) = monitored_mpsc::unbounded_channel("test_settlement_seq");
4417 let mut queue =
4418 CheckpointQueue::new_for_testing_with_sender(0, 0, initial_seq, max_tx, 0, sender);
4419 let versions = default_versions();
4420
4421 let chunk1 = Chunk {
4423 schedulables: (0..max_tx / 2 + 1)
4424 .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4425 .collect(),
4426 settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4427 height: 1,
4428 };
4429 queue.push_chunk(
4430 chunk1,
4431 &versions,
4432 1000,
4433 make_commit_ref(1),
4434 Digest::default(),
4435 );
4436
4437 let msg1 = receiver.try_recv().unwrap();
4439 let settlement1 = msg1.1.unwrap();
4440 assert_eq!(settlement1.checkpoint_seq, initial_seq);
4441
4442 let chunk2 = Chunk {
4444 schedulables: (0..max_tx / 2 + 1)
4445 .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4446 .collect(),
4447 settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4448 height: 2,
4449 };
4450 let flushed = queue.push_chunk(
4451 chunk2,
4452 &versions,
4453 1000,
4454 make_commit_ref(2),
4455 Digest::default(),
4456 );
4457 assert_eq!(flushed.len(), 1);
4458 assert_eq!(flushed[0].details.checkpoint_seq, Some(initial_seq));
4459
4460 let msg2 = receiver.try_recv().unwrap();
4463 let settlement2 = msg2.1.unwrap();
4464 assert_eq!(settlement2.checkpoint_seq, initial_seq + 1);
4465
4466 let pending = queue.flush_forced().unwrap();
4469 assert_eq!(
4470 pending.details.checkpoint_seq,
4471 Some(settlement2.checkpoint_seq)
4472 );
4473 }
4474
4475 #[test]
4476 fn test_checkpoint_seq_increments_on_flush() {
4477 let mut queue = CheckpointQueue::new_for_testing(0, 0, 10, 1000, 0);
4478 let versions = default_versions();
4479
4480 queue.push_chunk(
4481 make_chunk(5, 1),
4482 &versions,
4483 1000,
4484 make_commit_ref(1),
4485 Digest::default(),
4486 );
4487
4488 let pending = queue.flush(1000, true).unwrap();
4489
4490 assert_eq!(pending.details.checkpoint_seq, Some(10));
4491 assert_eq!(queue.current_checkpoint_seq, 11);
4492 }
4493
4494 #[test]
4495 fn test_multiple_chunks_with_overflow() {
4496 let max_tx = 10;
4497 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4498 let versions = default_versions();
4499
4500 let flushed1 = queue.push_chunk(
4501 make_chunk(max_tx / 2 + 1, 1),
4502 &versions,
4503 1000,
4504 make_commit_ref(1),
4505 Digest::default(),
4506 );
4507 let flushed2 = queue.push_chunk(
4508 make_chunk(max_tx / 2 + 1, 2),
4509 &versions,
4510 1000,
4511 make_commit_ref(1),
4512 Digest::default(),
4513 );
4514 let flushed3 = queue.push_chunk(
4515 make_chunk(max_tx / 2 + 1, 3),
4516 &versions,
4517 1000,
4518 make_commit_ref(1),
4519 Digest::default(),
4520 );
4521
4522 let all_flushed: Vec<_> = flushed1
4523 .into_iter()
4524 .chain(flushed2)
4525 .chain(flushed3)
4526 .collect();
4527 assert_eq!(all_flushed.len(), 2);
4528 assert_eq!(queue.pending_roots.len(), 1);
4529
4530 for p in &all_flushed {
4531 let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4532 assert!(tx_count <= max_tx);
4533 }
4534 }
4535 }
4536}