1use std::{
5 collections::{BTreeMap, HashMap, HashSet, VecDeque},
6 hash::Hash,
7 num::NonZeroUsize,
8 sync::{Arc, Mutex},
9 time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use consensus_config::Committee as ConsensusCommittee;
13use consensus_core::{CommitConsumerMonitor, CommitIndex, CommitRef};
14use consensus_types::block::TransactionIndex;
15use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
16use lru::LruCache;
17use mysten_common::{
18 assert_reachable, assert_sometimes, debug_fatal, random_util::randomize_cache_capacity_in_tests,
19};
20use mysten_metrics::{
21 monitored_future,
22 monitored_mpsc::{self, UnboundedReceiver},
23 monitored_scope, spawn_monitored_task,
24};
25use nonempty::NonEmpty;
26use parking_lot::RwLockWriteGuard;
27use serde::{Deserialize, Serialize};
28use sui_config::node::CongestionLogConfig;
29use sui_macros::{fail_point, fail_point_arg, fail_point_if};
30use sui_protocol_config::{Chain, PerObjectCongestionControlMode, ProtocolConfig};
31use sui_types::{
32 SUI_RANDOMNESS_STATE_OBJECT_ID,
33 authenticator_state::ActiveJwk,
34 base_types::{
35 AuthorityName, ConciseableName, ConsensusObjectSequenceKey, ObjectID, ObjectRef,
36 SequenceNumber, TransactionDigest,
37 },
38 crypto::RandomnessRound,
39 digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest, Digest},
40 executable_transaction::{
41 TrustedExecutableTransaction, VerifiedExecutableTransaction,
42 VerifiedExecutableTransactionWithAliases,
43 },
44 messages_checkpoint::{
45 CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointTimestamp,
46 },
47 messages_consensus::{
48 AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments,
49 ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
50 ExecutionTimeObservation,
51 },
52 node_role::NodeRole,
53 sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
54 transaction::{
55 InputObjectKind, SenderSignedData, TransactionDataAPI, TransactionKey, VerifiedTransaction,
56 WithAliases,
57 },
58};
59use tokio::task::JoinSet;
60use tracing::{debug, error, info, instrument, trace, warn};
61
62use crate::{
63 authority::{
64 AuthorityMetrics, AuthorityState, ExecutionEnv,
65 authority_per_epoch_store::{
66 AuthorityPerEpochStore, CancelConsensusCertificateReason, ConsensusStats,
67 ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStatsV2,
68 consensus_quarantine::ConsensusCommitOutput,
69 },
70 backpressure::{BackpressureManager, BackpressureSubscriber},
71 congestion_log::CongestionCommitLogger,
72 consensus_tx_status_cache::ConsensusTxStatus,
73 epoch_start_configuration::EpochStartConfigTrait,
74 execution_time_estimator::ExecutionTimeEstimator,
75 shared_object_congestion_tracker::SharedObjectCongestionTracker,
76 shared_object_version_manager::{AssignedTxAndVersions, AssignedVersions, Schedulable},
77 transaction_deferral::{DeferralKey, DeferralReason, transaction_deferral_within_limit},
78 },
79 checkpoints::{
80 CheckpointHeight, CheckpointRoots, CheckpointService, CheckpointServiceNotify,
81 PendingCheckpoint, PendingCheckpointInfo, PendingCheckpointV2,
82 },
83 consensus_adapter::ConsensusAdapter,
84 consensus_throughput_calculator::ConsensusThroughputCalculator,
85 consensus_types::consensus_output_api::ConsensusCommitAPI,
86 epoch::{
87 randomness::{DkgStatus, RandomnessManager},
88 reconfiguration::ReconfigState,
89 },
90 execution_cache::ObjectCacheRead,
91 execution_scheduler::{SettlementBatchInfo, SettlementScheduler},
92 gasless_rate_limiter::ConsensusGaslessCounter,
93 post_consensus_tx_reorder::PostConsensusTxReorder,
94 traffic_controller::{TrafficController, policies::TrafficTally},
95};
96
97struct FilteredConsensusOutput {
100 transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
101 owned_object_locks: HashMap<ObjectRef, TransactionDigest>,
102}
103
104pub struct ConsensusHandlerInitializer {
105 state: Arc<AuthorityState>,
106 checkpoint_service: Arc<CheckpointService>,
107 epoch_store: Arc<AuthorityPerEpochStore>,
108 consensus_adapter: Arc<ConsensusAdapter>,
109 throughput_calculator: Arc<ConsensusThroughputCalculator>,
110 backpressure_manager: Arc<BackpressureManager>,
111 congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
112 consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
113}
114
115impl ConsensusHandlerInitializer {
116 pub fn new(
117 state: Arc<AuthorityState>,
118 checkpoint_service: Arc<CheckpointService>,
119 epoch_store: Arc<AuthorityPerEpochStore>,
120 consensus_adapter: Arc<ConsensusAdapter>,
121 throughput_calculator: Arc<ConsensusThroughputCalculator>,
122 backpressure_manager: Arc<BackpressureManager>,
123 congestion_log_config: Option<CongestionLogConfig>,
124 ) -> Self {
125 let congestion_logger =
126 congestion_log_config.and_then(|config| match CongestionCommitLogger::new(&config) {
127 Ok(logger) => Some(Arc::new(Mutex::new(logger))),
128 Err(e) => {
129 debug_fatal!("Failed to create congestion logger: {e}");
130 None
131 }
132 });
133 let consensus_gasless_counter = state.consensus_gasless_counter.clone();
134 Self {
135 state,
136 checkpoint_service,
137 epoch_store,
138 consensus_adapter,
139 throughput_calculator,
140 backpressure_manager,
141 congestion_logger,
142 consensus_gasless_counter,
143 }
144 }
145
146 #[cfg(test)]
147 pub(crate) fn new_for_testing(
148 state: Arc<AuthorityState>,
149 checkpoint_service: Arc<CheckpointService>,
150 ) -> Self {
151 use crate::consensus_test_utils::make_consensus_adapter_for_test;
152 use std::collections::HashSet;
153
154 let backpressure_manager = BackpressureManager::new_for_tests();
155 let consensus_adapter =
156 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
157 let consensus_gasless_counter = state.consensus_gasless_counter.clone();
158 Self {
159 state: state.clone(),
160 checkpoint_service,
161 epoch_store: state.epoch_store_for_testing().clone(),
162 consensus_adapter,
163 throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
164 None,
165 state.metrics.clone(),
166 )),
167 backpressure_manager,
168 congestion_logger: None,
169 consensus_gasless_counter,
170 }
171 }
172
173 pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
174 let new_epoch_start_state = self.epoch_store.epoch_start_state();
175 let consensus_committee = new_epoch_start_state.get_consensus_committee();
176
177 let settlement_scheduler = SettlementScheduler::new(
178 self.state.execution_scheduler().as_ref().clone(),
179 self.state.get_transaction_cache_reader().clone(),
180 self.state.metrics.clone(),
181 );
182 ConsensusHandler::new(
183 self.epoch_store.clone(),
184 self.checkpoint_service.clone(),
185 settlement_scheduler,
186 self.consensus_adapter.clone(),
187 self.state.get_object_cache_reader().clone(),
188 consensus_committee,
189 self.state.metrics.clone(),
190 self.throughput_calculator.clone(),
191 self.backpressure_manager.subscribe(),
192 self.state.traffic_controller.clone(),
193 self.congestion_logger.clone(),
194 self.consensus_gasless_counter.clone(),
195 )
196 }
197}
198
199mod additional_consensus_state {
200 use std::marker::PhantomData;
201
202 use consensus_core::CommitRef;
203 use fastcrypto::hash::HashFunction as _;
204 use sui_types::{crypto::DefaultHash, digests::Digest};
205
206 use super::*;
207 #[derive(Serialize, Deserialize)]
216 pub(super) struct AdditionalConsensusState {
217 commit_interval_observer: CommitIntervalObserver,
218 }
219
220 impl AdditionalConsensusState {
221 pub fn new(additional_consensus_state_window_size: u32) -> Self {
222 Self {
223 commit_interval_observer: CommitIntervalObserver::new(
224 additional_consensus_state_window_size,
225 ),
226 }
227 }
228
229 pub(crate) fn observe_commit(
231 &mut self,
232 protocol_config: &ProtocolConfig,
233 epoch_start_time: u64,
234 consensus_commit: &impl ConsensusCommitAPI,
235 ) -> ConsensusCommitInfo {
236 self.commit_interval_observer
237 .observe_commit_time(consensus_commit);
238
239 let estimated_commit_period = self
240 .commit_interval_observer
241 .commit_interval_estimate()
242 .unwrap_or(Duration::from_millis(
243 protocol_config.min_checkpoint_interval_ms(),
244 ));
245
246 info!("estimated commit rate: {:?}", estimated_commit_period);
247
248 self.commit_info_impl(
249 epoch_start_time,
250 consensus_commit,
251 Some(estimated_commit_period),
252 )
253 }
254
255 fn commit_info_impl(
256 &self,
257 epoch_start_time: u64,
258 consensus_commit: &impl ConsensusCommitAPI,
259 estimated_commit_period: Option<Duration>,
260 ) -> ConsensusCommitInfo {
261 let leader_author = consensus_commit.leader_author_index();
262 let timestamp = consensus_commit.commit_timestamp_ms();
263
264 let timestamp = if timestamp < epoch_start_time {
265 error!(
266 "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}"
267 );
268 epoch_start_time
269 } else {
270 timestamp
271 };
272
273 ConsensusCommitInfo {
274 _phantom: PhantomData,
275 round: consensus_commit.leader_round(),
276 timestamp,
277 leader_author,
278 consensus_commit_ref: consensus_commit.commit_ref(),
279 rejected_transactions_digest: consensus_commit.rejected_transactions_digest(),
280 additional_state_digest: Some(self.digest()),
281 estimated_commit_period,
282 skip_consensus_commit_prologue_in_test: false,
283 }
284 }
285
286 fn digest(&self) -> AdditionalConsensusStateDigest {
288 let mut hash = DefaultHash::new();
289 bcs::serialize_into(&mut hash, self).unwrap();
290 AdditionalConsensusStateDigest::new(hash.finalize().into())
291 }
292 }
293
294 pub struct ConsensusCommitInfo {
295 _phantom: PhantomData<()>,
297
298 pub round: u64,
299 pub timestamp: u64,
300 pub leader_author: AuthorityIndex,
301 pub consensus_commit_ref: CommitRef,
302 pub rejected_transactions_digest: Digest,
303
304 additional_state_digest: Option<AdditionalConsensusStateDigest>,
305 estimated_commit_period: Option<Duration>,
306
307 pub skip_consensus_commit_prologue_in_test: bool,
308 }
309
310 impl ConsensusCommitInfo {
311 pub fn new_for_test(
312 commit_round: u64,
313 commit_timestamp: u64,
314 estimated_commit_period: Option<Duration>,
315 skip_consensus_commit_prologue_in_test: bool,
316 ) -> Self {
317 Self {
318 _phantom: PhantomData,
319 round: commit_round,
320 timestamp: commit_timestamp,
321 leader_author: 0,
322 consensus_commit_ref: CommitRef::default(),
323 rejected_transactions_digest: Digest::default(),
324 additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO),
325 estimated_commit_period,
326 skip_consensus_commit_prologue_in_test,
327 }
328 }
329
330 pub fn new_for_congestion_test(
331 commit_round: u64,
332 commit_timestamp: u64,
333 estimated_commit_period: Duration,
334 ) -> Self {
335 Self::new_for_test(
336 commit_round,
337 commit_timestamp,
338 Some(estimated_commit_period),
339 true,
340 )
341 }
342
343 pub fn additional_state_digest(&self) -> AdditionalConsensusStateDigest {
344 self.additional_state_digest
346 .expect("additional_state_digest is not available")
347 }
348
349 pub fn estimated_commit_period(&self) -> Duration {
350 self.estimated_commit_period
352 .expect("estimated commit period is not available")
353 }
354
355 fn consensus_commit_digest(&self) -> ConsensusCommitDigest {
356 ConsensusCommitDigest::new(self.consensus_commit_ref.digest.into_inner())
357 }
358
359 fn consensus_commit_prologue_transaction(
360 &self,
361 epoch: u64,
362 ) -> VerifiedExecutableTransaction {
363 let transaction = VerifiedTransaction::new_consensus_commit_prologue(
364 epoch,
365 self.round,
366 self.timestamp,
367 );
368 VerifiedExecutableTransaction::new_system(transaction, epoch)
369 }
370
371 fn consensus_commit_prologue_v2_transaction(
372 &self,
373 epoch: u64,
374 ) -> VerifiedExecutableTransaction {
375 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
376 epoch,
377 self.round,
378 self.timestamp,
379 self.consensus_commit_digest(),
380 );
381 VerifiedExecutableTransaction::new_system(transaction, epoch)
382 }
383
384 fn consensus_commit_prologue_v3_transaction(
385 &self,
386 epoch: u64,
387 consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
388 ) -> VerifiedExecutableTransaction {
389 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v3(
390 epoch,
391 self.round,
392 self.timestamp,
393 self.consensus_commit_digest(),
394 consensus_determined_version_assignments,
395 );
396 VerifiedExecutableTransaction::new_system(transaction, epoch)
397 }
398
399 fn consensus_commit_prologue_v4_transaction(
400 &self,
401 epoch: u64,
402 consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
403 additional_state_digest: AdditionalConsensusStateDigest,
404 ) -> VerifiedExecutableTransaction {
405 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v4(
406 epoch,
407 self.round,
408 self.timestamp,
409 self.consensus_commit_digest(),
410 consensus_determined_version_assignments,
411 additional_state_digest,
412 );
413 VerifiedExecutableTransaction::new_system(transaction, epoch)
414 }
415
416 pub fn create_consensus_commit_prologue_transaction(
417 &self,
418 epoch: u64,
419 protocol_config: &ProtocolConfig,
420 cancelled_txn_version_assignment: Vec<(
421 TransactionDigest,
422 Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
423 )>,
424 commit_info: &ConsensusCommitInfo,
425 indirect_state_observer: IndirectStateObserver,
426 ) -> VerifiedExecutableTransaction {
427 let version_assignments = if protocol_config
428 .record_consensus_determined_version_assignments_in_prologue_v2()
429 {
430 Some(
431 ConsensusDeterminedVersionAssignments::CancelledTransactionsV2(
432 cancelled_txn_version_assignment,
433 ),
434 )
435 } else if protocol_config.record_consensus_determined_version_assignments_in_prologue()
436 {
437 Some(
438 ConsensusDeterminedVersionAssignments::CancelledTransactions(
439 cancelled_txn_version_assignment
440 .into_iter()
441 .map(|(tx_digest, versions)| {
442 (
443 tx_digest,
444 versions.into_iter().map(|(id, v)| (id.0, v)).collect(),
445 )
446 })
447 .collect(),
448 ),
449 )
450 } else {
451 None
452 };
453
454 if protocol_config.record_additional_state_digest_in_prologue() {
455 let additional_state_digest =
456 if protocol_config.additional_consensus_digest_indirect_state() {
457 let d1 = commit_info.additional_state_digest();
458 indirect_state_observer.fold_with(d1)
459 } else {
460 commit_info.additional_state_digest()
461 };
462
463 self.consensus_commit_prologue_v4_transaction(
464 epoch,
465 version_assignments.unwrap(),
466 additional_state_digest,
467 )
468 } else if let Some(version_assignments) = version_assignments {
469 self.consensus_commit_prologue_v3_transaction(epoch, version_assignments)
470 } else if protocol_config.include_consensus_digest_in_prologue() {
471 self.consensus_commit_prologue_v2_transaction(epoch)
472 } else {
473 self.consensus_commit_prologue_transaction(epoch)
474 }
475 }
476 }
477
478 #[derive(Default)]
479 pub struct IndirectStateObserver {
480 hash: DefaultHash,
481 }
482
483 impl IndirectStateObserver {
484 pub fn new() -> Self {
485 Self::default()
486 }
487
488 pub fn observe_indirect_state<T: Serialize>(&mut self, state: &T) {
489 bcs::serialize_into(&mut self.hash, state).unwrap();
490 }
491
492 pub fn fold_with(
493 self,
494 d1: AdditionalConsensusStateDigest,
495 ) -> AdditionalConsensusStateDigest {
496 let hash = self.hash.finalize();
497 let d2 = AdditionalConsensusStateDigest::new(hash.into());
498
499 let mut hasher = DefaultHash::new();
500 bcs::serialize_into(&mut hasher, &d1).unwrap();
501 bcs::serialize_into(&mut hasher, &d2).unwrap();
502 AdditionalConsensusStateDigest::new(hasher.finalize().into())
503 }
504 }
505
506 #[test]
507 fn test_additional_consensus_state() {
508 use crate::consensus_test_utils::TestConsensusCommit;
509
510 fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) {
511 let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
512 state.observe_commit(
513 &protocol_config,
514 100,
515 &TestConsensusCommit::empty(round, timestamp, 0),
516 );
517 }
518
519 let mut s1 = AdditionalConsensusState::new(3);
520 observe(&mut s1, 1, 1000);
521 observe(&mut s1, 2, 2000);
522 observe(&mut s1, 3, 3000);
523 observe(&mut s1, 4, 4000);
524
525 let mut s2 = AdditionalConsensusState::new(3);
526 observe(&mut s2, 2, 2000);
529 observe(&mut s2, 3, 3000);
530 observe(&mut s2, 4, 4000);
531
532 assert_eq!(s1.digest(), s2.digest());
533
534 observe(&mut s1, 5, 5000);
535 observe(&mut s2, 5, 5000);
536
537 assert_eq!(s1.digest(), s2.digest());
538 }
539}
540use additional_consensus_state::AdditionalConsensusState;
541pub(crate) use additional_consensus_state::{ConsensusCommitInfo, IndirectStateObserver};
542
543struct QueuedCheckpointRoots {
544 roots: CheckpointRoots,
545 timestamp: CheckpointTimestamp,
546 consensus_commit_ref: CommitRef,
547 rejected_transactions_digest: Digest,
548}
549
550struct Chunk<
551 T: crate::authority::shared_object_version_manager::AsTx = VerifiedExecutableTransaction,
552> {
553 schedulables: Vec<Schedulable<T>>,
554 settlement: Option<Schedulable<T>>,
555 height: CheckpointHeight,
556}
557
558impl<T: crate::authority::shared_object_version_manager::AsTx + Clone> Chunk<T> {
559 fn all_schedulables(&self) -> impl Iterator<Item = &Schedulable<T>> + Clone {
560 self.schedulables.iter().chain(self.settlement.iter())
561 }
562
563 fn all_schedulables_from(chunks: &[Self]) -> impl Iterator<Item = &Schedulable<T>> + Clone {
564 chunks.iter().flat_map(|c| c.all_schedulables())
565 }
566
567 fn to_checkpoint_roots(&self) -> CheckpointRoots {
568 let tx_roots: Vec<_> = self.schedulables.iter().map(|s| s.key()).collect();
569 let settlement_root = self.settlement.as_ref().map(|s| s.key());
570 CheckpointRoots {
571 tx_roots,
572 settlement_root,
573 height: self.height,
574 }
575 }
576}
577
578impl From<Chunk<VerifiedExecutableTransactionWithAliases>> for Chunk {
579 fn from(chunk: Chunk<VerifiedExecutableTransactionWithAliases>) -> Self {
580 Chunk {
581 schedulables: chunk.schedulables.into_iter().map(|s| s.into()).collect(),
582 settlement: chunk.settlement.map(|s| s.into()),
583 height: chunk.height,
584 }
585 }
586}
587
588pub(crate) struct CheckpointQueue {
596 last_built_timestamp: CheckpointTimestamp,
597 pending_roots: VecDeque<QueuedCheckpointRoots>,
598 height: u64,
599 pending_tx_count: usize,
600 current_checkpoint_seq: CheckpointSequenceNumber,
601 max_tx: usize,
602 min_checkpoint_interval_ms: u64,
603 execution_scheduler_sender: ExecutionSchedulerSender,
604}
605
606impl CheckpointQueue {
607 pub(crate) fn new(
608 last_built_timestamp: CheckpointTimestamp,
609 checkpoint_height: u64,
610 next_checkpoint_seq: CheckpointSequenceNumber,
611 max_tx: usize,
612 min_checkpoint_interval_ms: u64,
613 execution_scheduler_sender: ExecutionSchedulerSender,
614 ) -> Self {
615 Self {
616 last_built_timestamp,
617 pending_roots: VecDeque::new(),
618 height: checkpoint_height,
619 pending_tx_count: 0,
620 current_checkpoint_seq: next_checkpoint_seq,
621 max_tx,
622 min_checkpoint_interval_ms,
623 execution_scheduler_sender,
624 }
625 }
626
627 #[cfg(test)]
628 fn new_for_testing(
629 last_built_timestamp: CheckpointTimestamp,
630 checkpoint_height: u64,
631 next_checkpoint_seq: CheckpointSequenceNumber,
632 max_tx: usize,
633 min_checkpoint_interval_ms: u64,
634 ) -> Self {
635 let (sender, _receiver) = monitored_mpsc::unbounded_channel("test_checkpoint_queue_sender");
636 Self {
637 last_built_timestamp,
638 pending_roots: VecDeque::new(),
639 height: checkpoint_height,
640 pending_tx_count: 0,
641 current_checkpoint_seq: next_checkpoint_seq,
642 max_tx,
643 min_checkpoint_interval_ms,
644 execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
645 }
646 }
647
648 #[cfg(test)]
649 fn new_for_testing_with_sender(
650 last_built_timestamp: CheckpointTimestamp,
651 checkpoint_height: u64,
652 next_checkpoint_seq: CheckpointSequenceNumber,
653 max_tx: usize,
654 min_checkpoint_interval_ms: u64,
655 sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
656 ) -> Self {
657 Self {
658 last_built_timestamp,
659 pending_roots: VecDeque::new(),
660 height: checkpoint_height,
661 pending_tx_count: 0,
662 current_checkpoint_seq: next_checkpoint_seq,
663 max_tx,
664 min_checkpoint_interval_ms,
665 execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
666 }
667 }
668
669 pub(crate) fn last_built_timestamp(&self) -> CheckpointTimestamp {
670 self.last_built_timestamp
671 }
672
673 pub(crate) fn is_empty(&self) -> bool {
674 self.pending_roots.is_empty()
675 }
676
677 fn next_height(&mut self) -> u64 {
678 self.height += 1;
679 self.height
680 }
681
682 fn push_chunk(
683 &mut self,
684 chunk: Chunk,
685 assigned_versions: &HashMap<TransactionKey, AssignedVersions>,
686 timestamp: CheckpointTimestamp,
687 consensus_commit_ref: CommitRef,
688 rejected_transactions_digest: Digest,
689 ) -> Vec<PendingCheckpointV2> {
690 let max_tx = self.max_tx;
691 let user_tx_count = chunk.schedulables.len();
692
693 let roots = chunk.to_checkpoint_roots();
694
695 let schedulables: Vec<_> = chunk
696 .schedulables
697 .into_iter()
698 .map(|s| {
699 let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
700 (s, versions)
701 })
702 .collect();
703
704 let mut flushed_checkpoints = Vec::new();
705
706 if self.pending_tx_count > 0
707 && self.pending_tx_count + user_tx_count > max_tx
708 && let Some(checkpoint) = self.flush_forced()
709 {
710 flushed_checkpoints.push(checkpoint);
711 }
712
713 let settlement_info = chunk.settlement.as_ref().map(|s| {
714 let settlement_key = s.key();
715 let tx_keys: Vec<_> = schedulables.iter().map(|(s, _)| s.key()).collect();
716 SettlementBatchInfo {
717 settlement_key,
718 tx_keys,
719 checkpoint_height: chunk.height,
720 checkpoint_seq: self.current_checkpoint_seq,
721 assigned_versions: assigned_versions
722 .get(&settlement_key)
723 .cloned()
724 .unwrap_or_default(),
725 }
726 });
727
728 self.execution_scheduler_sender
729 .send(schedulables, settlement_info);
730
731 self.pending_tx_count += user_tx_count;
732 self.pending_roots.push_back(QueuedCheckpointRoots {
733 roots,
734 timestamp,
735 consensus_commit_ref,
736 rejected_transactions_digest,
737 });
738
739 flushed_checkpoints
740 }
741
742 pub(crate) fn flush(
743 &mut self,
744 current_timestamp: CheckpointTimestamp,
745 force: bool,
746 ) -> Option<PendingCheckpointV2> {
747 if !force && current_timestamp < self.last_built_timestamp + self.min_checkpoint_interval_ms
748 {
749 return None;
750 }
751 self.flush_forced()
752 }
753
754 fn flush_forced(&mut self) -> Option<PendingCheckpointV2> {
755 if self.pending_roots.is_empty() {
756 return None;
757 }
758
759 let to_flush: Vec<_> = self.pending_roots.drain(..).collect();
760 let last_root = to_flush.last().unwrap();
761
762 let checkpoint = PendingCheckpointV2 {
763 roots: to_flush.iter().map(|q| q.roots.clone()).collect(),
764 details: PendingCheckpointInfo {
765 timestamp_ms: last_root.timestamp,
766 last_of_epoch: false,
767 checkpoint_height: last_root.roots.height,
768 consensus_commit_ref: last_root.consensus_commit_ref,
769 rejected_transactions_digest: last_root.rejected_transactions_digest,
770 checkpoint_seq: Some(self.current_checkpoint_seq),
771 },
772 };
773
774 self.last_built_timestamp = last_root.timestamp;
775 self.pending_tx_count = 0;
776 self.current_checkpoint_seq += 1;
777
778 Some(checkpoint)
779 }
780
781 pub(crate) fn checkpoint_seq(&self) -> CheckpointSequenceNumber {
782 self.current_checkpoint_seq
783 .checked_sub(1)
784 .expect("checkpoint_seq called before any checkpoint was assigned")
785 }
786}
787
788pub struct ConsensusHandler<C> {
789 epoch_store: Arc<AuthorityPerEpochStore>,
792 last_consensus_stats: ExecutionIndicesWithStatsV2,
796 checkpoint_service: Arc<C>,
797 cache_reader: Arc<dyn ObjectCacheRead>,
799 committee: ConsensusCommittee,
801 metrics: Arc<AuthorityMetrics>,
804 processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
806 execution_scheduler_sender: ExecutionSchedulerSender,
808 consensus_adapter: Arc<ConsensusAdapter>,
810
811 throughput_calculator: Arc<ConsensusThroughputCalculator>,
813
814 additional_consensus_state: AdditionalConsensusState,
815
816 backpressure_subscriber: BackpressureSubscriber,
817
818 traffic_controller: Option<Arc<TrafficController>>,
819
820 congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
821
822 consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
823
824 checkpoint_queue: Mutex<CheckpointQueue>,
825}
826
827const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
828
829impl<C> ConsensusHandler<C> {
830 pub(crate) fn new(
831 epoch_store: Arc<AuthorityPerEpochStore>,
832 checkpoint_service: Arc<C>,
833 settlement_scheduler: SettlementScheduler,
834 consensus_adapter: Arc<ConsensusAdapter>,
835 cache_reader: Arc<dyn ObjectCacheRead>,
836 committee: ConsensusCommittee,
837 metrics: Arc<AuthorityMetrics>,
838 throughput_calculator: Arc<ConsensusThroughputCalculator>,
839 backpressure_subscriber: BackpressureSubscriber,
840 traffic_controller: Option<Arc<TrafficController>>,
841 congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
842 consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
843 ) -> Self {
844 assert!(
845 matches!(
846 epoch_store
847 .protocol_config()
848 .per_object_congestion_control_mode(),
849 PerObjectCongestionControlMode::ExecutionTimeEstimate(_)
850 ),
851 "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
852 );
853
854 let mut last_consensus_stats = epoch_store
856 .get_last_consensus_stats()
857 .expect("Should be able to read last consensus index");
858 if !last_consensus_stats.stats.is_initialized() {
860 last_consensus_stats.stats = ConsensusStats::new(committee.size());
861 if epoch_store
862 .protocol_config()
863 .split_checkpoints_in_consensus_handler()
864 {
865 last_consensus_stats.checkpoint_seq = epoch_store.previous_epoch_last_checkpoint();
866 }
867 }
868 let max_tx = epoch_store
869 .protocol_config()
870 .max_transactions_per_checkpoint() as usize;
871 let min_checkpoint_interval_ms = epoch_store
872 .protocol_config()
873 .min_checkpoint_interval_ms_as_option()
874 .unwrap_or_default();
875 let execution_scheduler_sender =
876 ExecutionSchedulerSender::start(settlement_scheduler, epoch_store.clone());
877 let commit_rate_estimate_window_size = epoch_store
878 .protocol_config()
879 .get_consensus_commit_rate_estimation_window_size();
880 let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
881 let checkpoint_height = last_consensus_stats.height;
882 let next_checkpoint_seq = if epoch_store
883 .protocol_config()
884 .split_checkpoints_in_consensus_handler()
885 {
886 last_consensus_stats.checkpoint_seq + 1
887 } else {
888 0
889 };
890 Self {
891 epoch_store,
892 last_consensus_stats,
893 checkpoint_service,
894 cache_reader,
895 committee,
896 metrics,
897 processed_cache: LruCache::new(
898 NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
899 ),
900 execution_scheduler_sender: execution_scheduler_sender.clone(),
901 consensus_adapter,
902 throughput_calculator,
903 additional_consensus_state: AdditionalConsensusState::new(
904 commit_rate_estimate_window_size,
905 ),
906 backpressure_subscriber,
907 traffic_controller,
908 congestion_logger,
909 consensus_gasless_counter,
910 checkpoint_queue: Mutex::new(CheckpointQueue::new(
911 last_built_timestamp,
912 checkpoint_height,
913 next_checkpoint_seq,
914 max_tx,
915 min_checkpoint_interval_ms,
916 execution_scheduler_sender,
917 )),
918 }
919 }
920
921 pub(crate) fn last_processed_subdag_index(&self) -> u64 {
923 self.last_consensus_stats.index.sub_dag_index
924 }
925
926 pub(crate) fn new_for_testing(
927 epoch_store: Arc<AuthorityPerEpochStore>,
928 checkpoint_service: Arc<C>,
929 execution_scheduler_sender: ExecutionSchedulerSender,
930 consensus_adapter: Arc<ConsensusAdapter>,
931 cache_reader: Arc<dyn ObjectCacheRead>,
932 committee: ConsensusCommittee,
933 metrics: Arc<AuthorityMetrics>,
934 throughput_calculator: Arc<ConsensusThroughputCalculator>,
935 backpressure_subscriber: BackpressureSubscriber,
936 traffic_controller: Option<Arc<TrafficController>>,
937 last_consensus_stats: ExecutionIndicesWithStatsV2,
938 ) -> Self {
939 let commit_rate_estimate_window_size = epoch_store
940 .protocol_config()
941 .get_consensus_commit_rate_estimation_window_size();
942 let max_tx = epoch_store
943 .protocol_config()
944 .max_transactions_per_checkpoint() as usize;
945 let min_checkpoint_interval_ms = epoch_store
946 .protocol_config()
947 .min_checkpoint_interval_ms_as_option()
948 .unwrap_or_default();
949 let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
950 let checkpoint_height = last_consensus_stats.height;
951 Self {
952 epoch_store,
953 last_consensus_stats,
954 checkpoint_service,
955 cache_reader,
956 committee,
957 metrics,
958 processed_cache: LruCache::new(
959 NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
960 ),
961 execution_scheduler_sender: execution_scheduler_sender.clone(),
962 consensus_adapter,
963 throughput_calculator,
964 additional_consensus_state: AdditionalConsensusState::new(
965 commit_rate_estimate_window_size,
966 ),
967 backpressure_subscriber,
968 traffic_controller,
969 congestion_logger: None,
970 consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
971 checkpoint_queue: Mutex::new(CheckpointQueue::new(
972 last_built_timestamp,
973 checkpoint_height,
974 0,
975 max_tx,
976 min_checkpoint_interval_ms,
977 execution_scheduler_sender,
978 )),
979 }
980 }
981}
982
983#[derive(Default)]
984struct CommitHandlerInput {
985 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
986 capability_notifications: Vec<AuthorityCapabilitiesV2>,
987 execution_time_observations: Vec<ExecutionTimeObservation>,
988 checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
989 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
990 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
991 end_of_publish_transactions: Vec<AuthorityName>,
992 new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
993}
994
995struct CommitHandlerState {
996 dkg_failed: bool,
997 randomness_round: Option<RandomnessRound>,
998 output: ConsensusCommitOutput,
999 indirect_state_observer: Option<IndirectStateObserver>,
1000 initial_reconfig_state: ReconfigState,
1001 occurrence_counts: HashMap<TransactionDigest, u32>,
1003}
1004
1005impl CommitHandlerState {
1006 fn get_notifications(&self) -> Vec<SequencedConsensusTransactionKey> {
1007 self.output
1008 .get_consensus_messages_processed()
1009 .cloned()
1010 .collect()
1011 }
1012
1013 fn init_randomness<'a, 'epoch>(
1014 &'a mut self,
1015 epoch_store: &'epoch AuthorityPerEpochStore,
1016 commit_info: &'a ConsensusCommitInfo,
1017 ) -> Option<tokio::sync::MutexGuard<'epoch, RandomnessManager>> {
1018 let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| {
1019 rm.try_lock()
1020 .expect("should only ever be called from the commit handler thread")
1021 });
1022
1023 let mut dkg_failed = false;
1024 let randomness_round = if epoch_store.randomness_state_enabled() {
1025 let randomness_manager = randomness_manager
1026 .as_mut()
1027 .expect("randomness manager should exist if randomness is enabled");
1028 match randomness_manager.dkg_status() {
1029 DkgStatus::Pending => None,
1030 DkgStatus::Failed => {
1031 dkg_failed = true;
1032 None
1033 }
1034 DkgStatus::Successful => {
1035 if self.initial_reconfig_state.should_accept_tx() {
1038 randomness_manager
1039 .reserve_next_randomness(commit_info.timestamp, &mut self.output)
1041 .expect("epoch ended")
1042 } else {
1043 None
1044 }
1045 }
1046 }
1047 } else {
1048 None
1049 };
1050
1051 if randomness_round.is_some() {
1052 assert!(!dkg_failed); }
1054
1055 self.randomness_round = randomness_round;
1056 self.dkg_failed = dkg_failed;
1057
1058 randomness_manager
1059 }
1060}
1061
1062impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
1063 fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
1067 assert!(
1068 self.epoch_store
1069 .protocol_config()
1070 .record_additional_state_digest_in_prologue()
1071 );
1072 let protocol_config = self.epoch_store.protocol_config();
1073 let epoch_start_time = self
1074 .epoch_store
1075 .epoch_start_config()
1076 .epoch_start_timestamp_ms();
1077
1078 self.additional_consensus_state.observe_commit(
1079 protocol_config,
1080 epoch_start_time,
1081 &consensus_commit,
1082 );
1083 }
1084
1085 #[cfg(test)]
1086 pub(crate) async fn handle_consensus_commit_for_test(
1087 &mut self,
1088 consensus_commit: impl ConsensusCommitAPI,
1089 ) {
1090 self.handle_consensus_commit(consensus_commit).await;
1091 }
1092
1093 #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))]
1094 pub(crate) async fn handle_consensus_commit(
1095 &mut self,
1096 consensus_commit: impl ConsensusCommitAPI,
1097 ) {
1098 {
1099 let protocol_config = self.epoch_store.protocol_config();
1100
1101 assert!(protocol_config.ignore_execution_time_observations_after_certs_closed());
1103 assert!(protocol_config.record_time_estimate_processed());
1104 assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints());
1105 assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest());
1106 assert!(protocol_config.authority_capabilities_v2());
1107 assert!(protocol_config.cancel_for_failed_dkg_early());
1108 }
1109
1110 self.backpressure_subscriber.await_no_backpressure().await;
1115
1116 let epoch = self.epoch_store.epoch();
1117
1118 let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit");
1119
1120 let last_committed_round = self.last_consensus_stats.index.last_committed_round;
1121
1122 self.epoch_store
1123 .consensus_tx_status_cache
1124 .update_last_committed_leader_round(last_committed_round as u32);
1125 self.epoch_store
1126 .tx_reject_reason_cache
1127 .set_last_committed_leader_round(last_committed_round as u32);
1128
1129 let commit_info = self.additional_consensus_state.observe_commit(
1130 self.epoch_store.protocol_config(),
1131 self.epoch_store
1132 .epoch_start_config()
1133 .epoch_start_timestamp_ms(),
1134 &consensus_commit,
1135 );
1136 assert!(commit_info.round > last_committed_round);
1137
1138 let (timestamp, leader_author, commit_sub_dag_index) =
1139 self.gather_commit_metadata(&consensus_commit);
1140
1141 info!(
1142 %consensus_commit,
1143 "Received consensus output {}. Rejected transactions {}",
1144 consensus_commit.commit_ref(),
1145 consensus_commit.rejected_transactions_debug_string(),
1146 );
1147
1148 self.last_consensus_stats.index = ExecutionIndices {
1149 last_committed_round: commit_info.round,
1150 sub_dag_index: commit_sub_dag_index,
1151 transaction_index: 0_u64,
1152 };
1153
1154 self.metrics
1155 .consensus_committed_subdags
1156 .with_label_values(&[&leader_author.to_string()])
1157 .inc();
1158
1159 let mut state = CommitHandlerState {
1160 output: ConsensusCommitOutput::new(commit_info.round),
1161 dkg_failed: false,
1162 randomness_round: None,
1163 indirect_state_observer: Some(IndirectStateObserver::new()),
1164 initial_reconfig_state: self
1165 .epoch_store
1166 .get_reconfig_state_read_lock_guard()
1167 .clone(),
1168 occurrence_counts: HashMap::new(),
1169 };
1170
1171 let FilteredConsensusOutput {
1172 transactions,
1173 owned_object_locks,
1174 } = self.filter_consensus_txns(
1175 state.initial_reconfig_state.clone(),
1176 &commit_info,
1177 &consensus_commit,
1178 );
1179 if !owned_object_locks.is_empty() {
1181 state.output.set_owned_object_locks(owned_object_locks);
1182 }
1183 let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions);
1184
1185 let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info);
1186
1187 let CommitHandlerInput {
1188 user_transactions,
1189 capability_notifications,
1190 execution_time_observations,
1191 checkpoint_signature_messages,
1192 randomness_dkg_messages,
1193 randomness_dkg_confirmations,
1194 end_of_publish_transactions,
1195 new_jwks,
1196 } = self.build_commit_handler_input(transactions);
1197
1198 self.process_gasless_transactions(&commit_info, &user_transactions);
1199 self.process_jwks(&mut state, &commit_info, new_jwks);
1200 self.process_capability_notifications(capability_notifications);
1201 self.process_execution_time_observations(&mut state, execution_time_observations);
1202 self.process_checkpoint_signature_messages(checkpoint_signature_messages);
1203
1204 self.process_dkg_updates(
1205 &mut state,
1206 &commit_info,
1207 randomness_manager.as_deref_mut(),
1208 randomness_dkg_messages,
1209 randomness_dkg_confirmations,
1210 )
1211 .await;
1212
1213 let mut execution_time_estimator = self
1214 .epoch_store
1215 .execution_time_estimator
1216 .try_lock()
1217 .expect("should only ever be called from the commit handler thread");
1218
1219 let authenticator_state_update_transaction =
1220 self.create_authenticator_state_update(last_committed_round, &commit_info);
1221
1222 let split_checkpoints = self
1223 .epoch_store
1224 .protocol_config()
1225 .split_checkpoints_in_consensus_handler();
1226
1227 let (lock, final_round, num_schedulables, checkpoint_height) = if !split_checkpoints {
1228 let (schedulables, randomness_schedulables, assigned_versions) = self
1229 .process_transactions(
1230 &mut state,
1231 &mut execution_time_estimator,
1232 &commit_info,
1233 authenticator_state_update_transaction,
1234 user_transactions,
1235 );
1236
1237 let (should_accept_tx, lock, final_round) =
1238 self.handle_close_epoch(&mut state, &commit_info, end_of_publish_transactions);
1239
1240 let make_checkpoint = should_accept_tx || final_round;
1241 if !make_checkpoint {
1242 return;
1244 }
1245
1246 if final_round {
1249 self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1250 }
1251
1252 let checkpoint_height = self
1253 .epoch_store
1254 .calculate_pending_checkpoint_height(commit_info.round);
1255
1256 self.create_pending_checkpoints(
1257 &mut state,
1258 &commit_info,
1259 &schedulables,
1260 &randomness_schedulables,
1261 final_round,
1262 );
1263
1264 let num_schedulables = schedulables.len();
1265 let mut all_schedulables = schedulables;
1266 all_schedulables.extend(randomness_schedulables);
1267 let assigned_versions = assigned_versions.into_map();
1268 let paired: Vec<_> = all_schedulables
1269 .into_iter()
1270 .map(|s| {
1271 let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
1272 (s, versions)
1273 })
1274 .collect();
1275 self.execution_scheduler_sender.send(paired, None);
1276
1277 (lock, final_round, num_schedulables, checkpoint_height)
1278 } else {
1279 let (
1280 transactions_to_schedule,
1281 randomness_transactions_to_schedule,
1282 cancelled_txns,
1283 randomness_state_update_transaction,
1284 ) = self.collect_transactions_to_schedule(
1285 &mut state,
1286 &mut execution_time_estimator,
1287 &commit_info,
1288 user_transactions,
1289 );
1290
1291 let (should_accept_tx, lock, final_round) =
1292 self.handle_close_epoch(&mut state, &commit_info, end_of_publish_transactions);
1293
1294 let make_checkpoint = should_accept_tx || final_round;
1295 if !make_checkpoint {
1296 return;
1298 }
1299
1300 if final_round {
1303 self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1304 }
1305
1306 let epoch = self.epoch_store.epoch();
1307 let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1308 .then_some(Schedulable::ConsensusCommitPrologue(
1309 epoch,
1310 commit_info.round,
1311 commit_info.consensus_commit_ref.index,
1312 ));
1313
1314 let schedulables: Vec<_> = itertools::chain!(
1315 consensus_commit_prologue.into_iter(),
1316 authenticator_state_update_transaction
1317 .into_iter()
1318 .map(Schedulable::Transaction),
1319 transactions_to_schedule
1320 .into_iter()
1321 .map(Schedulable::Transaction),
1322 )
1323 .collect();
1324
1325 let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1326 .into_iter()
1327 .chain(
1328 randomness_transactions_to_schedule
1329 .into_iter()
1330 .map(Schedulable::Transaction),
1331 )
1332 .collect();
1333
1334 let num_schedulables = schedulables.len();
1335 let checkpoint_height = self.create_pending_checkpoints_v2(
1336 &mut state,
1337 &commit_info,
1338 schedulables,
1339 randomness_schedulables,
1340 &cancelled_txns,
1341 final_round,
1342 );
1343
1344 (lock, final_round, num_schedulables, checkpoint_height)
1345 };
1346
1347 let notifications = state.get_notifications();
1348
1349 let mut stats_to_record = self.last_consensus_stats.clone();
1350 stats_to_record.height = checkpoint_height;
1351 if split_checkpoints {
1352 let queue = self.checkpoint_queue.lock().unwrap();
1353 stats_to_record.last_checkpoint_flush_timestamp = queue.last_built_timestamp();
1354 stats_to_record.checkpoint_seq = queue.checkpoint_seq();
1355 }
1356
1357 state.output.record_consensus_commit_stats(stats_to_record);
1358
1359 self.record_deferral_deletion(&mut state);
1360
1361 self.epoch_store
1362 .consensus_quarantine
1363 .write()
1364 .push_consensus_output(state.output, &self.epoch_store)
1365 .expect("push_consensus_output should not fail");
1366
1367 debug!(
1368 ?commit_info.round,
1369 "Notifying checkpoint service about new pending checkpoint(s)",
1370 );
1371 self.checkpoint_service
1372 .notify_checkpoint()
1373 .expect("failed to notify checkpoint service");
1374
1375 if let Some(randomness_round) = state.randomness_round {
1376 randomness_manager
1377 .as_ref()
1378 .expect("randomness manager should exist if randomness round is provided")
1379 .generate_randomness(epoch, randomness_round);
1380 }
1381
1382 self.epoch_store.process_notifications(notifications.iter());
1383
1384 self.log_final_round(lock, final_round);
1386
1387 self.throughput_calculator
1389 .add_transactions(timestamp, num_schedulables as u64);
1390
1391 fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
1392 let key = [commit_sub_dag_index, epoch];
1393 if sui_simulator::random::deterministic_probability_once(&key, 0.01) {
1394 sui_simulator::task::kill_current_node(None);
1395 }
1396 });
1397
1398 fail_point!("crash");
1399
1400 self.send_end_of_publish_if_needed().await;
1401 }
1402
1403 fn handle_close_epoch(
1404 &self,
1405 state: &mut CommitHandlerState,
1406 commit_info: &ConsensusCommitInfo,
1407 end_of_publish_transactions: Vec<AuthorityName>,
1408 ) -> (bool, Option<RwLockWriteGuard<'_, ReconfigState>>, bool) {
1409 let timestamp_based_epoch_close = self
1410 .epoch_store
1411 .protocol_config()
1412 .timestamp_based_epoch_close();
1413 let timestamp_triggered = timestamp_based_epoch_close
1414 && commit_info.timestamp >= self.epoch_store.next_reconfiguration_timestamp_ms();
1415 if timestamp_triggered {
1416 let reconfig_guard = self.epoch_store.get_reconfig_state_write_lock_guard();
1418 if reconfig_guard.should_accept_user_certs() {
1419 self.epoch_store.close_user_certs(reconfig_guard);
1420 }
1421 }
1422 let collected_eop_quorum =
1423 self.process_end_of_publish_transactions(state, end_of_publish_transactions);
1424 if timestamp_triggered || collected_eop_quorum {
1425 let (lock, final_round) = self.advance_eop_state_machine(state);
1426 (lock.should_accept_tx(), Some(lock), final_round)
1427 } else {
1428 (true, None, false)
1429 }
1430 }
1431
1432 fn record_end_of_epoch_execution_time_observations(
1433 &self,
1434 estimator: &mut ExecutionTimeEstimator,
1435 ) {
1436 self.epoch_store
1437 .end_of_epoch_execution_time_observations
1438 .set(estimator.take_observations())
1439 .expect("`stored_execution_time_observations` should only be set once at end of epoch");
1440 }
1441
1442 fn record_deferral_deletion(&self, state: &mut CommitHandlerState) {
1443 let mut deferred_transactions = self
1444 .epoch_store
1445 .consensus_output_cache
1446 .deferred_transactions
1447 .lock();
1448 for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() {
1449 deferred_transactions.remove(&deleted_deferred_key);
1450 }
1451 }
1452
1453 fn log_final_round(&self, lock: Option<RwLockWriteGuard<ReconfigState>>, final_round: bool) {
1454 if final_round {
1455 let epoch = self.epoch_store.epoch();
1456 info!(
1457 ?epoch,
1458 lock=?lock.as_ref(),
1459 final_round=?final_round,
1460 "Notified last checkpoint"
1461 );
1462 self.epoch_store.record_end_of_message_quorum_time_metric();
1463 }
1464 }
1465
1466 fn create_pending_checkpoints(
1467 &self,
1468 state: &mut CommitHandlerState,
1469 commit_info: &ConsensusCommitInfo,
1470 schedulables: &[Schedulable],
1471 randomness_schedulables: &[Schedulable],
1472 final_round: bool,
1473 ) {
1474 assert!(
1475 !self
1476 .epoch_store
1477 .protocol_config()
1478 .split_checkpoints_in_consensus_handler()
1479 );
1480
1481 let checkpoint_height = self
1482 .epoch_store
1483 .calculate_pending_checkpoint_height(commit_info.round);
1484
1485 let should_write_random_checkpoint = state.randomness_round.is_some()
1492 || (state.dkg_failed && !randomness_schedulables.is_empty());
1493
1494 let pending_checkpoint = PendingCheckpoint {
1495 roots: schedulables.iter().map(|s| s.key()).collect(),
1496 details: PendingCheckpointInfo {
1497 timestamp_ms: commit_info.timestamp,
1498 last_of_epoch: final_round && !should_write_random_checkpoint,
1499 checkpoint_height,
1500 consensus_commit_ref: commit_info.consensus_commit_ref,
1501 rejected_transactions_digest: commit_info.rejected_transactions_digest,
1502 checkpoint_seq: None,
1503 },
1504 };
1505 self.epoch_store
1506 .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1507 .expect("failed to write pending checkpoint");
1508
1509 info!(
1510 "Written pending checkpoint: {:?}",
1511 pending_checkpoint.details,
1512 );
1513
1514 if should_write_random_checkpoint {
1515 let pending_checkpoint = PendingCheckpoint {
1516 roots: randomness_schedulables.iter().map(|s| s.key()).collect(),
1517 details: PendingCheckpointInfo {
1518 timestamp_ms: commit_info.timestamp,
1519 last_of_epoch: final_round,
1520 checkpoint_height: checkpoint_height + 1,
1521 consensus_commit_ref: commit_info.consensus_commit_ref,
1522 rejected_transactions_digest: commit_info.rejected_transactions_digest,
1523 checkpoint_seq: None,
1524 },
1525 };
1526 self.epoch_store
1527 .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1528 .expect("failed to write pending checkpoint");
1529 }
1530 }
1531
1532 #[allow(clippy::type_complexity)]
1533 fn collect_transactions_to_schedule(
1534 &self,
1535 state: &mut CommitHandlerState,
1536 execution_time_estimator: &mut ExecutionTimeEstimator,
1537 commit_info: &ConsensusCommitInfo,
1538 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1539 ) -> (
1540 Vec<VerifiedExecutableTransactionWithAliases>,
1541 Vec<VerifiedExecutableTransactionWithAliases>,
1542 BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1543 Option<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1544 ) {
1545 let protocol_config = self.epoch_store.protocol_config();
1546 let epoch = self.epoch_store.epoch();
1547
1548 let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) =
1549 self.merge_and_reorder_transactions(state, commit_info, user_transactions);
1550
1551 let mut shared_object_congestion_tracker =
1552 self.init_congestion_tracker(commit_info, false, &ordered_txns);
1553 let mut shared_object_using_randomness_congestion_tracker =
1554 self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns);
1555
1556 let randomness_state_update_transaction = state
1557 .randomness_round
1558 .map(|round| Schedulable::RandomnessStateUpdate(epoch, round));
1559 debug!(
1560 "Randomness state update transaction: {:?}",
1561 randomness_state_update_transaction
1562 .as_ref()
1563 .map(|t| t.key())
1564 );
1565
1566 let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len());
1567 let mut randomness_transactions_to_schedule =
1568 Vec::with_capacity(ordered_randomness_txns.len());
1569 let mut deferred_txns = BTreeMap::new();
1570 let mut cancelled_txns = BTreeMap::new();
1571
1572 for transaction in ordered_txns {
1573 self.handle_deferral_and_cancellation(
1574 state,
1575 &mut cancelled_txns,
1576 &mut deferred_txns,
1577 &mut transactions_to_schedule,
1578 protocol_config,
1579 commit_info,
1580 transaction,
1581 &mut shared_object_congestion_tracker,
1582 &previously_deferred_tx_digests,
1583 execution_time_estimator,
1584 );
1585 }
1586
1587 for transaction in ordered_randomness_txns {
1588 if state.dkg_failed {
1589 debug!(
1590 "Canceling randomness-using transaction {:?} because DKG failed",
1591 transaction.tx().digest(),
1592 );
1593 cancelled_txns.insert(
1594 *transaction.tx().digest(),
1595 CancelConsensusCertificateReason::DkgFailed,
1596 );
1597 randomness_transactions_to_schedule.push(transaction);
1598 continue;
1599 }
1600 self.handle_deferral_and_cancellation(
1601 state,
1602 &mut cancelled_txns,
1603 &mut deferred_txns,
1604 &mut randomness_transactions_to_schedule,
1605 protocol_config,
1606 commit_info,
1607 transaction,
1608 &mut shared_object_using_randomness_congestion_tracker,
1609 &previously_deferred_tx_digests,
1610 execution_time_estimator,
1611 );
1612 }
1613
1614 let mut total_deferred_txns = 0;
1615 {
1616 let mut deferred_transactions = self
1617 .epoch_store
1618 .consensus_output_cache
1619 .deferred_transactions
1620 .lock();
1621 for (key, txns) in deferred_txns.into_iter() {
1622 total_deferred_txns += txns.len();
1623 deferred_transactions.insert(key, txns.clone());
1624 state.output.defer_transactions(key, txns);
1625 }
1626 }
1627
1628 self.metrics
1629 .consensus_handler_deferred_transactions
1630 .inc_by(total_deferred_txns as u64);
1631 self.metrics
1632 .consensus_handler_cancelled_transactions
1633 .inc_by(cancelled_txns.len() as u64);
1634 self.metrics
1635 .consensus_handler_max_object_costs
1636 .with_label_values(&["regular_commit"])
1637 .set(shared_object_congestion_tracker.max_cost() as i64);
1638 self.metrics
1639 .consensus_handler_max_object_costs
1640 .with_label_values(&["randomness_commit"])
1641 .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
1642
1643 let congestion_commit_data = shared_object_congestion_tracker.finish_commit(commit_info);
1644 let randomness_congestion_commit_data =
1645 shared_object_using_randomness_congestion_tracker.finish_commit(commit_info);
1646
1647 if let Some(logger) = &self.congestion_logger {
1648 let epoch = self.epoch_store.epoch();
1649 let mut logger = logger.lock().unwrap();
1650 logger.write_commit_log(epoch, commit_info, false, &congestion_commit_data);
1651 logger.write_commit_log(epoch, commit_info, true, &randomness_congestion_commit_data);
1652 }
1653
1654 if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get()
1655 && let Err(e) = tx_object_debts.try_send(
1656 congestion_commit_data
1657 .accumulated_debts
1658 .iter()
1659 .chain(randomness_congestion_commit_data.accumulated_debts.iter())
1660 .map(|(id, _)| *id)
1661 .collect(),
1662 )
1663 {
1664 info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
1665 }
1666
1667 state
1668 .output
1669 .set_congestion_control_object_debts(congestion_commit_data.accumulated_debts);
1670 state.output.set_congestion_control_randomness_object_debts(
1671 randomness_congestion_commit_data.accumulated_debts,
1672 );
1673
1674 (
1675 transactions_to_schedule,
1676 randomness_transactions_to_schedule,
1677 cancelled_txns,
1678 randomness_state_update_transaction,
1679 )
1680 }
1681
1682 fn process_transactions(
1683 &self,
1684 state: &mut CommitHandlerState,
1685 execution_time_estimator: &mut ExecutionTimeEstimator,
1686 commit_info: &ConsensusCommitInfo,
1687 authenticator_state_update_transaction: Option<VerifiedExecutableTransactionWithAliases>,
1688 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1689 ) -> (Vec<Schedulable>, Vec<Schedulable>, AssignedTxAndVersions) {
1690 let protocol_config = self.epoch_store.protocol_config();
1691 assert!(!protocol_config.split_checkpoints_in_consensus_handler());
1692 let epoch = self.epoch_store.epoch();
1693
1694 let (
1695 transactions_to_schedule,
1696 randomness_transactions_to_schedule,
1697 cancelled_txns,
1698 randomness_state_update_transaction,
1699 ) = self.collect_transactions_to_schedule(
1700 state,
1701 execution_time_estimator,
1702 commit_info,
1703 user_transactions,
1704 );
1705
1706 let mut settlement = None;
1707 let mut randomness_settlement = None;
1708 if self.epoch_store.accumulators_enabled() {
1709 let checkpoint_height = self
1710 .epoch_store
1711 .calculate_pending_checkpoint_height(commit_info.round);
1712
1713 settlement = Some(Schedulable::AccumulatorSettlement(epoch, checkpoint_height));
1714
1715 if state.randomness_round.is_some() || !randomness_transactions_to_schedule.is_empty() {
1716 randomness_settlement = Some(Schedulable::AccumulatorSettlement(
1717 epoch,
1718 checkpoint_height + 1,
1719 ));
1720 }
1721 }
1722
1723 let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1724 .then_some(Schedulable::ConsensusCommitPrologue(
1725 epoch,
1726 commit_info.round,
1727 commit_info.consensus_commit_ref.index,
1728 ));
1729
1730 let schedulables: Vec<_> = itertools::chain!(
1731 consensus_commit_prologue.into_iter(),
1732 authenticator_state_update_transaction
1733 .into_iter()
1734 .map(Schedulable::Transaction),
1735 transactions_to_schedule
1736 .into_iter()
1737 .map(Schedulable::Transaction),
1738 settlement,
1739 )
1740 .collect();
1741
1742 let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1743 .into_iter()
1744 .chain(
1745 randomness_transactions_to_schedule
1746 .into_iter()
1747 .map(Schedulable::Transaction),
1748 )
1749 .chain(randomness_settlement)
1750 .collect();
1751
1752 let assigned_versions = self
1753 .epoch_store
1754 .process_consensus_transaction_shared_object_versions(
1755 self.cache_reader.as_ref(),
1756 schedulables.iter(),
1757 randomness_schedulables.iter(),
1758 &cancelled_txns,
1759 &mut state.output,
1760 )
1761 .expect("failed to assign shared object versions");
1762
1763 let consensus_commit_prologue =
1764 self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1765
1766 let mut schedulables = schedulables;
1767 let mut assigned_versions = assigned_versions;
1768 if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1769 assert!(matches!(
1770 schedulables[0],
1771 Schedulable::ConsensusCommitPrologue(..)
1772 ));
1773 assert!(matches!(
1774 assigned_versions.0[0].0,
1775 TransactionKey::ConsensusCommitPrologue(..)
1776 ));
1777 assigned_versions.0[0].0 =
1778 TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1779 schedulables[0] = Schedulable::Transaction(consensus_commit_prologue);
1780 }
1781
1782 self.epoch_store
1783 .process_user_signatures(schedulables.iter().chain(randomness_schedulables.iter()));
1784
1785 let schedulables: Vec<Schedulable> = schedulables.into_iter().map(|s| s.into()).collect();
1787 let randomness_schedulables: Vec<Schedulable> = randomness_schedulables
1788 .into_iter()
1789 .map(|s| s.into())
1790 .collect();
1791
1792 (schedulables, randomness_schedulables, assigned_versions)
1793 }
1794
1795 #[allow(clippy::type_complexity)]
1796 fn create_pending_checkpoints_v2(
1797 &self,
1798 state: &mut CommitHandlerState,
1799 commit_info: &ConsensusCommitInfo,
1800 schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1801 randomness_schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1802 cancelled_txns: &BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1803 final_round: bool,
1804 ) -> CheckpointHeight {
1805 let protocol_config = self.epoch_store.protocol_config();
1806 assert!(protocol_config.split_checkpoints_in_consensus_handler());
1807
1808 let epoch = self.epoch_store.epoch();
1809 let accumulators_enabled = self.epoch_store.accumulators_enabled();
1810 let max_transactions_per_checkpoint =
1811 protocol_config.max_transactions_per_checkpoint() as usize;
1812
1813 let should_write_random_checkpoint = state.randomness_round.is_some()
1814 || (state.dkg_failed && !randomness_schedulables.is_empty());
1815
1816 let mut checkpoint_queue = self.checkpoint_queue.lock().unwrap();
1817
1818 let build_chunks =
1819 |schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1820 queue: &mut CheckpointQueue|
1821 -> Vec<Chunk<VerifiedExecutableTransactionWithAliases>> {
1822 schedulables
1823 .chunks(max_transactions_per_checkpoint)
1824 .map(|chunk| {
1825 let height = queue.next_height();
1826 let schedulables: Vec<_> = chunk.to_vec();
1827 let settlement = if accumulators_enabled {
1828 Some(Schedulable::AccumulatorSettlement(epoch, height))
1829 } else {
1830 None
1831 };
1832 Chunk {
1833 schedulables,
1834 settlement,
1835 height,
1836 }
1837 })
1838 .collect()
1839 };
1840
1841 let num_schedulables = schedulables.len();
1842 let chunked_schedulables = build_chunks(schedulables, &mut checkpoint_queue);
1843 if chunked_schedulables.len() > 1 {
1844 info!(
1845 "Splitting transactions into {} checkpoint chunks (num_schedulables={}, max_tx={})",
1846 chunked_schedulables.len(),
1847 num_schedulables,
1848 max_transactions_per_checkpoint
1849 );
1850 assert_reachable!("checkpoint split due to transaction limit");
1851 }
1852 let chunked_randomness_schedulables = if should_write_random_checkpoint {
1853 build_chunks(randomness_schedulables, &mut checkpoint_queue)
1854 } else {
1855 vec![]
1856 };
1857
1858 let schedulables_for_version_assignment =
1859 Chunk::all_schedulables_from(&chunked_schedulables);
1860 let randomness_schedulables_for_version_assignment =
1861 Chunk::all_schedulables_from(&chunked_randomness_schedulables);
1862
1863 let assigned_versions = self
1864 .epoch_store
1865 .process_consensus_transaction_shared_object_versions(
1866 self.cache_reader.as_ref(),
1867 schedulables_for_version_assignment,
1868 randomness_schedulables_for_version_assignment,
1869 cancelled_txns,
1870 &mut state.output,
1871 )
1872 .expect("failed to assign shared object versions");
1873
1874 let consensus_commit_prologue =
1875 self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1876
1877 let mut chunked_schedulables = chunked_schedulables;
1878 let mut assigned_versions = assigned_versions;
1879 if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1880 assert!(matches!(
1881 chunked_schedulables[0].schedulables[0],
1882 Schedulable::ConsensusCommitPrologue(..)
1883 ));
1884 assert!(matches!(
1885 assigned_versions.0[0].0,
1886 TransactionKey::ConsensusCommitPrologue(..)
1887 ));
1888 assigned_versions.0[0].0 =
1889 TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1890 chunked_schedulables[0].schedulables[0] =
1891 Schedulable::Transaction(consensus_commit_prologue);
1892 }
1893
1894 let assigned_versions = assigned_versions.into_map();
1895
1896 self.epoch_store.process_user_signatures(
1897 chunked_schedulables
1898 .iter()
1899 .flat_map(|c| c.all_schedulables())
1900 .chain(
1901 chunked_randomness_schedulables
1902 .iter()
1903 .flat_map(|c| c.all_schedulables()),
1904 ),
1905 );
1906
1907 let commit_height = chunked_randomness_schedulables
1908 .last()
1909 .or(chunked_schedulables.last())
1910 .map(|c| c.height)
1911 .expect("at least one checkpoint root must be created per commit");
1912
1913 let mut pending_checkpoints = Vec::new();
1914 for chunk in chunked_schedulables {
1915 pending_checkpoints.extend(checkpoint_queue.push_chunk(
1916 chunk.into(),
1917 &assigned_versions,
1918 commit_info.timestamp,
1919 commit_info.consensus_commit_ref,
1920 commit_info.rejected_transactions_digest,
1921 ));
1922 }
1923
1924 if protocol_config.merge_randomness_into_checkpoint() {
1925 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, final_round));
1930
1931 if should_write_random_checkpoint {
1932 for chunk in chunked_randomness_schedulables {
1933 pending_checkpoints.extend(checkpoint_queue.push_chunk(
1934 chunk.into(),
1935 &assigned_versions,
1936 commit_info.timestamp,
1937 commit_info.consensus_commit_ref,
1938 commit_info.rejected_transactions_digest,
1939 ));
1940 }
1941 if final_round {
1942 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1943 }
1944 }
1945 } else {
1946 let force = final_round || should_write_random_checkpoint;
1947 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, force));
1948
1949 if should_write_random_checkpoint {
1950 for chunk in chunked_randomness_schedulables {
1951 pending_checkpoints.extend(checkpoint_queue.push_chunk(
1952 chunk.into(),
1953 &assigned_versions,
1954 commit_info.timestamp,
1955 commit_info.consensus_commit_ref,
1956 commit_info.rejected_transactions_digest,
1957 ));
1958 }
1959 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1960 }
1961 }
1962
1963 if final_round && let Some(last) = pending_checkpoints.last_mut() {
1964 last.details.last_of_epoch = true;
1965 }
1966
1967 let queue_drained = checkpoint_queue.is_empty();
1968 drop(checkpoint_queue);
1969
1970 for pending_checkpoint in pending_checkpoints {
1971 debug!(
1972 checkpoint_height = pending_checkpoint.details.checkpoint_height,
1973 roots_count = pending_checkpoint.num_roots(),
1974 "Writing pending checkpoint",
1975 );
1976 self.epoch_store
1977 .write_pending_checkpoint_v2(&mut state.output, &pending_checkpoint)
1978 .expect("failed to write pending checkpoint");
1979 }
1980
1981 state.output.set_checkpoint_queue_drained(queue_drained);
1982
1983 commit_height
1984 }
1985
1986 fn add_consensus_commit_prologue_transaction<'a>(
1990 &'a self,
1991 state: &'a mut CommitHandlerState,
1992 commit_info: &'a ConsensusCommitInfo,
1993 assigned_versions: &AssignedTxAndVersions,
1994 ) -> Option<VerifiedExecutableTransactionWithAliases> {
1995 {
1996 if commit_info.skip_consensus_commit_prologue_in_test {
1997 return None;
1998 }
1999 }
2000
2001 let mut cancelled_txn_version_assignment = Vec::new();
2002
2003 let protocol_config = self.epoch_store.protocol_config();
2004
2005 for (txn_key, assigned_versions) in assigned_versions.0.iter() {
2006 let Some(d) = txn_key.as_digest() else {
2007 continue;
2008 };
2009
2010 if !protocol_config.include_cancelled_randomness_txns_in_prologue()
2011 && assigned_versions
2012 .shared_object_versions
2013 .iter()
2014 .any(|((id, _), _)| *id == SUI_RANDOMNESS_STATE_OBJECT_ID)
2015 {
2016 continue;
2017 }
2018
2019 if assigned_versions
2020 .shared_object_versions
2021 .iter()
2022 .any(|(_, version)| version.is_cancelled())
2023 {
2024 assert_reachable!("cancelled transactions");
2025 cancelled_txn_version_assignment
2026 .push((*d, assigned_versions.shared_object_versions.clone()));
2027 }
2028 }
2029
2030 fail_point_arg!(
2031 "additional_cancelled_txns_for_tests",
2032 |additional_cancelled_txns: Vec<(
2033 TransactionDigest,
2034 Vec<(ConsensusObjectSequenceKey, SequenceNumber)>
2035 )>| {
2036 cancelled_txn_version_assignment.extend(additional_cancelled_txns);
2037 }
2038 );
2039
2040 let transaction = commit_info.create_consensus_commit_prologue_transaction(
2041 self.epoch_store.epoch(),
2042 self.epoch_store.protocol_config(),
2043 cancelled_txn_version_assignment,
2044 commit_info,
2045 state.indirect_state_observer.take().unwrap(),
2046 );
2047 Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2048 transaction,
2049 ))
2050 }
2051
2052 fn handle_deferral_and_cancellation(
2053 &self,
2054 state: &mut CommitHandlerState,
2055 cancelled_txns: &mut BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
2056 deferred_txns: &mut BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>,
2057 scheduled_txns: &mut Vec<VerifiedExecutableTransactionWithAliases>,
2058 protocol_config: &ProtocolConfig,
2059 commit_info: &ConsensusCommitInfo,
2060 transaction: VerifiedExecutableTransactionWithAliases,
2061 shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
2062 previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
2063 execution_time_estimator: &ExecutionTimeEstimator,
2064 ) {
2065 if protocol_config.defer_unpaid_amplification() {
2069 let occurrence_count = state
2070 .occurrence_counts
2071 .get(transaction.tx().digest())
2072 .copied()
2073 .unwrap_or(0);
2074
2075 let rgp = self.epoch_store.reference_gas_price();
2076 let gas_price = transaction.tx().transaction_data().gas_price();
2077 let allowed_count = (gas_price / rgp.max(1)) + 1;
2078
2079 if occurrence_count as u64 > allowed_count {
2080 self.metrics
2081 .consensus_handler_unpaid_amplification_deferrals
2082 .inc();
2083
2084 let deferred_from_round = previously_deferred_tx_digests
2085 .get(transaction.tx().digest())
2086 .map(|k| k.deferred_from_round())
2087 .unwrap_or(commit_info.round);
2088
2089 let deferral_key = DeferralKey::new_for_consensus_round(
2090 commit_info.round + 1,
2091 deferred_from_round,
2092 );
2093
2094 if transaction_deferral_within_limit(
2095 &deferral_key,
2096 protocol_config.max_deferral_rounds_for_congestion_control(),
2097 ) {
2098 assert_reachable!("unpaid amplification deferral");
2099 debug!(
2100 "Deferring transaction {:?} due to unpaid amplification (count={}, allowed={})",
2101 transaction.tx().digest(),
2102 occurrence_count,
2103 allowed_count
2104 );
2105 deferred_txns
2106 .entry(deferral_key)
2107 .or_default()
2108 .push(transaction);
2109 return;
2110 }
2111 }
2112 }
2113
2114 let tx_cost = shared_object_congestion_tracker.get_tx_cost(
2115 execution_time_estimator,
2116 transaction.tx(),
2117 state.indirect_state_observer.as_mut().unwrap(),
2118 );
2119
2120 let deferral_info = self.epoch_store.should_defer(
2121 transaction.tx(),
2122 commit_info,
2123 state.dkg_failed,
2124 state.randomness_round.is_some(),
2125 previously_deferred_tx_digests,
2126 shared_object_congestion_tracker,
2127 );
2128
2129 if let Some((deferral_key, deferral_reason)) = deferral_info {
2130 debug!(
2131 "Deferring consensus certificate for transaction {:?} until {:?}",
2132 transaction.tx().digest(),
2133 deferral_key
2134 );
2135
2136 match deferral_reason {
2137 DeferralReason::RandomnessNotReady => {
2138 deferred_txns
2139 .entry(deferral_key)
2140 .or_default()
2141 .push(transaction);
2142 }
2143 DeferralReason::SharedObjectCongestion(congested_objects) => {
2144 self.metrics.consensus_handler_congested_transactions.inc();
2145 if transaction_deferral_within_limit(
2146 &deferral_key,
2147 protocol_config.max_deferral_rounds_for_congestion_control(),
2148 ) {
2149 deferred_txns
2150 .entry(deferral_key)
2151 .or_default()
2152 .push(transaction);
2153 } else {
2154 assert_sometimes!(
2155 transaction.tx().data().transaction_data().uses_randomness(),
2156 "cancelled randomness-using transaction"
2157 );
2158 assert_sometimes!(
2159 !transaction.tx().data().transaction_data().uses_randomness(),
2160 "cancelled non-randomness-using transaction"
2161 );
2162
2163 debug!(
2165 "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}",
2166 transaction.tx().digest(),
2167 deferral_key,
2168 congested_objects
2169 );
2170 cancelled_txns.insert(
2171 *transaction.tx().digest(),
2172 CancelConsensusCertificateReason::CongestionOnObjects(
2173 congested_objects,
2174 ),
2175 );
2176 scheduled_txns.push(transaction);
2177 }
2178 }
2179 }
2180 } else {
2181 shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, transaction.tx());
2183 scheduled_txns.push(transaction);
2184 }
2185 }
2186
2187 fn merge_and_reorder_transactions(
2188 &self,
2189 state: &mut CommitHandlerState,
2190 commit_info: &ConsensusCommitInfo,
2191 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
2192 ) -> (
2193 Vec<VerifiedExecutableTransactionWithAliases>,
2194 Vec<VerifiedExecutableTransactionWithAliases>,
2195 HashMap<TransactionDigest, DeferralKey>,
2196 ) {
2197 let protocol_config = self.epoch_store.protocol_config();
2198
2199 let (mut txns, mut randomness_txns, previously_deferred_tx_digests) =
2200 self.load_deferred_transactions(state, commit_info);
2201
2202 txns.reserve(user_transactions.len());
2203 randomness_txns.reserve(user_transactions.len());
2204
2205 let mut txns: Vec<_> = txns
2208 .into_iter()
2209 .filter_map(|tx| {
2210 if tx.tx().transaction_data().uses_randomness() {
2211 randomness_txns.push(tx);
2212 None
2213 } else {
2214 Some(tx)
2215 }
2216 })
2217 .collect();
2218
2219 for txn in user_transactions {
2220 if txn.tx().transaction_data().uses_randomness() {
2221 randomness_txns.push(txn);
2222 } else {
2223 txns.push(txn);
2224 }
2225 }
2226
2227 PostConsensusTxReorder::reorder(
2228 &mut txns,
2229 protocol_config.consensus_transaction_ordering(),
2230 );
2231 PostConsensusTxReorder::reorder(
2232 &mut randomness_txns,
2233 protocol_config.consensus_transaction_ordering(),
2234 );
2235
2236 (txns, randomness_txns, previously_deferred_tx_digests)
2237 }
2238
2239 fn load_deferred_transactions(
2240 &self,
2241 state: &mut CommitHandlerState,
2242 commit_info: &ConsensusCommitInfo,
2243 ) -> (
2244 Vec<VerifiedExecutableTransactionWithAliases>,
2245 Vec<VerifiedExecutableTransactionWithAliases>,
2246 HashMap<TransactionDigest, DeferralKey>,
2247 ) {
2248 let mut previously_deferred_tx_digests = HashMap::new();
2249
2250 let deferred_txs: Vec<_> = self
2251 .epoch_store
2252 .load_deferred_transactions_for_up_to_consensus_round_v2(
2253 &mut state.output,
2254 commit_info.round,
2255 )
2256 .expect("db error")
2257 .into_iter()
2258 .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2259 .map(|(key, tx)| {
2260 previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2261 tx
2262 })
2263 .collect();
2264 trace!(
2265 "loading deferred transactions: {:?}",
2266 deferred_txs.iter().map(|tx| tx.tx().digest())
2267 );
2268
2269 let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() {
2270 let txns: Vec<_> = self
2271 .epoch_store
2272 .load_deferred_transactions_for_randomness_v2(&mut state.output)
2273 .expect("db error")
2274 .into_iter()
2275 .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2276 .map(|(key, tx)| {
2277 previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2278 tx
2279 })
2280 .collect();
2281 trace!(
2282 "loading deferred randomness transactions: {:?}",
2283 txns.iter().map(|tx| tx.tx().digest())
2284 );
2285 txns
2286 } else {
2287 vec![]
2288 };
2289
2290 (
2291 deferred_txs,
2292 deferred_randomness_txs,
2293 previously_deferred_tx_digests,
2294 )
2295 }
2296
2297 fn init_congestion_tracker(
2298 &self,
2299 commit_info: &ConsensusCommitInfo,
2300 for_randomness: bool,
2301 txns: &[VerifiedExecutableTransactionWithAliases],
2302 ) -> SharedObjectCongestionTracker {
2303 #[allow(unused_mut)]
2304 let mut ret = SharedObjectCongestionTracker::from_protocol_config(
2305 self.epoch_store
2306 .consensus_quarantine
2307 .read()
2308 .load_initial_object_debts(
2309 &self.epoch_store,
2310 commit_info.round,
2311 for_randomness,
2312 txns,
2313 )
2314 .expect("db error"),
2315 self.epoch_store.protocol_config(),
2316 for_randomness,
2317 self.congestion_logger.is_some(),
2318 );
2319
2320 fail_point_arg!(
2321 "initial_congestion_tracker",
2322 |tracker: SharedObjectCongestionTracker| {
2323 info!(
2324 "Initialize shared_object_congestion_tracker to {:?}",
2325 tracker
2326 );
2327 ret = tracker;
2328 }
2329 );
2330
2331 ret
2332 }
2333
2334 fn process_gasless_transactions(
2335 &self,
2336 commit_info: &ConsensusCommitInfo,
2337 user_transactions: &[VerifiedExecutableTransactionWithAliases],
2338 ) {
2339 let gasless_count = user_transactions
2340 .iter()
2341 .filter(|txn| txn.tx().transaction_data().is_gasless_transaction())
2342 .count() as u64;
2343 self.consensus_gasless_counter
2344 .record_commit(commit_info.timestamp, gasless_count);
2345 }
2346
2347 fn process_jwks(
2348 &self,
2349 state: &mut CommitHandlerState,
2350 commit_info: &ConsensusCommitInfo,
2351 new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
2352 ) {
2353 for (authority_name, jwk_id, jwk) in new_jwks {
2354 self.epoch_store.record_jwk_vote(
2355 &mut state.output,
2356 commit_info.round,
2357 authority_name,
2358 &jwk_id,
2359 &jwk,
2360 );
2361 }
2362 }
2363
2364 fn process_capability_notifications(
2365 &self,
2366 capability_notifications: Vec<AuthorityCapabilitiesV2>,
2367 ) {
2368 for capabilities in capability_notifications {
2369 self.epoch_store
2370 .record_capabilities_v2(&capabilities)
2371 .expect("db error");
2372 }
2373 }
2374
2375 fn process_execution_time_observations(
2376 &self,
2377 state: &mut CommitHandlerState,
2378 execution_time_observations: Vec<ExecutionTimeObservation>,
2379 ) {
2380 let mut execution_time_estimator = self
2381 .epoch_store
2382 .execution_time_estimator
2383 .try_lock()
2384 .expect("should only ever be called from the commit handler thread");
2385
2386 for ExecutionTimeObservation {
2387 authority,
2388 generation,
2389 estimates,
2390 } in execution_time_observations
2391 {
2392 let authority_index = self
2393 .epoch_store
2394 .committee()
2395 .authority_index(&authority)
2396 .unwrap();
2397 execution_time_estimator.process_observations_from_consensus(
2398 authority_index,
2399 Some(generation),
2400 &estimates,
2401 );
2402 state
2403 .output
2404 .insert_execution_time_observation(authority_index, generation, estimates);
2405 }
2406 }
2407
2408 fn process_checkpoint_signature_messages(
2409 &self,
2410 checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
2411 ) {
2412 for checkpoint_signature_message in checkpoint_signature_messages {
2413 self.checkpoint_service
2414 .notify_checkpoint_signature(&checkpoint_signature_message)
2415 .expect("db error");
2416 }
2417 }
2418
2419 async fn process_dkg_updates(
2420 &self,
2421 state: &mut CommitHandlerState,
2422 commit_info: &ConsensusCommitInfo,
2423 randomness_manager: Option<&mut RandomnessManager>,
2424 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2425 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2426 ) {
2427 if !self.epoch_store.randomness_state_enabled() {
2428 let num_dkg_messages = randomness_dkg_messages.len();
2429 let num_dkg_confirmations = randomness_dkg_confirmations.len();
2430 if num_dkg_messages + num_dkg_confirmations > 0 {
2431 debug_fatal!(
2432 "received {} RandomnessDkgMessage and {} RandomnessDkgConfirmation messages when randomness is not enabled",
2433 num_dkg_messages,
2434 num_dkg_confirmations
2435 );
2436 }
2437 return;
2438 }
2439
2440 let randomness_manager =
2441 randomness_manager.expect("randomness manager should exist if randomness is enabled");
2442
2443 let randomness_dkg_updates =
2444 self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages);
2445
2446 let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations(
2447 state,
2448 randomness_manager,
2449 randomness_dkg_confirmations,
2450 );
2451
2452 let always_advance_dkg_to_resolution = (self
2456 .epoch_store
2457 .protocol_config()
2458 .always_advance_dkg_to_resolution()
2459 || (self.epoch_store.get_chain() == Chain::Mainnet
2460 && self.epoch_store.epoch() >= 1143))
2461 && randomness_manager.dkg_status() == DkgStatus::Pending;
2462
2463 if randomness_dkg_updates
2464 || randomness_dkg_confirmation_updates
2465 || always_advance_dkg_to_resolution
2466 {
2467 randomness_manager
2468 .advance_dkg(&mut state.output, commit_info.round)
2469 .await
2470 .expect("epoch ended");
2471 }
2472 }
2473
2474 fn process_randomness_dkg_messages(
2475 &self,
2476 randomness_manager: &mut RandomnessManager,
2477 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2478 ) -> bool {
2479 if randomness_dkg_messages.is_empty() {
2480 return false;
2481 }
2482
2483 let mut randomness_state_updated = false;
2484 for (authority, bytes) in randomness_dkg_messages {
2485 match bcs::from_bytes(&bytes) {
2486 Ok(message) => {
2487 randomness_manager
2488 .add_message(&authority, message)
2489 .expect("epoch ended");
2491 randomness_state_updated = true;
2492 }
2493
2494 Err(e) => {
2495 warn!(
2496 "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
2497 authority.concise(),
2498 );
2499 }
2500 }
2501 }
2502
2503 randomness_state_updated
2504 }
2505
2506 fn process_randomness_dkg_confirmations(
2507 &self,
2508 state: &mut CommitHandlerState,
2509 randomness_manager: &mut RandomnessManager,
2510 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2511 ) -> bool {
2512 if randomness_dkg_confirmations.is_empty() {
2513 return false;
2514 }
2515
2516 let mut randomness_state_updated = false;
2517 for (authority, bytes) in randomness_dkg_confirmations {
2518 match bcs::from_bytes(&bytes) {
2519 Ok(message) => {
2520 randomness_manager
2521 .add_confirmation(&mut state.output, &authority, message)
2522 .expect("epoch ended");
2524 randomness_state_updated = true;
2525 }
2526 Err(e) => {
2527 warn!(
2528 "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}",
2529 authority.concise(),
2530 );
2531 }
2532 }
2533 }
2534
2535 randomness_state_updated
2536 }
2537
2538 fn process_end_of_publish_transactions(
2540 &self,
2541 state: &mut CommitHandlerState,
2542 end_of_publish_transactions: Vec<AuthorityName>,
2543 ) -> bool {
2544 let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect(
2545 "No contention on end_of_publish as it is only accessed from consensus handler",
2546 );
2547
2548 if eop_aggregator.has_quorum() {
2549 return true;
2550 }
2551
2552 if end_of_publish_transactions.is_empty() {
2553 return false;
2554 }
2555
2556 for authority in end_of_publish_transactions {
2557 info!("Received EndOfPublish from {:?}", authority.concise());
2558
2559 state.output.insert_end_of_publish(authority);
2562 if eop_aggregator
2563 .insert_generic(authority, ())
2564 .is_quorum_reached()
2565 {
2566 debug!(
2567 "Collected enough end_of_publish messages with last message from validator {:?}",
2568 authority.concise(),
2569 );
2570 return true;
2571 }
2572 }
2573
2574 false
2575 }
2576
2577 fn advance_eop_state_machine(
2580 &self,
2581 state: &mut CommitHandlerState,
2582 ) -> (
2583 RwLockWriteGuard<'_, ReconfigState>,
2584 bool, ) {
2586 let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard();
2587 let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx();
2588
2589 reconfig_state.close_all_certs();
2590
2591 let commit_has_deferred_txns = state.output.has_deferred_transactions();
2592 let previous_commits_have_deferred_txns = !self.epoch_store.deferred_transactions_empty();
2593
2594 if !commit_has_deferred_txns && !previous_commits_have_deferred_txns {
2595 if !start_state_is_reject_all_tx {
2596 info!("Transitioning to RejectAllTx");
2597 }
2598 reconfig_state.close_all_tx();
2599 } else {
2600 debug!(
2601 "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}",
2602 previous_commits_have_deferred_txns, commit_has_deferred_txns,
2603 );
2604 }
2605
2606 state.output.store_reconfig_state(reconfig_state.clone());
2607
2608 if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() {
2609 (reconfig_state, true)
2610 } else {
2611 (reconfig_state, false)
2612 }
2613 }
2614
2615 fn gather_commit_metadata(
2616 &self,
2617 consensus_commit: &impl ConsensusCommitAPI,
2618 ) -> (u64, AuthorityIndex, u64) {
2619 let timestamp = consensus_commit.commit_timestamp_ms();
2620 let leader_author = consensus_commit.leader_author_index();
2621 let commit_sub_dag_index = consensus_commit.commit_sub_dag_index();
2622
2623 let system_time_ms = SystemTime::now()
2624 .duration_since(UNIX_EPOCH)
2625 .unwrap()
2626 .as_millis() as i64;
2627
2628 let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64);
2629 let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0;
2630 self.metrics
2631 .consensus_timestamp_bias
2632 .observe(consensus_timestamp_bias_seconds);
2633
2634 let epoch_start = self
2635 .epoch_store
2636 .epoch_start_config()
2637 .epoch_start_timestamp_ms();
2638 let timestamp = if timestamp < epoch_start {
2639 error!(
2640 "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}"
2641 );
2642 epoch_start
2643 } else {
2644 timestamp
2645 };
2646
2647 (timestamp, leader_author, commit_sub_dag_index)
2648 }
2649
2650 fn create_authenticator_state_update(
2651 &self,
2652 last_committed_round: u64,
2653 commit_info: &ConsensusCommitInfo,
2654 ) -> Option<VerifiedExecutableTransactionWithAliases> {
2655 let new_jwks = self
2663 .epoch_store
2664 .get_new_jwks(last_committed_round)
2665 .expect("Unrecoverable error in consensus handler");
2666
2667 if !new_jwks.is_empty() {
2668 let authenticator_state_update_transaction = authenticator_state_update_transaction(
2669 &self.epoch_store,
2670 commit_info.round,
2671 new_jwks,
2672 );
2673 debug!(
2674 "adding AuthenticatorStateUpdate({:?}) tx: {:?}",
2675 authenticator_state_update_transaction.digest(),
2676 authenticator_state_update_transaction,
2677 );
2678
2679 Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2680 authenticator_state_update_transaction,
2681 ))
2682 } else {
2683 None
2684 }
2685 }
2686
2687 #[instrument(level = "trace", skip_all)]
2690 fn filter_consensus_txns(
2691 &mut self,
2692 initial_reconfig_state: ReconfigState,
2693 commit_info: &ConsensusCommitInfo,
2694 consensus_commit: &impl ConsensusCommitAPI,
2695 ) -> FilteredConsensusOutput {
2696 let mut transactions = Vec::new();
2697 let mut owned_object_locks = HashMap::new();
2698 let epoch = self.epoch_store.epoch();
2699 let mut num_finalized_user_transactions = vec![0; self.committee.size()];
2700 let mut num_rejected_user_transactions = vec![0; self.committee.size()];
2701 for (block, parsed_transactions) in consensus_commit.transactions() {
2702 let author = block.author.value();
2703 self.last_consensus_stats.stats.inc_num_messages(author);
2705
2706 self.epoch_store.set_consensus_tx_status(
2708 ConsensusPosition::ping(epoch, block),
2709 ConsensusTxStatus::Finalized,
2710 );
2711
2712 for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() {
2713 let position = ConsensusPosition {
2714 epoch,
2715 block,
2716 index: tx_index as TransactionIndex,
2717 };
2718
2719 if let Some(tx) = parsed.transaction.kind.as_user_transaction() {
2722 let digest = tx.digest();
2723 if let Some((spam_weight, submitter_client_addrs)) = self
2724 .epoch_store
2725 .submitted_transaction_cache
2726 .increment_submission_count(digest)
2727 {
2728 if let Some(ref traffic_controller) = self.traffic_controller {
2729 debug!(
2730 "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses",
2731 submitter_client_addrs.len()
2732 );
2733
2734 for addr in submitter_client_addrs {
2736 traffic_controller.tally(TrafficTally::new(
2737 Some(addr),
2738 None,
2739 None,
2740 spam_weight.clone(),
2741 ));
2742 }
2743 } else {
2744 warn!(
2745 "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)",
2746 submitter_client_addrs.len()
2747 );
2748 }
2749 }
2750 }
2751
2752 if parsed.rejected {
2753 if parsed.transaction.is_user_transaction() {
2755 self.epoch_store
2756 .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
2757 num_rejected_user_transactions[author] += 1;
2758 }
2759 continue;
2761 }
2762
2763 let kind = classify(&parsed.transaction);
2764 self.metrics
2765 .consensus_handler_processed
2766 .with_label_values(&[kind])
2767 .inc();
2768 self.metrics
2769 .consensus_handler_transaction_sizes
2770 .with_label_values(&[kind])
2771 .observe(parsed.serialized_len as f64);
2772 if parsed.transaction.is_user_transaction() {
2773 self.last_consensus_stats
2774 .stats
2775 .inc_num_user_transactions(author);
2776 }
2777
2778 if !initial_reconfig_state.should_accept_consensus_certs() {
2779 match &parsed.transaction.kind {
2782 ConsensusTransactionKind::UserTransactionV2(_)
2783 | ConsensusTransactionKind::UserTransaction(_)
2785 | ConsensusTransactionKind::CertifiedTransaction(_)
2786 | ConsensusTransactionKind::CapabilityNotification(_)
2787 | ConsensusTransactionKind::CapabilityNotificationV2(_)
2788 | ConsensusTransactionKind::EndOfPublish(_)
2789 | ConsensusTransactionKind::ExecutionTimeObservation(_)
2791 | ConsensusTransactionKind::NewJWKFetched(_, _, _) => {
2792 debug!(
2793 "Ignoring consensus transaction {:?} because of end of epoch",
2794 parsed.transaction.key()
2795 );
2796 continue;
2797 }
2798
2799 ConsensusTransactionKind::CheckpointSignature(_)
2801 | ConsensusTransactionKind::CheckpointSignatureV2(_)
2802 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2803 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
2804 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => ()
2805 }
2806 }
2807
2808 if !initial_reconfig_state.should_accept_tx() {
2809 match &parsed.transaction.kind {
2810 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
2811 | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue,
2812 _ => {}
2813 }
2814 }
2815
2816 match &parsed.transaction.kind {
2818 ConsensusTransactionKind::CapabilityNotification(_)
2819 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2820 | ConsensusTransactionKind::CheckpointSignature(_) => {
2821 debug_fatal!(
2822 "BUG: saw deprecated tx {:?}for commit round {}",
2823 parsed.transaction.key(),
2824 commit_info.round
2825 );
2826 continue;
2827 }
2828 _ => {}
2829 }
2830
2831 if parsed.transaction.is_user_transaction() {
2832 let author_name = self
2833 .epoch_store
2834 .committee()
2835 .authority_by_index(author as u32)
2836 .unwrap();
2837 if self
2838 .epoch_store
2839 .has_received_end_of_publish_from(author_name)
2840 {
2841 warn!(
2845 "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus",
2846 author_name.concise(),
2847 parsed.transaction.key(),
2848 );
2849 continue;
2850 }
2851 }
2852
2853 if let ConsensusTransactionKind::UserTransactionV2(tx_with_claims) =
2859 &parsed.transaction.kind
2860 {
2861 let immutable_object_ids: HashSet<ObjectID> =
2862 tx_with_claims.get_immutable_objects().into_iter().collect();
2863 let tx = tx_with_claims.tx();
2864
2865 let Ok(input_objects) = tx.transaction_data().input_objects() else {
2866 debug_fatal!("Invalid input objects for transaction {}", tx.digest());
2867 continue;
2868 };
2869
2870 let owned_object_refs: Vec<_> = input_objects
2873 .iter()
2874 .filter_map(|obj| match obj {
2875 InputObjectKind::ImmOrOwnedMoveObject(obj_ref)
2876 if !immutable_object_ids.contains(&obj_ref.0) =>
2877 {
2878 Some(*obj_ref)
2879 }
2880 _ => None,
2881 })
2882 .collect();
2883
2884 match self
2885 .epoch_store
2886 .try_acquire_owned_object_locks_post_consensus(
2887 &owned_object_refs,
2888 *tx.digest(),
2889 &owned_object_locks,
2890 ) {
2891 Ok(new_locks) => {
2892 owned_object_locks.extend(new_locks.into_iter());
2893 self.epoch_store
2895 .set_consensus_tx_status(position, ConsensusTxStatus::Finalized);
2896 num_finalized_user_transactions[author] += 1;
2897 }
2898 Err(e) => {
2899 debug!("Dropping transaction {}: {}", tx.digest(), e);
2900 self.epoch_store
2901 .set_consensus_tx_status(position, ConsensusTxStatus::Dropped);
2902 self.epoch_store.set_rejection_vote_reason(position, &e);
2903 continue;
2904 }
2905 }
2906 }
2907
2908 let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);
2909 transactions.push((transaction, author as u32));
2910 }
2911 }
2912
2913 for (i, authority) in self.committee.authorities() {
2914 let hostname = &authority.hostname;
2915 self.metrics
2916 .consensus_committed_messages
2917 .with_label_values(&[hostname])
2918 .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
2919 self.metrics
2920 .consensus_committed_user_transactions
2921 .with_label_values(&[hostname])
2922 .set(
2923 self.last_consensus_stats
2924 .stats
2925 .get_num_user_transactions(i.value()) as i64,
2926 );
2927 self.metrics
2928 .consensus_finalized_user_transactions
2929 .with_label_values(&[hostname])
2930 .add(num_finalized_user_transactions[i.value()] as i64);
2931 self.metrics
2932 .consensus_rejected_user_transactions
2933 .with_label_values(&[hostname])
2934 .add(num_rejected_user_transactions[i.value()] as i64);
2935 }
2936
2937 FilteredConsensusOutput {
2938 transactions,
2939 owned_object_locks,
2940 }
2941 }
2942
2943 fn deduplicate_consensus_txns(
2944 &mut self,
2945 state: &mut CommitHandlerState,
2946 commit_info: &ConsensusCommitInfo,
2947 transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
2948 ) -> Vec<VerifiedSequencedConsensusTransaction> {
2949 let mut all_transactions = Vec::new();
2950
2951 let mut occurrence_counts: HashMap<SequencedConsensusTransactionKey, u32> = HashMap::new();
2954 let mut first_commit_keys: HashSet<SequencedConsensusTransactionKey> = HashSet::new();
2956
2957 for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
2958 let current_tx_index = ExecutionIndices {
2962 last_committed_round: commit_info.round,
2963 sub_dag_index: commit_info.consensus_commit_ref.index.into(),
2964 transaction_index: (seq + 1) as u64,
2965 };
2966
2967 self.last_consensus_stats.index = current_tx_index;
2968
2969 let certificate_author = *self
2970 .epoch_store
2971 .committee()
2972 .authority_by_index(cert_origin)
2973 .unwrap();
2974
2975 let sequenced_transaction = SequencedConsensusTransaction {
2976 certificate_author_index: cert_origin,
2977 certificate_author,
2978 consensus_index: current_tx_index,
2979 transaction,
2980 };
2981
2982 let Some(verified_transaction) = self
2983 .epoch_store
2984 .verify_consensus_transaction(sequenced_transaction)
2985 else {
2986 continue;
2987 };
2988
2989 let key = verified_transaction.0.key();
2990
2991 if let Some(tx_digest) = key.user_transaction_digest() {
2992 self.epoch_store
2993 .cache_recently_finalized_transaction(tx_digest);
2994 }
2995
2996 let count = occurrence_counts.entry(key.clone()).or_insert(0);
2999 *count += 1;
3000 let in_commit = *count > 1;
3001
3002 let in_cache = self.processed_cache.put(key.clone(), ()).is_some();
3003 if in_commit || in_cache {
3004 self.metrics.skipped_consensus_txns_cache_hit.inc();
3005 continue;
3006 }
3007 if self
3008 .epoch_store
3009 .is_consensus_message_processed(&key)
3010 .expect("db error")
3011 {
3012 self.metrics.skipped_consensus_txns.inc();
3013 continue;
3014 }
3015
3016 first_commit_keys.insert(key.clone());
3017
3018 state.output.record_consensus_message_processed(key);
3019
3020 all_transactions.push(verified_transaction);
3021 }
3022
3023 for key in first_commit_keys {
3024 if let Some(&count) = occurrence_counts.get(&key)
3025 && count > 1
3026 {
3027 self.metrics
3028 .consensus_handler_duplicate_tx_count
3029 .observe(count as f64);
3030 }
3031 }
3032
3033 assert!(
3035 state.occurrence_counts.is_empty(),
3036 "occurrence_counts should be empty before populating"
3037 );
3038 state.occurrence_counts.reserve(occurrence_counts.len());
3039 state.occurrence_counts.extend(
3040 occurrence_counts
3041 .into_iter()
3042 .filter_map(|(key, count)| key.user_transaction_digest().map(|d| (d, count))),
3043 );
3044
3045 all_transactions
3046 }
3047
3048 fn build_commit_handler_input(
3049 &self,
3050 transactions: Vec<VerifiedSequencedConsensusTransaction>,
3051 ) -> CommitHandlerInput {
3052 let epoch = self.epoch_store.epoch();
3053 let mut commit_handler_input = CommitHandlerInput::default();
3054
3055 for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() {
3056 match transaction.transaction {
3057 SequencedConsensusTransactionKind::External(consensus_transaction) => {
3058 match consensus_transaction.kind {
3059 ConsensusTransactionKind::UserTransactionV2(tx) => {
3061 let used_alias_versions = if self
3063 .epoch_store
3064 .protocol_config()
3065 .fix_checkpoint_signature_mapping()
3066 {
3067 tx.aliases()
3068 } else {
3069 tx.aliases_v1().map(|a| {
3073 NonEmpty::from_vec(
3074 a.into_iter()
3075 .enumerate()
3076 .map(|(idx, (_, seq))| (idx as u8, seq))
3077 .collect(),
3078 )
3079 .unwrap()
3080 })
3081 };
3082 let inner_tx = tx.into_tx();
3083 let tx = VerifiedTransaction::new_unchecked(inner_tx);
3085 let transaction =
3087 VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
3088 if let Some(used_alias_versions) = used_alias_versions {
3089 commit_handler_input
3090 .user_transactions
3091 .push(WithAliases::new(transaction, used_alias_versions));
3092 } else {
3093 commit_handler_input.user_transactions.push(
3094 VerifiedExecutableTransactionWithAliases::no_aliases(
3095 transaction,
3096 ),
3097 );
3098 }
3099 }
3100
3101 ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => {
3103 commit_handler_input
3104 .end_of_publish_transactions
3105 .push(authority_public_key_bytes);
3106 }
3107 ConsensusTransactionKind::NewJWKFetched(
3108 authority_public_key_bytes,
3109 jwk_id,
3110 jwk,
3111 ) => {
3112 commit_handler_input.new_jwks.push((
3113 authority_public_key_bytes,
3114 jwk_id,
3115 jwk,
3116 ));
3117 }
3118 ConsensusTransactionKind::RandomnessDkgMessage(
3119 authority_public_key_bytes,
3120 items,
3121 ) => {
3122 commit_handler_input
3123 .randomness_dkg_messages
3124 .push((authority_public_key_bytes, items));
3125 }
3126 ConsensusTransactionKind::RandomnessDkgConfirmation(
3127 authority_public_key_bytes,
3128 items,
3129 ) => {
3130 commit_handler_input
3131 .randomness_dkg_confirmations
3132 .push((authority_public_key_bytes, items));
3133 }
3134 ConsensusTransactionKind::CapabilityNotificationV2(
3135 authority_capabilities_v2,
3136 ) => {
3137 commit_handler_input
3138 .capability_notifications
3139 .push(authority_capabilities_v2);
3140 }
3141 ConsensusTransactionKind::ExecutionTimeObservation(
3142 execution_time_observation,
3143 ) => {
3144 commit_handler_input
3145 .execution_time_observations
3146 .push(execution_time_observation);
3147 }
3148 ConsensusTransactionKind::CheckpointSignatureV2(
3149 checkpoint_signature_message,
3150 ) => {
3151 commit_handler_input
3152 .checkpoint_signature_messages
3153 .push(*checkpoint_signature_message);
3154 }
3155
3156 ConsensusTransactionKind::CheckpointSignature(_)
3159 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
3160 | ConsensusTransactionKind::CapabilityNotification(_)
3161 | ConsensusTransactionKind::CertifiedTransaction(_)
3162 | ConsensusTransactionKind::UserTransaction(_) => {
3163 unreachable!("filtered earlier")
3164 }
3165 }
3166 }
3167 SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(),
3169 }
3170 }
3171
3172 commit_handler_input
3173 }
3174
3175 async fn send_end_of_publish_if_needed(&self) {
3176 if self
3177 .epoch_store
3178 .protocol_config()
3179 .timestamp_based_epoch_close()
3180 {
3181 return;
3182 }
3183 if !self.epoch_store.should_send_end_of_publish() {
3184 return;
3185 }
3186
3187 let end_of_publish = ConsensusTransaction::new_end_of_publish(self.epoch_store.name);
3188 if let Err(err) =
3189 self.consensus_adapter
3190 .submit(end_of_publish, None, &self.epoch_store, None, None)
3191 {
3192 warn!(
3193 "Error when sending EndOfPublish message from ConsensusHandler: {:?}",
3194 err
3195 );
3196 } else {
3197 info!(epoch=?self.epoch_store.epoch(), "Sending EndOfPublish message to consensus");
3198 }
3199 }
3200}
3201
3202pub(crate) type SchedulerMessage = (
3205 Vec<(Schedulable, AssignedVersions)>,
3206 Option<SettlementBatchInfo>,
3207);
3208
3209#[derive(Clone)]
3210pub(crate) struct ExecutionSchedulerSender {
3211 sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3212}
3213
3214impl ExecutionSchedulerSender {
3215 fn start(
3216 settlement_scheduler: SettlementScheduler,
3217 epoch_store: Arc<AuthorityPerEpochStore>,
3218 ) -> Self {
3219 let (sender, recv) = monitored_mpsc::unbounded_channel("execution_scheduler_sender");
3220 spawn_monitored_task!(Self::run(recv, settlement_scheduler, epoch_store));
3221 Self { sender }
3222 }
3223
3224 pub(crate) fn new_for_testing(
3225 sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3226 ) -> Self {
3227 Self { sender }
3228 }
3229
3230 fn send(
3231 &self,
3232 transactions: Vec<(Schedulable, AssignedVersions)>,
3233 settlement: Option<SettlementBatchInfo>,
3234 ) {
3235 let _ = self.sender.send((transactions, settlement));
3236 }
3237
3238 async fn run(
3239 mut recv: monitored_mpsc::UnboundedReceiver<SchedulerMessage>,
3240 settlement_scheduler: SettlementScheduler,
3241 epoch_store: Arc<AuthorityPerEpochStore>,
3242 ) {
3243 while let Some((transactions, settlement)) = recv.recv().await {
3244 let _guard = monitored_scope("ConsensusHandler::enqueue");
3245 let txns = transactions
3246 .into_iter()
3247 .map(|(txn, versions)| (txn, ExecutionEnv::new().with_assigned_versions(versions)))
3248 .collect();
3249 if let Some(settlement) = settlement {
3250 settlement_scheduler.enqueue_v2(txns, settlement, &epoch_store);
3251 } else {
3252 settlement_scheduler.enqueue(txns, &epoch_store);
3253 }
3254 }
3255 }
3256}
3257
3258pub(crate) struct MysticetiConsensusHandler {
3260 tasks: JoinSet<()>,
3261}
3262
3263impl MysticetiConsensusHandler {
3264 pub(crate) fn new(
3265 last_processed_commit_at_startup: CommitIndex,
3266 mut consensus_handler: ConsensusHandler<CheckpointService>,
3267 mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
3268 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
3269 node_role: NodeRole,
3270 ) -> Self {
3271 debug!(
3272 last_processed_commit_at_startup,
3273 "Starting consensus replay"
3274 );
3275 let mut tasks = JoinSet::new();
3276 tasks.spawn(monitored_future!(async move {
3277 while let Some(consensus_commit) = commit_receiver.recv().await {
3279 let commit_index = consensus_commit.commit_ref.index;
3280 if !node_role.process_consensus_commits() {
3281 debug!(
3282 commit_index,
3283 "Observer skipping consensus commit processing"
3284 );
3285 } else if commit_index <= last_processed_commit_at_startup {
3286 consensus_handler.handle_prior_consensus_commit(consensus_commit);
3287 } else {
3288 consensus_handler
3289 .handle_consensus_commit(consensus_commit)
3290 .await;
3291 }
3292 commit_consumer_monitor.set_highest_handled_commit(commit_index);
3293 }
3294 }));
3295 Self { tasks }
3296 }
3297
3298 pub(crate) async fn abort(&mut self) {
3299 self.tasks.shutdown().await;
3300 }
3301}
3302
3303fn authenticator_state_update_transaction(
3304 epoch_store: &AuthorityPerEpochStore,
3305 round: u64,
3306 mut new_active_jwks: Vec<ActiveJwk>,
3307) -> VerifiedExecutableTransaction {
3308 let epoch = epoch_store.epoch();
3309 new_active_jwks.sort();
3310
3311 info!("creating authenticator state update transaction");
3312 assert!(epoch_store.authenticator_state_enabled());
3313 let transaction = VerifiedTransaction::new_authenticator_state_update(
3314 epoch,
3315 round,
3316 new_active_jwks,
3317 epoch_store
3318 .epoch_start_config()
3319 .authenticator_obj_initial_shared_version()
3320 .expect("authenticator state obj must exist"),
3321 );
3322 VerifiedExecutableTransaction::new_system(transaction, epoch)
3323}
3324
3325pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
3326 match &transaction.kind {
3327 ConsensusTransactionKind::CertifiedTransaction(_) => "_deprecated_certificate",
3329 ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
3330 ConsensusTransactionKind::CheckpointSignatureV2(_) => "checkpoint_signature",
3331 ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
3332 ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
3333 ConsensusTransactionKind::CapabilityNotificationV2(_) => "capability_notification_v2",
3334 ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
3335 ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
3336 ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
3337 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
3338 ConsensusTransactionKind::UserTransaction(_) => "_deprecated_user_transaction",
3339 ConsensusTransactionKind::UserTransactionV2(tx) => {
3340 if tx.tx().is_consensus_tx() {
3341 "shared_user_transaction_v2"
3342 } else {
3343 "owned_user_transaction_v2"
3344 }
3345 }
3346 ConsensusTransactionKind::ExecutionTimeObservation(_) => "execution_time_observation",
3347 }
3348}
3349
3350#[derive(Debug, Clone, Serialize, Deserialize)]
3351pub struct SequencedConsensusTransaction {
3352 pub certificate_author_index: AuthorityIndex,
3353 pub certificate_author: AuthorityName,
3354 pub consensus_index: ExecutionIndices,
3355 pub transaction: SequencedConsensusTransactionKind,
3356}
3357
3358#[derive(Debug, Clone)]
3359#[allow(clippy::large_enum_variant)]
3360pub enum SequencedConsensusTransactionKind {
3361 External(ConsensusTransaction),
3362 System(VerifiedExecutableTransaction),
3363}
3364
3365impl Serialize for SequencedConsensusTransactionKind {
3366 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
3367 let serializable = SerializableSequencedConsensusTransactionKind::from(self);
3368 serializable.serialize(serializer)
3369 }
3370}
3371
3372impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
3373 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
3374 let serializable =
3375 SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
3376 Ok(serializable.into())
3377 }
3378}
3379
3380#[derive(Debug, Clone, Serialize, Deserialize)]
3384#[allow(clippy::large_enum_variant)]
3385enum SerializableSequencedConsensusTransactionKind {
3386 External(ConsensusTransaction),
3387 System(TrustedExecutableTransaction),
3388}
3389
3390impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
3391 fn from(kind: &SequencedConsensusTransactionKind) -> Self {
3392 match kind {
3393 SequencedConsensusTransactionKind::External(ext) => {
3394 SerializableSequencedConsensusTransactionKind::External(ext.clone())
3395 }
3396 SequencedConsensusTransactionKind::System(txn) => {
3397 SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
3398 }
3399 }
3400 }
3401}
3402
3403impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
3404 fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
3405 match kind {
3406 SerializableSequencedConsensusTransactionKind::External(ext) => {
3407 SequencedConsensusTransactionKind::External(ext)
3408 }
3409 SerializableSequencedConsensusTransactionKind::System(txn) => {
3410 SequencedConsensusTransactionKind::System(txn.into())
3411 }
3412 }
3413 }
3414}
3415
3416#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
3417pub enum SequencedConsensusTransactionKey {
3418 External(ConsensusTransactionKey),
3419 System(TransactionDigest),
3420}
3421
3422impl SequencedConsensusTransactionKey {
3423 pub fn user_transaction_digest(&self) -> Option<TransactionDigest> {
3424 match self {
3425 SequencedConsensusTransactionKey::External(key) => match key {
3426 ConsensusTransactionKey::Certificate(digest) => Some(*digest),
3427 _ => None,
3428 },
3429 SequencedConsensusTransactionKey::System(_) => None,
3430 }
3431 }
3432}
3433
3434impl SequencedConsensusTransactionKind {
3435 pub fn key(&self) -> SequencedConsensusTransactionKey {
3436 match self {
3437 SequencedConsensusTransactionKind::External(ext) => {
3438 SequencedConsensusTransactionKey::External(ext.key())
3439 }
3440 SequencedConsensusTransactionKind::System(txn) => {
3441 SequencedConsensusTransactionKey::System(*txn.digest())
3442 }
3443 }
3444 }
3445
3446 pub fn get_tracking_id(&self) -> u64 {
3447 match self {
3448 SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
3449 SequencedConsensusTransactionKind::System(_txn) => 0,
3450 }
3451 }
3452
3453 pub fn is_executable_transaction(&self) -> bool {
3454 match self {
3455 SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(),
3456 SequencedConsensusTransactionKind::System(_) => true,
3457 }
3458 }
3459
3460 pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
3461 match self {
3462 SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
3463 ConsensusTransactionKind::UserTransactionV2(txn) => Some(*txn.tx().digest()),
3464 _ => None,
3465 },
3466 SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
3467 }
3468 }
3469
3470 pub fn is_end_of_publish(&self) -> bool {
3471 match self {
3472 SequencedConsensusTransactionKind::External(ext) => {
3473 matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
3474 }
3475 SequencedConsensusTransactionKind::System(_) => false,
3476 }
3477 }
3478}
3479
3480impl SequencedConsensusTransaction {
3481 pub fn sender_authority(&self) -> AuthorityName {
3482 self.certificate_author
3483 }
3484
3485 pub fn key(&self) -> SequencedConsensusTransactionKey {
3486 self.transaction.key()
3487 }
3488
3489 pub fn is_end_of_publish(&self) -> bool {
3490 if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
3491 matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
3492 } else {
3493 false
3494 }
3495 }
3496
3497 pub fn try_take_execution_time_observation(&mut self) -> Option<ExecutionTimeObservation> {
3498 if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
3499 kind: ConsensusTransactionKind::ExecutionTimeObservation(observation),
3500 ..
3501 }) = &mut self.transaction
3502 {
3503 Some(std::mem::take(observation))
3504 } else {
3505 None
3506 }
3507 }
3508
3509 pub fn is_system(&self) -> bool {
3510 matches!(
3511 self.transaction,
3512 SequencedConsensusTransactionKind::System(_)
3513 )
3514 }
3515
3516 pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
3517 if !randomness_state_enabled {
3518 return false;
3521 }
3522 match &self.transaction {
3523 SequencedConsensusTransactionKind::External(ConsensusTransaction {
3524 kind: ConsensusTransactionKind::UserTransactionV2(txn),
3525 ..
3526 }) => txn.tx().transaction_data().uses_randomness(),
3527 _ => false,
3528 }
3529 }
3530
3531 pub fn as_consensus_txn(&self) -> Option<&SenderSignedData> {
3532 match &self.transaction {
3533 SequencedConsensusTransactionKind::External(ConsensusTransaction {
3534 kind: ConsensusTransactionKind::UserTransactionV2(txn),
3535 ..
3536 }) if txn.tx().is_consensus_tx() => Some(txn.tx().data()),
3537 SequencedConsensusTransactionKind::System(txn) if txn.is_consensus_tx() => {
3538 Some(txn.data())
3539 }
3540 _ => None,
3541 }
3542 }
3543}
3544
3545#[derive(Debug, Clone, Serialize, Deserialize)]
3546pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
3547
3548#[cfg(test)]
3549impl VerifiedSequencedConsensusTransaction {
3550 pub fn new_test(transaction: ConsensusTransaction) -> Self {
3551 Self(SequencedConsensusTransaction::new_test(transaction))
3552 }
3553}
3554
3555impl SequencedConsensusTransaction {
3556 pub fn new_test(transaction: ConsensusTransaction) -> Self {
3557 Self {
3558 certificate_author_index: 0,
3559 certificate_author: AuthorityName::ZERO,
3560 consensus_index: Default::default(),
3561 transaction: SequencedConsensusTransactionKind::External(transaction),
3562 }
3563 }
3564}
3565
3566#[derive(Serialize, Deserialize)]
3567pub(crate) struct CommitIntervalObserver {
3568 ring_buffer: VecDeque<u64>,
3569}
3570
3571impl CommitIntervalObserver {
3572 pub fn new(window_size: u32) -> Self {
3573 Self {
3574 ring_buffer: VecDeque::with_capacity(window_size as usize),
3575 }
3576 }
3577
3578 pub fn observe_commit_time(&mut self, consensus_commit: &impl ConsensusCommitAPI) {
3579 let commit_time = consensus_commit.commit_timestamp_ms();
3580 if self.ring_buffer.len() == self.ring_buffer.capacity() {
3581 self.ring_buffer.pop_front();
3582 }
3583 self.ring_buffer.push_back(commit_time);
3584 }
3585
3586 pub fn commit_interval_estimate(&self) -> Option<Duration> {
3587 if self.ring_buffer.len() <= 1 {
3588 None
3589 } else {
3590 let first = self.ring_buffer.front().unwrap();
3591 let last = self.ring_buffer.back().unwrap();
3592 let duration = last.saturating_sub(*first);
3593 let num_commits = self.ring_buffer.len() as u64;
3594 Some(Duration::from_millis(duration.div_ceil(num_commits)))
3595 }
3596 }
3597}
3598
3599#[cfg(test)]
3600mod tests {
3601 use std::collections::HashSet;
3602
3603 use consensus_core::{
3604 BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock,
3605 };
3606 use futures::pin_mut;
3607 use prometheus::Registry;
3608 use sui_protocol_config::{ConsensusTransactionOrdering, ProtocolConfig};
3609 use sui_types::{
3610 base_types::ExecutionDigests,
3611 base_types::{AuthorityName, FullObjectRef, ObjectID, SuiAddress, random_object_ref},
3612 committee::Committee,
3613 crypto::deterministic_random_account_key,
3614 gas::GasCostSummary,
3615 message_envelope::Message,
3616 messages_checkpoint::{
3617 CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
3618 SignedCheckpointSummary,
3619 },
3620 messages_consensus::ConsensusTransaction,
3621 object::Object,
3622 transaction::{
3623 CertifiedTransaction, TransactionData, TransactionDataAPI, VerifiedCertificate,
3624 },
3625 };
3626
3627 use super::*;
3628 use crate::{
3629 authority::{
3630 authority_per_epoch_store::ConsensusStatsAPI,
3631 test_authority_builder::TestAuthorityBuilder,
3632 },
3633 checkpoints::CheckpointServiceNoop,
3634 consensus_adapter::consensus_tests::test_user_transaction,
3635 consensus_test_utils::make_consensus_adapter_for_test,
3636 post_consensus_tx_reorder::PostConsensusTxReorder,
3637 };
3638
3639 #[tokio::test(flavor = "current_thread", start_paused = true)]
3640 async fn test_consensus_commit_handler() {
3641 telemetry_subscribers::init_for_testing();
3642
3643 let (sender, keypair) = deterministic_random_account_key();
3646 let gas_objects: Vec<Object> = (0..12)
3648 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3649 .collect();
3650 let owned_objects: Vec<Object> = (0..4)
3652 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3653 .collect();
3654 let shared_objects: Vec<Object> = (0..6)
3656 .map(|_| Object::shared_for_testing())
3657 .collect::<Vec<_>>();
3658 let mut all_objects = gas_objects.clone();
3659 all_objects.extend(owned_objects.clone());
3660 all_objects.extend(shared_objects.clone());
3661
3662 let network_config =
3663 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3664 .with_objects(all_objects.clone())
3665 .build();
3666
3667 let state = TestAuthorityBuilder::new()
3668 .with_network_config(&network_config, 0)
3669 .build()
3670 .await;
3671
3672 let epoch_store = state.epoch_store_for_testing().clone();
3673 let new_epoch_start_state = epoch_store.epoch_start_state();
3674 let consensus_committee = new_epoch_start_state.get_consensus_committee();
3675
3676 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3677
3678 let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
3679
3680 let backpressure_manager = BackpressureManager::new_for_tests();
3681 let consensus_adapter =
3682 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3683 let settlement_scheduler = SettlementScheduler::new(
3684 state.execution_scheduler().as_ref().clone(),
3685 state.get_transaction_cache_reader().clone(),
3686 state.metrics.clone(),
3687 );
3688 let mut consensus_handler = ConsensusHandler::new(
3689 epoch_store,
3690 Arc::new(CheckpointServiceNoop {}),
3691 settlement_scheduler,
3692 consensus_adapter,
3693 state.get_object_cache_reader().clone(),
3694 consensus_committee.clone(),
3695 metrics,
3696 Arc::new(throughput_calculator),
3697 backpressure_manager.subscribe(),
3698 state.traffic_controller.clone(),
3699 None,
3700 state.consensus_gasless_counter.clone(),
3701 );
3702
3703 let mut user_transactions = vec![];
3705 for (i, gas_object) in gas_objects[0..8].iter().enumerate() {
3706 let input_object = if i % 2 == 0 {
3707 owned_objects.get(i / 2).unwrap().clone()
3708 } else {
3709 shared_objects.get(i / 2).unwrap().clone()
3710 };
3711 let transaction = test_user_transaction(
3712 &state,
3713 sender,
3714 &keypair,
3715 gas_object.clone(),
3716 vec![input_object],
3717 )
3718 .await;
3719 user_transactions.push(transaction);
3720 }
3721
3722 for (i, gas_object) in gas_objects[8..12].iter().enumerate() {
3725 let shared_object = if i < 2 {
3726 shared_objects[4].clone()
3727 } else {
3728 shared_objects[5].clone()
3729 };
3730 let transaction = test_user_transaction(
3731 &state,
3732 sender,
3733 &keypair,
3734 gas_object.clone(),
3735 vec![shared_object],
3736 )
3737 .await;
3738 user_transactions.push(transaction);
3739 }
3740
3741 let mut blocks = Vec::new();
3743 for (i, consensus_transaction) in user_transactions
3744 .iter()
3745 .cloned()
3746 .map(|t| ConsensusTransaction::new_user_transaction_v2_message(&state.name, t.into()))
3747 .enumerate()
3748 {
3749 let transaction_bytes = bcs::to_bytes(&consensus_transaction).unwrap();
3750 let block = VerifiedBlock::new_for_test(
3751 TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
3752 .set_transactions(vec![Transaction::new(transaction_bytes)])
3753 .build(),
3754 );
3755
3756 blocks.push(block);
3757 }
3758
3759 let leader_block = blocks[0].clone();
3761 let committed_sub_dag = CommittedSubDag::new(
3762 leader_block.reference(),
3763 blocks.clone(),
3764 leader_block.timestamp_ms(),
3765 CommitRef::new(10, CommitDigest::MIN),
3766 );
3767
3768 backpressure_manager.set_backpressure(true);
3770 backpressure_manager.update_highest_certified_checkpoint(1);
3772
3773 {
3775 let waiter = consensus_handler.handle_consensus_commit(committed_sub_dag.clone());
3776 pin_mut!(waiter);
3777
3778 tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
3780 .await
3781 .unwrap_err();
3782
3783 backpressure_manager.set_backpressure(false);
3785
3786 tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
3788 .await
3789 .unwrap();
3790 }
3791
3792 let num_blocks = blocks.len();
3794 let num_transactions = user_transactions.len();
3795 let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
3796 assert_eq!(
3797 last_consensus_stats_1.index.transaction_index,
3798 num_transactions as u64
3799 );
3800 assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
3801 assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
3802 assert_eq!(
3803 last_consensus_stats_1.stats.get_num_messages(0),
3804 num_blocks as u64
3805 );
3806 assert_eq!(
3807 last_consensus_stats_1.stats.get_num_user_transactions(0),
3808 num_transactions as u64
3809 );
3810
3811 for (i, t) in user_transactions.iter().enumerate() {
3813 let digest = t.tx().digest();
3814 if tokio::time::timeout(
3815 std::time::Duration::from_secs(10),
3816 state.notify_read_effects_for_testing("", *digest),
3817 )
3818 .await
3819 .is_ok()
3820 {
3821 } else {
3823 panic!("User transaction {} {} did not execute", i, digest);
3824 }
3825 }
3826
3827 state.execution_scheduler().check_empty_for_testing().await;
3829 }
3830
3831 fn to_short_strings(txs: Vec<VerifiedExecutableTransactionWithAliases>) -> Vec<String> {
3832 txs.into_iter()
3833 .map(|tx| format!("transaction({})", tx.tx().transaction_data().gas_price()))
3834 .collect()
3835 }
3836
3837 #[test]
3838 fn test_order_by_gas_price() {
3839 let mut v = vec![user_txn(42), user_txn(100)];
3840 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3841 assert_eq!(
3842 to_short_strings(v),
3843 vec![
3844 "transaction(100)".to_string(),
3845 "transaction(42)".to_string(),
3846 ]
3847 );
3848
3849 let mut v = vec![
3850 user_txn(1200),
3851 user_txn(12),
3852 user_txn(1000),
3853 user_txn(42),
3854 user_txn(100),
3855 user_txn(1000),
3856 ];
3857 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3858 assert_eq!(
3859 to_short_strings(v),
3860 vec![
3861 "transaction(1200)".to_string(),
3862 "transaction(1000)".to_string(),
3863 "transaction(1000)".to_string(),
3864 "transaction(100)".to_string(),
3865 "transaction(42)".to_string(),
3866 "transaction(12)".to_string(),
3867 ]
3868 );
3869 }
3870
3871 #[tokio::test(flavor = "current_thread")]
3872 async fn test_checkpoint_signature_dedup() {
3873 telemetry_subscribers::init_for_testing();
3874
3875 let network_config =
3876 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3877 let state = TestAuthorityBuilder::new()
3878 .with_network_config(&network_config, 0)
3879 .build()
3880 .await;
3881
3882 let epoch_store = state.epoch_store_for_testing().clone();
3883 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3884
3885 let make_signed = || {
3886 let epoch = epoch_store.epoch();
3887 let contents =
3888 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3889 let summary = CheckpointSummary::new(
3890 &ProtocolConfig::get_for_max_version_UNSAFE(),
3891 epoch,
3892 42, 10, &contents,
3895 None, GasCostSummary::default(),
3897 None, 0, Vec::new(), Vec::new(), );
3902 SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name)
3903 };
3904
3905 let v2_s1 = make_signed();
3907 let v2_s1_clone = v2_s1.clone();
3908 let v2_digest_a = v2_s1.data().digest();
3909 let v2_a =
3910 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3911 summary: v2_s1,
3912 });
3913
3914 let v2_s2 = make_signed();
3915 let v2_digest_b = v2_s2.data().digest();
3916 let v2_b =
3917 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3918 summary: v2_s2,
3919 });
3920
3921 assert_ne!(v2_digest_a, v2_digest_b);
3922
3923 assert_eq!(v2_s1_clone.data().digest(), v2_digest_a);
3925 let v2_dup =
3926 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3927 summary: v2_s1_clone,
3928 });
3929
3930 let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3931 let block = VerifiedBlock::new_for_test(
3932 TestBlock::new(100, 0)
3933 .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)])
3934 .build(),
3935 );
3936 let commit = CommittedSubDag::new(
3937 block.reference(),
3938 vec![block.clone()],
3939 block.timestamp_ms(),
3940 CommitRef::new(10, CommitDigest::MIN),
3941 );
3942
3943 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3944 let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3945 let backpressure = BackpressureManager::new_for_tests();
3946 let consensus_adapter =
3947 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3948 let settlement_scheduler = SettlementScheduler::new(
3949 state.execution_scheduler().as_ref().clone(),
3950 state.get_transaction_cache_reader().clone(),
3951 state.metrics.clone(),
3952 );
3953 let mut handler = ConsensusHandler::new(
3954 epoch_store.clone(),
3955 Arc::new(CheckpointServiceNoop {}),
3956 settlement_scheduler,
3957 consensus_adapter,
3958 state.get_object_cache_reader().clone(),
3959 consensus_committee.clone(),
3960 metrics,
3961 Arc::new(throughput),
3962 backpressure.subscribe(),
3963 state.traffic_controller.clone(),
3964 None,
3965 state.consensus_gasless_counter.clone(),
3966 );
3967
3968 handler.handle_consensus_commit(commit).await;
3969
3970 use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3971 use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3972
3973 let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a));
3975 let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b));
3976 assert!(
3977 epoch_store
3978 .is_consensus_message_processed(&v2_key_a)
3979 .unwrap()
3980 );
3981 assert!(
3982 epoch_store
3983 .is_consensus_message_processed(&v2_key_b)
3984 .unwrap()
3985 );
3986 }
3987
3988 #[tokio::test(flavor = "current_thread")]
3989 async fn test_verify_consensus_transaction_filters_mismatched_authorities() {
3990 telemetry_subscribers::init_for_testing();
3991
3992 let network_config =
3993 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3994 let state = TestAuthorityBuilder::new()
3995 .with_network_config(&network_config, 0)
3996 .build()
3997 .await;
3998
3999 let epoch_store = state.epoch_store_for_testing().clone();
4000 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
4001
4002 use fastcrypto::traits::KeyPair;
4004 let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair();
4005 let wrong_authority: AuthorityName = wrong_keypair.public().into();
4006
4007 let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority);
4009
4010 let valid_eop = ConsensusTransaction::new_end_of_publish(state.name);
4012
4013 let epoch = epoch_store.epoch();
4015 let contents =
4016 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
4017 let summary = CheckpointSummary::new(
4018 &ProtocolConfig::get_for_max_version_UNSAFE(),
4019 epoch,
4020 42, 10, &contents,
4023 None, GasCostSummary::default(),
4025 None, 0, Vec::new(), Vec::new(), );
4030
4031 let mismatched_checkpoint_signed =
4033 SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority);
4034 let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest();
4035 let mismatched_checkpoint =
4036 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4037 summary: mismatched_checkpoint_signed,
4038 });
4039
4040 let valid_checkpoint_signed =
4042 SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name);
4043 let valid_checkpoint_digest = valid_checkpoint_signed.data().digest();
4044 let valid_checkpoint =
4045 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4046 summary: valid_checkpoint_signed,
4047 });
4048
4049 let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
4050
4051 let block = VerifiedBlock::new_for_test(
4053 TestBlock::new(100, 0)
4054 .set_transactions(vec![
4055 to_tx(&mismatched_eop),
4056 to_tx(&valid_eop),
4057 to_tx(&mismatched_checkpoint),
4058 to_tx(&valid_checkpoint),
4059 ])
4060 .build(),
4061 );
4062 let commit = CommittedSubDag::new(
4063 block.reference(),
4064 vec![block.clone()],
4065 block.timestamp_ms(),
4066 CommitRef::new(10, CommitDigest::MIN),
4067 );
4068
4069 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
4070 let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
4071 let backpressure = BackpressureManager::new_for_tests();
4072 let consensus_adapter =
4073 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
4074 let settlement_scheduler = SettlementScheduler::new(
4075 state.execution_scheduler().as_ref().clone(),
4076 state.get_transaction_cache_reader().clone(),
4077 state.metrics.clone(),
4078 );
4079 let mut handler = ConsensusHandler::new(
4080 epoch_store.clone(),
4081 Arc::new(CheckpointServiceNoop {}),
4082 settlement_scheduler,
4083 consensus_adapter,
4084 state.get_object_cache_reader().clone(),
4085 consensus_committee.clone(),
4086 metrics,
4087 Arc::new(throughput),
4088 backpressure.subscribe(),
4089 state.traffic_controller.clone(),
4090 None,
4091 state.consensus_gasless_counter.clone(),
4092 );
4093
4094 handler.handle_consensus_commit(commit).await;
4095
4096 use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
4097 use sui_types::messages_consensus::ConsensusTransactionKey as CK;
4098
4099 let valid_eop_key = SK::External(CK::EndOfPublish(state.name));
4101 assert!(
4102 epoch_store
4103 .is_consensus_message_processed(&valid_eop_key)
4104 .unwrap(),
4105 "Valid EndOfPublish should have been processed"
4106 );
4107
4108 let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4109 state.name,
4110 42,
4111 valid_checkpoint_digest,
4112 ));
4113 assert!(
4114 epoch_store
4115 .is_consensus_message_processed(&valid_checkpoint_key)
4116 .unwrap(),
4117 "Valid CheckpointSignature should have been processed"
4118 );
4119
4120 let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority));
4122 assert!(
4123 !epoch_store
4124 .is_consensus_message_processed(&mismatched_eop_key)
4125 .unwrap(),
4126 "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)"
4127 );
4128
4129 let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4130 wrong_authority,
4131 42,
4132 mismatched_checkpoint_digest,
4133 ));
4134 assert!(
4135 !epoch_store
4136 .is_consensus_message_processed(&mismatched_checkpoint_key)
4137 .unwrap(),
4138 "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)"
4139 );
4140 }
4141
4142 fn user_txn(gas_price: u64) -> VerifiedExecutableTransactionWithAliases {
4143 let (committee, keypairs) = Committee::new_simple_test_committee();
4144 let (sender, sender_keypair) = deterministic_random_account_key();
4145 let tx = sui_types::transaction::Transaction::from_data_and_signer(
4146 TransactionData::new_transfer(
4147 SuiAddress::default(),
4148 FullObjectRef::from_fastpath_ref(random_object_ref()),
4149 sender,
4150 random_object_ref(),
4151 1000 * gas_price,
4152 gas_price,
4153 ),
4154 vec![&sender_keypair],
4155 );
4156 let tx = VerifiedExecutableTransaction::new_from_certificate(
4157 VerifiedCertificate::new_unchecked(
4158 CertifiedTransaction::new_from_keypairs_for_testing(
4159 tx.into_data(),
4160 &keypairs,
4161 &committee,
4162 ),
4163 ),
4164 );
4165 VerifiedExecutableTransactionWithAliases::no_aliases(tx)
4166 }
4167
4168 mod checkpoint_queue_tests {
4169 use super::*;
4170 use consensus_core::CommitRef;
4171 use sui_types::digests::Digest;
4172
4173 fn make_chunk(tx_count: usize, height: u64) -> Chunk {
4174 Chunk {
4175 schedulables: (0..tx_count)
4176 .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4177 .collect(),
4178 settlement: None,
4179 height,
4180 }
4181 }
4182
4183 fn make_commit_ref(index: u32) -> CommitRef {
4184 CommitRef {
4185 index,
4186 digest: CommitDigest::MIN,
4187 }
4188 }
4189
4190 fn default_versions() -> HashMap<TransactionKey, AssignedVersions> {
4191 HashMap::new()
4192 }
4193
4194 #[test]
4195 fn test_flush_all_checkpoint_roots() {
4196 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4197 let versions = default_versions();
4198
4199 queue.push_chunk(
4200 make_chunk(5, 1),
4201 &versions,
4202 1000,
4203 make_commit_ref(1),
4204 Digest::default(),
4205 );
4206 queue.push_chunk(
4207 make_chunk(3, 2),
4208 &versions,
4209 1000,
4210 make_commit_ref(1),
4211 Digest::default(),
4212 );
4213
4214 let pending = queue.flush(1000, true);
4215
4216 assert!(pending.is_some());
4217 assert!(queue.pending_roots.is_empty());
4218 }
4219
4220 #[test]
4221 fn test_flush_respects_min_checkpoint_interval() {
4222 let min_interval = 200;
4223 let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, 1000, min_interval);
4224 let versions = default_versions();
4225
4226 queue.push_chunk(
4227 make_chunk(5, 1),
4228 &versions,
4229 1000,
4230 make_commit_ref(1),
4231 Digest::default(),
4232 );
4233
4234 let pending = queue.flush(1000 + min_interval - 1, false);
4235 assert!(pending.is_none());
4236 assert_eq!(queue.pending_roots.len(), 1);
4237
4238 let pending = queue.flush(1000 + min_interval, false);
4239 assert!(pending.is_some());
4240 assert!(queue.pending_roots.is_empty());
4241 }
4242
4243 #[test]
4244 fn test_push_chunk_flushes_when_exceeds_max() {
4245 let max_tx = 10;
4246 let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, max_tx, 0);
4247 let versions = default_versions();
4248
4249 queue.push_chunk(
4250 make_chunk(max_tx / 2 + 1, 1),
4251 &versions,
4252 1000,
4253 make_commit_ref(1),
4254 Digest::default(),
4255 );
4256
4257 let flushed = queue.push_chunk(
4258 make_chunk(max_tx / 2 + 1, 2),
4259 &versions,
4260 1000,
4261 make_commit_ref(2),
4262 Digest::default(),
4263 );
4264
4265 assert_eq!(flushed.len(), 1);
4266 assert_eq!(queue.pending_roots.len(), 1);
4267 }
4268
4269 #[test]
4270 fn test_multiple_chunks_merged_into_one_checkpoint() {
4271 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 200);
4272 let versions = default_versions();
4273
4274 queue.push_chunk(
4275 make_chunk(10, 1),
4276 &versions,
4277 1000,
4278 make_commit_ref(1),
4279 Digest::default(),
4280 );
4281 queue.push_chunk(
4282 make_chunk(10, 2),
4283 &versions,
4284 1000,
4285 make_commit_ref(2),
4286 Digest::default(),
4287 );
4288 queue.push_chunk(
4289 make_chunk(10, 3),
4290 &versions,
4291 1000,
4292 make_commit_ref(3),
4293 Digest::default(),
4294 );
4295
4296 let pending = queue.flush(1000, true).unwrap();
4297
4298 assert_eq!(pending.roots.len(), 3);
4299 }
4300
4301 #[test]
4302 fn test_push_chunk_handles_overflow() {
4303 let max_tx = 10;
4304 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4305 let versions = default_versions();
4306
4307 let flushed1 = queue.push_chunk(
4308 make_chunk(max_tx / 2, 1),
4309 &versions,
4310 1000,
4311 make_commit_ref(1),
4312 Digest::default(),
4313 );
4314 assert!(flushed1.is_empty());
4315
4316 let flushed2 = queue.push_chunk(
4317 make_chunk(max_tx / 2, 2),
4318 &versions,
4319 1000,
4320 make_commit_ref(2),
4321 Digest::default(),
4322 );
4323 assert!(flushed2.is_empty());
4324
4325 let flushed3 = queue.push_chunk(
4326 make_chunk(max_tx / 2, 3),
4327 &versions,
4328 1000,
4329 make_commit_ref(3),
4330 Digest::default(),
4331 );
4332 assert_eq!(flushed3.len(), 1);
4333
4334 let pending = queue.flush(1000, true);
4335
4336 for p in pending.iter().chain(flushed3.iter()) {
4337 let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4338 assert!(tx_count <= max_tx);
4339 }
4340 }
4341
4342 #[test]
4343 fn test_checkpoint_uses_last_chunk_height() {
4344 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4345 let versions = default_versions();
4346
4347 queue.push_chunk(
4348 make_chunk(10, 100),
4349 &versions,
4350 1000,
4351 make_commit_ref(1),
4352 Digest::default(),
4353 );
4354 queue.push_chunk(
4355 make_chunk(10, 200),
4356 &versions,
4357 1000,
4358 make_commit_ref(2),
4359 Digest::default(),
4360 );
4361
4362 let pending = queue.flush(1000, true).unwrap();
4363
4364 assert_eq!(pending.details.checkpoint_height, 200);
4365 }
4366
4367 #[test]
4368 fn test_last_built_timestamp_updated_on_flush() {
4369 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4370 let versions = default_versions();
4371
4372 queue.push_chunk(
4373 make_chunk(10, 1),
4374 &versions,
4375 5000,
4376 make_commit_ref(1),
4377 Digest::default(),
4378 );
4379
4380 assert_eq!(queue.last_built_timestamp, 0);
4381
4382 let _ = queue.flush(5000, true);
4383
4384 assert_eq!(queue.last_built_timestamp, 5000);
4385 }
4386
4387 #[test]
4388 fn test_settlement_info_sent_through_channel() {
4389 let mut queue = CheckpointQueue::new_for_testing(0, 0, 5, 1000, 0);
4390 let versions = default_versions();
4391
4392 let chunk1 = Chunk {
4393 schedulables: vec![
4394 Schedulable::ConsensusCommitPrologue(0, 1, 0),
4395 Schedulable::ConsensusCommitPrologue(0, 2, 0),
4396 Schedulable::ConsensusCommitPrologue(0, 3, 0),
4397 ],
4398 settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4399 height: 1,
4400 };
4401
4402 let chunk2 = Chunk {
4403 schedulables: vec![
4404 Schedulable::ConsensusCommitPrologue(0, 4, 0),
4405 Schedulable::ConsensusCommitPrologue(0, 5, 0),
4406 ],
4407 settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4408 height: 2,
4409 };
4410
4411 queue.push_chunk(
4412 chunk1,
4413 &versions,
4414 1000,
4415 make_commit_ref(1),
4416 Digest::default(),
4417 );
4418 queue.push_chunk(
4419 chunk2,
4420 &versions,
4421 1000,
4422 make_commit_ref(1),
4423 Digest::default(),
4424 );
4425 }
4426
4427 #[test]
4428 fn test_settlement_checkpoint_seq_correct_after_flush() {
4429 let max_tx = 10;
4430 let initial_seq = 5;
4431 let (sender, mut receiver) = monitored_mpsc::unbounded_channel("test_settlement_seq");
4432 let mut queue =
4433 CheckpointQueue::new_for_testing_with_sender(0, 0, initial_seq, max_tx, 0, sender);
4434 let versions = default_versions();
4435
4436 let chunk1 = Chunk {
4438 schedulables: (0..max_tx / 2 + 1)
4439 .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4440 .collect(),
4441 settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4442 height: 1,
4443 };
4444 queue.push_chunk(
4445 chunk1,
4446 &versions,
4447 1000,
4448 make_commit_ref(1),
4449 Digest::default(),
4450 );
4451
4452 let msg1 = receiver.try_recv().unwrap();
4454 let settlement1 = msg1.1.unwrap();
4455 assert_eq!(settlement1.checkpoint_seq, initial_seq);
4456
4457 let chunk2 = Chunk {
4459 schedulables: (0..max_tx / 2 + 1)
4460 .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4461 .collect(),
4462 settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4463 height: 2,
4464 };
4465 let flushed = queue.push_chunk(
4466 chunk2,
4467 &versions,
4468 1000,
4469 make_commit_ref(2),
4470 Digest::default(),
4471 );
4472 assert_eq!(flushed.len(), 1);
4473 assert_eq!(flushed[0].details.checkpoint_seq, Some(initial_seq));
4474
4475 let msg2 = receiver.try_recv().unwrap();
4478 let settlement2 = msg2.1.unwrap();
4479 assert_eq!(settlement2.checkpoint_seq, initial_seq + 1);
4480
4481 let pending = queue.flush_forced().unwrap();
4484 assert_eq!(
4485 pending.details.checkpoint_seq,
4486 Some(settlement2.checkpoint_seq)
4487 );
4488 }
4489
4490 #[test]
4491 fn test_checkpoint_seq_increments_on_flush() {
4492 let mut queue = CheckpointQueue::new_for_testing(0, 0, 10, 1000, 0);
4493 let versions = default_versions();
4494
4495 queue.push_chunk(
4496 make_chunk(5, 1),
4497 &versions,
4498 1000,
4499 make_commit_ref(1),
4500 Digest::default(),
4501 );
4502
4503 let pending = queue.flush(1000, true).unwrap();
4504
4505 assert_eq!(pending.details.checkpoint_seq, Some(10));
4506 assert_eq!(queue.current_checkpoint_seq, 11);
4507 }
4508
4509 #[test]
4510 fn test_multiple_chunks_with_overflow() {
4511 let max_tx = 10;
4512 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4513 let versions = default_versions();
4514
4515 let flushed1 = queue.push_chunk(
4516 make_chunk(max_tx / 2 + 1, 1),
4517 &versions,
4518 1000,
4519 make_commit_ref(1),
4520 Digest::default(),
4521 );
4522 let flushed2 = queue.push_chunk(
4523 make_chunk(max_tx / 2 + 1, 2),
4524 &versions,
4525 1000,
4526 make_commit_ref(1),
4527 Digest::default(),
4528 );
4529 let flushed3 = queue.push_chunk(
4530 make_chunk(max_tx / 2 + 1, 3),
4531 &versions,
4532 1000,
4533 make_commit_ref(1),
4534 Digest::default(),
4535 );
4536
4537 let all_flushed: Vec<_> = flushed1
4538 .into_iter()
4539 .chain(flushed2)
4540 .chain(flushed3)
4541 .collect();
4542 assert_eq!(all_flushed.len(), 2);
4543 assert_eq!(queue.pending_roots.len(), 1);
4544
4545 for p in &all_flushed {
4546 let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4547 assert!(tx_count <= max_tx);
4548 }
4549 }
4550 }
4551}