1use std::{
5 collections::{BTreeMap, HashMap, HashSet, VecDeque},
6 hash::Hash,
7 num::NonZeroUsize,
8 sync::{Arc, Mutex},
9 time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use consensus_config::Committee as ConsensusCommittee;
13use consensus_core::{CommitConsumerMonitor, CommitIndex, CommitRef};
14use consensus_types::block::TransactionIndex;
15use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
16use lru::LruCache;
17use mysten_common::{
18 assert_reachable, assert_sometimes, debug_fatal, random_util::randomize_cache_capacity_in_tests,
19};
20use mysten_metrics::{
21 monitored_future,
22 monitored_mpsc::{self, UnboundedReceiver},
23 monitored_scope, spawn_monitored_task,
24};
25use nonempty::NonEmpty;
26use parking_lot::RwLockWriteGuard;
27use serde::{Deserialize, Serialize};
28use sui_config::node::CongestionLogConfig;
29use sui_macros::{fail_point, fail_point_arg, fail_point_if};
30use sui_protocol_config::{PerObjectCongestionControlMode, ProtocolConfig};
31use sui_types::{
32 SUI_RANDOMNESS_STATE_OBJECT_ID,
33 authenticator_state::ActiveJwk,
34 base_types::{
35 AuthorityName, ConciseableName, ConsensusObjectSequenceKey, ObjectID, ObjectRef,
36 SequenceNumber, TransactionDigest,
37 },
38 crypto::RandomnessRound,
39 digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest, Digest},
40 executable_transaction::{
41 TrustedExecutableTransaction, VerifiedExecutableTransaction,
42 VerifiedExecutableTransactionWithAliases,
43 },
44 messages_checkpoint::{
45 CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointTimestamp,
46 },
47 messages_consensus::{
48 AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments,
49 ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
50 ExecutionTimeObservation,
51 },
52 sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
53 transaction::{
54 InputObjectKind, SenderSignedData, TransactionDataAPI, TransactionKey, VerifiedTransaction,
55 WithAliases,
56 },
57};
58use tokio::task::JoinSet;
59use tracing::{debug, error, info, instrument, trace, warn};
60
61use crate::{
62 authority::{
63 AuthorityMetrics, AuthorityState, ExecutionEnv,
64 authority_per_epoch_store::{
65 AuthorityPerEpochStore, CancelConsensusCertificateReason, ConsensusStats,
66 ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStatsV2,
67 consensus_quarantine::ConsensusCommitOutput,
68 },
69 backpressure::{BackpressureManager, BackpressureSubscriber},
70 congestion_log::CongestionCommitLogger,
71 consensus_tx_status_cache::ConsensusTxStatus,
72 epoch_start_configuration::EpochStartConfigTrait,
73 execution_time_estimator::ExecutionTimeEstimator,
74 shared_object_congestion_tracker::SharedObjectCongestionTracker,
75 shared_object_version_manager::{AssignedTxAndVersions, AssignedVersions, Schedulable},
76 transaction_deferral::{DeferralKey, DeferralReason, transaction_deferral_within_limit},
77 },
78 checkpoints::{
79 CheckpointHeight, CheckpointRoots, CheckpointService, CheckpointServiceNotify,
80 PendingCheckpoint, PendingCheckpointInfo, PendingCheckpointV2,
81 },
82 consensus_adapter::ConsensusAdapter,
83 consensus_throughput_calculator::ConsensusThroughputCalculator,
84 consensus_types::consensus_output_api::ConsensusCommitAPI,
85 epoch::{
86 randomness::{DkgStatus, RandomnessManager},
87 reconfiguration::ReconfigState,
88 },
89 execution_cache::ObjectCacheRead,
90 execution_scheduler::{SettlementBatchInfo, SettlementScheduler},
91 gasless_rate_limiter::ConsensusGaslessCounter,
92 post_consensus_tx_reorder::PostConsensusTxReorder,
93 traffic_controller::{TrafficController, policies::TrafficTally},
94};
95
96struct FilteredConsensusOutput {
99 transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
100 owned_object_locks: HashMap<ObjectRef, TransactionDigest>,
101}
102
103pub struct ConsensusHandlerInitializer {
104 state: Arc<AuthorityState>,
105 checkpoint_service: Arc<CheckpointService>,
106 epoch_store: Arc<AuthorityPerEpochStore>,
107 consensus_adapter: Arc<ConsensusAdapter>,
108 throughput_calculator: Arc<ConsensusThroughputCalculator>,
109 backpressure_manager: Arc<BackpressureManager>,
110 congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
111 consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
112}
113
114impl ConsensusHandlerInitializer {
115 pub fn new(
116 state: Arc<AuthorityState>,
117 checkpoint_service: Arc<CheckpointService>,
118 epoch_store: Arc<AuthorityPerEpochStore>,
119 consensus_adapter: Arc<ConsensusAdapter>,
120 throughput_calculator: Arc<ConsensusThroughputCalculator>,
121 backpressure_manager: Arc<BackpressureManager>,
122 congestion_log_config: Option<CongestionLogConfig>,
123 ) -> Self {
124 let congestion_logger =
125 congestion_log_config.and_then(|config| match CongestionCommitLogger::new(&config) {
126 Ok(logger) => Some(Arc::new(Mutex::new(logger))),
127 Err(e) => {
128 debug_fatal!("Failed to create congestion logger: {e}");
129 None
130 }
131 });
132 let consensus_gasless_counter = state.consensus_gasless_counter.clone();
133 Self {
134 state,
135 checkpoint_service,
136 epoch_store,
137 consensus_adapter,
138 throughput_calculator,
139 backpressure_manager,
140 congestion_logger,
141 consensus_gasless_counter,
142 }
143 }
144
145 #[cfg(test)]
146 pub(crate) fn new_for_testing(
147 state: Arc<AuthorityState>,
148 checkpoint_service: Arc<CheckpointService>,
149 ) -> Self {
150 use crate::consensus_test_utils::make_consensus_adapter_for_test;
151 use std::collections::HashSet;
152
153 let backpressure_manager = BackpressureManager::new_for_tests();
154 let consensus_adapter =
155 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
156 let consensus_gasless_counter = state.consensus_gasless_counter.clone();
157 Self {
158 state: state.clone(),
159 checkpoint_service,
160 epoch_store: state.epoch_store_for_testing().clone(),
161 consensus_adapter,
162 throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
163 None,
164 state.metrics.clone(),
165 )),
166 backpressure_manager,
167 congestion_logger: None,
168 consensus_gasless_counter,
169 }
170 }
171
172 pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
173 let new_epoch_start_state = self.epoch_store.epoch_start_state();
174 let consensus_committee = new_epoch_start_state.get_consensus_committee();
175
176 let settlement_scheduler = SettlementScheduler::new(
177 self.state.execution_scheduler().as_ref().clone(),
178 self.state.get_transaction_cache_reader().clone(),
179 self.state.metrics.clone(),
180 );
181 ConsensusHandler::new(
182 self.epoch_store.clone(),
183 self.checkpoint_service.clone(),
184 settlement_scheduler,
185 self.consensus_adapter.clone(),
186 self.state.get_object_cache_reader().clone(),
187 consensus_committee,
188 self.state.metrics.clone(),
189 self.throughput_calculator.clone(),
190 self.backpressure_manager.subscribe(),
191 self.state.traffic_controller.clone(),
192 self.congestion_logger.clone(),
193 self.consensus_gasless_counter.clone(),
194 )
195 }
196}
197
198mod additional_consensus_state {
199 use std::marker::PhantomData;
200
201 use consensus_core::CommitRef;
202 use fastcrypto::hash::HashFunction as _;
203 use sui_types::{crypto::DefaultHash, digests::Digest};
204
205 use super::*;
206 #[derive(Serialize, Deserialize)]
215 pub(super) struct AdditionalConsensusState {
216 commit_interval_observer: CommitIntervalObserver,
217 }
218
219 impl AdditionalConsensusState {
220 pub fn new(additional_consensus_state_window_size: u32) -> Self {
221 Self {
222 commit_interval_observer: CommitIntervalObserver::new(
223 additional_consensus_state_window_size,
224 ),
225 }
226 }
227
228 pub(crate) fn observe_commit(
230 &mut self,
231 protocol_config: &ProtocolConfig,
232 epoch_start_time: u64,
233 consensus_commit: &impl ConsensusCommitAPI,
234 ) -> ConsensusCommitInfo {
235 self.commit_interval_observer
236 .observe_commit_time(consensus_commit);
237
238 let estimated_commit_period = self
239 .commit_interval_observer
240 .commit_interval_estimate()
241 .unwrap_or(Duration::from_millis(
242 protocol_config.min_checkpoint_interval_ms(),
243 ));
244
245 info!("estimated commit rate: {:?}", estimated_commit_period);
246
247 self.commit_info_impl(
248 epoch_start_time,
249 consensus_commit,
250 Some(estimated_commit_period),
251 )
252 }
253
254 fn commit_info_impl(
255 &self,
256 epoch_start_time: u64,
257 consensus_commit: &impl ConsensusCommitAPI,
258 estimated_commit_period: Option<Duration>,
259 ) -> ConsensusCommitInfo {
260 let leader_author = consensus_commit.leader_author_index();
261 let timestamp = consensus_commit.commit_timestamp_ms();
262
263 let timestamp = if timestamp < epoch_start_time {
264 error!(
265 "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}"
266 );
267 epoch_start_time
268 } else {
269 timestamp
270 };
271
272 ConsensusCommitInfo {
273 _phantom: PhantomData,
274 round: consensus_commit.leader_round(),
275 timestamp,
276 leader_author,
277 consensus_commit_ref: consensus_commit.commit_ref(),
278 rejected_transactions_digest: consensus_commit.rejected_transactions_digest(),
279 additional_state_digest: Some(self.digest()),
280 estimated_commit_period,
281 skip_consensus_commit_prologue_in_test: false,
282 }
283 }
284
285 fn digest(&self) -> AdditionalConsensusStateDigest {
287 let mut hash = DefaultHash::new();
288 bcs::serialize_into(&mut hash, self).unwrap();
289 AdditionalConsensusStateDigest::new(hash.finalize().into())
290 }
291 }
292
293 pub struct ConsensusCommitInfo {
294 _phantom: PhantomData<()>,
296
297 pub round: u64,
298 pub timestamp: u64,
299 pub leader_author: AuthorityIndex,
300 pub consensus_commit_ref: CommitRef,
301 pub rejected_transactions_digest: Digest,
302
303 additional_state_digest: Option<AdditionalConsensusStateDigest>,
304 estimated_commit_period: Option<Duration>,
305
306 pub skip_consensus_commit_prologue_in_test: bool,
307 }
308
309 impl ConsensusCommitInfo {
310 pub fn new_for_test(
311 commit_round: u64,
312 commit_timestamp: u64,
313 estimated_commit_period: Option<Duration>,
314 skip_consensus_commit_prologue_in_test: bool,
315 ) -> Self {
316 Self {
317 _phantom: PhantomData,
318 round: commit_round,
319 timestamp: commit_timestamp,
320 leader_author: 0,
321 consensus_commit_ref: CommitRef::default(),
322 rejected_transactions_digest: Digest::default(),
323 additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO),
324 estimated_commit_period,
325 skip_consensus_commit_prologue_in_test,
326 }
327 }
328
329 pub fn new_for_congestion_test(
330 commit_round: u64,
331 commit_timestamp: u64,
332 estimated_commit_period: Duration,
333 ) -> Self {
334 Self::new_for_test(
335 commit_round,
336 commit_timestamp,
337 Some(estimated_commit_period),
338 true,
339 )
340 }
341
342 pub fn additional_state_digest(&self) -> AdditionalConsensusStateDigest {
343 self.additional_state_digest
345 .expect("additional_state_digest is not available")
346 }
347
348 pub fn estimated_commit_period(&self) -> Duration {
349 self.estimated_commit_period
351 .expect("estimated commit period is not available")
352 }
353
354 fn consensus_commit_digest(&self) -> ConsensusCommitDigest {
355 ConsensusCommitDigest::new(self.consensus_commit_ref.digest.into_inner())
356 }
357
358 fn consensus_commit_prologue_transaction(
359 &self,
360 epoch: u64,
361 ) -> VerifiedExecutableTransaction {
362 let transaction = VerifiedTransaction::new_consensus_commit_prologue(
363 epoch,
364 self.round,
365 self.timestamp,
366 );
367 VerifiedExecutableTransaction::new_system(transaction, epoch)
368 }
369
370 fn consensus_commit_prologue_v2_transaction(
371 &self,
372 epoch: u64,
373 ) -> VerifiedExecutableTransaction {
374 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
375 epoch,
376 self.round,
377 self.timestamp,
378 self.consensus_commit_digest(),
379 );
380 VerifiedExecutableTransaction::new_system(transaction, epoch)
381 }
382
383 fn consensus_commit_prologue_v3_transaction(
384 &self,
385 epoch: u64,
386 consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
387 ) -> VerifiedExecutableTransaction {
388 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v3(
389 epoch,
390 self.round,
391 self.timestamp,
392 self.consensus_commit_digest(),
393 consensus_determined_version_assignments,
394 );
395 VerifiedExecutableTransaction::new_system(transaction, epoch)
396 }
397
398 fn consensus_commit_prologue_v4_transaction(
399 &self,
400 epoch: u64,
401 consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
402 additional_state_digest: AdditionalConsensusStateDigest,
403 ) -> VerifiedExecutableTransaction {
404 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v4(
405 epoch,
406 self.round,
407 self.timestamp,
408 self.consensus_commit_digest(),
409 consensus_determined_version_assignments,
410 additional_state_digest,
411 );
412 VerifiedExecutableTransaction::new_system(transaction, epoch)
413 }
414
415 pub fn create_consensus_commit_prologue_transaction(
416 &self,
417 epoch: u64,
418 protocol_config: &ProtocolConfig,
419 cancelled_txn_version_assignment: Vec<(
420 TransactionDigest,
421 Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
422 )>,
423 commit_info: &ConsensusCommitInfo,
424 indirect_state_observer: IndirectStateObserver,
425 ) -> VerifiedExecutableTransaction {
426 let version_assignments = if protocol_config
427 .record_consensus_determined_version_assignments_in_prologue_v2()
428 {
429 Some(
430 ConsensusDeterminedVersionAssignments::CancelledTransactionsV2(
431 cancelled_txn_version_assignment,
432 ),
433 )
434 } else if protocol_config.record_consensus_determined_version_assignments_in_prologue()
435 {
436 Some(
437 ConsensusDeterminedVersionAssignments::CancelledTransactions(
438 cancelled_txn_version_assignment
439 .into_iter()
440 .map(|(tx_digest, versions)| {
441 (
442 tx_digest,
443 versions.into_iter().map(|(id, v)| (id.0, v)).collect(),
444 )
445 })
446 .collect(),
447 ),
448 )
449 } else {
450 None
451 };
452
453 if protocol_config.record_additional_state_digest_in_prologue() {
454 let additional_state_digest =
455 if protocol_config.additional_consensus_digest_indirect_state() {
456 let d1 = commit_info.additional_state_digest();
457 indirect_state_observer.fold_with(d1)
458 } else {
459 commit_info.additional_state_digest()
460 };
461
462 self.consensus_commit_prologue_v4_transaction(
463 epoch,
464 version_assignments.unwrap(),
465 additional_state_digest,
466 )
467 } else if let Some(version_assignments) = version_assignments {
468 self.consensus_commit_prologue_v3_transaction(epoch, version_assignments)
469 } else if protocol_config.include_consensus_digest_in_prologue() {
470 self.consensus_commit_prologue_v2_transaction(epoch)
471 } else {
472 self.consensus_commit_prologue_transaction(epoch)
473 }
474 }
475 }
476
477 #[derive(Default)]
478 pub struct IndirectStateObserver {
479 hash: DefaultHash,
480 }
481
482 impl IndirectStateObserver {
483 pub fn new() -> Self {
484 Self::default()
485 }
486
487 pub fn observe_indirect_state<T: Serialize>(&mut self, state: &T) {
488 bcs::serialize_into(&mut self.hash, state).unwrap();
489 }
490
491 pub fn fold_with(
492 self,
493 d1: AdditionalConsensusStateDigest,
494 ) -> AdditionalConsensusStateDigest {
495 let hash = self.hash.finalize();
496 let d2 = AdditionalConsensusStateDigest::new(hash.into());
497
498 let mut hasher = DefaultHash::new();
499 bcs::serialize_into(&mut hasher, &d1).unwrap();
500 bcs::serialize_into(&mut hasher, &d2).unwrap();
501 AdditionalConsensusStateDigest::new(hasher.finalize().into())
502 }
503 }
504
505 #[test]
506 fn test_additional_consensus_state() {
507 use crate::consensus_test_utils::TestConsensusCommit;
508
509 fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) {
510 let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
511 state.observe_commit(
512 &protocol_config,
513 100,
514 &TestConsensusCommit::empty(round, timestamp, 0),
515 );
516 }
517
518 let mut s1 = AdditionalConsensusState::new(3);
519 observe(&mut s1, 1, 1000);
520 observe(&mut s1, 2, 2000);
521 observe(&mut s1, 3, 3000);
522 observe(&mut s1, 4, 4000);
523
524 let mut s2 = AdditionalConsensusState::new(3);
525 observe(&mut s2, 2, 2000);
528 observe(&mut s2, 3, 3000);
529 observe(&mut s2, 4, 4000);
530
531 assert_eq!(s1.digest(), s2.digest());
532
533 observe(&mut s1, 5, 5000);
534 observe(&mut s2, 5, 5000);
535
536 assert_eq!(s1.digest(), s2.digest());
537 }
538}
539use additional_consensus_state::AdditionalConsensusState;
540pub(crate) use additional_consensus_state::{ConsensusCommitInfo, IndirectStateObserver};
541
542struct QueuedCheckpointRoots {
543 roots: CheckpointRoots,
544 timestamp: CheckpointTimestamp,
545 consensus_commit_ref: CommitRef,
546 rejected_transactions_digest: Digest,
547}
548
549struct Chunk<
550 T: crate::authority::shared_object_version_manager::AsTx = VerifiedExecutableTransaction,
551> {
552 schedulables: Vec<Schedulable<T>>,
553 settlement: Option<Schedulable<T>>,
554 height: CheckpointHeight,
555}
556
557impl<T: crate::authority::shared_object_version_manager::AsTx + Clone> Chunk<T> {
558 fn all_schedulables(&self) -> impl Iterator<Item = &Schedulable<T>> + Clone {
559 self.schedulables.iter().chain(self.settlement.iter())
560 }
561
562 fn all_schedulables_from(chunks: &[Self]) -> impl Iterator<Item = &Schedulable<T>> + Clone {
563 chunks.iter().flat_map(|c| c.all_schedulables())
564 }
565
566 fn to_checkpoint_roots(&self) -> CheckpointRoots {
567 let tx_roots: Vec<_> = self.schedulables.iter().map(|s| s.key()).collect();
568 let settlement_root = self.settlement.as_ref().map(|s| s.key());
569 CheckpointRoots {
570 tx_roots,
571 settlement_root,
572 height: self.height,
573 }
574 }
575}
576
577impl From<Chunk<VerifiedExecutableTransactionWithAliases>> for Chunk {
578 fn from(chunk: Chunk<VerifiedExecutableTransactionWithAliases>) -> Self {
579 Chunk {
580 schedulables: chunk.schedulables.into_iter().map(|s| s.into()).collect(),
581 settlement: chunk.settlement.map(|s| s.into()),
582 height: chunk.height,
583 }
584 }
585}
586
587pub(crate) struct CheckpointQueue {
595 last_built_timestamp: CheckpointTimestamp,
596 pending_roots: VecDeque<QueuedCheckpointRoots>,
597 height: u64,
598 pending_tx_count: usize,
599 current_checkpoint_seq: CheckpointSequenceNumber,
600 max_tx: usize,
601 min_checkpoint_interval_ms: u64,
602 execution_scheduler_sender: ExecutionSchedulerSender,
603}
604
605impl CheckpointQueue {
606 pub(crate) fn new(
607 last_built_timestamp: CheckpointTimestamp,
608 checkpoint_height: u64,
609 next_checkpoint_seq: CheckpointSequenceNumber,
610 max_tx: usize,
611 min_checkpoint_interval_ms: u64,
612 execution_scheduler_sender: ExecutionSchedulerSender,
613 ) -> Self {
614 Self {
615 last_built_timestamp,
616 pending_roots: VecDeque::new(),
617 height: checkpoint_height,
618 pending_tx_count: 0,
619 current_checkpoint_seq: next_checkpoint_seq,
620 max_tx,
621 min_checkpoint_interval_ms,
622 execution_scheduler_sender,
623 }
624 }
625
626 #[cfg(test)]
627 fn new_for_testing(
628 last_built_timestamp: CheckpointTimestamp,
629 checkpoint_height: u64,
630 next_checkpoint_seq: CheckpointSequenceNumber,
631 max_tx: usize,
632 min_checkpoint_interval_ms: u64,
633 ) -> Self {
634 let (sender, _receiver) = monitored_mpsc::unbounded_channel("test_checkpoint_queue_sender");
635 Self {
636 last_built_timestamp,
637 pending_roots: VecDeque::new(),
638 height: checkpoint_height,
639 pending_tx_count: 0,
640 current_checkpoint_seq: next_checkpoint_seq,
641 max_tx,
642 min_checkpoint_interval_ms,
643 execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
644 }
645 }
646
647 #[cfg(test)]
648 fn new_for_testing_with_sender(
649 last_built_timestamp: CheckpointTimestamp,
650 checkpoint_height: u64,
651 next_checkpoint_seq: CheckpointSequenceNumber,
652 max_tx: usize,
653 min_checkpoint_interval_ms: u64,
654 sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
655 ) -> Self {
656 Self {
657 last_built_timestamp,
658 pending_roots: VecDeque::new(),
659 height: checkpoint_height,
660 pending_tx_count: 0,
661 current_checkpoint_seq: next_checkpoint_seq,
662 max_tx,
663 min_checkpoint_interval_ms,
664 execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
665 }
666 }
667
668 pub(crate) fn last_built_timestamp(&self) -> CheckpointTimestamp {
669 self.last_built_timestamp
670 }
671
672 pub(crate) fn is_empty(&self) -> bool {
673 self.pending_roots.is_empty()
674 }
675
676 fn next_height(&mut self) -> u64 {
677 self.height += 1;
678 self.height
679 }
680
681 fn push_chunk(
682 &mut self,
683 chunk: Chunk,
684 assigned_versions: &HashMap<TransactionKey, AssignedVersions>,
685 timestamp: CheckpointTimestamp,
686 consensus_commit_ref: CommitRef,
687 rejected_transactions_digest: Digest,
688 ) -> Vec<PendingCheckpointV2> {
689 let max_tx = self.max_tx;
690 let user_tx_count = chunk.schedulables.len();
691
692 let roots = chunk.to_checkpoint_roots();
693
694 let schedulables: Vec<_> = chunk
695 .schedulables
696 .into_iter()
697 .map(|s| {
698 let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
699 (s, versions)
700 })
701 .collect();
702
703 let mut flushed_checkpoints = Vec::new();
704
705 if self.pending_tx_count > 0
706 && self.pending_tx_count + user_tx_count > max_tx
707 && let Some(checkpoint) = self.flush_forced()
708 {
709 flushed_checkpoints.push(checkpoint);
710 }
711
712 let settlement_info = chunk.settlement.as_ref().map(|s| {
713 let settlement_key = s.key();
714 let tx_keys: Vec<_> = schedulables.iter().map(|(s, _)| s.key()).collect();
715 SettlementBatchInfo {
716 settlement_key,
717 tx_keys,
718 checkpoint_height: chunk.height,
719 checkpoint_seq: self.current_checkpoint_seq,
720 assigned_versions: assigned_versions
721 .get(&settlement_key)
722 .cloned()
723 .unwrap_or_default(),
724 }
725 });
726
727 self.execution_scheduler_sender
728 .send(schedulables, settlement_info);
729
730 self.pending_tx_count += user_tx_count;
731 self.pending_roots.push_back(QueuedCheckpointRoots {
732 roots,
733 timestamp,
734 consensus_commit_ref,
735 rejected_transactions_digest,
736 });
737
738 flushed_checkpoints
739 }
740
741 pub(crate) fn flush(
742 &mut self,
743 current_timestamp: CheckpointTimestamp,
744 force: bool,
745 ) -> Option<PendingCheckpointV2> {
746 if !force && current_timestamp < self.last_built_timestamp + self.min_checkpoint_interval_ms
747 {
748 return None;
749 }
750 self.flush_forced()
751 }
752
753 fn flush_forced(&mut self) -> Option<PendingCheckpointV2> {
754 if self.pending_roots.is_empty() {
755 return None;
756 }
757
758 let to_flush: Vec<_> = self.pending_roots.drain(..).collect();
759 let last_root = to_flush.last().unwrap();
760
761 let checkpoint = PendingCheckpointV2 {
762 roots: to_flush.iter().map(|q| q.roots.clone()).collect(),
763 details: PendingCheckpointInfo {
764 timestamp_ms: last_root.timestamp,
765 last_of_epoch: false,
766 checkpoint_height: last_root.roots.height,
767 consensus_commit_ref: last_root.consensus_commit_ref,
768 rejected_transactions_digest: last_root.rejected_transactions_digest,
769 checkpoint_seq: Some(self.current_checkpoint_seq),
770 },
771 };
772
773 self.last_built_timestamp = last_root.timestamp;
774 self.pending_tx_count = 0;
775 self.current_checkpoint_seq += 1;
776
777 Some(checkpoint)
778 }
779
780 pub(crate) fn checkpoint_seq(&self) -> CheckpointSequenceNumber {
781 self.current_checkpoint_seq
782 .checked_sub(1)
783 .expect("checkpoint_seq called before any checkpoint was assigned")
784 }
785}
786
787pub struct ConsensusHandler<C> {
788 epoch_store: Arc<AuthorityPerEpochStore>,
791 last_consensus_stats: ExecutionIndicesWithStatsV2,
795 checkpoint_service: Arc<C>,
796 cache_reader: Arc<dyn ObjectCacheRead>,
798 committee: ConsensusCommittee,
800 metrics: Arc<AuthorityMetrics>,
803 processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
805 execution_scheduler_sender: ExecutionSchedulerSender,
807 consensus_adapter: Arc<ConsensusAdapter>,
809
810 throughput_calculator: Arc<ConsensusThroughputCalculator>,
812
813 additional_consensus_state: AdditionalConsensusState,
814
815 backpressure_subscriber: BackpressureSubscriber,
816
817 traffic_controller: Option<Arc<TrafficController>>,
818
819 congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
820
821 consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
822
823 checkpoint_queue: Mutex<CheckpointQueue>,
824}
825
826const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
827
828impl<C> ConsensusHandler<C> {
829 pub(crate) fn new(
830 epoch_store: Arc<AuthorityPerEpochStore>,
831 checkpoint_service: Arc<C>,
832 settlement_scheduler: SettlementScheduler,
833 consensus_adapter: Arc<ConsensusAdapter>,
834 cache_reader: Arc<dyn ObjectCacheRead>,
835 committee: ConsensusCommittee,
836 metrics: Arc<AuthorityMetrics>,
837 throughput_calculator: Arc<ConsensusThroughputCalculator>,
838 backpressure_subscriber: BackpressureSubscriber,
839 traffic_controller: Option<Arc<TrafficController>>,
840 congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
841 consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
842 ) -> Self {
843 assert!(
844 matches!(
845 epoch_store
846 .protocol_config()
847 .per_object_congestion_control_mode(),
848 PerObjectCongestionControlMode::ExecutionTimeEstimate(_)
849 ),
850 "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
851 );
852
853 let mut last_consensus_stats = epoch_store
855 .get_last_consensus_stats()
856 .expect("Should be able to read last consensus index");
857 if !last_consensus_stats.stats.is_initialized() {
859 last_consensus_stats.stats = ConsensusStats::new(committee.size());
860 if epoch_store
861 .protocol_config()
862 .split_checkpoints_in_consensus_handler()
863 {
864 last_consensus_stats.checkpoint_seq = epoch_store.previous_epoch_last_checkpoint();
865 }
866 }
867 let max_tx = epoch_store
868 .protocol_config()
869 .max_transactions_per_checkpoint() as usize;
870 let min_checkpoint_interval_ms = epoch_store
871 .protocol_config()
872 .min_checkpoint_interval_ms_as_option()
873 .unwrap_or_default();
874 let execution_scheduler_sender =
875 ExecutionSchedulerSender::start(settlement_scheduler, epoch_store.clone());
876 let commit_rate_estimate_window_size = epoch_store
877 .protocol_config()
878 .get_consensus_commit_rate_estimation_window_size();
879 let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
880 let checkpoint_height = last_consensus_stats.height;
881 let next_checkpoint_seq = if epoch_store
882 .protocol_config()
883 .split_checkpoints_in_consensus_handler()
884 {
885 last_consensus_stats.checkpoint_seq + 1
886 } else {
887 0
888 };
889 Self {
890 epoch_store,
891 last_consensus_stats,
892 checkpoint_service,
893 cache_reader,
894 committee,
895 metrics,
896 processed_cache: LruCache::new(
897 NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
898 ),
899 execution_scheduler_sender: execution_scheduler_sender.clone(),
900 consensus_adapter,
901 throughput_calculator,
902 additional_consensus_state: AdditionalConsensusState::new(
903 commit_rate_estimate_window_size,
904 ),
905 backpressure_subscriber,
906 traffic_controller,
907 congestion_logger,
908 consensus_gasless_counter,
909 checkpoint_queue: Mutex::new(CheckpointQueue::new(
910 last_built_timestamp,
911 checkpoint_height,
912 next_checkpoint_seq,
913 max_tx,
914 min_checkpoint_interval_ms,
915 execution_scheduler_sender,
916 )),
917 }
918 }
919
920 pub(crate) fn last_processed_subdag_index(&self) -> u64 {
922 self.last_consensus_stats.index.sub_dag_index
923 }
924
925 pub(crate) fn new_for_testing(
926 epoch_store: Arc<AuthorityPerEpochStore>,
927 checkpoint_service: Arc<C>,
928 execution_scheduler_sender: ExecutionSchedulerSender,
929 consensus_adapter: Arc<ConsensusAdapter>,
930 cache_reader: Arc<dyn ObjectCacheRead>,
931 committee: ConsensusCommittee,
932 metrics: Arc<AuthorityMetrics>,
933 throughput_calculator: Arc<ConsensusThroughputCalculator>,
934 backpressure_subscriber: BackpressureSubscriber,
935 traffic_controller: Option<Arc<TrafficController>>,
936 last_consensus_stats: ExecutionIndicesWithStatsV2,
937 ) -> Self {
938 let commit_rate_estimate_window_size = epoch_store
939 .protocol_config()
940 .get_consensus_commit_rate_estimation_window_size();
941 let max_tx = epoch_store
942 .protocol_config()
943 .max_transactions_per_checkpoint() as usize;
944 let min_checkpoint_interval_ms = epoch_store
945 .protocol_config()
946 .min_checkpoint_interval_ms_as_option()
947 .unwrap_or_default();
948 let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
949 let checkpoint_height = last_consensus_stats.height;
950 Self {
951 epoch_store,
952 last_consensus_stats,
953 checkpoint_service,
954 cache_reader,
955 committee,
956 metrics,
957 processed_cache: LruCache::new(
958 NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
959 ),
960 execution_scheduler_sender: execution_scheduler_sender.clone(),
961 consensus_adapter,
962 throughput_calculator,
963 additional_consensus_state: AdditionalConsensusState::new(
964 commit_rate_estimate_window_size,
965 ),
966 backpressure_subscriber,
967 traffic_controller,
968 congestion_logger: None,
969 consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
970 checkpoint_queue: Mutex::new(CheckpointQueue::new(
971 last_built_timestamp,
972 checkpoint_height,
973 0,
974 max_tx,
975 min_checkpoint_interval_ms,
976 execution_scheduler_sender,
977 )),
978 }
979 }
980}
981
982#[derive(Default)]
983struct CommitHandlerInput {
984 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
985 capability_notifications: Vec<AuthorityCapabilitiesV2>,
986 execution_time_observations: Vec<ExecutionTimeObservation>,
987 checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
988 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
989 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
990 end_of_publish_transactions: Vec<AuthorityName>,
991 new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
992}
993
994struct CommitHandlerState {
995 dkg_failed: bool,
996 randomness_round: Option<RandomnessRound>,
997 output: ConsensusCommitOutput,
998 indirect_state_observer: Option<IndirectStateObserver>,
999 initial_reconfig_state: ReconfigState,
1000 occurrence_counts: HashMap<TransactionDigest, u32>,
1002}
1003
1004impl CommitHandlerState {
1005 fn get_notifications(&self) -> Vec<SequencedConsensusTransactionKey> {
1006 self.output
1007 .get_consensus_messages_processed()
1008 .cloned()
1009 .collect()
1010 }
1011
1012 fn init_randomness<'a, 'epoch>(
1013 &'a mut self,
1014 epoch_store: &'epoch AuthorityPerEpochStore,
1015 commit_info: &'a ConsensusCommitInfo,
1016 ) -> Option<tokio::sync::MutexGuard<'epoch, RandomnessManager>> {
1017 let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| {
1018 rm.try_lock()
1019 .expect("should only ever be called from the commit handler thread")
1020 });
1021
1022 let mut dkg_failed = false;
1023 let randomness_round = if epoch_store.randomness_state_enabled() {
1024 let randomness_manager = randomness_manager
1025 .as_mut()
1026 .expect("randomness manager should exist if randomness is enabled");
1027 match randomness_manager.dkg_status() {
1028 DkgStatus::Pending => None,
1029 DkgStatus::Failed => {
1030 dkg_failed = true;
1031 None
1032 }
1033 DkgStatus::Successful => {
1034 if self.initial_reconfig_state.should_accept_tx() {
1037 randomness_manager
1038 .reserve_next_randomness(commit_info.timestamp, &mut self.output)
1040 .expect("epoch ended")
1041 } else {
1042 None
1043 }
1044 }
1045 }
1046 } else {
1047 None
1048 };
1049
1050 if randomness_round.is_some() {
1051 assert!(!dkg_failed); }
1053
1054 self.randomness_round = randomness_round;
1055 self.dkg_failed = dkg_failed;
1056
1057 randomness_manager
1058 }
1059}
1060
1061impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
1062 fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
1066 assert!(
1067 self.epoch_store
1068 .protocol_config()
1069 .record_additional_state_digest_in_prologue()
1070 );
1071 let protocol_config = self.epoch_store.protocol_config();
1072 let epoch_start_time = self
1073 .epoch_store
1074 .epoch_start_config()
1075 .epoch_start_timestamp_ms();
1076
1077 self.additional_consensus_state.observe_commit(
1078 protocol_config,
1079 epoch_start_time,
1080 &consensus_commit,
1081 );
1082 }
1083
1084 #[cfg(test)]
1085 pub(crate) async fn handle_consensus_commit_for_test(
1086 &mut self,
1087 consensus_commit: impl ConsensusCommitAPI,
1088 ) {
1089 self.handle_consensus_commit(consensus_commit).await;
1090 }
1091
1092 #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))]
1093 pub(crate) async fn handle_consensus_commit(
1094 &mut self,
1095 consensus_commit: impl ConsensusCommitAPI,
1096 ) {
1097 {
1098 let protocol_config = self.epoch_store.protocol_config();
1099
1100 assert!(protocol_config.ignore_execution_time_observations_after_certs_closed());
1102 assert!(protocol_config.record_time_estimate_processed());
1103 assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints());
1104 assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest());
1105 assert!(protocol_config.authority_capabilities_v2());
1106 assert!(protocol_config.cancel_for_failed_dkg_early());
1107 }
1108
1109 self.backpressure_subscriber.await_no_backpressure().await;
1114
1115 let epoch = self.epoch_store.epoch();
1116
1117 let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit");
1118
1119 let last_committed_round = self.last_consensus_stats.index.last_committed_round;
1120
1121 self.epoch_store
1122 .consensus_tx_status_cache
1123 .update_last_committed_leader_round(last_committed_round as u32);
1124 self.epoch_store
1125 .tx_reject_reason_cache
1126 .set_last_committed_leader_round(last_committed_round as u32);
1127
1128 let commit_info = self.additional_consensus_state.observe_commit(
1129 self.epoch_store.protocol_config(),
1130 self.epoch_store
1131 .epoch_start_config()
1132 .epoch_start_timestamp_ms(),
1133 &consensus_commit,
1134 );
1135 assert!(commit_info.round > last_committed_round);
1136
1137 let (timestamp, leader_author, commit_sub_dag_index) =
1138 self.gather_commit_metadata(&consensus_commit);
1139
1140 info!(
1141 %consensus_commit,
1142 "Received consensus output {}. Rejected transactions {}",
1143 consensus_commit.commit_ref(),
1144 consensus_commit.rejected_transactions_debug_string(),
1145 );
1146
1147 self.last_consensus_stats.index = ExecutionIndices {
1148 last_committed_round: commit_info.round,
1149 sub_dag_index: commit_sub_dag_index,
1150 transaction_index: 0_u64,
1151 };
1152
1153 self.metrics
1154 .consensus_committed_subdags
1155 .with_label_values(&[&leader_author.to_string()])
1156 .inc();
1157
1158 let mut state = CommitHandlerState {
1159 output: ConsensusCommitOutput::new(commit_info.round),
1160 dkg_failed: false,
1161 randomness_round: None,
1162 indirect_state_observer: Some(IndirectStateObserver::new()),
1163 initial_reconfig_state: self
1164 .epoch_store
1165 .get_reconfig_state_read_lock_guard()
1166 .clone(),
1167 occurrence_counts: HashMap::new(),
1168 };
1169
1170 let FilteredConsensusOutput {
1171 transactions,
1172 owned_object_locks,
1173 } = self.filter_consensus_txns(
1174 state.initial_reconfig_state.clone(),
1175 &commit_info,
1176 &consensus_commit,
1177 );
1178 if !owned_object_locks.is_empty() {
1180 state.output.set_owned_object_locks(owned_object_locks);
1181 }
1182 let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions);
1183
1184 let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info);
1185
1186 let CommitHandlerInput {
1187 user_transactions,
1188 capability_notifications,
1189 execution_time_observations,
1190 checkpoint_signature_messages,
1191 randomness_dkg_messages,
1192 randomness_dkg_confirmations,
1193 end_of_publish_transactions,
1194 new_jwks,
1195 } = self.build_commit_handler_input(transactions);
1196
1197 self.process_gasless_transactions(&commit_info, &user_transactions);
1198 self.process_jwks(&mut state, &commit_info, new_jwks);
1199 self.process_capability_notifications(capability_notifications);
1200 self.process_execution_time_observations(&mut state, execution_time_observations);
1201 self.process_checkpoint_signature_messages(checkpoint_signature_messages);
1202
1203 self.process_dkg_updates(
1204 &mut state,
1205 &commit_info,
1206 randomness_manager.as_deref_mut(),
1207 randomness_dkg_messages,
1208 randomness_dkg_confirmations,
1209 )
1210 .await;
1211
1212 let mut execution_time_estimator = self
1213 .epoch_store
1214 .execution_time_estimator
1215 .try_lock()
1216 .expect("should only ever be called from the commit handler thread");
1217
1218 let authenticator_state_update_transaction =
1219 self.create_authenticator_state_update(last_committed_round, &commit_info);
1220
1221 let split_checkpoints = self
1222 .epoch_store
1223 .protocol_config()
1224 .split_checkpoints_in_consensus_handler();
1225
1226 let (lock, final_round, num_schedulables, checkpoint_height) = if !split_checkpoints {
1227 let (schedulables, randomness_schedulables, assigned_versions) = self
1228 .process_transactions(
1229 &mut state,
1230 &mut execution_time_estimator,
1231 &commit_info,
1232 authenticator_state_update_transaction,
1233 user_transactions,
1234 );
1235
1236 let (should_accept_tx, lock, final_round) =
1237 self.handle_eop(&mut state, end_of_publish_transactions);
1238
1239 let make_checkpoint = should_accept_tx || final_round;
1240 if !make_checkpoint {
1241 return;
1243 }
1244
1245 if final_round {
1248 self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1249 }
1250
1251 let checkpoint_height = self
1252 .epoch_store
1253 .calculate_pending_checkpoint_height(commit_info.round);
1254
1255 self.create_pending_checkpoints(
1256 &mut state,
1257 &commit_info,
1258 &schedulables,
1259 &randomness_schedulables,
1260 final_round,
1261 );
1262
1263 let num_schedulables = schedulables.len();
1264 let mut all_schedulables = schedulables;
1265 all_schedulables.extend(randomness_schedulables);
1266 let assigned_versions = assigned_versions.into_map();
1267 let paired: Vec<_> = all_schedulables
1268 .into_iter()
1269 .map(|s| {
1270 let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
1271 (s, versions)
1272 })
1273 .collect();
1274 self.execution_scheduler_sender.send(paired, None);
1275
1276 (lock, final_round, num_schedulables, checkpoint_height)
1277 } else {
1278 let (
1279 transactions_to_schedule,
1280 randomness_transactions_to_schedule,
1281 cancelled_txns,
1282 randomness_state_update_transaction,
1283 ) = self.collect_transactions_to_schedule(
1284 &mut state,
1285 &mut execution_time_estimator,
1286 &commit_info,
1287 user_transactions,
1288 );
1289
1290 let (should_accept_tx, lock, final_round) =
1291 self.handle_eop(&mut state, end_of_publish_transactions);
1292
1293 let make_checkpoint = should_accept_tx || final_round;
1294 if !make_checkpoint {
1295 return;
1297 }
1298
1299 if final_round {
1302 self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1303 }
1304
1305 let epoch = self.epoch_store.epoch();
1306 let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1307 .then_some(Schedulable::ConsensusCommitPrologue(
1308 epoch,
1309 commit_info.round,
1310 commit_info.consensus_commit_ref.index,
1311 ));
1312
1313 let schedulables: Vec<_> = itertools::chain!(
1314 consensus_commit_prologue.into_iter(),
1315 authenticator_state_update_transaction
1316 .into_iter()
1317 .map(Schedulable::Transaction),
1318 transactions_to_schedule
1319 .into_iter()
1320 .map(Schedulable::Transaction),
1321 )
1322 .collect();
1323
1324 let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1325 .into_iter()
1326 .chain(
1327 randomness_transactions_to_schedule
1328 .into_iter()
1329 .map(Schedulable::Transaction),
1330 )
1331 .collect();
1332
1333 let num_schedulables = schedulables.len();
1334 let checkpoint_height = self.create_pending_checkpoints_v2(
1335 &mut state,
1336 &commit_info,
1337 schedulables,
1338 randomness_schedulables,
1339 &cancelled_txns,
1340 final_round,
1341 );
1342
1343 (lock, final_round, num_schedulables, checkpoint_height)
1344 };
1345
1346 let notifications = state.get_notifications();
1347
1348 let mut stats_to_record = self.last_consensus_stats.clone();
1349 stats_to_record.height = checkpoint_height;
1350 if split_checkpoints {
1351 let queue = self.checkpoint_queue.lock().unwrap();
1352 stats_to_record.last_checkpoint_flush_timestamp = queue.last_built_timestamp();
1353 stats_to_record.checkpoint_seq = queue.checkpoint_seq();
1354 }
1355
1356 state.output.record_consensus_commit_stats(stats_to_record);
1357
1358 self.record_deferral_deletion(&mut state);
1359
1360 self.epoch_store
1361 .consensus_quarantine
1362 .write()
1363 .push_consensus_output(state.output, &self.epoch_store)
1364 .expect("push_consensus_output should not fail");
1365
1366 debug!(
1367 ?commit_info.round,
1368 "Notifying checkpoint service about new pending checkpoint(s)",
1369 );
1370 self.checkpoint_service
1371 .notify_checkpoint()
1372 .expect("failed to notify checkpoint service");
1373
1374 if let Some(randomness_round) = state.randomness_round {
1375 randomness_manager
1376 .as_ref()
1377 .expect("randomness manager should exist if randomness round is provided")
1378 .generate_randomness(epoch, randomness_round);
1379 }
1380
1381 self.epoch_store.process_notifications(notifications.iter());
1382
1383 self.log_final_round(lock, final_round);
1385
1386 self.throughput_calculator
1388 .add_transactions(timestamp, num_schedulables as u64);
1389
1390 fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
1391 let key = [commit_sub_dag_index, epoch];
1392 if sui_simulator::random::deterministic_probability_once(&key, 0.01) {
1393 sui_simulator::task::kill_current_node(None);
1394 }
1395 });
1396
1397 fail_point!("crash");
1398
1399 self.send_end_of_publish_if_needed().await;
1400 }
1401
1402 fn handle_eop(
1403 &self,
1404 state: &mut CommitHandlerState,
1405 end_of_publish_transactions: Vec<AuthorityName>,
1406 ) -> (bool, Option<RwLockWriteGuard<'_, ReconfigState>>, bool) {
1407 let collected_eop =
1408 self.process_end_of_publish_transactions(state, end_of_publish_transactions);
1409 if collected_eop {
1410 let (lock, final_round) = self.advance_eop_state_machine(state);
1411 (lock.should_accept_tx(), Some(lock), final_round)
1412 } else {
1413 (true, None, false)
1414 }
1415 }
1416
1417 fn record_end_of_epoch_execution_time_observations(
1418 &self,
1419 estimator: &mut ExecutionTimeEstimator,
1420 ) {
1421 self.epoch_store
1422 .end_of_epoch_execution_time_observations
1423 .set(estimator.take_observations())
1424 .expect("`stored_execution_time_observations` should only be set once at end of epoch");
1425 }
1426
1427 fn record_deferral_deletion(&self, state: &mut CommitHandlerState) {
1428 let mut deferred_transactions = self
1429 .epoch_store
1430 .consensus_output_cache
1431 .deferred_transactions
1432 .lock();
1433 for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() {
1434 deferred_transactions.remove(&deleted_deferred_key);
1435 }
1436 }
1437
1438 fn log_final_round(&self, lock: Option<RwLockWriteGuard<ReconfigState>>, final_round: bool) {
1439 if final_round {
1440 let epoch = self.epoch_store.epoch();
1441 info!(
1442 ?epoch,
1443 lock=?lock.as_ref(),
1444 final_round=?final_round,
1445 "Notified last checkpoint"
1446 );
1447 self.epoch_store.record_end_of_message_quorum_time_metric();
1448 }
1449 }
1450
1451 fn create_pending_checkpoints(
1452 &self,
1453 state: &mut CommitHandlerState,
1454 commit_info: &ConsensusCommitInfo,
1455 schedulables: &[Schedulable],
1456 randomness_schedulables: &[Schedulable],
1457 final_round: bool,
1458 ) {
1459 assert!(
1460 !self
1461 .epoch_store
1462 .protocol_config()
1463 .split_checkpoints_in_consensus_handler()
1464 );
1465
1466 let checkpoint_height = self
1467 .epoch_store
1468 .calculate_pending_checkpoint_height(commit_info.round);
1469
1470 let should_write_random_checkpoint = state.randomness_round.is_some()
1477 || (state.dkg_failed && !randomness_schedulables.is_empty());
1478
1479 let pending_checkpoint = PendingCheckpoint {
1480 roots: schedulables.iter().map(|s| s.key()).collect(),
1481 details: PendingCheckpointInfo {
1482 timestamp_ms: commit_info.timestamp,
1483 last_of_epoch: final_round && !should_write_random_checkpoint,
1484 checkpoint_height,
1485 consensus_commit_ref: commit_info.consensus_commit_ref,
1486 rejected_transactions_digest: commit_info.rejected_transactions_digest,
1487 checkpoint_seq: None,
1488 },
1489 };
1490 self.epoch_store
1491 .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1492 .expect("failed to write pending checkpoint");
1493
1494 info!(
1495 "Written pending checkpoint: {:?}",
1496 pending_checkpoint.details,
1497 );
1498
1499 if should_write_random_checkpoint {
1500 let pending_checkpoint = PendingCheckpoint {
1501 roots: randomness_schedulables.iter().map(|s| s.key()).collect(),
1502 details: PendingCheckpointInfo {
1503 timestamp_ms: commit_info.timestamp,
1504 last_of_epoch: final_round,
1505 checkpoint_height: checkpoint_height + 1,
1506 consensus_commit_ref: commit_info.consensus_commit_ref,
1507 rejected_transactions_digest: commit_info.rejected_transactions_digest,
1508 checkpoint_seq: None,
1509 },
1510 };
1511 self.epoch_store
1512 .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1513 .expect("failed to write pending checkpoint");
1514 }
1515 }
1516
1517 #[allow(clippy::type_complexity)]
1518 fn collect_transactions_to_schedule(
1519 &self,
1520 state: &mut CommitHandlerState,
1521 execution_time_estimator: &mut ExecutionTimeEstimator,
1522 commit_info: &ConsensusCommitInfo,
1523 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1524 ) -> (
1525 Vec<VerifiedExecutableTransactionWithAliases>,
1526 Vec<VerifiedExecutableTransactionWithAliases>,
1527 BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1528 Option<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1529 ) {
1530 let protocol_config = self.epoch_store.protocol_config();
1531 let epoch = self.epoch_store.epoch();
1532
1533 let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) =
1534 self.merge_and_reorder_transactions(state, commit_info, user_transactions);
1535
1536 let mut shared_object_congestion_tracker =
1537 self.init_congestion_tracker(commit_info, false, &ordered_txns);
1538 let mut shared_object_using_randomness_congestion_tracker =
1539 self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns);
1540
1541 let randomness_state_update_transaction = state
1542 .randomness_round
1543 .map(|round| Schedulable::RandomnessStateUpdate(epoch, round));
1544 debug!(
1545 "Randomness state update transaction: {:?}",
1546 randomness_state_update_transaction
1547 .as_ref()
1548 .map(|t| t.key())
1549 );
1550
1551 let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len());
1552 let mut randomness_transactions_to_schedule =
1553 Vec::with_capacity(ordered_randomness_txns.len());
1554 let mut deferred_txns = BTreeMap::new();
1555 let mut cancelled_txns = BTreeMap::new();
1556
1557 for transaction in ordered_txns {
1558 self.handle_deferral_and_cancellation(
1559 state,
1560 &mut cancelled_txns,
1561 &mut deferred_txns,
1562 &mut transactions_to_schedule,
1563 protocol_config,
1564 commit_info,
1565 transaction,
1566 &mut shared_object_congestion_tracker,
1567 &previously_deferred_tx_digests,
1568 execution_time_estimator,
1569 );
1570 }
1571
1572 for transaction in ordered_randomness_txns {
1573 if state.dkg_failed {
1574 debug!(
1575 "Canceling randomness-using transaction {:?} because DKG failed",
1576 transaction.tx().digest(),
1577 );
1578 cancelled_txns.insert(
1579 *transaction.tx().digest(),
1580 CancelConsensusCertificateReason::DkgFailed,
1581 );
1582 randomness_transactions_to_schedule.push(transaction);
1583 continue;
1584 }
1585 self.handle_deferral_and_cancellation(
1586 state,
1587 &mut cancelled_txns,
1588 &mut deferred_txns,
1589 &mut randomness_transactions_to_schedule,
1590 protocol_config,
1591 commit_info,
1592 transaction,
1593 &mut shared_object_using_randomness_congestion_tracker,
1594 &previously_deferred_tx_digests,
1595 execution_time_estimator,
1596 );
1597 }
1598
1599 let mut total_deferred_txns = 0;
1600 {
1601 let mut deferred_transactions = self
1602 .epoch_store
1603 .consensus_output_cache
1604 .deferred_transactions
1605 .lock();
1606 for (key, txns) in deferred_txns.into_iter() {
1607 total_deferred_txns += txns.len();
1608 deferred_transactions.insert(key, txns.clone());
1609 state.output.defer_transactions(key, txns);
1610 }
1611 }
1612
1613 self.metrics
1614 .consensus_handler_deferred_transactions
1615 .inc_by(total_deferred_txns as u64);
1616 self.metrics
1617 .consensus_handler_cancelled_transactions
1618 .inc_by(cancelled_txns.len() as u64);
1619 self.metrics
1620 .consensus_handler_max_object_costs
1621 .with_label_values(&["regular_commit"])
1622 .set(shared_object_congestion_tracker.max_cost() as i64);
1623 self.metrics
1624 .consensus_handler_max_object_costs
1625 .with_label_values(&["randomness_commit"])
1626 .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
1627
1628 let congestion_commit_data = shared_object_congestion_tracker.finish_commit(commit_info);
1629 let randomness_congestion_commit_data =
1630 shared_object_using_randomness_congestion_tracker.finish_commit(commit_info);
1631
1632 if let Some(logger) = &self.congestion_logger {
1633 let epoch = self.epoch_store.epoch();
1634 let mut logger = logger.lock().unwrap();
1635 logger.write_commit_log(epoch, commit_info, false, &congestion_commit_data);
1636 logger.write_commit_log(epoch, commit_info, true, &randomness_congestion_commit_data);
1637 }
1638
1639 if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get()
1640 && let Err(e) = tx_object_debts.try_send(
1641 congestion_commit_data
1642 .accumulated_debts
1643 .iter()
1644 .chain(randomness_congestion_commit_data.accumulated_debts.iter())
1645 .map(|(id, _)| *id)
1646 .collect(),
1647 )
1648 {
1649 info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
1650 }
1651
1652 state
1653 .output
1654 .set_congestion_control_object_debts(congestion_commit_data.accumulated_debts);
1655 state.output.set_congestion_control_randomness_object_debts(
1656 randomness_congestion_commit_data.accumulated_debts,
1657 );
1658
1659 (
1660 transactions_to_schedule,
1661 randomness_transactions_to_schedule,
1662 cancelled_txns,
1663 randomness_state_update_transaction,
1664 )
1665 }
1666
1667 fn process_transactions(
1668 &self,
1669 state: &mut CommitHandlerState,
1670 execution_time_estimator: &mut ExecutionTimeEstimator,
1671 commit_info: &ConsensusCommitInfo,
1672 authenticator_state_update_transaction: Option<VerifiedExecutableTransactionWithAliases>,
1673 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1674 ) -> (Vec<Schedulable>, Vec<Schedulable>, AssignedTxAndVersions) {
1675 let protocol_config = self.epoch_store.protocol_config();
1676 assert!(!protocol_config.split_checkpoints_in_consensus_handler());
1677 let epoch = self.epoch_store.epoch();
1678
1679 let (
1680 transactions_to_schedule,
1681 randomness_transactions_to_schedule,
1682 cancelled_txns,
1683 randomness_state_update_transaction,
1684 ) = self.collect_transactions_to_schedule(
1685 state,
1686 execution_time_estimator,
1687 commit_info,
1688 user_transactions,
1689 );
1690
1691 let mut settlement = None;
1692 let mut randomness_settlement = None;
1693 if self.epoch_store.accumulators_enabled() {
1694 let checkpoint_height = self
1695 .epoch_store
1696 .calculate_pending_checkpoint_height(commit_info.round);
1697
1698 settlement = Some(Schedulable::AccumulatorSettlement(epoch, checkpoint_height));
1699
1700 if state.randomness_round.is_some() || !randomness_transactions_to_schedule.is_empty() {
1701 randomness_settlement = Some(Schedulable::AccumulatorSettlement(
1702 epoch,
1703 checkpoint_height + 1,
1704 ));
1705 }
1706 }
1707
1708 let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1709 .then_some(Schedulable::ConsensusCommitPrologue(
1710 epoch,
1711 commit_info.round,
1712 commit_info.consensus_commit_ref.index,
1713 ));
1714
1715 let schedulables: Vec<_> = itertools::chain!(
1716 consensus_commit_prologue.into_iter(),
1717 authenticator_state_update_transaction
1718 .into_iter()
1719 .map(Schedulable::Transaction),
1720 transactions_to_schedule
1721 .into_iter()
1722 .map(Schedulable::Transaction),
1723 settlement,
1724 )
1725 .collect();
1726
1727 let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1728 .into_iter()
1729 .chain(
1730 randomness_transactions_to_schedule
1731 .into_iter()
1732 .map(Schedulable::Transaction),
1733 )
1734 .chain(randomness_settlement)
1735 .collect();
1736
1737 let assigned_versions = self
1738 .epoch_store
1739 .process_consensus_transaction_shared_object_versions(
1740 self.cache_reader.as_ref(),
1741 schedulables.iter(),
1742 randomness_schedulables.iter(),
1743 &cancelled_txns,
1744 &mut state.output,
1745 )
1746 .expect("failed to assign shared object versions");
1747
1748 let consensus_commit_prologue =
1749 self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1750
1751 let mut schedulables = schedulables;
1752 let mut assigned_versions = assigned_versions;
1753 if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1754 assert!(matches!(
1755 schedulables[0],
1756 Schedulable::ConsensusCommitPrologue(..)
1757 ));
1758 assert!(matches!(
1759 assigned_versions.0[0].0,
1760 TransactionKey::ConsensusCommitPrologue(..)
1761 ));
1762 assigned_versions.0[0].0 =
1763 TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1764 schedulables[0] = Schedulable::Transaction(consensus_commit_prologue);
1765 }
1766
1767 self.epoch_store
1768 .process_user_signatures(schedulables.iter().chain(randomness_schedulables.iter()));
1769
1770 let schedulables: Vec<Schedulable> = schedulables.into_iter().map(|s| s.into()).collect();
1772 let randomness_schedulables: Vec<Schedulable> = randomness_schedulables
1773 .into_iter()
1774 .map(|s| s.into())
1775 .collect();
1776
1777 (schedulables, randomness_schedulables, assigned_versions)
1778 }
1779
1780 #[allow(clippy::type_complexity)]
1781 fn create_pending_checkpoints_v2(
1782 &self,
1783 state: &mut CommitHandlerState,
1784 commit_info: &ConsensusCommitInfo,
1785 schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1786 randomness_schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1787 cancelled_txns: &BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1788 final_round: bool,
1789 ) -> CheckpointHeight {
1790 let protocol_config = self.epoch_store.protocol_config();
1791 assert!(protocol_config.split_checkpoints_in_consensus_handler());
1792
1793 let epoch = self.epoch_store.epoch();
1794 let accumulators_enabled = self.epoch_store.accumulators_enabled();
1795 let max_transactions_per_checkpoint =
1796 protocol_config.max_transactions_per_checkpoint() as usize;
1797
1798 let should_write_random_checkpoint = state.randomness_round.is_some()
1799 || (state.dkg_failed && !randomness_schedulables.is_empty());
1800
1801 let mut checkpoint_queue = self.checkpoint_queue.lock().unwrap();
1802
1803 let build_chunks =
1804 |schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1805 queue: &mut CheckpointQueue|
1806 -> Vec<Chunk<VerifiedExecutableTransactionWithAliases>> {
1807 schedulables
1808 .chunks(max_transactions_per_checkpoint)
1809 .map(|chunk| {
1810 let height = queue.next_height();
1811 let schedulables: Vec<_> = chunk.to_vec();
1812 let settlement = if accumulators_enabled {
1813 Some(Schedulable::AccumulatorSettlement(epoch, height))
1814 } else {
1815 None
1816 };
1817 Chunk {
1818 schedulables,
1819 settlement,
1820 height,
1821 }
1822 })
1823 .collect()
1824 };
1825
1826 let num_schedulables = schedulables.len();
1827 let chunked_schedulables = build_chunks(schedulables, &mut checkpoint_queue);
1828 if chunked_schedulables.len() > 1 {
1829 info!(
1830 "Splitting transactions into {} checkpoint chunks (num_schedulables={}, max_tx={})",
1831 chunked_schedulables.len(),
1832 num_schedulables,
1833 max_transactions_per_checkpoint
1834 );
1835 assert_reachable!("checkpoint split due to transaction limit");
1836 }
1837 let chunked_randomness_schedulables = if should_write_random_checkpoint {
1838 build_chunks(randomness_schedulables, &mut checkpoint_queue)
1839 } else {
1840 vec![]
1841 };
1842
1843 let schedulables_for_version_assignment =
1844 Chunk::all_schedulables_from(&chunked_schedulables);
1845 let randomness_schedulables_for_version_assignment =
1846 Chunk::all_schedulables_from(&chunked_randomness_schedulables);
1847
1848 let assigned_versions = self
1849 .epoch_store
1850 .process_consensus_transaction_shared_object_versions(
1851 self.cache_reader.as_ref(),
1852 schedulables_for_version_assignment,
1853 randomness_schedulables_for_version_assignment,
1854 cancelled_txns,
1855 &mut state.output,
1856 )
1857 .expect("failed to assign shared object versions");
1858
1859 let consensus_commit_prologue =
1860 self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1861
1862 let mut chunked_schedulables = chunked_schedulables;
1863 let mut assigned_versions = assigned_versions;
1864 if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1865 assert!(matches!(
1866 chunked_schedulables[0].schedulables[0],
1867 Schedulable::ConsensusCommitPrologue(..)
1868 ));
1869 assert!(matches!(
1870 assigned_versions.0[0].0,
1871 TransactionKey::ConsensusCommitPrologue(..)
1872 ));
1873 assigned_versions.0[0].0 =
1874 TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1875 chunked_schedulables[0].schedulables[0] =
1876 Schedulable::Transaction(consensus_commit_prologue);
1877 }
1878
1879 let assigned_versions = assigned_versions.into_map();
1880
1881 self.epoch_store.process_user_signatures(
1882 chunked_schedulables
1883 .iter()
1884 .flat_map(|c| c.all_schedulables())
1885 .chain(
1886 chunked_randomness_schedulables
1887 .iter()
1888 .flat_map(|c| c.all_schedulables()),
1889 ),
1890 );
1891
1892 let commit_height = chunked_randomness_schedulables
1893 .last()
1894 .or(chunked_schedulables.last())
1895 .map(|c| c.height)
1896 .expect("at least one checkpoint root must be created per commit");
1897
1898 let mut pending_checkpoints = Vec::new();
1899 for chunk in chunked_schedulables {
1900 pending_checkpoints.extend(checkpoint_queue.push_chunk(
1901 chunk.into(),
1902 &assigned_versions,
1903 commit_info.timestamp,
1904 commit_info.consensus_commit_ref,
1905 commit_info.rejected_transactions_digest,
1906 ));
1907 }
1908
1909 if protocol_config.merge_randomness_into_checkpoint() {
1910 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, final_round));
1915
1916 if should_write_random_checkpoint {
1917 for chunk in chunked_randomness_schedulables {
1918 pending_checkpoints.extend(checkpoint_queue.push_chunk(
1919 chunk.into(),
1920 &assigned_versions,
1921 commit_info.timestamp,
1922 commit_info.consensus_commit_ref,
1923 commit_info.rejected_transactions_digest,
1924 ));
1925 }
1926 if final_round {
1927 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1928 }
1929 }
1930 } else {
1931 let force = final_round || should_write_random_checkpoint;
1932 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, force));
1933
1934 if should_write_random_checkpoint {
1935 for chunk in chunked_randomness_schedulables {
1936 pending_checkpoints.extend(checkpoint_queue.push_chunk(
1937 chunk.into(),
1938 &assigned_versions,
1939 commit_info.timestamp,
1940 commit_info.consensus_commit_ref,
1941 commit_info.rejected_transactions_digest,
1942 ));
1943 }
1944 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1945 }
1946 }
1947
1948 if final_round && let Some(last) = pending_checkpoints.last_mut() {
1949 last.details.last_of_epoch = true;
1950 }
1951
1952 let queue_drained = checkpoint_queue.is_empty();
1953 drop(checkpoint_queue);
1954
1955 for pending_checkpoint in pending_checkpoints {
1956 debug!(
1957 checkpoint_height = pending_checkpoint.details.checkpoint_height,
1958 roots_count = pending_checkpoint.num_roots(),
1959 "Writing pending checkpoint",
1960 );
1961 self.epoch_store
1962 .write_pending_checkpoint_v2(&mut state.output, &pending_checkpoint)
1963 .expect("failed to write pending checkpoint");
1964 }
1965
1966 state.output.set_checkpoint_queue_drained(queue_drained);
1967
1968 commit_height
1969 }
1970
1971 fn add_consensus_commit_prologue_transaction<'a>(
1975 &'a self,
1976 state: &'a mut CommitHandlerState,
1977 commit_info: &'a ConsensusCommitInfo,
1978 assigned_versions: &AssignedTxAndVersions,
1979 ) -> Option<VerifiedExecutableTransactionWithAliases> {
1980 {
1981 if commit_info.skip_consensus_commit_prologue_in_test {
1982 return None;
1983 }
1984 }
1985
1986 let mut cancelled_txn_version_assignment = Vec::new();
1987
1988 let protocol_config = self.epoch_store.protocol_config();
1989
1990 for (txn_key, assigned_versions) in assigned_versions.0.iter() {
1991 let Some(d) = txn_key.as_digest() else {
1992 continue;
1993 };
1994
1995 if !protocol_config.include_cancelled_randomness_txns_in_prologue()
1996 && assigned_versions
1997 .shared_object_versions
1998 .iter()
1999 .any(|((id, _), _)| *id == SUI_RANDOMNESS_STATE_OBJECT_ID)
2000 {
2001 continue;
2002 }
2003
2004 if assigned_versions
2005 .shared_object_versions
2006 .iter()
2007 .any(|(_, version)| version.is_cancelled())
2008 {
2009 assert_reachable!("cancelled transactions");
2010 cancelled_txn_version_assignment
2011 .push((*d, assigned_versions.shared_object_versions.clone()));
2012 }
2013 }
2014
2015 fail_point_arg!(
2016 "additional_cancelled_txns_for_tests",
2017 |additional_cancelled_txns: Vec<(
2018 TransactionDigest,
2019 Vec<(ConsensusObjectSequenceKey, SequenceNumber)>
2020 )>| {
2021 cancelled_txn_version_assignment.extend(additional_cancelled_txns);
2022 }
2023 );
2024
2025 let transaction = commit_info.create_consensus_commit_prologue_transaction(
2026 self.epoch_store.epoch(),
2027 self.epoch_store.protocol_config(),
2028 cancelled_txn_version_assignment,
2029 commit_info,
2030 state.indirect_state_observer.take().unwrap(),
2031 );
2032 Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2033 transaction,
2034 ))
2035 }
2036
2037 fn handle_deferral_and_cancellation(
2038 &self,
2039 state: &mut CommitHandlerState,
2040 cancelled_txns: &mut BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
2041 deferred_txns: &mut BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>,
2042 scheduled_txns: &mut Vec<VerifiedExecutableTransactionWithAliases>,
2043 protocol_config: &ProtocolConfig,
2044 commit_info: &ConsensusCommitInfo,
2045 transaction: VerifiedExecutableTransactionWithAliases,
2046 shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
2047 previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
2048 execution_time_estimator: &ExecutionTimeEstimator,
2049 ) {
2050 if protocol_config.defer_unpaid_amplification() {
2054 let occurrence_count = state
2055 .occurrence_counts
2056 .get(transaction.tx().digest())
2057 .copied()
2058 .unwrap_or(0);
2059
2060 let rgp = self.epoch_store.reference_gas_price();
2061 let gas_price = transaction.tx().transaction_data().gas_price();
2062 let allowed_count = (gas_price / rgp.max(1)) + 1;
2063
2064 if occurrence_count as u64 > allowed_count {
2065 self.metrics
2066 .consensus_handler_unpaid_amplification_deferrals
2067 .inc();
2068
2069 let deferred_from_round = previously_deferred_tx_digests
2070 .get(transaction.tx().digest())
2071 .map(|k| k.deferred_from_round())
2072 .unwrap_or(commit_info.round);
2073
2074 let deferral_key = DeferralKey::new_for_consensus_round(
2075 commit_info.round + 1,
2076 deferred_from_round,
2077 );
2078
2079 if transaction_deferral_within_limit(
2080 &deferral_key,
2081 protocol_config.max_deferral_rounds_for_congestion_control(),
2082 ) {
2083 assert_reachable!("unpaid amplification deferral");
2084 debug!(
2085 "Deferring transaction {:?} due to unpaid amplification (count={}, allowed={})",
2086 transaction.tx().digest(),
2087 occurrence_count,
2088 allowed_count
2089 );
2090 deferred_txns
2091 .entry(deferral_key)
2092 .or_default()
2093 .push(transaction);
2094 return;
2095 }
2096 }
2097 }
2098
2099 let tx_cost = shared_object_congestion_tracker.get_tx_cost(
2100 execution_time_estimator,
2101 transaction.tx(),
2102 state.indirect_state_observer.as_mut().unwrap(),
2103 );
2104
2105 let deferral_info = self.epoch_store.should_defer(
2106 transaction.tx(),
2107 commit_info,
2108 state.dkg_failed,
2109 state.randomness_round.is_some(),
2110 previously_deferred_tx_digests,
2111 shared_object_congestion_tracker,
2112 );
2113
2114 if let Some((deferral_key, deferral_reason)) = deferral_info {
2115 debug!(
2116 "Deferring consensus certificate for transaction {:?} until {:?}",
2117 transaction.tx().digest(),
2118 deferral_key
2119 );
2120
2121 match deferral_reason {
2122 DeferralReason::RandomnessNotReady => {
2123 deferred_txns
2124 .entry(deferral_key)
2125 .or_default()
2126 .push(transaction);
2127 }
2128 DeferralReason::SharedObjectCongestion(congested_objects) => {
2129 self.metrics.consensus_handler_congested_transactions.inc();
2130 if transaction_deferral_within_limit(
2131 &deferral_key,
2132 protocol_config.max_deferral_rounds_for_congestion_control(),
2133 ) {
2134 deferred_txns
2135 .entry(deferral_key)
2136 .or_default()
2137 .push(transaction);
2138 } else {
2139 assert_sometimes!(
2140 transaction.tx().data().transaction_data().uses_randomness(),
2141 "cancelled randomness-using transaction"
2142 );
2143 assert_sometimes!(
2144 !transaction.tx().data().transaction_data().uses_randomness(),
2145 "cancelled non-randomness-using transaction"
2146 );
2147
2148 debug!(
2150 "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}",
2151 transaction.tx().digest(),
2152 deferral_key,
2153 congested_objects
2154 );
2155 cancelled_txns.insert(
2156 *transaction.tx().digest(),
2157 CancelConsensusCertificateReason::CongestionOnObjects(
2158 congested_objects,
2159 ),
2160 );
2161 scheduled_txns.push(transaction);
2162 }
2163 }
2164 }
2165 } else {
2166 shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, transaction.tx());
2168 scheduled_txns.push(transaction);
2169 }
2170 }
2171
2172 fn merge_and_reorder_transactions(
2173 &self,
2174 state: &mut CommitHandlerState,
2175 commit_info: &ConsensusCommitInfo,
2176 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
2177 ) -> (
2178 Vec<VerifiedExecutableTransactionWithAliases>,
2179 Vec<VerifiedExecutableTransactionWithAliases>,
2180 HashMap<TransactionDigest, DeferralKey>,
2181 ) {
2182 let protocol_config = self.epoch_store.protocol_config();
2183
2184 let (mut txns, mut randomness_txns, previously_deferred_tx_digests) =
2185 self.load_deferred_transactions(state, commit_info);
2186
2187 txns.reserve(user_transactions.len());
2188 randomness_txns.reserve(user_transactions.len());
2189
2190 let mut txns: Vec<_> = txns
2193 .into_iter()
2194 .filter_map(|tx| {
2195 if tx.tx().transaction_data().uses_randomness() {
2196 randomness_txns.push(tx);
2197 None
2198 } else {
2199 Some(tx)
2200 }
2201 })
2202 .collect();
2203
2204 for txn in user_transactions {
2205 if txn.tx().transaction_data().uses_randomness() {
2206 randomness_txns.push(txn);
2207 } else {
2208 txns.push(txn);
2209 }
2210 }
2211
2212 PostConsensusTxReorder::reorder(
2213 &mut txns,
2214 protocol_config.consensus_transaction_ordering(),
2215 );
2216 PostConsensusTxReorder::reorder(
2217 &mut randomness_txns,
2218 protocol_config.consensus_transaction_ordering(),
2219 );
2220
2221 (txns, randomness_txns, previously_deferred_tx_digests)
2222 }
2223
2224 fn load_deferred_transactions(
2225 &self,
2226 state: &mut CommitHandlerState,
2227 commit_info: &ConsensusCommitInfo,
2228 ) -> (
2229 Vec<VerifiedExecutableTransactionWithAliases>,
2230 Vec<VerifiedExecutableTransactionWithAliases>,
2231 HashMap<TransactionDigest, DeferralKey>,
2232 ) {
2233 let mut previously_deferred_tx_digests = HashMap::new();
2234
2235 let deferred_txs: Vec<_> = self
2236 .epoch_store
2237 .load_deferred_transactions_for_up_to_consensus_round_v2(
2238 &mut state.output,
2239 commit_info.round,
2240 )
2241 .expect("db error")
2242 .into_iter()
2243 .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2244 .map(|(key, tx)| {
2245 previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2246 tx
2247 })
2248 .collect();
2249 trace!(
2250 "loading deferred transactions: {:?}",
2251 deferred_txs.iter().map(|tx| tx.tx().digest())
2252 );
2253
2254 let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() {
2255 let txns: Vec<_> = self
2256 .epoch_store
2257 .load_deferred_transactions_for_randomness_v2(&mut state.output)
2258 .expect("db error")
2259 .into_iter()
2260 .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2261 .map(|(key, tx)| {
2262 previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2263 tx
2264 })
2265 .collect();
2266 trace!(
2267 "loading deferred randomness transactions: {:?}",
2268 txns.iter().map(|tx| tx.tx().digest())
2269 );
2270 txns
2271 } else {
2272 vec![]
2273 };
2274
2275 (
2276 deferred_txs,
2277 deferred_randomness_txs,
2278 previously_deferred_tx_digests,
2279 )
2280 }
2281
2282 fn init_congestion_tracker(
2283 &self,
2284 commit_info: &ConsensusCommitInfo,
2285 for_randomness: bool,
2286 txns: &[VerifiedExecutableTransactionWithAliases],
2287 ) -> SharedObjectCongestionTracker {
2288 #[allow(unused_mut)]
2289 let mut ret = SharedObjectCongestionTracker::from_protocol_config(
2290 self.epoch_store
2291 .consensus_quarantine
2292 .read()
2293 .load_initial_object_debts(
2294 &self.epoch_store,
2295 commit_info.round,
2296 for_randomness,
2297 txns,
2298 )
2299 .expect("db error"),
2300 self.epoch_store.protocol_config(),
2301 for_randomness,
2302 self.congestion_logger.is_some(),
2303 );
2304
2305 fail_point_arg!(
2306 "initial_congestion_tracker",
2307 |tracker: SharedObjectCongestionTracker| {
2308 info!(
2309 "Initialize shared_object_congestion_tracker to {:?}",
2310 tracker
2311 );
2312 ret = tracker;
2313 }
2314 );
2315
2316 ret
2317 }
2318
2319 fn process_gasless_transactions(
2320 &self,
2321 commit_info: &ConsensusCommitInfo,
2322 user_transactions: &[VerifiedExecutableTransactionWithAliases],
2323 ) {
2324 let gasless_count = user_transactions
2325 .iter()
2326 .filter(|txn| txn.tx().transaction_data().is_gasless_transaction())
2327 .count() as u64;
2328 self.consensus_gasless_counter
2329 .record_commit(commit_info.timestamp, gasless_count);
2330 }
2331
2332 fn process_jwks(
2333 &self,
2334 state: &mut CommitHandlerState,
2335 commit_info: &ConsensusCommitInfo,
2336 new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
2337 ) {
2338 for (authority_name, jwk_id, jwk) in new_jwks {
2339 self.epoch_store.record_jwk_vote(
2340 &mut state.output,
2341 commit_info.round,
2342 authority_name,
2343 &jwk_id,
2344 &jwk,
2345 );
2346 }
2347 }
2348
2349 fn process_capability_notifications(
2350 &self,
2351 capability_notifications: Vec<AuthorityCapabilitiesV2>,
2352 ) {
2353 for capabilities in capability_notifications {
2354 self.epoch_store
2355 .record_capabilities_v2(&capabilities)
2356 .expect("db error");
2357 }
2358 }
2359
2360 fn process_execution_time_observations(
2361 &self,
2362 state: &mut CommitHandlerState,
2363 execution_time_observations: Vec<ExecutionTimeObservation>,
2364 ) {
2365 let mut execution_time_estimator = self
2366 .epoch_store
2367 .execution_time_estimator
2368 .try_lock()
2369 .expect("should only ever be called from the commit handler thread");
2370
2371 for ExecutionTimeObservation {
2372 authority,
2373 generation,
2374 estimates,
2375 } in execution_time_observations
2376 {
2377 let authority_index = self
2378 .epoch_store
2379 .committee()
2380 .authority_index(&authority)
2381 .unwrap();
2382 execution_time_estimator.process_observations_from_consensus(
2383 authority_index,
2384 Some(generation),
2385 &estimates,
2386 );
2387 state
2388 .output
2389 .insert_execution_time_observation(authority_index, generation, estimates);
2390 }
2391 }
2392
2393 fn process_checkpoint_signature_messages(
2394 &self,
2395 checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
2396 ) {
2397 for checkpoint_signature_message in checkpoint_signature_messages {
2398 self.checkpoint_service
2399 .notify_checkpoint_signature(&checkpoint_signature_message)
2400 .expect("db error");
2401 }
2402 }
2403
2404 async fn process_dkg_updates(
2405 &self,
2406 state: &mut CommitHandlerState,
2407 commit_info: &ConsensusCommitInfo,
2408 randomness_manager: Option<&mut RandomnessManager>,
2409 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2410 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2411 ) {
2412 if !self.epoch_store.randomness_state_enabled() {
2413 let num_dkg_messages = randomness_dkg_messages.len();
2414 let num_dkg_confirmations = randomness_dkg_confirmations.len();
2415 if num_dkg_messages + num_dkg_confirmations > 0 {
2416 debug_fatal!(
2417 "received {} RandomnessDkgMessage and {} RandomnessDkgConfirmation messages when randomness is not enabled",
2418 num_dkg_messages,
2419 num_dkg_confirmations
2420 );
2421 }
2422 return;
2423 }
2424
2425 let randomness_manager =
2426 randomness_manager.expect("randomness manager should exist if randomness is enabled");
2427
2428 let randomness_dkg_updates =
2429 self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages);
2430
2431 let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations(
2432 state,
2433 randomness_manager,
2434 randomness_dkg_confirmations,
2435 );
2436
2437 if randomness_dkg_updates || randomness_dkg_confirmation_updates {
2438 randomness_manager
2439 .advance_dkg(&mut state.output, commit_info.round)
2440 .await
2441 .expect("epoch ended");
2442 }
2443 }
2444
2445 fn process_randomness_dkg_messages(
2446 &self,
2447 randomness_manager: &mut RandomnessManager,
2448 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2449 ) -> bool {
2450 if randomness_dkg_messages.is_empty() {
2451 return false;
2452 }
2453
2454 let mut randomness_state_updated = false;
2455 for (authority, bytes) in randomness_dkg_messages {
2456 match bcs::from_bytes(&bytes) {
2457 Ok(message) => {
2458 randomness_manager
2459 .add_message(&authority, message)
2460 .expect("epoch ended");
2462 randomness_state_updated = true;
2463 }
2464
2465 Err(e) => {
2466 warn!(
2467 "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
2468 authority.concise(),
2469 );
2470 }
2471 }
2472 }
2473
2474 randomness_state_updated
2475 }
2476
2477 fn process_randomness_dkg_confirmations(
2478 &self,
2479 state: &mut CommitHandlerState,
2480 randomness_manager: &mut RandomnessManager,
2481 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2482 ) -> bool {
2483 if randomness_dkg_confirmations.is_empty() {
2484 return false;
2485 }
2486
2487 let mut randomness_state_updated = false;
2488 for (authority, bytes) in randomness_dkg_confirmations {
2489 match bcs::from_bytes(&bytes) {
2490 Ok(message) => {
2491 randomness_manager
2492 .add_confirmation(&mut state.output, &authority, message)
2493 .expect("epoch ended");
2495 randomness_state_updated = true;
2496 }
2497 Err(e) => {
2498 warn!(
2499 "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}",
2500 authority.concise(),
2501 );
2502 }
2503 }
2504 }
2505
2506 randomness_state_updated
2507 }
2508
2509 fn process_end_of_publish_transactions(
2511 &self,
2512 state: &mut CommitHandlerState,
2513 end_of_publish_transactions: Vec<AuthorityName>,
2514 ) -> bool {
2515 let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect(
2516 "No contention on end_of_publish as it is only accessed from consensus handler",
2517 );
2518
2519 if eop_aggregator.has_quorum() {
2520 return true;
2521 }
2522
2523 if end_of_publish_transactions.is_empty() {
2524 return false;
2525 }
2526
2527 for authority in end_of_publish_transactions {
2528 info!("Received EndOfPublish from {:?}", authority.concise());
2529
2530 state.output.insert_end_of_publish(authority);
2533 if eop_aggregator
2534 .insert_generic(authority, ())
2535 .is_quorum_reached()
2536 {
2537 debug!(
2538 "Collected enough end_of_publish messages with last message from validator {:?}",
2539 authority.concise(),
2540 );
2541 return true;
2542 }
2543 }
2544
2545 false
2546 }
2547
2548 fn advance_eop_state_machine(
2551 &self,
2552 state: &mut CommitHandlerState,
2553 ) -> (
2554 RwLockWriteGuard<'_, ReconfigState>,
2555 bool, ) {
2557 let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard();
2558 let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx();
2559
2560 reconfig_state.close_all_certs();
2561
2562 let commit_has_deferred_txns = state.output.has_deferred_transactions();
2563 let previous_commits_have_deferred_txns = !self.epoch_store.deferred_transactions_empty();
2564
2565 if !commit_has_deferred_txns && !previous_commits_have_deferred_txns {
2566 if !start_state_is_reject_all_tx {
2567 info!("Transitioning to RejectAllTx");
2568 }
2569 reconfig_state.close_all_tx();
2570 } else {
2571 debug!(
2572 "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}",
2573 previous_commits_have_deferred_txns, commit_has_deferred_txns,
2574 );
2575 }
2576
2577 state.output.store_reconfig_state(reconfig_state.clone());
2578
2579 if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() {
2580 (reconfig_state, true)
2581 } else {
2582 (reconfig_state, false)
2583 }
2584 }
2585
2586 fn gather_commit_metadata(
2587 &self,
2588 consensus_commit: &impl ConsensusCommitAPI,
2589 ) -> (u64, AuthorityIndex, u64) {
2590 let timestamp = consensus_commit.commit_timestamp_ms();
2591 let leader_author = consensus_commit.leader_author_index();
2592 let commit_sub_dag_index = consensus_commit.commit_sub_dag_index();
2593
2594 let system_time_ms = SystemTime::now()
2595 .duration_since(UNIX_EPOCH)
2596 .unwrap()
2597 .as_millis() as i64;
2598
2599 let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64);
2600 let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0;
2601 self.metrics
2602 .consensus_timestamp_bias
2603 .observe(consensus_timestamp_bias_seconds);
2604
2605 let epoch_start = self
2606 .epoch_store
2607 .epoch_start_config()
2608 .epoch_start_timestamp_ms();
2609 let timestamp = if timestamp < epoch_start {
2610 error!(
2611 "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}"
2612 );
2613 epoch_start
2614 } else {
2615 timestamp
2616 };
2617
2618 (timestamp, leader_author, commit_sub_dag_index)
2619 }
2620
2621 fn create_authenticator_state_update(
2622 &self,
2623 last_committed_round: u64,
2624 commit_info: &ConsensusCommitInfo,
2625 ) -> Option<VerifiedExecutableTransactionWithAliases> {
2626 let new_jwks = self
2634 .epoch_store
2635 .get_new_jwks(last_committed_round)
2636 .expect("Unrecoverable error in consensus handler");
2637
2638 if !new_jwks.is_empty() {
2639 let authenticator_state_update_transaction = authenticator_state_update_transaction(
2640 &self.epoch_store,
2641 commit_info.round,
2642 new_jwks,
2643 );
2644 debug!(
2645 "adding AuthenticatorStateUpdate({:?}) tx: {:?}",
2646 authenticator_state_update_transaction.digest(),
2647 authenticator_state_update_transaction,
2648 );
2649
2650 Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2651 authenticator_state_update_transaction,
2652 ))
2653 } else {
2654 None
2655 }
2656 }
2657
2658 #[instrument(level = "trace", skip_all)]
2661 fn filter_consensus_txns(
2662 &mut self,
2663 initial_reconfig_state: ReconfigState,
2664 commit_info: &ConsensusCommitInfo,
2665 consensus_commit: &impl ConsensusCommitAPI,
2666 ) -> FilteredConsensusOutput {
2667 let mut transactions = Vec::new();
2668 let mut owned_object_locks = HashMap::new();
2669 let epoch = self.epoch_store.epoch();
2670 let mut num_finalized_user_transactions = vec![0; self.committee.size()];
2671 let mut num_rejected_user_transactions = vec![0; self.committee.size()];
2672 for (block, parsed_transactions) in consensus_commit.transactions() {
2673 let author = block.author.value();
2674 self.last_consensus_stats.stats.inc_num_messages(author);
2676
2677 self.epoch_store.set_consensus_tx_status(
2679 ConsensusPosition::ping(epoch, block),
2680 ConsensusTxStatus::Finalized,
2681 );
2682
2683 for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() {
2684 let position = ConsensusPosition {
2685 epoch,
2686 block,
2687 index: tx_index as TransactionIndex,
2688 };
2689
2690 if let Some(tx) = parsed.transaction.kind.as_user_transaction() {
2693 let digest = tx.digest();
2694 if let Some((spam_weight, submitter_client_addrs)) = self
2695 .epoch_store
2696 .submitted_transaction_cache
2697 .increment_submission_count(digest)
2698 {
2699 if let Some(ref traffic_controller) = self.traffic_controller {
2700 debug!(
2701 "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses",
2702 submitter_client_addrs.len()
2703 );
2704
2705 for addr in submitter_client_addrs {
2707 traffic_controller.tally(TrafficTally::new(
2708 Some(addr),
2709 None,
2710 None,
2711 spam_weight.clone(),
2712 ));
2713 }
2714 } else {
2715 warn!(
2716 "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)",
2717 submitter_client_addrs.len()
2718 );
2719 }
2720 }
2721 }
2722
2723 if parsed.rejected {
2724 if parsed.transaction.is_user_transaction() {
2726 self.epoch_store
2727 .set_consensus_tx_status(position, ConsensusTxStatus::Rejected);
2728 num_rejected_user_transactions[author] += 1;
2729 }
2730 continue;
2732 }
2733
2734 let kind = classify(&parsed.transaction);
2735 self.metrics
2736 .consensus_handler_processed
2737 .with_label_values(&[kind])
2738 .inc();
2739 self.metrics
2740 .consensus_handler_transaction_sizes
2741 .with_label_values(&[kind])
2742 .observe(parsed.serialized_len as f64);
2743 if parsed.transaction.is_user_transaction() {
2744 self.last_consensus_stats
2745 .stats
2746 .inc_num_user_transactions(author);
2747 }
2748
2749 if !initial_reconfig_state.should_accept_consensus_certs() {
2750 match &parsed.transaction.kind {
2753 ConsensusTransactionKind::UserTransactionV2(_)
2754 | ConsensusTransactionKind::UserTransaction(_)
2756 | ConsensusTransactionKind::CertifiedTransaction(_)
2757 | ConsensusTransactionKind::CapabilityNotification(_)
2758 | ConsensusTransactionKind::CapabilityNotificationV2(_)
2759 | ConsensusTransactionKind::EndOfPublish(_)
2760 | ConsensusTransactionKind::ExecutionTimeObservation(_)
2762 | ConsensusTransactionKind::NewJWKFetched(_, _, _) => {
2763 debug!(
2764 "Ignoring consensus transaction {:?} because of end of epoch",
2765 parsed.transaction.key()
2766 );
2767 continue;
2768 }
2769
2770 ConsensusTransactionKind::CheckpointSignature(_)
2772 | ConsensusTransactionKind::CheckpointSignatureV2(_)
2773 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2774 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
2775 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => ()
2776 }
2777 }
2778
2779 if !initial_reconfig_state.should_accept_tx() {
2780 match &parsed.transaction.kind {
2781 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
2782 | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue,
2783 _ => {}
2784 }
2785 }
2786
2787 match &parsed.transaction.kind {
2789 ConsensusTransactionKind::CapabilityNotification(_)
2790 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2791 | ConsensusTransactionKind::CheckpointSignature(_) => {
2792 debug_fatal!(
2793 "BUG: saw deprecated tx {:?}for commit round {}",
2794 parsed.transaction.key(),
2795 commit_info.round
2796 );
2797 continue;
2798 }
2799 _ => {}
2800 }
2801
2802 if parsed.transaction.is_user_transaction() {
2803 let author_name = self
2804 .epoch_store
2805 .committee()
2806 .authority_by_index(author as u32)
2807 .unwrap();
2808 if self
2809 .epoch_store
2810 .has_received_end_of_publish_from(author_name)
2811 {
2812 warn!(
2816 "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus",
2817 author_name.concise(),
2818 parsed.transaction.key(),
2819 );
2820 continue;
2821 }
2822 }
2823
2824 if let ConsensusTransactionKind::UserTransactionV2(tx_with_claims) =
2830 &parsed.transaction.kind
2831 {
2832 let immutable_object_ids: HashSet<ObjectID> =
2833 tx_with_claims.get_immutable_objects().into_iter().collect();
2834 let tx = tx_with_claims.tx();
2835
2836 let Ok(input_objects) = tx.transaction_data().input_objects() else {
2837 debug_fatal!("Invalid input objects for transaction {}", tx.digest());
2838 continue;
2839 };
2840
2841 let owned_object_refs: Vec<_> = input_objects
2844 .iter()
2845 .filter_map(|obj| match obj {
2846 InputObjectKind::ImmOrOwnedMoveObject(obj_ref)
2847 if !immutable_object_ids.contains(&obj_ref.0) =>
2848 {
2849 Some(*obj_ref)
2850 }
2851 _ => None,
2852 })
2853 .collect();
2854
2855 match self
2856 .epoch_store
2857 .try_acquire_owned_object_locks_post_consensus(
2858 &owned_object_refs,
2859 *tx.digest(),
2860 &owned_object_locks,
2861 ) {
2862 Ok(new_locks) => {
2863 owned_object_locks.extend(new_locks.into_iter());
2864 self.epoch_store
2866 .set_consensus_tx_status(position, ConsensusTxStatus::Finalized);
2867 num_finalized_user_transactions[author] += 1;
2868 }
2869 Err(e) => {
2870 debug!("Dropping transaction {}: {}", tx.digest(), e);
2871 self.epoch_store
2872 .set_consensus_tx_status(position, ConsensusTxStatus::Dropped);
2873 self.epoch_store.set_rejection_vote_reason(position, &e);
2874 continue;
2875 }
2876 }
2877 }
2878
2879 let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);
2880 transactions.push((transaction, author as u32));
2881 }
2882 }
2883
2884 for (i, authority) in self.committee.authorities() {
2885 let hostname = &authority.hostname;
2886 self.metrics
2887 .consensus_committed_messages
2888 .with_label_values(&[hostname])
2889 .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
2890 self.metrics
2891 .consensus_committed_user_transactions
2892 .with_label_values(&[hostname])
2893 .set(
2894 self.last_consensus_stats
2895 .stats
2896 .get_num_user_transactions(i.value()) as i64,
2897 );
2898 self.metrics
2899 .consensus_finalized_user_transactions
2900 .with_label_values(&[hostname])
2901 .add(num_finalized_user_transactions[i.value()] as i64);
2902 self.metrics
2903 .consensus_rejected_user_transactions
2904 .with_label_values(&[hostname])
2905 .add(num_rejected_user_transactions[i.value()] as i64);
2906 }
2907
2908 FilteredConsensusOutput {
2909 transactions,
2910 owned_object_locks,
2911 }
2912 }
2913
2914 fn deduplicate_consensus_txns(
2915 &mut self,
2916 state: &mut CommitHandlerState,
2917 commit_info: &ConsensusCommitInfo,
2918 transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
2919 ) -> Vec<VerifiedSequencedConsensusTransaction> {
2920 let mut all_transactions = Vec::new();
2921
2922 let mut occurrence_counts: HashMap<SequencedConsensusTransactionKey, u32> = HashMap::new();
2925 let mut first_commit_keys: HashSet<SequencedConsensusTransactionKey> = HashSet::new();
2927
2928 for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
2929 let current_tx_index = ExecutionIndices {
2933 last_committed_round: commit_info.round,
2934 sub_dag_index: commit_info.consensus_commit_ref.index.into(),
2935 transaction_index: (seq + 1) as u64,
2936 };
2937
2938 self.last_consensus_stats.index = current_tx_index;
2939
2940 let certificate_author = *self
2941 .epoch_store
2942 .committee()
2943 .authority_by_index(cert_origin)
2944 .unwrap();
2945
2946 let sequenced_transaction = SequencedConsensusTransaction {
2947 certificate_author_index: cert_origin,
2948 certificate_author,
2949 consensus_index: current_tx_index,
2950 transaction,
2951 };
2952
2953 let Some(verified_transaction) = self
2954 .epoch_store
2955 .verify_consensus_transaction(sequenced_transaction)
2956 else {
2957 continue;
2958 };
2959
2960 let key = verified_transaction.0.key();
2961
2962 if let Some(tx_digest) = key.user_transaction_digest() {
2963 self.epoch_store
2964 .cache_recently_finalized_transaction(tx_digest);
2965 }
2966
2967 let count = occurrence_counts.entry(key.clone()).or_insert(0);
2970 *count += 1;
2971 let in_commit = *count > 1;
2972
2973 let in_cache = self.processed_cache.put(key.clone(), ()).is_some();
2974 if in_commit || in_cache {
2975 self.metrics.skipped_consensus_txns_cache_hit.inc();
2976 continue;
2977 }
2978 if self
2979 .epoch_store
2980 .is_consensus_message_processed(&key)
2981 .expect("db error")
2982 {
2983 self.metrics.skipped_consensus_txns.inc();
2984 continue;
2985 }
2986
2987 first_commit_keys.insert(key.clone());
2988
2989 state.output.record_consensus_message_processed(key);
2990
2991 all_transactions.push(verified_transaction);
2992 }
2993
2994 for key in first_commit_keys {
2995 if let Some(&count) = occurrence_counts.get(&key)
2996 && count > 1
2997 {
2998 self.metrics
2999 .consensus_handler_duplicate_tx_count
3000 .observe(count as f64);
3001 }
3002 }
3003
3004 assert!(
3006 state.occurrence_counts.is_empty(),
3007 "occurrence_counts should be empty before populating"
3008 );
3009 state.occurrence_counts.reserve(occurrence_counts.len());
3010 state.occurrence_counts.extend(
3011 occurrence_counts
3012 .into_iter()
3013 .filter_map(|(key, count)| key.user_transaction_digest().map(|d| (d, count))),
3014 );
3015
3016 all_transactions
3017 }
3018
3019 fn build_commit_handler_input(
3020 &self,
3021 transactions: Vec<VerifiedSequencedConsensusTransaction>,
3022 ) -> CommitHandlerInput {
3023 let epoch = self.epoch_store.epoch();
3024 let mut commit_handler_input = CommitHandlerInput::default();
3025
3026 for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() {
3027 match transaction.transaction {
3028 SequencedConsensusTransactionKind::External(consensus_transaction) => {
3029 match consensus_transaction.kind {
3030 ConsensusTransactionKind::UserTransactionV2(tx) => {
3032 let used_alias_versions = if self
3034 .epoch_store
3035 .protocol_config()
3036 .fix_checkpoint_signature_mapping()
3037 {
3038 tx.aliases()
3039 } else {
3040 tx.aliases_v1().map(|a| {
3044 NonEmpty::from_vec(
3045 a.into_iter()
3046 .enumerate()
3047 .map(|(idx, (_, seq))| (idx as u8, seq))
3048 .collect(),
3049 )
3050 .unwrap()
3051 })
3052 };
3053 let inner_tx = tx.into_tx();
3054 let tx = VerifiedTransaction::new_unchecked(inner_tx);
3056 let transaction =
3058 VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
3059 if let Some(used_alias_versions) = used_alias_versions {
3060 commit_handler_input
3061 .user_transactions
3062 .push(WithAliases::new(transaction, used_alias_versions));
3063 } else {
3064 commit_handler_input.user_transactions.push(
3065 VerifiedExecutableTransactionWithAliases::no_aliases(
3066 transaction,
3067 ),
3068 );
3069 }
3070 }
3071
3072 ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => {
3074 commit_handler_input
3075 .end_of_publish_transactions
3076 .push(authority_public_key_bytes);
3077 }
3078 ConsensusTransactionKind::NewJWKFetched(
3079 authority_public_key_bytes,
3080 jwk_id,
3081 jwk,
3082 ) => {
3083 commit_handler_input.new_jwks.push((
3084 authority_public_key_bytes,
3085 jwk_id,
3086 jwk,
3087 ));
3088 }
3089 ConsensusTransactionKind::RandomnessDkgMessage(
3090 authority_public_key_bytes,
3091 items,
3092 ) => {
3093 commit_handler_input
3094 .randomness_dkg_messages
3095 .push((authority_public_key_bytes, items));
3096 }
3097 ConsensusTransactionKind::RandomnessDkgConfirmation(
3098 authority_public_key_bytes,
3099 items,
3100 ) => {
3101 commit_handler_input
3102 .randomness_dkg_confirmations
3103 .push((authority_public_key_bytes, items));
3104 }
3105 ConsensusTransactionKind::CapabilityNotificationV2(
3106 authority_capabilities_v2,
3107 ) => {
3108 commit_handler_input
3109 .capability_notifications
3110 .push(authority_capabilities_v2);
3111 }
3112 ConsensusTransactionKind::ExecutionTimeObservation(
3113 execution_time_observation,
3114 ) => {
3115 commit_handler_input
3116 .execution_time_observations
3117 .push(execution_time_observation);
3118 }
3119 ConsensusTransactionKind::CheckpointSignatureV2(
3120 checkpoint_signature_message,
3121 ) => {
3122 commit_handler_input
3123 .checkpoint_signature_messages
3124 .push(*checkpoint_signature_message);
3125 }
3126
3127 ConsensusTransactionKind::CheckpointSignature(_)
3130 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
3131 | ConsensusTransactionKind::CapabilityNotification(_)
3132 | ConsensusTransactionKind::CertifiedTransaction(_)
3133 | ConsensusTransactionKind::UserTransaction(_) => {
3134 unreachable!("filtered earlier")
3135 }
3136 }
3137 }
3138 SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(),
3140 }
3141 }
3142
3143 commit_handler_input
3144 }
3145
3146 async fn send_end_of_publish_if_needed(&self) {
3147 if !self.epoch_store.should_send_end_of_publish() {
3148 return;
3149 }
3150
3151 let end_of_publish = ConsensusTransaction::new_end_of_publish(self.epoch_store.name);
3152 if let Err(err) =
3153 self.consensus_adapter
3154 .submit(end_of_publish, None, &self.epoch_store, None, None)
3155 {
3156 warn!(
3157 "Error when sending EndOfPublish message from ConsensusHandler: {:?}",
3158 err
3159 );
3160 } else {
3161 info!(epoch=?self.epoch_store.epoch(), "Sending EndOfPublish message to consensus");
3162 }
3163 }
3164}
3165
3166pub(crate) type SchedulerMessage = (
3169 Vec<(Schedulable, AssignedVersions)>,
3170 Option<SettlementBatchInfo>,
3171);
3172
3173#[derive(Clone)]
3174pub(crate) struct ExecutionSchedulerSender {
3175 sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3176}
3177
3178impl ExecutionSchedulerSender {
3179 fn start(
3180 settlement_scheduler: SettlementScheduler,
3181 epoch_store: Arc<AuthorityPerEpochStore>,
3182 ) -> Self {
3183 let (sender, recv) = monitored_mpsc::unbounded_channel("execution_scheduler_sender");
3184 spawn_monitored_task!(Self::run(recv, settlement_scheduler, epoch_store));
3185 Self { sender }
3186 }
3187
3188 pub(crate) fn new_for_testing(
3189 sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3190 ) -> Self {
3191 Self { sender }
3192 }
3193
3194 fn send(
3195 &self,
3196 transactions: Vec<(Schedulable, AssignedVersions)>,
3197 settlement: Option<SettlementBatchInfo>,
3198 ) {
3199 let _ = self.sender.send((transactions, settlement));
3200 }
3201
3202 async fn run(
3203 mut recv: monitored_mpsc::UnboundedReceiver<SchedulerMessage>,
3204 settlement_scheduler: SettlementScheduler,
3205 epoch_store: Arc<AuthorityPerEpochStore>,
3206 ) {
3207 while let Some((transactions, settlement)) = recv.recv().await {
3208 let _guard = monitored_scope("ConsensusHandler::enqueue");
3209 let txns = transactions
3210 .into_iter()
3211 .map(|(txn, versions)| (txn, ExecutionEnv::new().with_assigned_versions(versions)))
3212 .collect();
3213 if let Some(settlement) = settlement {
3214 settlement_scheduler.enqueue_v2(txns, settlement, &epoch_store);
3215 } else {
3216 settlement_scheduler.enqueue(txns, &epoch_store);
3217 }
3218 }
3219 }
3220}
3221
3222pub(crate) struct MysticetiConsensusHandler {
3224 tasks: JoinSet<()>,
3225}
3226
3227impl MysticetiConsensusHandler {
3228 pub(crate) fn new(
3229 last_processed_commit_at_startup: CommitIndex,
3230 mut consensus_handler: ConsensusHandler<CheckpointService>,
3231 mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
3232 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
3233 ) -> Self {
3234 debug!(
3235 last_processed_commit_at_startup,
3236 "Starting consensus replay"
3237 );
3238 let mut tasks = JoinSet::new();
3239 tasks.spawn(monitored_future!(async move {
3240 while let Some(consensus_commit) = commit_receiver.recv().await {
3242 let commit_index = consensus_commit.commit_ref.index;
3243 if commit_index <= last_processed_commit_at_startup {
3244 consensus_handler.handle_prior_consensus_commit(consensus_commit);
3245 } else {
3246 consensus_handler
3247 .handle_consensus_commit(consensus_commit)
3248 .await;
3249 }
3250 commit_consumer_monitor.set_highest_handled_commit(commit_index);
3251 }
3252 }));
3253 Self { tasks }
3254 }
3255
3256 pub(crate) async fn abort(&mut self) {
3257 self.tasks.shutdown().await;
3258 }
3259}
3260
3261fn authenticator_state_update_transaction(
3262 epoch_store: &AuthorityPerEpochStore,
3263 round: u64,
3264 mut new_active_jwks: Vec<ActiveJwk>,
3265) -> VerifiedExecutableTransaction {
3266 let epoch = epoch_store.epoch();
3267 new_active_jwks.sort();
3268
3269 info!("creating authenticator state update transaction");
3270 assert!(epoch_store.authenticator_state_enabled());
3271 let transaction = VerifiedTransaction::new_authenticator_state_update(
3272 epoch,
3273 round,
3274 new_active_jwks,
3275 epoch_store
3276 .epoch_start_config()
3277 .authenticator_obj_initial_shared_version()
3278 .expect("authenticator state obj must exist"),
3279 );
3280 VerifiedExecutableTransaction::new_system(transaction, epoch)
3281}
3282
3283pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
3284 match &transaction.kind {
3285 ConsensusTransactionKind::CertifiedTransaction(_) => "_deprecated_certificate",
3287 ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
3288 ConsensusTransactionKind::CheckpointSignatureV2(_) => "checkpoint_signature",
3289 ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
3290 ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
3291 ConsensusTransactionKind::CapabilityNotificationV2(_) => "capability_notification_v2",
3292 ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
3293 ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
3294 ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
3295 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
3296 ConsensusTransactionKind::UserTransaction(_) => "_deprecated_user_transaction",
3297 ConsensusTransactionKind::UserTransactionV2(tx) => {
3298 if tx.tx().is_consensus_tx() {
3299 "shared_user_transaction_v2"
3300 } else {
3301 "owned_user_transaction_v2"
3302 }
3303 }
3304 ConsensusTransactionKind::ExecutionTimeObservation(_) => "execution_time_observation",
3305 }
3306}
3307
3308#[derive(Debug, Clone, Serialize, Deserialize)]
3309pub struct SequencedConsensusTransaction {
3310 pub certificate_author_index: AuthorityIndex,
3311 pub certificate_author: AuthorityName,
3312 pub consensus_index: ExecutionIndices,
3313 pub transaction: SequencedConsensusTransactionKind,
3314}
3315
3316#[derive(Debug, Clone)]
3317#[allow(clippy::large_enum_variant)]
3318pub enum SequencedConsensusTransactionKind {
3319 External(ConsensusTransaction),
3320 System(VerifiedExecutableTransaction),
3321}
3322
3323impl Serialize for SequencedConsensusTransactionKind {
3324 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
3325 let serializable = SerializableSequencedConsensusTransactionKind::from(self);
3326 serializable.serialize(serializer)
3327 }
3328}
3329
3330impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
3331 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
3332 let serializable =
3333 SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
3334 Ok(serializable.into())
3335 }
3336}
3337
3338#[derive(Debug, Clone, Serialize, Deserialize)]
3342#[allow(clippy::large_enum_variant)]
3343enum SerializableSequencedConsensusTransactionKind {
3344 External(ConsensusTransaction),
3345 System(TrustedExecutableTransaction),
3346}
3347
3348impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
3349 fn from(kind: &SequencedConsensusTransactionKind) -> Self {
3350 match kind {
3351 SequencedConsensusTransactionKind::External(ext) => {
3352 SerializableSequencedConsensusTransactionKind::External(ext.clone())
3353 }
3354 SequencedConsensusTransactionKind::System(txn) => {
3355 SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
3356 }
3357 }
3358 }
3359}
3360
3361impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
3362 fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
3363 match kind {
3364 SerializableSequencedConsensusTransactionKind::External(ext) => {
3365 SequencedConsensusTransactionKind::External(ext)
3366 }
3367 SerializableSequencedConsensusTransactionKind::System(txn) => {
3368 SequencedConsensusTransactionKind::System(txn.into())
3369 }
3370 }
3371 }
3372}
3373
3374#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
3375pub enum SequencedConsensusTransactionKey {
3376 External(ConsensusTransactionKey),
3377 System(TransactionDigest),
3378}
3379
3380impl SequencedConsensusTransactionKey {
3381 pub fn user_transaction_digest(&self) -> Option<TransactionDigest> {
3382 match self {
3383 SequencedConsensusTransactionKey::External(key) => match key {
3384 ConsensusTransactionKey::Certificate(digest) => Some(*digest),
3385 _ => None,
3386 },
3387 SequencedConsensusTransactionKey::System(_) => None,
3388 }
3389 }
3390}
3391
3392impl SequencedConsensusTransactionKind {
3393 pub fn key(&self) -> SequencedConsensusTransactionKey {
3394 match self {
3395 SequencedConsensusTransactionKind::External(ext) => {
3396 SequencedConsensusTransactionKey::External(ext.key())
3397 }
3398 SequencedConsensusTransactionKind::System(txn) => {
3399 SequencedConsensusTransactionKey::System(*txn.digest())
3400 }
3401 }
3402 }
3403
3404 pub fn get_tracking_id(&self) -> u64 {
3405 match self {
3406 SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
3407 SequencedConsensusTransactionKind::System(_txn) => 0,
3408 }
3409 }
3410
3411 pub fn is_executable_transaction(&self) -> bool {
3412 match self {
3413 SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(),
3414 SequencedConsensusTransactionKind::System(_) => true,
3415 }
3416 }
3417
3418 pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
3419 match self {
3420 SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
3421 ConsensusTransactionKind::UserTransactionV2(txn) => Some(*txn.tx().digest()),
3422 _ => None,
3423 },
3424 SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
3425 }
3426 }
3427
3428 pub fn is_end_of_publish(&self) -> bool {
3429 match self {
3430 SequencedConsensusTransactionKind::External(ext) => {
3431 matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
3432 }
3433 SequencedConsensusTransactionKind::System(_) => false,
3434 }
3435 }
3436}
3437
3438impl SequencedConsensusTransaction {
3439 pub fn sender_authority(&self) -> AuthorityName {
3440 self.certificate_author
3441 }
3442
3443 pub fn key(&self) -> SequencedConsensusTransactionKey {
3444 self.transaction.key()
3445 }
3446
3447 pub fn is_end_of_publish(&self) -> bool {
3448 if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
3449 matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
3450 } else {
3451 false
3452 }
3453 }
3454
3455 pub fn try_take_execution_time_observation(&mut self) -> Option<ExecutionTimeObservation> {
3456 if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
3457 kind: ConsensusTransactionKind::ExecutionTimeObservation(observation),
3458 ..
3459 }) = &mut self.transaction
3460 {
3461 Some(std::mem::take(observation))
3462 } else {
3463 None
3464 }
3465 }
3466
3467 pub fn is_system(&self) -> bool {
3468 matches!(
3469 self.transaction,
3470 SequencedConsensusTransactionKind::System(_)
3471 )
3472 }
3473
3474 pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
3475 if !randomness_state_enabled {
3476 return false;
3479 }
3480 match &self.transaction {
3481 SequencedConsensusTransactionKind::External(ConsensusTransaction {
3482 kind: ConsensusTransactionKind::UserTransactionV2(txn),
3483 ..
3484 }) => txn.tx().transaction_data().uses_randomness(),
3485 _ => false,
3486 }
3487 }
3488
3489 pub fn as_consensus_txn(&self) -> Option<&SenderSignedData> {
3490 match &self.transaction {
3491 SequencedConsensusTransactionKind::External(ConsensusTransaction {
3492 kind: ConsensusTransactionKind::UserTransactionV2(txn),
3493 ..
3494 }) if txn.tx().is_consensus_tx() => Some(txn.tx().data()),
3495 SequencedConsensusTransactionKind::System(txn) if txn.is_consensus_tx() => {
3496 Some(txn.data())
3497 }
3498 _ => None,
3499 }
3500 }
3501}
3502
3503#[derive(Debug, Clone, Serialize, Deserialize)]
3504pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
3505
3506#[cfg(test)]
3507impl VerifiedSequencedConsensusTransaction {
3508 pub fn new_test(transaction: ConsensusTransaction) -> Self {
3509 Self(SequencedConsensusTransaction::new_test(transaction))
3510 }
3511}
3512
3513impl SequencedConsensusTransaction {
3514 pub fn new_test(transaction: ConsensusTransaction) -> Self {
3515 Self {
3516 certificate_author_index: 0,
3517 certificate_author: AuthorityName::ZERO,
3518 consensus_index: Default::default(),
3519 transaction: SequencedConsensusTransactionKind::External(transaction),
3520 }
3521 }
3522}
3523
3524#[derive(Serialize, Deserialize)]
3525pub(crate) struct CommitIntervalObserver {
3526 ring_buffer: VecDeque<u64>,
3527}
3528
3529impl CommitIntervalObserver {
3530 pub fn new(window_size: u32) -> Self {
3531 Self {
3532 ring_buffer: VecDeque::with_capacity(window_size as usize),
3533 }
3534 }
3535
3536 pub fn observe_commit_time(&mut self, consensus_commit: &impl ConsensusCommitAPI) {
3537 let commit_time = consensus_commit.commit_timestamp_ms();
3538 if self.ring_buffer.len() == self.ring_buffer.capacity() {
3539 self.ring_buffer.pop_front();
3540 }
3541 self.ring_buffer.push_back(commit_time);
3542 }
3543
3544 pub fn commit_interval_estimate(&self) -> Option<Duration> {
3545 if self.ring_buffer.len() <= 1 {
3546 None
3547 } else {
3548 let first = self.ring_buffer.front().unwrap();
3549 let last = self.ring_buffer.back().unwrap();
3550 let duration = last.saturating_sub(*first);
3551 let num_commits = self.ring_buffer.len() as u64;
3552 Some(Duration::from_millis(duration.div_ceil(num_commits)))
3553 }
3554 }
3555}
3556
3557#[cfg(test)]
3558mod tests {
3559 use std::collections::HashSet;
3560
3561 use consensus_core::{
3562 BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock,
3563 };
3564 use futures::pin_mut;
3565 use prometheus::Registry;
3566 use sui_protocol_config::{ConsensusTransactionOrdering, ProtocolConfig};
3567 use sui_types::{
3568 base_types::ExecutionDigests,
3569 base_types::{AuthorityName, FullObjectRef, ObjectID, SuiAddress, random_object_ref},
3570 committee::Committee,
3571 crypto::deterministic_random_account_key,
3572 gas::GasCostSummary,
3573 message_envelope::Message,
3574 messages_checkpoint::{
3575 CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
3576 SignedCheckpointSummary,
3577 },
3578 messages_consensus::ConsensusTransaction,
3579 object::Object,
3580 transaction::{
3581 CertifiedTransaction, TransactionData, TransactionDataAPI, VerifiedCertificate,
3582 },
3583 };
3584
3585 use super::*;
3586 use crate::{
3587 authority::{
3588 authority_per_epoch_store::ConsensusStatsAPI,
3589 test_authority_builder::TestAuthorityBuilder,
3590 },
3591 checkpoints::CheckpointServiceNoop,
3592 consensus_adapter::consensus_tests::test_user_transaction,
3593 consensus_test_utils::make_consensus_adapter_for_test,
3594 post_consensus_tx_reorder::PostConsensusTxReorder,
3595 };
3596
3597 #[tokio::test(flavor = "current_thread", start_paused = true)]
3598 async fn test_consensus_commit_handler() {
3599 telemetry_subscribers::init_for_testing();
3600
3601 let (sender, keypair) = deterministic_random_account_key();
3604 let gas_objects: Vec<Object> = (0..12)
3606 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3607 .collect();
3608 let owned_objects: Vec<Object> = (0..4)
3610 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3611 .collect();
3612 let shared_objects: Vec<Object> = (0..6)
3614 .map(|_| Object::shared_for_testing())
3615 .collect::<Vec<_>>();
3616 let mut all_objects = gas_objects.clone();
3617 all_objects.extend(owned_objects.clone());
3618 all_objects.extend(shared_objects.clone());
3619
3620 let network_config =
3621 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3622 .with_objects(all_objects.clone())
3623 .build();
3624
3625 let state = TestAuthorityBuilder::new()
3626 .with_network_config(&network_config, 0)
3627 .build()
3628 .await;
3629
3630 let epoch_store = state.epoch_store_for_testing().clone();
3631 let new_epoch_start_state = epoch_store.epoch_start_state();
3632 let consensus_committee = new_epoch_start_state.get_consensus_committee();
3633
3634 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3635
3636 let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
3637
3638 let backpressure_manager = BackpressureManager::new_for_tests();
3639 let consensus_adapter =
3640 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3641 let settlement_scheduler = SettlementScheduler::new(
3642 state.execution_scheduler().as_ref().clone(),
3643 state.get_transaction_cache_reader().clone(),
3644 state.metrics.clone(),
3645 );
3646 let mut consensus_handler = ConsensusHandler::new(
3647 epoch_store,
3648 Arc::new(CheckpointServiceNoop {}),
3649 settlement_scheduler,
3650 consensus_adapter,
3651 state.get_object_cache_reader().clone(),
3652 consensus_committee.clone(),
3653 metrics,
3654 Arc::new(throughput_calculator),
3655 backpressure_manager.subscribe(),
3656 state.traffic_controller.clone(),
3657 None,
3658 state.consensus_gasless_counter.clone(),
3659 );
3660
3661 let mut user_transactions = vec![];
3663 for (i, gas_object) in gas_objects[0..8].iter().enumerate() {
3664 let input_object = if i % 2 == 0 {
3665 owned_objects.get(i / 2).unwrap().clone()
3666 } else {
3667 shared_objects.get(i / 2).unwrap().clone()
3668 };
3669 let transaction = test_user_transaction(
3670 &state,
3671 sender,
3672 &keypair,
3673 gas_object.clone(),
3674 vec![input_object],
3675 )
3676 .await;
3677 user_transactions.push(transaction);
3678 }
3679
3680 for (i, gas_object) in gas_objects[8..12].iter().enumerate() {
3683 let shared_object = if i < 2 {
3684 shared_objects[4].clone()
3685 } else {
3686 shared_objects[5].clone()
3687 };
3688 let transaction = test_user_transaction(
3689 &state,
3690 sender,
3691 &keypair,
3692 gas_object.clone(),
3693 vec![shared_object],
3694 )
3695 .await;
3696 user_transactions.push(transaction);
3697 }
3698
3699 let mut blocks = Vec::new();
3701 for (i, consensus_transaction) in user_transactions
3702 .iter()
3703 .cloned()
3704 .map(|t| ConsensusTransaction::new_user_transaction_v2_message(&state.name, t.into()))
3705 .enumerate()
3706 {
3707 let transaction_bytes = bcs::to_bytes(&consensus_transaction).unwrap();
3708 let block = VerifiedBlock::new_for_test(
3709 TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
3710 .set_transactions(vec![Transaction::new(transaction_bytes)])
3711 .build(),
3712 );
3713
3714 blocks.push(block);
3715 }
3716
3717 let leader_block = blocks[0].clone();
3719 let committed_sub_dag = CommittedSubDag::new(
3720 leader_block.reference(),
3721 blocks.clone(),
3722 leader_block.timestamp_ms(),
3723 CommitRef::new(10, CommitDigest::MIN),
3724 );
3725
3726 backpressure_manager.set_backpressure(true);
3728 backpressure_manager.update_highest_certified_checkpoint(1);
3730
3731 {
3733 let waiter = consensus_handler.handle_consensus_commit(committed_sub_dag.clone());
3734 pin_mut!(waiter);
3735
3736 tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
3738 .await
3739 .unwrap_err();
3740
3741 backpressure_manager.set_backpressure(false);
3743
3744 tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
3746 .await
3747 .unwrap();
3748 }
3749
3750 let num_blocks = blocks.len();
3752 let num_transactions = user_transactions.len();
3753 let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
3754 assert_eq!(
3755 last_consensus_stats_1.index.transaction_index,
3756 num_transactions as u64
3757 );
3758 assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
3759 assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
3760 assert_eq!(
3761 last_consensus_stats_1.stats.get_num_messages(0),
3762 num_blocks as u64
3763 );
3764 assert_eq!(
3765 last_consensus_stats_1.stats.get_num_user_transactions(0),
3766 num_transactions as u64
3767 );
3768
3769 for (i, t) in user_transactions.iter().enumerate() {
3771 let digest = t.tx().digest();
3772 if tokio::time::timeout(
3773 std::time::Duration::from_secs(10),
3774 state.notify_read_effects_for_testing("", *digest),
3775 )
3776 .await
3777 .is_ok()
3778 {
3779 } else {
3781 panic!("User transaction {} {} did not execute", i, digest);
3782 }
3783 }
3784
3785 state.execution_scheduler().check_empty_for_testing().await;
3787 }
3788
3789 fn to_short_strings(txs: Vec<VerifiedExecutableTransactionWithAliases>) -> Vec<String> {
3790 txs.into_iter()
3791 .map(|tx| format!("transaction({})", tx.tx().transaction_data().gas_price()))
3792 .collect()
3793 }
3794
3795 #[test]
3796 fn test_order_by_gas_price() {
3797 let mut v = vec![user_txn(42), user_txn(100)];
3798 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3799 assert_eq!(
3800 to_short_strings(v),
3801 vec![
3802 "transaction(100)".to_string(),
3803 "transaction(42)".to_string(),
3804 ]
3805 );
3806
3807 let mut v = vec![
3808 user_txn(1200),
3809 user_txn(12),
3810 user_txn(1000),
3811 user_txn(42),
3812 user_txn(100),
3813 user_txn(1000),
3814 ];
3815 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3816 assert_eq!(
3817 to_short_strings(v),
3818 vec![
3819 "transaction(1200)".to_string(),
3820 "transaction(1000)".to_string(),
3821 "transaction(1000)".to_string(),
3822 "transaction(100)".to_string(),
3823 "transaction(42)".to_string(),
3824 "transaction(12)".to_string(),
3825 ]
3826 );
3827 }
3828
3829 #[tokio::test(flavor = "current_thread")]
3830 async fn test_checkpoint_signature_dedup() {
3831 telemetry_subscribers::init_for_testing();
3832
3833 let network_config =
3834 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3835 let state = TestAuthorityBuilder::new()
3836 .with_network_config(&network_config, 0)
3837 .build()
3838 .await;
3839
3840 let epoch_store = state.epoch_store_for_testing().clone();
3841 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3842
3843 let make_signed = || {
3844 let epoch = epoch_store.epoch();
3845 let contents =
3846 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3847 let summary = CheckpointSummary::new(
3848 &ProtocolConfig::get_for_max_version_UNSAFE(),
3849 epoch,
3850 42, 10, &contents,
3853 None, GasCostSummary::default(),
3855 None, 0, Vec::new(), Vec::new(), );
3860 SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name)
3861 };
3862
3863 let v2_s1 = make_signed();
3865 let v2_s1_clone = v2_s1.clone();
3866 let v2_digest_a = v2_s1.data().digest();
3867 let v2_a =
3868 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3869 summary: v2_s1,
3870 });
3871
3872 let v2_s2 = make_signed();
3873 let v2_digest_b = v2_s2.data().digest();
3874 let v2_b =
3875 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3876 summary: v2_s2,
3877 });
3878
3879 assert_ne!(v2_digest_a, v2_digest_b);
3880
3881 assert_eq!(v2_s1_clone.data().digest(), v2_digest_a);
3883 let v2_dup =
3884 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3885 summary: v2_s1_clone,
3886 });
3887
3888 let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3889 let block = VerifiedBlock::new_for_test(
3890 TestBlock::new(100, 0)
3891 .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)])
3892 .build(),
3893 );
3894 let commit = CommittedSubDag::new(
3895 block.reference(),
3896 vec![block.clone()],
3897 block.timestamp_ms(),
3898 CommitRef::new(10, CommitDigest::MIN),
3899 );
3900
3901 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3902 let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3903 let backpressure = BackpressureManager::new_for_tests();
3904 let consensus_adapter =
3905 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3906 let settlement_scheduler = SettlementScheduler::new(
3907 state.execution_scheduler().as_ref().clone(),
3908 state.get_transaction_cache_reader().clone(),
3909 state.metrics.clone(),
3910 );
3911 let mut handler = ConsensusHandler::new(
3912 epoch_store.clone(),
3913 Arc::new(CheckpointServiceNoop {}),
3914 settlement_scheduler,
3915 consensus_adapter,
3916 state.get_object_cache_reader().clone(),
3917 consensus_committee.clone(),
3918 metrics,
3919 Arc::new(throughput),
3920 backpressure.subscribe(),
3921 state.traffic_controller.clone(),
3922 None,
3923 state.consensus_gasless_counter.clone(),
3924 );
3925
3926 handler.handle_consensus_commit(commit).await;
3927
3928 use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3929 use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3930
3931 let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a));
3933 let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b));
3934 assert!(
3935 epoch_store
3936 .is_consensus_message_processed(&v2_key_a)
3937 .unwrap()
3938 );
3939 assert!(
3940 epoch_store
3941 .is_consensus_message_processed(&v2_key_b)
3942 .unwrap()
3943 );
3944 }
3945
3946 #[tokio::test(flavor = "current_thread")]
3947 async fn test_verify_consensus_transaction_filters_mismatched_authorities() {
3948 telemetry_subscribers::init_for_testing();
3949
3950 let network_config =
3951 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3952 let state = TestAuthorityBuilder::new()
3953 .with_network_config(&network_config, 0)
3954 .build()
3955 .await;
3956
3957 let epoch_store = state.epoch_store_for_testing().clone();
3958 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3959
3960 use fastcrypto::traits::KeyPair;
3962 let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair();
3963 let wrong_authority: AuthorityName = wrong_keypair.public().into();
3964
3965 let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority);
3967
3968 let valid_eop = ConsensusTransaction::new_end_of_publish(state.name);
3970
3971 let epoch = epoch_store.epoch();
3973 let contents =
3974 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3975 let summary = CheckpointSummary::new(
3976 &ProtocolConfig::get_for_max_version_UNSAFE(),
3977 epoch,
3978 42, 10, &contents,
3981 None, GasCostSummary::default(),
3983 None, 0, Vec::new(), Vec::new(), );
3988
3989 let mismatched_checkpoint_signed =
3991 SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority);
3992 let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest();
3993 let mismatched_checkpoint =
3994 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3995 summary: mismatched_checkpoint_signed,
3996 });
3997
3998 let valid_checkpoint_signed =
4000 SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name);
4001 let valid_checkpoint_digest = valid_checkpoint_signed.data().digest();
4002 let valid_checkpoint =
4003 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4004 summary: valid_checkpoint_signed,
4005 });
4006
4007 let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
4008
4009 let block = VerifiedBlock::new_for_test(
4011 TestBlock::new(100, 0)
4012 .set_transactions(vec![
4013 to_tx(&mismatched_eop),
4014 to_tx(&valid_eop),
4015 to_tx(&mismatched_checkpoint),
4016 to_tx(&valid_checkpoint),
4017 ])
4018 .build(),
4019 );
4020 let commit = CommittedSubDag::new(
4021 block.reference(),
4022 vec![block.clone()],
4023 block.timestamp_ms(),
4024 CommitRef::new(10, CommitDigest::MIN),
4025 );
4026
4027 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
4028 let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
4029 let backpressure = BackpressureManager::new_for_tests();
4030 let consensus_adapter =
4031 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
4032 let settlement_scheduler = SettlementScheduler::new(
4033 state.execution_scheduler().as_ref().clone(),
4034 state.get_transaction_cache_reader().clone(),
4035 state.metrics.clone(),
4036 );
4037 let mut handler = ConsensusHandler::new(
4038 epoch_store.clone(),
4039 Arc::new(CheckpointServiceNoop {}),
4040 settlement_scheduler,
4041 consensus_adapter,
4042 state.get_object_cache_reader().clone(),
4043 consensus_committee.clone(),
4044 metrics,
4045 Arc::new(throughput),
4046 backpressure.subscribe(),
4047 state.traffic_controller.clone(),
4048 None,
4049 state.consensus_gasless_counter.clone(),
4050 );
4051
4052 handler.handle_consensus_commit(commit).await;
4053
4054 use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
4055 use sui_types::messages_consensus::ConsensusTransactionKey as CK;
4056
4057 let valid_eop_key = SK::External(CK::EndOfPublish(state.name));
4059 assert!(
4060 epoch_store
4061 .is_consensus_message_processed(&valid_eop_key)
4062 .unwrap(),
4063 "Valid EndOfPublish should have been processed"
4064 );
4065
4066 let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4067 state.name,
4068 42,
4069 valid_checkpoint_digest,
4070 ));
4071 assert!(
4072 epoch_store
4073 .is_consensus_message_processed(&valid_checkpoint_key)
4074 .unwrap(),
4075 "Valid CheckpointSignature should have been processed"
4076 );
4077
4078 let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority));
4080 assert!(
4081 !epoch_store
4082 .is_consensus_message_processed(&mismatched_eop_key)
4083 .unwrap(),
4084 "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)"
4085 );
4086
4087 let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4088 wrong_authority,
4089 42,
4090 mismatched_checkpoint_digest,
4091 ));
4092 assert!(
4093 !epoch_store
4094 .is_consensus_message_processed(&mismatched_checkpoint_key)
4095 .unwrap(),
4096 "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)"
4097 );
4098 }
4099
4100 fn user_txn(gas_price: u64) -> VerifiedExecutableTransactionWithAliases {
4101 let (committee, keypairs) = Committee::new_simple_test_committee();
4102 let (sender, sender_keypair) = deterministic_random_account_key();
4103 let tx = sui_types::transaction::Transaction::from_data_and_signer(
4104 TransactionData::new_transfer(
4105 SuiAddress::default(),
4106 FullObjectRef::from_fastpath_ref(random_object_ref()),
4107 sender,
4108 random_object_ref(),
4109 1000 * gas_price,
4110 gas_price,
4111 ),
4112 vec![&sender_keypair],
4113 );
4114 let tx = VerifiedExecutableTransaction::new_from_certificate(
4115 VerifiedCertificate::new_unchecked(
4116 CertifiedTransaction::new_from_keypairs_for_testing(
4117 tx.into_data(),
4118 &keypairs,
4119 &committee,
4120 ),
4121 ),
4122 );
4123 VerifiedExecutableTransactionWithAliases::no_aliases(tx)
4124 }
4125
4126 mod checkpoint_queue_tests {
4127 use super::*;
4128 use consensus_core::CommitRef;
4129 use sui_types::digests::Digest;
4130
4131 fn make_chunk(tx_count: usize, height: u64) -> Chunk {
4132 Chunk {
4133 schedulables: (0..tx_count)
4134 .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4135 .collect(),
4136 settlement: None,
4137 height,
4138 }
4139 }
4140
4141 fn make_commit_ref(index: u32) -> CommitRef {
4142 CommitRef {
4143 index,
4144 digest: CommitDigest::MIN,
4145 }
4146 }
4147
4148 fn default_versions() -> HashMap<TransactionKey, AssignedVersions> {
4149 HashMap::new()
4150 }
4151
4152 #[test]
4153 fn test_flush_all_checkpoint_roots() {
4154 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4155 let versions = default_versions();
4156
4157 queue.push_chunk(
4158 make_chunk(5, 1),
4159 &versions,
4160 1000,
4161 make_commit_ref(1),
4162 Digest::default(),
4163 );
4164 queue.push_chunk(
4165 make_chunk(3, 2),
4166 &versions,
4167 1000,
4168 make_commit_ref(1),
4169 Digest::default(),
4170 );
4171
4172 let pending = queue.flush(1000, true);
4173
4174 assert!(pending.is_some());
4175 assert!(queue.pending_roots.is_empty());
4176 }
4177
4178 #[test]
4179 fn test_flush_respects_min_checkpoint_interval() {
4180 let min_interval = 200;
4181 let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, 1000, min_interval);
4182 let versions = default_versions();
4183
4184 queue.push_chunk(
4185 make_chunk(5, 1),
4186 &versions,
4187 1000,
4188 make_commit_ref(1),
4189 Digest::default(),
4190 );
4191
4192 let pending = queue.flush(1000 + min_interval - 1, false);
4193 assert!(pending.is_none());
4194 assert_eq!(queue.pending_roots.len(), 1);
4195
4196 let pending = queue.flush(1000 + min_interval, false);
4197 assert!(pending.is_some());
4198 assert!(queue.pending_roots.is_empty());
4199 }
4200
4201 #[test]
4202 fn test_push_chunk_flushes_when_exceeds_max() {
4203 let max_tx = 10;
4204 let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, max_tx, 0);
4205 let versions = default_versions();
4206
4207 queue.push_chunk(
4208 make_chunk(max_tx / 2 + 1, 1),
4209 &versions,
4210 1000,
4211 make_commit_ref(1),
4212 Digest::default(),
4213 );
4214
4215 let flushed = queue.push_chunk(
4216 make_chunk(max_tx / 2 + 1, 2),
4217 &versions,
4218 1000,
4219 make_commit_ref(2),
4220 Digest::default(),
4221 );
4222
4223 assert_eq!(flushed.len(), 1);
4224 assert_eq!(queue.pending_roots.len(), 1);
4225 }
4226
4227 #[test]
4228 fn test_multiple_chunks_merged_into_one_checkpoint() {
4229 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 200);
4230 let versions = default_versions();
4231
4232 queue.push_chunk(
4233 make_chunk(10, 1),
4234 &versions,
4235 1000,
4236 make_commit_ref(1),
4237 Digest::default(),
4238 );
4239 queue.push_chunk(
4240 make_chunk(10, 2),
4241 &versions,
4242 1000,
4243 make_commit_ref(2),
4244 Digest::default(),
4245 );
4246 queue.push_chunk(
4247 make_chunk(10, 3),
4248 &versions,
4249 1000,
4250 make_commit_ref(3),
4251 Digest::default(),
4252 );
4253
4254 let pending = queue.flush(1000, true).unwrap();
4255
4256 assert_eq!(pending.roots.len(), 3);
4257 }
4258
4259 #[test]
4260 fn test_push_chunk_handles_overflow() {
4261 let max_tx = 10;
4262 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4263 let versions = default_versions();
4264
4265 let flushed1 = queue.push_chunk(
4266 make_chunk(max_tx / 2, 1),
4267 &versions,
4268 1000,
4269 make_commit_ref(1),
4270 Digest::default(),
4271 );
4272 assert!(flushed1.is_empty());
4273
4274 let flushed2 = queue.push_chunk(
4275 make_chunk(max_tx / 2, 2),
4276 &versions,
4277 1000,
4278 make_commit_ref(2),
4279 Digest::default(),
4280 );
4281 assert!(flushed2.is_empty());
4282
4283 let flushed3 = queue.push_chunk(
4284 make_chunk(max_tx / 2, 3),
4285 &versions,
4286 1000,
4287 make_commit_ref(3),
4288 Digest::default(),
4289 );
4290 assert_eq!(flushed3.len(), 1);
4291
4292 let pending = queue.flush(1000, true);
4293
4294 for p in pending.iter().chain(flushed3.iter()) {
4295 let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4296 assert!(tx_count <= max_tx);
4297 }
4298 }
4299
4300 #[test]
4301 fn test_checkpoint_uses_last_chunk_height() {
4302 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4303 let versions = default_versions();
4304
4305 queue.push_chunk(
4306 make_chunk(10, 100),
4307 &versions,
4308 1000,
4309 make_commit_ref(1),
4310 Digest::default(),
4311 );
4312 queue.push_chunk(
4313 make_chunk(10, 200),
4314 &versions,
4315 1000,
4316 make_commit_ref(2),
4317 Digest::default(),
4318 );
4319
4320 let pending = queue.flush(1000, true).unwrap();
4321
4322 assert_eq!(pending.details.checkpoint_height, 200);
4323 }
4324
4325 #[test]
4326 fn test_last_built_timestamp_updated_on_flush() {
4327 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4328 let versions = default_versions();
4329
4330 queue.push_chunk(
4331 make_chunk(10, 1),
4332 &versions,
4333 5000,
4334 make_commit_ref(1),
4335 Digest::default(),
4336 );
4337
4338 assert_eq!(queue.last_built_timestamp, 0);
4339
4340 let _ = queue.flush(5000, true);
4341
4342 assert_eq!(queue.last_built_timestamp, 5000);
4343 }
4344
4345 #[test]
4346 fn test_settlement_info_sent_through_channel() {
4347 let mut queue = CheckpointQueue::new_for_testing(0, 0, 5, 1000, 0);
4348 let versions = default_versions();
4349
4350 let chunk1 = Chunk {
4351 schedulables: vec![
4352 Schedulable::ConsensusCommitPrologue(0, 1, 0),
4353 Schedulable::ConsensusCommitPrologue(0, 2, 0),
4354 Schedulable::ConsensusCommitPrologue(0, 3, 0),
4355 ],
4356 settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4357 height: 1,
4358 };
4359
4360 let chunk2 = Chunk {
4361 schedulables: vec![
4362 Schedulable::ConsensusCommitPrologue(0, 4, 0),
4363 Schedulable::ConsensusCommitPrologue(0, 5, 0),
4364 ],
4365 settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4366 height: 2,
4367 };
4368
4369 queue.push_chunk(
4370 chunk1,
4371 &versions,
4372 1000,
4373 make_commit_ref(1),
4374 Digest::default(),
4375 );
4376 queue.push_chunk(
4377 chunk2,
4378 &versions,
4379 1000,
4380 make_commit_ref(1),
4381 Digest::default(),
4382 );
4383 }
4384
4385 #[test]
4386 fn test_settlement_checkpoint_seq_correct_after_flush() {
4387 let max_tx = 10;
4388 let initial_seq = 5;
4389 let (sender, mut receiver) = monitored_mpsc::unbounded_channel("test_settlement_seq");
4390 let mut queue =
4391 CheckpointQueue::new_for_testing_with_sender(0, 0, initial_seq, max_tx, 0, sender);
4392 let versions = default_versions();
4393
4394 let chunk1 = Chunk {
4396 schedulables: (0..max_tx / 2 + 1)
4397 .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4398 .collect(),
4399 settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4400 height: 1,
4401 };
4402 queue.push_chunk(
4403 chunk1,
4404 &versions,
4405 1000,
4406 make_commit_ref(1),
4407 Digest::default(),
4408 );
4409
4410 let msg1 = receiver.try_recv().unwrap();
4412 let settlement1 = msg1.1.unwrap();
4413 assert_eq!(settlement1.checkpoint_seq, initial_seq);
4414
4415 let chunk2 = Chunk {
4417 schedulables: (0..max_tx / 2 + 1)
4418 .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4419 .collect(),
4420 settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4421 height: 2,
4422 };
4423 let flushed = queue.push_chunk(
4424 chunk2,
4425 &versions,
4426 1000,
4427 make_commit_ref(2),
4428 Digest::default(),
4429 );
4430 assert_eq!(flushed.len(), 1);
4431 assert_eq!(flushed[0].details.checkpoint_seq, Some(initial_seq));
4432
4433 let msg2 = receiver.try_recv().unwrap();
4436 let settlement2 = msg2.1.unwrap();
4437 assert_eq!(settlement2.checkpoint_seq, initial_seq + 1);
4438
4439 let pending = queue.flush_forced().unwrap();
4442 assert_eq!(
4443 pending.details.checkpoint_seq,
4444 Some(settlement2.checkpoint_seq)
4445 );
4446 }
4447
4448 #[test]
4449 fn test_checkpoint_seq_increments_on_flush() {
4450 let mut queue = CheckpointQueue::new_for_testing(0, 0, 10, 1000, 0);
4451 let versions = default_versions();
4452
4453 queue.push_chunk(
4454 make_chunk(5, 1),
4455 &versions,
4456 1000,
4457 make_commit_ref(1),
4458 Digest::default(),
4459 );
4460
4461 let pending = queue.flush(1000, true).unwrap();
4462
4463 assert_eq!(pending.details.checkpoint_seq, Some(10));
4464 assert_eq!(queue.current_checkpoint_seq, 11);
4465 }
4466
4467 #[test]
4468 fn test_multiple_chunks_with_overflow() {
4469 let max_tx = 10;
4470 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4471 let versions = default_versions();
4472
4473 let flushed1 = queue.push_chunk(
4474 make_chunk(max_tx / 2 + 1, 1),
4475 &versions,
4476 1000,
4477 make_commit_ref(1),
4478 Digest::default(),
4479 );
4480 let flushed2 = queue.push_chunk(
4481 make_chunk(max_tx / 2 + 1, 2),
4482 &versions,
4483 1000,
4484 make_commit_ref(1),
4485 Digest::default(),
4486 );
4487 let flushed3 = queue.push_chunk(
4488 make_chunk(max_tx / 2 + 1, 3),
4489 &versions,
4490 1000,
4491 make_commit_ref(1),
4492 Digest::default(),
4493 );
4494
4495 let all_flushed: Vec<_> = flushed1
4496 .into_iter()
4497 .chain(flushed2)
4498 .chain(flushed3)
4499 .collect();
4500 assert_eq!(all_flushed.len(), 2);
4501 assert_eq!(queue.pending_roots.len(), 1);
4502
4503 for p in &all_flushed {
4504 let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4505 assert!(tx_count <= max_tx);
4506 }
4507 }
4508 }
4509}