1use std::{
5 collections::{BTreeMap, HashMap, HashSet, VecDeque},
6 hash::Hash,
7 num::NonZeroUsize,
8 sync::{Arc, Mutex},
9 time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use consensus_config::Committee as ConsensusCommittee;
13use consensus_core::{CommitConsumerMonitor, CommitIndex, CommitRef};
14use consensus_types::block::BlockRef;
15use consensus_types::block::TransactionIndex;
16use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
17use lru::LruCache;
18use mysten_common::{
19 assert_reachable, assert_sometimes, debug_fatal, random_util::randomize_cache_capacity_in_tests,
20};
21use mysten_metrics::{
22 monitored_future,
23 monitored_mpsc::{self, UnboundedReceiver},
24 monitored_scope, spawn_monitored_task,
25};
26use nonempty::NonEmpty;
27use parking_lot::RwLockWriteGuard;
28use serde::{Deserialize, Serialize};
29use sui_config::node::CongestionLogConfig;
30use sui_macros::{fail_point, fail_point_arg, fail_point_if};
31use sui_protocol_config::{Chain, PerObjectCongestionControlMode, ProtocolConfig};
32use sui_types::{
33 SUI_RANDOMNESS_STATE_OBJECT_ID,
34 authenticator_state::ActiveJwk,
35 base_types::{
36 AuthorityName, ConciseableName, ConsensusObjectSequenceKey, ObjectID, ObjectRef,
37 SequenceNumber, TransactionDigest,
38 },
39 crypto::RandomnessRound,
40 digests::{AdditionalConsensusStateDigest, ConsensusCommitDigest, Digest},
41 executable_transaction::{
42 TrustedExecutableTransaction, VerifiedExecutableTransaction,
43 VerifiedExecutableTransactionWithAliases,
44 },
45 messages_checkpoint::{
46 CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointTimestamp,
47 },
48 messages_consensus::{
49 AuthorityCapabilitiesV2, AuthorityIndex, ConsensusDeterminedVersionAssignments,
50 ConsensusPosition, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
51 ExecutionTimeObservation,
52 },
53 sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
54 transaction::{
55 InputObjectKind, PlainTransactionWithClaims, SenderSignedData, TransactionDataAPI,
56 TransactionKey, VerifiedTransaction, WithAliases,
57 },
58};
59use tokio::task::JoinSet;
60use tracing::{debug, error, info, instrument, trace, warn};
61
62use crate::{
63 authority::{
64 AuthorityMetrics, AuthorityState, ExecutionEnv,
65 authority_per_epoch_store::{
66 AuthorityPerEpochStore, CancelConsensusCertificateReason, ConsensusStats,
67 ConsensusStatsAPI, ExecutionIndices, ExecutionIndicesWithStatsV2,
68 consensus_quarantine::ConsensusCommitOutput,
69 },
70 backpressure::{BackpressureManager, BackpressureSubscriber},
71 congestion_log::CongestionCommitLogger,
72 consensus_tx_status_cache::ConsensusTxStatus,
73 execution_time_estimator::ExecutionTimeEstimator,
74 shared_object_congestion_tracker::SharedObjectCongestionTracker,
75 shared_object_version_manager::{AssignedTxAndVersions, AssignedVersions, Schedulable},
76 transaction_deferral::{DeferralKey, DeferralReason, transaction_deferral_within_limit},
77 },
78 checkpoints::{
79 CheckpointHeight, CheckpointRoots, CheckpointService, CheckpointServiceNotify,
80 PendingCheckpoint, PendingCheckpointInfo,
81 },
82 consensus_adapter::ConsensusAdapter,
83 consensus_throughput_calculator::ConsensusThroughputCalculator,
84 consensus_types::consensus_output_api::{ConsensusCommitAPI, ParsedTransaction},
85 epoch::{
86 randomness::{DkgStatus, RandomnessManager},
87 reconfiguration::ReconfigState,
88 },
89 execution_cache::ObjectCacheRead,
90 execution_scheduler::{SettlementBatchInfo, SettlementScheduler},
91 gasless_rate_limiter::ConsensusGaslessCounter,
92 post_consensus_tx_reorder::PostConsensusTxReorder,
93 traffic_controller::{TrafficController, policies::TrafficTally},
94};
95
96struct FilteredConsensusOutput {
99 transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
100 owned_object_locks: HashMap<ObjectRef, TransactionDigest>,
101 dropped_transaction_keys: Vec<ConsensusTransactionKey>,
102}
103
104pub struct ConsensusHandlerInitializer {
105 state: Arc<AuthorityState>,
106 checkpoint_service: Arc<CheckpointService>,
107 epoch_store: Arc<AuthorityPerEpochStore>,
108 consensus_adapter: Arc<ConsensusAdapter>,
109 throughput_calculator: Arc<ConsensusThroughputCalculator>,
110 backpressure_manager: Arc<BackpressureManager>,
111 congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
112 consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
113}
114
115impl ConsensusHandlerInitializer {
116 pub fn new(
117 state: Arc<AuthorityState>,
118 checkpoint_service: Arc<CheckpointService>,
119 epoch_store: Arc<AuthorityPerEpochStore>,
120 consensus_adapter: Arc<ConsensusAdapter>,
121 throughput_calculator: Arc<ConsensusThroughputCalculator>,
122 backpressure_manager: Arc<BackpressureManager>,
123 congestion_log_config: Option<CongestionLogConfig>,
124 ) -> Self {
125 let congestion_logger =
126 congestion_log_config.and_then(|config| match CongestionCommitLogger::new(&config) {
127 Ok(logger) => Some(Arc::new(Mutex::new(logger))),
128 Err(e) => {
129 debug_fatal!("Failed to create congestion logger: {e}");
130 None
131 }
132 });
133 let consensus_gasless_counter = state.consensus_gasless_counter.clone();
134 Self {
135 state,
136 checkpoint_service,
137 epoch_store,
138 consensus_adapter,
139 throughput_calculator,
140 backpressure_manager,
141 congestion_logger,
142 consensus_gasless_counter,
143 }
144 }
145
146 #[cfg(test)]
147 pub(crate) fn new_for_testing(
148 state: Arc<AuthorityState>,
149 checkpoint_service: Arc<CheckpointService>,
150 ) -> Self {
151 use crate::consensus_test_utils::make_consensus_adapter_for_test;
152 use std::collections::HashSet;
153
154 let backpressure_manager = BackpressureManager::new_for_tests();
155 let consensus_adapter =
156 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
157 let consensus_gasless_counter = state.consensus_gasless_counter.clone();
158 Self {
159 state: state.clone(),
160 checkpoint_service,
161 epoch_store: state.epoch_store_for_testing().clone(),
162 consensus_adapter,
163 throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
164 None,
165 state.metrics.clone(),
166 )),
167 backpressure_manager,
168 congestion_logger: None,
169 consensus_gasless_counter,
170 }
171 }
172
173 pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
174 let new_epoch_start_state = self.epoch_store.epoch_start_state();
175 let consensus_committee = new_epoch_start_state.get_consensus_committee();
176
177 let settlement_scheduler = SettlementScheduler::new(
178 self.state.execution_scheduler().as_ref().clone(),
179 self.state.get_transaction_cache_reader().clone(),
180 self.state.metrics.clone(),
181 );
182 ConsensusHandler::new(
183 self.epoch_store.clone(),
184 self.checkpoint_service.clone(),
185 settlement_scheduler,
186 self.consensus_adapter.clone(),
187 self.state.get_object_cache_reader().clone(),
188 consensus_committee,
189 self.state.metrics.clone(),
190 self.throughput_calculator.clone(),
191 self.backpressure_manager.subscribe(),
192 self.state.traffic_controller.clone(),
193 self.congestion_logger.clone(),
194 self.consensus_gasless_counter.clone(),
195 )
196 }
197}
198
199mod additional_consensus_state {
200 use std::marker::PhantomData;
201
202 use consensus_core::CommitRef;
203 use fastcrypto::hash::HashFunction as _;
204 use sui_types::{crypto::DefaultHash, digests::Digest};
205
206 use super::*;
207 #[derive(Serialize, Deserialize)]
216 pub(super) struct AdditionalConsensusState {
217 commit_interval_observer: CommitIntervalObserver,
218 }
219
220 impl AdditionalConsensusState {
221 pub fn new(additional_consensus_state_window_size: u32) -> Self {
222 Self {
223 commit_interval_observer: CommitIntervalObserver::new(
224 additional_consensus_state_window_size,
225 ),
226 }
227 }
228
229 pub(crate) fn observe_commit(
231 &mut self,
232 protocol_config: &ProtocolConfig,
233 epoch_start_time: u64,
234 consensus_commit: &impl ConsensusCommitAPI,
235 ) -> ConsensusCommitInfo {
236 self.commit_interval_observer
237 .observe_commit_time(consensus_commit);
238
239 let estimated_commit_period = self
240 .commit_interval_observer
241 .commit_interval_estimate()
242 .unwrap_or(Duration::from_millis(
243 protocol_config.min_checkpoint_interval_ms(),
244 ));
245
246 info!("estimated commit rate: {:?}", estimated_commit_period);
247
248 self.commit_info_impl(
249 epoch_start_time,
250 consensus_commit,
251 Some(estimated_commit_period),
252 )
253 }
254
255 fn commit_info_impl(
256 &self,
257 epoch_start_time: u64,
258 consensus_commit: &impl ConsensusCommitAPI,
259 estimated_commit_period: Option<Duration>,
260 ) -> ConsensusCommitInfo {
261 let leader_author = consensus_commit.leader_author_index();
262 let timestamp = consensus_commit.commit_timestamp_ms();
263
264 let timestamp = if timestamp < epoch_start_time {
265 error!(
266 "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start_time}, author {leader_author:?}"
267 );
268 epoch_start_time
269 } else {
270 timestamp
271 };
272
273 ConsensusCommitInfo {
274 _phantom: PhantomData,
275 round: consensus_commit.leader_round(),
276 timestamp,
277 leader_author,
278 consensus_commit_ref: consensus_commit.commit_ref(),
279 rejected_transactions_digest: consensus_commit.rejected_transactions_digest(),
280 additional_state_digest: Some(self.digest()),
281 estimated_commit_period,
282 skip_consensus_commit_prologue_in_test: false,
283 }
284 }
285
286 fn digest(&self) -> AdditionalConsensusStateDigest {
288 let mut hash = DefaultHash::new();
289 bcs::serialize_into(&mut hash, self).unwrap();
290 AdditionalConsensusStateDigest::new(hash.finalize().into())
291 }
292 }
293
294 pub struct ConsensusCommitInfo {
295 _phantom: PhantomData<()>,
297
298 pub round: u64,
299 pub timestamp: u64,
300 pub leader_author: AuthorityIndex,
301 pub consensus_commit_ref: CommitRef,
302 pub rejected_transactions_digest: Digest,
303
304 additional_state_digest: Option<AdditionalConsensusStateDigest>,
305 estimated_commit_period: Option<Duration>,
306
307 pub skip_consensus_commit_prologue_in_test: bool,
308 }
309
310 impl ConsensusCommitInfo {
311 pub fn new_for_test(
312 commit_round: u64,
313 commit_timestamp: u64,
314 estimated_commit_period: Option<Duration>,
315 skip_consensus_commit_prologue_in_test: bool,
316 ) -> Self {
317 Self {
318 _phantom: PhantomData,
319 round: commit_round,
320 timestamp: commit_timestamp,
321 leader_author: 0,
322 consensus_commit_ref: CommitRef::default(),
323 rejected_transactions_digest: Digest::default(),
324 additional_state_digest: Some(AdditionalConsensusStateDigest::ZERO),
325 estimated_commit_period,
326 skip_consensus_commit_prologue_in_test,
327 }
328 }
329
330 pub fn new_for_congestion_test(
331 commit_round: u64,
332 commit_timestamp: u64,
333 estimated_commit_period: Duration,
334 ) -> Self {
335 Self::new_for_test(
336 commit_round,
337 commit_timestamp,
338 Some(estimated_commit_period),
339 true,
340 )
341 }
342
343 pub fn additional_state_digest(&self) -> AdditionalConsensusStateDigest {
344 self.additional_state_digest
346 .expect("additional_state_digest is not available")
347 }
348
349 pub fn estimated_commit_period(&self) -> Duration {
350 self.estimated_commit_period
352 .expect("estimated commit period is not available")
353 }
354
355 fn consensus_commit_digest(&self) -> ConsensusCommitDigest {
356 ConsensusCommitDigest::new(self.consensus_commit_ref.digest.into_inner())
357 }
358
359 fn consensus_commit_prologue_transaction(
360 &self,
361 epoch: u64,
362 ) -> VerifiedExecutableTransaction {
363 let transaction = VerifiedTransaction::new_consensus_commit_prologue(
364 epoch,
365 self.round,
366 self.timestamp,
367 );
368 VerifiedExecutableTransaction::new_system(transaction, epoch)
369 }
370
371 fn consensus_commit_prologue_v2_transaction(
372 &self,
373 epoch: u64,
374 ) -> VerifiedExecutableTransaction {
375 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
376 epoch,
377 self.round,
378 self.timestamp,
379 self.consensus_commit_digest(),
380 );
381 VerifiedExecutableTransaction::new_system(transaction, epoch)
382 }
383
384 fn consensus_commit_prologue_v3_transaction(
385 &self,
386 epoch: u64,
387 consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
388 ) -> VerifiedExecutableTransaction {
389 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v3(
390 epoch,
391 self.round,
392 self.timestamp,
393 self.consensus_commit_digest(),
394 consensus_determined_version_assignments,
395 );
396 VerifiedExecutableTransaction::new_system(transaction, epoch)
397 }
398
399 fn consensus_commit_prologue_v4_transaction(
400 &self,
401 epoch: u64,
402 consensus_determined_version_assignments: ConsensusDeterminedVersionAssignments,
403 additional_state_digest: AdditionalConsensusStateDigest,
404 ) -> VerifiedExecutableTransaction {
405 let transaction = VerifiedTransaction::new_consensus_commit_prologue_v4(
406 epoch,
407 self.round,
408 self.timestamp,
409 self.consensus_commit_digest(),
410 consensus_determined_version_assignments,
411 additional_state_digest,
412 );
413 VerifiedExecutableTransaction::new_system(transaction, epoch)
414 }
415
416 pub fn create_consensus_commit_prologue_transaction(
417 &self,
418 epoch: u64,
419 protocol_config: &ProtocolConfig,
420 cancelled_txn_version_assignment: Vec<(
421 TransactionDigest,
422 Vec<(ConsensusObjectSequenceKey, SequenceNumber)>,
423 )>,
424 commit_info: &ConsensusCommitInfo,
425 indirect_state_observer: IndirectStateObserver,
426 ) -> VerifiedExecutableTransaction {
427 let version_assignments = if protocol_config
428 .record_consensus_determined_version_assignments_in_prologue_v2()
429 {
430 Some(
431 ConsensusDeterminedVersionAssignments::CancelledTransactionsV2(
432 cancelled_txn_version_assignment,
433 ),
434 )
435 } else if protocol_config.record_consensus_determined_version_assignments_in_prologue()
436 {
437 Some(
438 ConsensusDeterminedVersionAssignments::CancelledTransactions(
439 cancelled_txn_version_assignment
440 .into_iter()
441 .map(|(tx_digest, versions)| {
442 (
443 tx_digest,
444 versions.into_iter().map(|(id, v)| (id.0, v)).collect(),
445 )
446 })
447 .collect(),
448 ),
449 )
450 } else {
451 None
452 };
453
454 if protocol_config.record_additional_state_digest_in_prologue() {
455 let additional_state_digest =
456 if protocol_config.additional_consensus_digest_indirect_state() {
457 let d1 = commit_info.additional_state_digest();
458 indirect_state_observer.fold_with(d1)
459 } else {
460 commit_info.additional_state_digest()
461 };
462
463 self.consensus_commit_prologue_v4_transaction(
464 epoch,
465 version_assignments.unwrap(),
466 additional_state_digest,
467 )
468 } else if let Some(version_assignments) = version_assignments {
469 self.consensus_commit_prologue_v3_transaction(epoch, version_assignments)
470 } else if protocol_config.include_consensus_digest_in_prologue() {
471 self.consensus_commit_prologue_v2_transaction(epoch)
472 } else {
473 self.consensus_commit_prologue_transaction(epoch)
474 }
475 }
476 }
477
478 #[derive(Default)]
479 pub struct IndirectStateObserver {
480 hash: DefaultHash,
481 }
482
483 impl IndirectStateObserver {
484 pub fn new() -> Self {
485 Self::default()
486 }
487
488 pub fn observe_indirect_state<T: Serialize>(&mut self, state: &T) {
489 bcs::serialize_into(&mut self.hash, state).unwrap();
490 }
491
492 pub fn fold_with(
493 self,
494 d1: AdditionalConsensusStateDigest,
495 ) -> AdditionalConsensusStateDigest {
496 let hash = self.hash.finalize();
497 let d2 = AdditionalConsensusStateDigest::new(hash.into());
498
499 let mut hasher = DefaultHash::new();
500 bcs::serialize_into(&mut hasher, &d1).unwrap();
501 bcs::serialize_into(&mut hasher, &d2).unwrap();
502 AdditionalConsensusStateDigest::new(hasher.finalize().into())
503 }
504 }
505
506 #[test]
507 fn test_additional_consensus_state() {
508 use crate::consensus_test_utils::TestConsensusCommit;
509
510 fn observe(state: &mut AdditionalConsensusState, round: u64, timestamp: u64) {
511 let protocol_config = ProtocolConfig::get_for_max_version_UNSAFE();
512 state.observe_commit(
513 &protocol_config,
514 100,
515 &TestConsensusCommit::empty(round, timestamp, 0),
516 );
517 }
518
519 let mut s1 = AdditionalConsensusState::new(3);
520 observe(&mut s1, 1, 1000);
521 observe(&mut s1, 2, 2000);
522 observe(&mut s1, 3, 3000);
523 observe(&mut s1, 4, 4000);
524
525 let mut s2 = AdditionalConsensusState::new(3);
526 observe(&mut s2, 2, 2000);
529 observe(&mut s2, 3, 3000);
530 observe(&mut s2, 4, 4000);
531
532 assert_eq!(s1.digest(), s2.digest());
533
534 observe(&mut s1, 5, 5000);
535 observe(&mut s2, 5, 5000);
536
537 assert_eq!(s1.digest(), s2.digest());
538 }
539}
540use additional_consensus_state::AdditionalConsensusState;
541pub(crate) use additional_consensus_state::{ConsensusCommitInfo, IndirectStateObserver};
542
543struct QueuedCheckpointRoots {
544 roots: CheckpointRoots,
545 timestamp: CheckpointTimestamp,
546 consensus_commit_ref: CommitRef,
547 rejected_transactions_digest: Digest,
548}
549
550struct Chunk<
551 T: crate::authority::shared_object_version_manager::AsTx = VerifiedExecutableTransaction,
552> {
553 schedulables: Vec<Schedulable<T>>,
554 settlement: Option<Schedulable<T>>,
555 height: CheckpointHeight,
556}
557
558impl<T: crate::authority::shared_object_version_manager::AsTx + Clone> Chunk<T> {
559 fn all_schedulables(&self) -> impl Iterator<Item = &Schedulable<T>> + Clone {
560 self.schedulables.iter().chain(self.settlement.iter())
561 }
562
563 fn all_schedulables_from(chunks: &[Self]) -> impl Iterator<Item = &Schedulable<T>> + Clone {
564 chunks.iter().flat_map(|c| c.all_schedulables())
565 }
566
567 fn to_checkpoint_roots(&self) -> CheckpointRoots {
568 let tx_roots: Vec<_> = self.schedulables.iter().map(|s| s.key()).collect();
569 let settlement_root = self.settlement.as_ref().map(|s| s.key());
570 CheckpointRoots {
571 tx_roots,
572 settlement_root,
573 height: self.height,
574 }
575 }
576}
577
578impl From<Chunk<VerifiedExecutableTransactionWithAliases>> for Chunk {
579 fn from(chunk: Chunk<VerifiedExecutableTransactionWithAliases>) -> Self {
580 Chunk {
581 schedulables: chunk.schedulables.into_iter().map(|s| s.into()).collect(),
582 settlement: chunk.settlement.map(|s| s.into()),
583 height: chunk.height,
584 }
585 }
586}
587
588pub(crate) struct CheckpointQueue {
596 last_built_timestamp: CheckpointTimestamp,
597 pending_roots: VecDeque<QueuedCheckpointRoots>,
598 height: u64,
599 pending_tx_count: usize,
600 current_checkpoint_seq: CheckpointSequenceNumber,
601 max_tx: usize,
602 min_checkpoint_interval_ms: u64,
603 execution_scheduler_sender: ExecutionSchedulerSender,
604}
605
606impl CheckpointQueue {
607 pub(crate) fn new(
608 last_built_timestamp: CheckpointTimestamp,
609 checkpoint_height: u64,
610 next_checkpoint_seq: CheckpointSequenceNumber,
611 max_tx: usize,
612 min_checkpoint_interval_ms: u64,
613 execution_scheduler_sender: ExecutionSchedulerSender,
614 ) -> Self {
615 Self {
616 last_built_timestamp,
617 pending_roots: VecDeque::new(),
618 height: checkpoint_height,
619 pending_tx_count: 0,
620 current_checkpoint_seq: next_checkpoint_seq,
621 max_tx,
622 min_checkpoint_interval_ms,
623 execution_scheduler_sender,
624 }
625 }
626
627 #[cfg(test)]
628 fn new_for_testing(
629 last_built_timestamp: CheckpointTimestamp,
630 checkpoint_height: u64,
631 next_checkpoint_seq: CheckpointSequenceNumber,
632 max_tx: usize,
633 min_checkpoint_interval_ms: u64,
634 ) -> Self {
635 let (sender, _receiver) = monitored_mpsc::unbounded_channel("test_checkpoint_queue_sender");
636 Self {
637 last_built_timestamp,
638 pending_roots: VecDeque::new(),
639 height: checkpoint_height,
640 pending_tx_count: 0,
641 current_checkpoint_seq: next_checkpoint_seq,
642 max_tx,
643 min_checkpoint_interval_ms,
644 execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
645 }
646 }
647
648 #[cfg(test)]
649 fn new_for_testing_with_sender(
650 last_built_timestamp: CheckpointTimestamp,
651 checkpoint_height: u64,
652 next_checkpoint_seq: CheckpointSequenceNumber,
653 max_tx: usize,
654 min_checkpoint_interval_ms: u64,
655 sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
656 ) -> Self {
657 Self {
658 last_built_timestamp,
659 pending_roots: VecDeque::new(),
660 height: checkpoint_height,
661 pending_tx_count: 0,
662 current_checkpoint_seq: next_checkpoint_seq,
663 max_tx,
664 min_checkpoint_interval_ms,
665 execution_scheduler_sender: ExecutionSchedulerSender::new_for_testing(sender),
666 }
667 }
668
669 pub(crate) fn last_built_timestamp(&self) -> CheckpointTimestamp {
670 self.last_built_timestamp
671 }
672
673 pub(crate) fn is_empty(&self) -> bool {
674 self.pending_roots.is_empty()
675 }
676
677 fn next_height(&mut self) -> u64 {
678 self.height += 1;
679 self.height
680 }
681
682 fn push_chunk(
683 &mut self,
684 chunk: Chunk,
685 assigned_versions: &HashMap<TransactionKey, AssignedVersions>,
686 timestamp: CheckpointTimestamp,
687 consensus_commit_ref: CommitRef,
688 rejected_transactions_digest: Digest,
689 ) -> Vec<PendingCheckpoint> {
690 let max_tx = self.max_tx;
691 let user_tx_count = chunk.schedulables.len();
692
693 let roots = chunk.to_checkpoint_roots();
694
695 let schedulables: Vec<_> = chunk
696 .schedulables
697 .into_iter()
698 .map(|s| {
699 let versions = assigned_versions.get(&s.key()).cloned().unwrap_or_default();
700 (s, versions)
701 })
702 .collect();
703
704 let mut flushed_checkpoints = Vec::new();
705
706 if self.pending_tx_count > 0
707 && self.pending_tx_count + user_tx_count > max_tx
708 && let Some(checkpoint) = self.flush_forced()
709 {
710 flushed_checkpoints.push(checkpoint);
711 }
712
713 let settlement_info = chunk.settlement.as_ref().map(|s| {
714 let settlement_key = s.key();
715 let tx_keys: Vec<_> = schedulables.iter().map(|(s, _)| s.key()).collect();
716 SettlementBatchInfo {
717 settlement_key,
718 tx_keys,
719 checkpoint_height: chunk.height,
720 checkpoint_seq: self.current_checkpoint_seq,
721 assigned_versions: assigned_versions
722 .get(&settlement_key)
723 .cloned()
724 .unwrap_or_default(),
725 }
726 });
727
728 self.execution_scheduler_sender
729 .send(schedulables, settlement_info);
730
731 self.pending_tx_count += user_tx_count;
732 self.pending_roots.push_back(QueuedCheckpointRoots {
733 roots,
734 timestamp,
735 consensus_commit_ref,
736 rejected_transactions_digest,
737 });
738
739 flushed_checkpoints
740 }
741
742 pub(crate) fn flush(
743 &mut self,
744 current_timestamp: CheckpointTimestamp,
745 force: bool,
746 ) -> Option<PendingCheckpoint> {
747 if !force && current_timestamp < self.last_built_timestamp + self.min_checkpoint_interval_ms
748 {
749 return None;
750 }
751 self.flush_forced()
752 }
753
754 fn flush_forced(&mut self) -> Option<PendingCheckpoint> {
755 if self.pending_roots.is_empty() {
756 return None;
757 }
758
759 let to_flush: Vec<_> = self.pending_roots.drain(..).collect();
760 let last_root = to_flush.last().unwrap();
761
762 let checkpoint = PendingCheckpoint {
763 roots: to_flush.iter().map(|q| q.roots.clone()).collect(),
764 details: PendingCheckpointInfo {
765 timestamp_ms: last_root.timestamp,
766 last_of_epoch: false,
767 checkpoint_height: last_root.roots.height,
768 consensus_commit_ref: last_root.consensus_commit_ref,
769 rejected_transactions_digest: last_root.rejected_transactions_digest,
770 checkpoint_seq: self.current_checkpoint_seq,
771 },
772 };
773
774 self.last_built_timestamp = last_root.timestamp;
775 self.pending_tx_count = 0;
776 self.current_checkpoint_seq += 1;
777
778 Some(checkpoint)
779 }
780
781 pub(crate) fn checkpoint_seq(&self) -> CheckpointSequenceNumber {
782 self.current_checkpoint_seq
783 .checked_sub(1)
784 .expect("checkpoint_seq called before any checkpoint was assigned")
785 }
786}
787
788pub struct ConsensusHandler<C> {
789 epoch_store: Arc<AuthorityPerEpochStore>,
792 last_consensus_stats: ExecutionIndicesWithStatsV2,
796 checkpoint_service: Arc<C>,
797 cache_reader: Arc<dyn ObjectCacheRead>,
799 committee: ConsensusCommittee,
801 metrics: Arc<AuthorityMetrics>,
804 processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
806 consensus_adapter: Arc<ConsensusAdapter>,
808
809 throughput_calculator: Arc<ConsensusThroughputCalculator>,
811
812 additional_consensus_state: AdditionalConsensusState,
813
814 backpressure_subscriber: BackpressureSubscriber,
815
816 traffic_controller: Option<Arc<TrafficController>>,
817
818 congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
819
820 consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
821
822 checkpoint_queue: Mutex<CheckpointQueue>,
823}
824
825const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
826
827impl<C> ConsensusHandler<C> {
828 pub(crate) fn new(
829 epoch_store: Arc<AuthorityPerEpochStore>,
830 checkpoint_service: Arc<C>,
831 settlement_scheduler: SettlementScheduler,
832 consensus_adapter: Arc<ConsensusAdapter>,
833 cache_reader: Arc<dyn ObjectCacheRead>,
834 committee: ConsensusCommittee,
835 metrics: Arc<AuthorityMetrics>,
836 throughput_calculator: Arc<ConsensusThroughputCalculator>,
837 backpressure_subscriber: BackpressureSubscriber,
838 traffic_controller: Option<Arc<TrafficController>>,
839 congestion_logger: Option<Arc<Mutex<CongestionCommitLogger>>>,
840 consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
841 ) -> Self {
842 assert!(
843 matches!(
844 epoch_store
845 .protocol_config()
846 .per_object_congestion_control_mode(),
847 PerObjectCongestionControlMode::ExecutionTimeEstimate(_)
848 ),
849 "support for congestion control modes other than PerObjectCongestionControlMode::ExecutionTimeEstimate has been removed"
850 );
851
852 assert!(
853 epoch_store
854 .protocol_config()
855 .split_checkpoints_in_consensus_handler(),
856 "support for splitting checkpoints outside of consensus handler has been removed"
857 );
858
859 let mut last_consensus_stats = epoch_store
861 .get_last_consensus_stats()
862 .expect("Should be able to read last consensus index");
863 if !last_consensus_stats.stats.is_initialized() {
865 last_consensus_stats.stats = ConsensusStats::new(committee.size());
866 last_consensus_stats.checkpoint_seq = epoch_store.previous_epoch_last_checkpoint();
867 }
868 let max_tx = epoch_store
869 .protocol_config()
870 .max_transactions_per_checkpoint() as usize;
871 let min_checkpoint_interval_ms = epoch_store
872 .protocol_config()
873 .min_checkpoint_interval_ms_as_option()
874 .unwrap_or_default();
875 let execution_scheduler_sender =
876 ExecutionSchedulerSender::start(settlement_scheduler, epoch_store.clone());
877 let commit_rate_estimate_window_size = epoch_store
878 .protocol_config()
879 .get_consensus_commit_rate_estimation_window_size();
880 let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
881 let checkpoint_height = last_consensus_stats.height;
882 let next_checkpoint_seq = last_consensus_stats.checkpoint_seq + 1;
883 Self {
884 epoch_store,
885 last_consensus_stats,
886 checkpoint_service,
887 cache_reader,
888 committee,
889 metrics,
890 processed_cache: LruCache::new(
891 NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
892 ),
893 consensus_adapter,
894 throughput_calculator,
895 additional_consensus_state: AdditionalConsensusState::new(
896 commit_rate_estimate_window_size,
897 ),
898 backpressure_subscriber,
899 traffic_controller,
900 congestion_logger,
901 consensus_gasless_counter,
902 checkpoint_queue: Mutex::new(CheckpointQueue::new(
903 last_built_timestamp,
904 checkpoint_height,
905 next_checkpoint_seq,
906 max_tx,
907 min_checkpoint_interval_ms,
908 execution_scheduler_sender,
909 )),
910 }
911 }
912
913 pub(crate) fn last_processed_subdag_index(&self) -> u64 {
915 self.last_consensus_stats.index.sub_dag_index
916 }
917
918 pub(crate) fn new_for_testing(
919 epoch_store: Arc<AuthorityPerEpochStore>,
920 checkpoint_service: Arc<C>,
921 execution_scheduler_sender: ExecutionSchedulerSender,
922 consensus_adapter: Arc<ConsensusAdapter>,
923 cache_reader: Arc<dyn ObjectCacheRead>,
924 committee: ConsensusCommittee,
925 metrics: Arc<AuthorityMetrics>,
926 throughput_calculator: Arc<ConsensusThroughputCalculator>,
927 backpressure_subscriber: BackpressureSubscriber,
928 traffic_controller: Option<Arc<TrafficController>>,
929 last_consensus_stats: ExecutionIndicesWithStatsV2,
930 ) -> Self {
931 let commit_rate_estimate_window_size = epoch_store
932 .protocol_config()
933 .get_consensus_commit_rate_estimation_window_size();
934 let max_tx = epoch_store
935 .protocol_config()
936 .max_transactions_per_checkpoint() as usize;
937 let min_checkpoint_interval_ms = epoch_store
938 .protocol_config()
939 .min_checkpoint_interval_ms_as_option()
940 .unwrap_or_default();
941 let last_built_timestamp = last_consensus_stats.last_checkpoint_flush_timestamp;
942 let checkpoint_height = last_consensus_stats.height;
943 Self {
944 epoch_store,
945 last_consensus_stats,
946 checkpoint_service,
947 cache_reader,
948 committee,
949 metrics,
950 processed_cache: LruCache::new(
951 NonZeroUsize::new(randomize_cache_capacity_in_tests(PROCESSED_CACHE_CAP)).unwrap(),
952 ),
953 consensus_adapter,
954 throughput_calculator,
955 additional_consensus_state: AdditionalConsensusState::new(
956 commit_rate_estimate_window_size,
957 ),
958 backpressure_subscriber,
959 traffic_controller,
960 congestion_logger: None,
961 consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
962 checkpoint_queue: Mutex::new(CheckpointQueue::new(
963 last_built_timestamp,
964 checkpoint_height,
965 0,
966 max_tx,
967 min_checkpoint_interval_ms,
968 execution_scheduler_sender,
969 )),
970 }
971 }
972}
973
974#[derive(Default)]
975struct CommitHandlerInput {
976 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
977 capability_notifications: Vec<AuthorityCapabilitiesV2>,
978 execution_time_observations: Vec<ExecutionTimeObservation>,
979 checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
980 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
981 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
982 end_of_publish_transactions: Vec<AuthorityName>,
983 new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
984}
985
986struct CommitHandlerState {
987 dkg_failed: bool,
988 randomness_round: Option<RandomnessRound>,
989 output: ConsensusCommitOutput,
990 indirect_state_observer: Option<IndirectStateObserver>,
991 initial_reconfig_state: ReconfigState,
992 occurrence_counts: HashMap<TransactionDigest, u32>,
994}
995
996impl CommitHandlerState {
997 fn get_notifications(&self) -> Vec<SequencedConsensusTransactionKey> {
998 self.output
999 .get_consensus_messages_processed()
1000 .cloned()
1001 .collect()
1002 }
1003
1004 fn init_randomness<'a, 'epoch>(
1005 &'a mut self,
1006 epoch_store: &'epoch AuthorityPerEpochStore,
1007 commit_info: &'a ConsensusCommitInfo,
1008 ) -> Option<tokio::sync::MutexGuard<'epoch, RandomnessManager>> {
1009 let mut randomness_manager = epoch_store.randomness_manager.get().map(|rm| {
1010 rm.try_lock()
1011 .expect("should only ever be called from the commit handler thread")
1012 });
1013
1014 let mut dkg_failed = false;
1015 let randomness_round = if epoch_store.randomness_state_enabled() {
1016 let randomness_manager = randomness_manager
1017 .as_mut()
1018 .expect("randomness manager should exist if randomness is enabled");
1019 match randomness_manager.dkg_status() {
1020 DkgStatus::Pending => None,
1021 DkgStatus::Failed => {
1022 dkg_failed = true;
1023 None
1024 }
1025 DkgStatus::Successful => {
1026 if self.initial_reconfig_state.should_accept_tx() {
1029 randomness_manager
1030 .reserve_next_randomness(commit_info.timestamp, &mut self.output)
1032 .expect("epoch ended")
1033 } else {
1034 None
1035 }
1036 }
1037 }
1038 } else {
1039 None
1040 };
1041
1042 if randomness_round.is_some() {
1043 assert!(!dkg_failed); }
1045
1046 self.randomness_round = randomness_round;
1047 self.dkg_failed = dkg_failed;
1048
1049 randomness_manager
1050 }
1051}
1052
1053impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
1054 fn handle_prior_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) {
1058 assert!(
1059 self.epoch_store
1060 .protocol_config()
1061 .record_additional_state_digest_in_prologue()
1062 );
1063 let protocol_config = self.epoch_store.protocol_config();
1064 let epoch_start_time = self
1065 .epoch_store
1066 .epoch_start_config()
1067 .epoch_start_timestamp_ms();
1068
1069 self.additional_consensus_state.observe_commit(
1070 protocol_config,
1071 epoch_start_time,
1072 &consensus_commit,
1073 );
1074 }
1075
1076 #[cfg(test)]
1077 pub(crate) async fn handle_consensus_commit_for_test(
1078 &mut self,
1079 consensus_commit: impl ConsensusCommitAPI,
1080 ) {
1081 let transactions = consensus_commit.transactions();
1082 self.handle_consensus_commit(consensus_commit, transactions)
1083 .await;
1084 }
1085
1086 #[instrument(level = "debug", skip_all, fields(epoch = self.epoch_store.epoch(), round = consensus_commit.leader_round()))]
1087 pub(crate) async fn handle_consensus_commit(
1088 &mut self,
1089 consensus_commit: impl ConsensusCommitAPI,
1090 transactions: ParsedConsensusTransactions,
1091 ) {
1092 {
1093 let protocol_config = self.epoch_store.protocol_config();
1094
1095 assert!(protocol_config.ignore_execution_time_observations_after_certs_closed());
1097 assert!(protocol_config.record_time_estimate_processed());
1098 assert!(protocol_config.prepend_prologue_tx_in_consensus_commit_in_checkpoints());
1099 assert!(protocol_config.consensus_checkpoint_signature_key_includes_digest());
1100 assert!(protocol_config.authority_capabilities_v2());
1101 assert!(protocol_config.cancel_for_failed_dkg_early());
1102 }
1103
1104 self.backpressure_subscriber.await_no_backpressure().await;
1109
1110 let epoch = self.epoch_store.epoch();
1111
1112 let _scope = monitored_scope("ConsensusCommitHandler::handle_consensus_commit");
1113
1114 let last_committed_round = self.last_consensus_stats.index.last_committed_round;
1115
1116 self.epoch_store
1117 .consensus_tx_status_cache
1118 .update_last_committed_leader_round(last_committed_round as u32);
1119 self.epoch_store
1120 .tx_reject_reason_cache
1121 .set_last_committed_leader_round(last_committed_round as u32);
1122
1123 let commit_info = self.additional_consensus_state.observe_commit(
1124 self.epoch_store.protocol_config(),
1125 self.epoch_store
1126 .epoch_start_config()
1127 .epoch_start_timestamp_ms(),
1128 &consensus_commit,
1129 );
1130 assert!(commit_info.round > last_committed_round);
1131
1132 let (timestamp, leader_author, commit_sub_dag_index) =
1133 self.gather_commit_metadata(&consensus_commit);
1134
1135 info!(
1136 %consensus_commit,
1137 "Received consensus output {}. Rejected transactions {}",
1138 consensus_commit.commit_ref(),
1139 consensus_commit.rejected_transactions_debug_string(),
1140 );
1141
1142 self.last_consensus_stats.index = ExecutionIndices {
1143 last_committed_round: commit_info.round,
1144 sub_dag_index: commit_sub_dag_index,
1145 transaction_index: 0_u64,
1146 };
1147
1148 self.metrics
1149 .consensus_committed_subdags
1150 .with_label_values(&[&leader_author.to_string()])
1151 .inc();
1152
1153 let mut state = CommitHandlerState {
1154 output: ConsensusCommitOutput::new(commit_info.round),
1155 dkg_failed: false,
1156 randomness_round: None,
1157 indirect_state_observer: Some(IndirectStateObserver::new()),
1158 initial_reconfig_state: self
1159 .epoch_store
1160 .get_reconfig_state_read_lock_guard()
1161 .clone(),
1162 occurrence_counts: HashMap::new(),
1163 };
1164
1165 let FilteredConsensusOutput {
1166 transactions,
1167 owned_object_locks,
1168 dropped_transaction_keys,
1169 } = self.filter_consensus_txns(
1170 state.initial_reconfig_state.clone(),
1171 &commit_info,
1172 transactions,
1173 );
1174 if !owned_object_locks.is_empty() {
1176 state.output.set_owned_object_locks(owned_object_locks);
1177 }
1178
1179 for key in dropped_transaction_keys {
1181 state.output.record_consensus_message_processed(
1182 SequencedConsensusTransactionKey::External(key),
1183 );
1184 }
1185 let transactions = self.deduplicate_consensus_txns(&mut state, &commit_info, transactions);
1186
1187 let mut randomness_manager = state.init_randomness(&self.epoch_store, &commit_info);
1188
1189 let CommitHandlerInput {
1190 user_transactions,
1191 capability_notifications,
1192 execution_time_observations,
1193 checkpoint_signature_messages,
1194 randomness_dkg_messages,
1195 randomness_dkg_confirmations,
1196 end_of_publish_transactions,
1197 new_jwks,
1198 } = self.build_commit_handler_input(transactions);
1199
1200 self.process_gasless_transactions(&commit_info, &user_transactions);
1201 self.process_jwks(&mut state, &commit_info, new_jwks);
1202 self.process_capability_notifications(capability_notifications);
1203 self.process_execution_time_observations(&mut state, execution_time_observations);
1204 self.process_checkpoint_signature_messages(checkpoint_signature_messages);
1205
1206 self.process_dkg_updates(
1207 &mut state,
1208 &commit_info,
1209 randomness_manager.as_deref_mut(),
1210 randomness_dkg_messages,
1211 randomness_dkg_confirmations,
1212 )
1213 .await;
1214
1215 let mut execution_time_estimator = self
1216 .epoch_store
1217 .execution_time_estimator
1218 .try_lock()
1219 .expect("should only ever be called from the commit handler thread");
1220
1221 let authenticator_state_update_transaction =
1222 self.create_authenticator_state_update(last_committed_round, &commit_info);
1223
1224 let (
1225 transactions_to_schedule,
1226 randomness_transactions_to_schedule,
1227 cancelled_txns,
1228 randomness_state_update_transaction,
1229 ) = self.collect_transactions_to_schedule(
1230 &mut state,
1231 &mut execution_time_estimator,
1232 &commit_info,
1233 user_transactions,
1234 );
1235
1236 let (should_accept_tx, lock, final_round) =
1237 self.handle_close_epoch(&mut state, &commit_info, end_of_publish_transactions);
1238
1239 let make_checkpoint = should_accept_tx || final_round;
1240 if !make_checkpoint {
1241 return;
1243 }
1244
1245 if final_round {
1248 self.record_end_of_epoch_execution_time_observations(&mut execution_time_estimator);
1249 }
1250
1251 let consensus_commit_prologue = (!commit_info.skip_consensus_commit_prologue_in_test)
1252 .then_some(Schedulable::ConsensusCommitPrologue(
1253 epoch,
1254 commit_info.round,
1255 commit_info.consensus_commit_ref.index,
1256 ));
1257
1258 let schedulables: Vec<_> = itertools::chain!(
1259 consensus_commit_prologue.into_iter(),
1260 authenticator_state_update_transaction
1261 .into_iter()
1262 .map(Schedulable::Transaction),
1263 transactions_to_schedule
1264 .into_iter()
1265 .map(Schedulable::Transaction),
1266 )
1267 .collect();
1268
1269 let randomness_schedulables: Vec<_> = randomness_state_update_transaction
1270 .into_iter()
1271 .chain(
1272 randomness_transactions_to_schedule
1273 .into_iter()
1274 .map(Schedulable::Transaction),
1275 )
1276 .collect();
1277
1278 let num_schedulables = schedulables.len();
1279 let checkpoint_height = self.create_pending_checkpoints(
1280 &mut state,
1281 &commit_info,
1282 schedulables,
1283 randomness_schedulables,
1284 &cancelled_txns,
1285 final_round,
1286 );
1287
1288 let notifications = state.get_notifications();
1289
1290 let mut stats_to_record = self.last_consensus_stats.clone();
1291 stats_to_record.height = checkpoint_height;
1292 {
1293 let queue = self.checkpoint_queue.lock().unwrap();
1294 stats_to_record.last_checkpoint_flush_timestamp = queue.last_built_timestamp();
1295 stats_to_record.checkpoint_seq = queue.checkpoint_seq();
1296 }
1297
1298 state.output.record_consensus_commit_stats(stats_to_record);
1299
1300 self.record_deferral_deletion(&mut state);
1301
1302 self.epoch_store
1303 .consensus_quarantine
1304 .write()
1305 .push_consensus_output(state.output, &self.epoch_store)
1306 .expect("push_consensus_output should not fail");
1307
1308 debug!(
1309 ?commit_info.round,
1310 "Notifying checkpoint service about new pending checkpoint(s)",
1311 );
1312 self.checkpoint_service
1313 .notify_checkpoint()
1314 .expect("failed to notify checkpoint service");
1315
1316 if let Some(randomness_round) = state.randomness_round {
1317 randomness_manager
1318 .as_ref()
1319 .expect("randomness manager should exist if randomness round is provided")
1320 .generate_randomness(epoch, randomness_round);
1321 }
1322
1323 self.epoch_store.process_notifications(notifications.iter());
1324
1325 self.log_final_round(lock, final_round);
1327
1328 self.throughput_calculator
1330 .add_transactions(timestamp, num_schedulables as u64);
1331
1332 fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
1333 let key = [commit_sub_dag_index, epoch];
1334 if sui_simulator::random::deterministic_probability_once(&key, 0.01) {
1335 sui_simulator::task::kill_current_node(None);
1336 }
1337 });
1338
1339 fail_point!("crash");
1340
1341 self.send_end_of_publish_if_needed().await;
1342 }
1343
1344 fn handle_close_epoch(
1345 &self,
1346 state: &mut CommitHandlerState,
1347 commit_info: &ConsensusCommitInfo,
1348 end_of_publish_transactions: Vec<AuthorityName>,
1349 ) -> (bool, Option<RwLockWriteGuard<'_, ReconfigState>>, bool) {
1350 let timestamp_based_epoch_close = self
1351 .epoch_store
1352 .protocol_config()
1353 .timestamp_based_epoch_close();
1354 let timestamp_triggered = timestamp_based_epoch_close
1355 && commit_info.timestamp >= self.epoch_store.next_reconfiguration_timestamp_ms();
1356 if timestamp_triggered {
1357 let reconfig_guard = self.epoch_store.get_reconfig_state_write_lock_guard();
1359 if reconfig_guard.should_accept_user_certs() {
1360 self.epoch_store.close_user_certs(reconfig_guard);
1361 }
1362 }
1363 let collected_eop_quorum =
1364 self.process_end_of_publish_transactions(state, end_of_publish_transactions);
1365 if timestamp_triggered || collected_eop_quorum {
1366 let (lock, final_round) = self.advance_eop_state_machine(state);
1367 (lock.should_accept_tx(), Some(lock), final_round)
1368 } else {
1369 (true, None, false)
1370 }
1371 }
1372
1373 fn record_end_of_epoch_execution_time_observations(
1374 &self,
1375 estimator: &mut ExecutionTimeEstimator,
1376 ) {
1377 self.epoch_store
1378 .end_of_epoch_execution_time_observations
1379 .set(estimator.take_observations())
1380 .expect("`stored_execution_time_observations` should only be set once at end of epoch");
1381 }
1382
1383 fn record_deferral_deletion(&self, state: &mut CommitHandlerState) {
1384 let mut deferred_transactions = self
1385 .epoch_store
1386 .consensus_output_cache
1387 .deferred_transactions
1388 .lock();
1389 for deleted_deferred_key in state.output.get_deleted_deferred_txn_keys() {
1390 deferred_transactions.remove(&deleted_deferred_key);
1391 }
1392 }
1393
1394 fn log_final_round(&self, lock: Option<RwLockWriteGuard<ReconfigState>>, final_round: bool) {
1395 if final_round {
1396 let epoch = self.epoch_store.epoch();
1397 info!(
1398 ?epoch,
1399 lock=?lock.as_ref(),
1400 final_round=?final_round,
1401 "Notified last checkpoint"
1402 );
1403 self.epoch_store.record_end_of_message_quorum_time_metric();
1404 }
1405 }
1406
1407 #[allow(clippy::type_complexity)]
1408 fn collect_transactions_to_schedule(
1409 &self,
1410 state: &mut CommitHandlerState,
1411 execution_time_estimator: &mut ExecutionTimeEstimator,
1412 commit_info: &ConsensusCommitInfo,
1413 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1414 ) -> (
1415 Vec<VerifiedExecutableTransactionWithAliases>,
1416 Vec<VerifiedExecutableTransactionWithAliases>,
1417 BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1418 Option<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1419 ) {
1420 let _scope = monitored_scope("ConsensusCommitHandler::collect_transactions_to_schedule");
1421 let protocol_config = self.epoch_store.protocol_config();
1422 let epoch = self.epoch_store.epoch();
1423
1424 let (ordered_txns, ordered_randomness_txns, previously_deferred_tx_digests) =
1425 self.merge_and_reorder_transactions(state, commit_info, user_transactions);
1426
1427 let mut shared_object_congestion_tracker =
1428 self.init_congestion_tracker(commit_info, false, &ordered_txns);
1429 let mut shared_object_using_randomness_congestion_tracker =
1430 self.init_congestion_tracker(commit_info, true, &ordered_randomness_txns);
1431
1432 let randomness_state_update_transaction = state
1433 .randomness_round
1434 .map(|round| Schedulable::RandomnessStateUpdate(epoch, round));
1435 debug!(
1436 "Randomness state update transaction: {:?}",
1437 randomness_state_update_transaction
1438 .as_ref()
1439 .map(|t| t.key())
1440 );
1441
1442 let mut transactions_to_schedule = Vec::with_capacity(ordered_txns.len());
1443 let mut randomness_transactions_to_schedule =
1444 Vec::with_capacity(ordered_randomness_txns.len());
1445 let mut deferred_txns = BTreeMap::new();
1446 let mut cancelled_txns = BTreeMap::new();
1447
1448 for transaction in ordered_txns {
1449 self.handle_deferral_and_cancellation(
1450 state,
1451 &mut cancelled_txns,
1452 &mut deferred_txns,
1453 &mut transactions_to_schedule,
1454 protocol_config,
1455 commit_info,
1456 transaction,
1457 &mut shared_object_congestion_tracker,
1458 &previously_deferred_tx_digests,
1459 execution_time_estimator,
1460 );
1461 }
1462
1463 for transaction in ordered_randomness_txns {
1464 if state.dkg_failed {
1465 debug!(
1466 "Canceling randomness-using transaction {:?} because DKG failed",
1467 transaction.tx().digest(),
1468 );
1469 cancelled_txns.insert(
1470 *transaction.tx().digest(),
1471 CancelConsensusCertificateReason::DkgFailed,
1472 );
1473 randomness_transactions_to_schedule.push(transaction);
1474 continue;
1475 }
1476 self.handle_deferral_and_cancellation(
1477 state,
1478 &mut cancelled_txns,
1479 &mut deferred_txns,
1480 &mut randomness_transactions_to_schedule,
1481 protocol_config,
1482 commit_info,
1483 transaction,
1484 &mut shared_object_using_randomness_congestion_tracker,
1485 &previously_deferred_tx_digests,
1486 execution_time_estimator,
1487 );
1488 }
1489
1490 let mut total_deferred_txns = 0;
1491 {
1492 let mut deferred_transactions = self
1493 .epoch_store
1494 .consensus_output_cache
1495 .deferred_transactions
1496 .lock();
1497 for (key, txns) in deferred_txns.into_iter() {
1498 total_deferred_txns += txns.len();
1499 deferred_transactions.insert(key, txns.clone());
1500 state.output.defer_transactions(key, txns);
1501 }
1502 }
1503
1504 self.metrics
1505 .consensus_handler_deferred_transactions
1506 .inc_by(total_deferred_txns as u64);
1507 self.metrics
1508 .consensus_handler_cancelled_transactions
1509 .inc_by(cancelled_txns.len() as u64);
1510 self.metrics
1511 .consensus_handler_max_object_costs
1512 .with_label_values(&["regular_commit"])
1513 .set(shared_object_congestion_tracker.max_cost() as i64);
1514 self.metrics
1515 .consensus_handler_max_object_costs
1516 .with_label_values(&["randomness_commit"])
1517 .set(shared_object_using_randomness_congestion_tracker.max_cost() as i64);
1518
1519 let congestion_commit_data = shared_object_congestion_tracker.finish_commit(commit_info);
1520 let randomness_congestion_commit_data =
1521 shared_object_using_randomness_congestion_tracker.finish_commit(commit_info);
1522
1523 if let Some(logger) = &self.congestion_logger {
1524 let epoch = self.epoch_store.epoch();
1525 let mut logger = logger.lock().unwrap();
1526 logger.write_commit_log(epoch, commit_info, false, &congestion_commit_data);
1527 logger.write_commit_log(epoch, commit_info, true, &randomness_congestion_commit_data);
1528 }
1529
1530 if let Some(tx_object_debts) = self.epoch_store.tx_object_debts.get()
1531 && let Err(e) = tx_object_debts.try_send(
1532 congestion_commit_data
1533 .accumulated_debts
1534 .iter()
1535 .chain(randomness_congestion_commit_data.accumulated_debts.iter())
1536 .map(|(id, _)| *id)
1537 .collect(),
1538 )
1539 {
1540 info!("failed to send updated object debts to ExecutionTimeObserver: {e:?}");
1541 }
1542
1543 state
1544 .output
1545 .set_congestion_control_object_debts(congestion_commit_data.accumulated_debts);
1546 state.output.set_congestion_control_randomness_object_debts(
1547 randomness_congestion_commit_data.accumulated_debts,
1548 );
1549
1550 (
1551 transactions_to_schedule,
1552 randomness_transactions_to_schedule,
1553 cancelled_txns,
1554 randomness_state_update_transaction,
1555 )
1556 }
1557
1558 #[allow(clippy::type_complexity)]
1559 fn create_pending_checkpoints(
1560 &self,
1561 state: &mut CommitHandlerState,
1562 commit_info: &ConsensusCommitInfo,
1563 schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1564 randomness_schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1565 cancelled_txns: &BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1566 final_round: bool,
1567 ) -> CheckpointHeight {
1568 let _scope = monitored_scope("ConsensusCommitHandler::create_pending_checkpoints");
1569 let protocol_config = self.epoch_store.protocol_config();
1570 let epoch = self.epoch_store.epoch();
1571 let accumulators_enabled = self.epoch_store.accumulators_enabled();
1572 let max_transactions_per_checkpoint =
1573 protocol_config.max_transactions_per_checkpoint() as usize;
1574
1575 let should_write_random_checkpoint = state.randomness_round.is_some()
1576 || (state.dkg_failed && !randomness_schedulables.is_empty());
1577
1578 let mut checkpoint_queue = self.checkpoint_queue.lock().unwrap();
1579
1580 let build_chunks =
1581 |schedulables: Vec<Schedulable<VerifiedExecutableTransactionWithAliases>>,
1582 queue: &mut CheckpointQueue|
1583 -> Vec<Chunk<VerifiedExecutableTransactionWithAliases>> {
1584 schedulables
1585 .chunks(max_transactions_per_checkpoint)
1586 .map(|chunk| {
1587 let height = queue.next_height();
1588 let schedulables: Vec<_> = chunk.to_vec();
1589 let settlement = if accumulators_enabled {
1590 Some(Schedulable::AccumulatorSettlement(epoch, height))
1591 } else {
1592 None
1593 };
1594 Chunk {
1595 schedulables,
1596 settlement,
1597 height,
1598 }
1599 })
1600 .collect()
1601 };
1602
1603 let num_schedulables = schedulables.len();
1604 let chunked_schedulables = build_chunks(schedulables, &mut checkpoint_queue);
1605 if chunked_schedulables.len() > 1 {
1606 info!(
1607 "Splitting transactions into {} checkpoint chunks (num_schedulables={}, max_tx={})",
1608 chunked_schedulables.len(),
1609 num_schedulables,
1610 max_transactions_per_checkpoint
1611 );
1612 assert_reachable!("checkpoint split due to transaction limit");
1613 }
1614 let chunked_randomness_schedulables = if should_write_random_checkpoint {
1615 build_chunks(randomness_schedulables, &mut checkpoint_queue)
1616 } else {
1617 vec![]
1618 };
1619
1620 let schedulables_for_version_assignment =
1621 Chunk::all_schedulables_from(&chunked_schedulables);
1622 let randomness_schedulables_for_version_assignment =
1623 Chunk::all_schedulables_from(&chunked_randomness_schedulables);
1624
1625 let assigned_versions = self
1626 .epoch_store
1627 .process_consensus_transaction_shared_object_versions(
1628 self.cache_reader.as_ref(),
1629 schedulables_for_version_assignment,
1630 randomness_schedulables_for_version_assignment,
1631 cancelled_txns,
1632 &mut state.output,
1633 )
1634 .expect("failed to assign shared object versions");
1635
1636 let consensus_commit_prologue =
1637 self.add_consensus_commit_prologue_transaction(state, commit_info, &assigned_versions);
1638
1639 let mut chunked_schedulables = chunked_schedulables;
1640 let mut assigned_versions = assigned_versions;
1641 if let Some(consensus_commit_prologue) = consensus_commit_prologue {
1642 assert!(matches!(
1643 chunked_schedulables[0].schedulables[0],
1644 Schedulable::ConsensusCommitPrologue(..)
1645 ));
1646 assert!(matches!(
1647 assigned_versions.0[0].0,
1648 TransactionKey::ConsensusCommitPrologue(..)
1649 ));
1650 assigned_versions.0[0].0 =
1651 TransactionKey::Digest(*consensus_commit_prologue.tx().digest());
1652 chunked_schedulables[0].schedulables[0] =
1653 Schedulable::Transaction(consensus_commit_prologue);
1654 }
1655
1656 let assigned_versions = assigned_versions.into_map();
1657
1658 self.epoch_store.process_user_signatures(
1659 chunked_schedulables
1660 .iter()
1661 .flat_map(|c| c.all_schedulables())
1662 .chain(
1663 chunked_randomness_schedulables
1664 .iter()
1665 .flat_map(|c| c.all_schedulables()),
1666 ),
1667 );
1668
1669 let commit_height = chunked_randomness_schedulables
1670 .last()
1671 .or(chunked_schedulables.last())
1672 .map(|c| c.height)
1673 .expect("at least one checkpoint root must be created per commit");
1674
1675 let mut pending_checkpoints = Vec::new();
1676 for chunk in chunked_schedulables {
1677 pending_checkpoints.extend(checkpoint_queue.push_chunk(
1678 chunk.into(),
1679 &assigned_versions,
1680 commit_info.timestamp,
1681 commit_info.consensus_commit_ref,
1682 commit_info.rejected_transactions_digest,
1683 ));
1684 }
1685
1686 if protocol_config.merge_randomness_into_checkpoint() {
1687 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, final_round));
1692
1693 if should_write_random_checkpoint {
1694 for chunk in chunked_randomness_schedulables {
1695 pending_checkpoints.extend(checkpoint_queue.push_chunk(
1696 chunk.into(),
1697 &assigned_versions,
1698 commit_info.timestamp,
1699 commit_info.consensus_commit_ref,
1700 commit_info.rejected_transactions_digest,
1701 ));
1702 }
1703 if final_round {
1704 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1705 }
1706 }
1707 } else {
1708 let force = final_round || should_write_random_checkpoint;
1709 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, force));
1710
1711 if should_write_random_checkpoint {
1712 for chunk in chunked_randomness_schedulables {
1713 pending_checkpoints.extend(checkpoint_queue.push_chunk(
1714 chunk.into(),
1715 &assigned_versions,
1716 commit_info.timestamp,
1717 commit_info.consensus_commit_ref,
1718 commit_info.rejected_transactions_digest,
1719 ));
1720 }
1721 pending_checkpoints.extend(checkpoint_queue.flush(commit_info.timestamp, true));
1722 }
1723 }
1724
1725 if final_round && let Some(last) = pending_checkpoints.last_mut() {
1726 last.details.last_of_epoch = true;
1727 }
1728
1729 let queue_drained = checkpoint_queue.is_empty();
1730 drop(checkpoint_queue);
1731
1732 for pending_checkpoint in pending_checkpoints {
1733 debug!(
1734 checkpoint_height = pending_checkpoint.details.checkpoint_height,
1735 roots_count = pending_checkpoint.num_roots(),
1736 "Writing pending checkpoint",
1737 );
1738 self.epoch_store
1739 .write_pending_checkpoint(&mut state.output, &pending_checkpoint)
1740 .expect("failed to write pending checkpoint");
1741 }
1742
1743 state.output.set_checkpoint_queue_drained(queue_drained);
1744
1745 commit_height
1746 }
1747
1748 fn add_consensus_commit_prologue_transaction<'a>(
1752 &'a self,
1753 state: &'a mut CommitHandlerState,
1754 commit_info: &'a ConsensusCommitInfo,
1755 assigned_versions: &AssignedTxAndVersions,
1756 ) -> Option<VerifiedExecutableTransactionWithAliases> {
1757 {
1758 if commit_info.skip_consensus_commit_prologue_in_test {
1759 return None;
1760 }
1761 }
1762
1763 let mut cancelled_txn_version_assignment = Vec::new();
1764
1765 let protocol_config = self.epoch_store.protocol_config();
1766
1767 for (txn_key, assigned_versions) in assigned_versions.0.iter() {
1768 let Some(d) = txn_key.as_digest() else {
1769 continue;
1770 };
1771
1772 if !protocol_config.include_cancelled_randomness_txns_in_prologue()
1773 && assigned_versions
1774 .shared_object_versions
1775 .iter()
1776 .any(|((id, _), _)| *id == SUI_RANDOMNESS_STATE_OBJECT_ID)
1777 {
1778 continue;
1779 }
1780
1781 if assigned_versions
1782 .shared_object_versions
1783 .iter()
1784 .any(|(_, version)| version.is_cancelled())
1785 {
1786 assert_reachable!("cancelled transactions");
1787 cancelled_txn_version_assignment
1788 .push((*d, assigned_versions.shared_object_versions.clone()));
1789 }
1790 }
1791
1792 fail_point_arg!(
1793 "additional_cancelled_txns_for_tests",
1794 |additional_cancelled_txns: Vec<(
1795 TransactionDigest,
1796 Vec<(ConsensusObjectSequenceKey, SequenceNumber)>
1797 )>| {
1798 cancelled_txn_version_assignment.extend(additional_cancelled_txns);
1799 }
1800 );
1801
1802 let transaction = commit_info.create_consensus_commit_prologue_transaction(
1803 self.epoch_store.epoch(),
1804 self.epoch_store.protocol_config(),
1805 cancelled_txn_version_assignment,
1806 commit_info,
1807 state.indirect_state_observer.take().unwrap(),
1808 );
1809 Some(VerifiedExecutableTransactionWithAliases::no_aliases(
1810 transaction,
1811 ))
1812 }
1813
1814 fn handle_deferral_and_cancellation(
1815 &self,
1816 state: &mut CommitHandlerState,
1817 cancelled_txns: &mut BTreeMap<TransactionDigest, CancelConsensusCertificateReason>,
1818 deferred_txns: &mut BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>,
1819 scheduled_txns: &mut Vec<VerifiedExecutableTransactionWithAliases>,
1820 protocol_config: &ProtocolConfig,
1821 commit_info: &ConsensusCommitInfo,
1822 transaction: VerifiedExecutableTransactionWithAliases,
1823 shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
1824 previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
1825 execution_time_estimator: &ExecutionTimeEstimator,
1826 ) {
1827 if protocol_config.defer_unpaid_amplification() {
1831 let occurrence_count = state
1832 .occurrence_counts
1833 .get(transaction.tx().digest())
1834 .copied()
1835 .unwrap_or(0);
1836
1837 let rgp = self.epoch_store.reference_gas_price();
1838 let gas_price = transaction.tx().transaction_data().gas_price();
1839 let allowed_count = (gas_price / rgp.max(1)) + 1;
1840
1841 if occurrence_count as u64 > allowed_count {
1842 self.metrics
1843 .consensus_handler_unpaid_amplification_deferrals
1844 .inc();
1845
1846 let deferred_from_round = previously_deferred_tx_digests
1847 .get(transaction.tx().digest())
1848 .map(|k| k.deferred_from_round())
1849 .unwrap_or(commit_info.round);
1850
1851 let deferral_key = DeferralKey::new_for_consensus_round(
1852 commit_info.round + 1,
1853 deferred_from_round,
1854 );
1855
1856 if transaction_deferral_within_limit(
1857 &deferral_key,
1858 protocol_config.max_deferral_rounds_for_congestion_control(),
1859 ) {
1860 assert_reachable!("unpaid amplification deferral");
1861 debug!(
1862 "Deferring transaction {:?} due to unpaid amplification (count={}, allowed={})",
1863 transaction.tx().digest(),
1864 occurrence_count,
1865 allowed_count
1866 );
1867 deferred_txns
1868 .entry(deferral_key)
1869 .or_default()
1870 .push(transaction);
1871 return;
1872 }
1873 }
1874 }
1875
1876 let tx_cost = shared_object_congestion_tracker.get_tx_cost(
1877 execution_time_estimator,
1878 transaction.tx(),
1879 state.indirect_state_observer.as_mut().unwrap(),
1880 );
1881
1882 let deferral_info = self.epoch_store.should_defer(
1883 transaction.tx(),
1884 commit_info,
1885 state.dkg_failed,
1886 state.randomness_round.is_some(),
1887 previously_deferred_tx_digests,
1888 shared_object_congestion_tracker,
1889 );
1890
1891 if let Some((deferral_key, deferral_reason)) = deferral_info {
1892 debug!(
1893 "Deferring consensus certificate for transaction {:?} until {:?}",
1894 transaction.tx().digest(),
1895 deferral_key
1896 );
1897
1898 match deferral_reason {
1899 DeferralReason::RandomnessNotReady => {
1900 deferred_txns
1901 .entry(deferral_key)
1902 .or_default()
1903 .push(transaction);
1904 }
1905 DeferralReason::SharedObjectCongestion(congested_objects) => {
1906 self.metrics.consensus_handler_congested_transactions.inc();
1907 if transaction_deferral_within_limit(
1908 &deferral_key,
1909 protocol_config.max_deferral_rounds_for_congestion_control(),
1910 ) {
1911 deferred_txns
1912 .entry(deferral_key)
1913 .or_default()
1914 .push(transaction);
1915 } else {
1916 assert_sometimes!(
1917 transaction.tx().data().transaction_data().uses_randomness(),
1918 "cancelled randomness-using transaction"
1919 );
1920 assert_sometimes!(
1921 !transaction.tx().data().transaction_data().uses_randomness(),
1922 "cancelled non-randomness-using transaction"
1923 );
1924
1925 debug!(
1927 "Cancelling consensus transaction {:?} with deferral key {:?} due to congestion on objects {:?}",
1928 transaction.tx().digest(),
1929 deferral_key,
1930 congested_objects
1931 );
1932 cancelled_txns.insert(
1933 *transaction.tx().digest(),
1934 CancelConsensusCertificateReason::CongestionOnObjects(
1935 congested_objects,
1936 ),
1937 );
1938 scheduled_txns.push(transaction);
1939 }
1940 }
1941 }
1942 } else {
1943 shared_object_congestion_tracker.bump_object_execution_cost(tx_cost, transaction.tx());
1945 scheduled_txns.push(transaction);
1946 }
1947 }
1948
1949 fn merge_and_reorder_transactions(
1950 &self,
1951 state: &mut CommitHandlerState,
1952 commit_info: &ConsensusCommitInfo,
1953 user_transactions: Vec<VerifiedExecutableTransactionWithAliases>,
1954 ) -> (
1955 Vec<VerifiedExecutableTransactionWithAliases>,
1956 Vec<VerifiedExecutableTransactionWithAliases>,
1957 HashMap<TransactionDigest, DeferralKey>,
1958 ) {
1959 let protocol_config = self.epoch_store.protocol_config();
1960
1961 let (mut txns, mut randomness_txns, previously_deferred_tx_digests) =
1962 self.load_deferred_transactions(state, commit_info);
1963
1964 txns.reserve(user_transactions.len());
1965 randomness_txns.reserve(user_transactions.len());
1966
1967 let mut txns: Vec<_> = txns
1970 .into_iter()
1971 .filter_map(|tx| {
1972 if tx.tx().transaction_data().uses_randomness() {
1973 randomness_txns.push(tx);
1974 None
1975 } else {
1976 Some(tx)
1977 }
1978 })
1979 .collect();
1980
1981 for txn in user_transactions {
1982 if txn.tx().transaction_data().uses_randomness() {
1983 randomness_txns.push(txn);
1984 } else {
1985 txns.push(txn);
1986 }
1987 }
1988
1989 PostConsensusTxReorder::reorder(
1990 &mut txns,
1991 protocol_config.consensus_transaction_ordering(),
1992 );
1993 PostConsensusTxReorder::reorder(
1994 &mut randomness_txns,
1995 protocol_config.consensus_transaction_ordering(),
1996 );
1997
1998 (txns, randomness_txns, previously_deferred_tx_digests)
1999 }
2000
2001 fn load_deferred_transactions(
2002 &self,
2003 state: &mut CommitHandlerState,
2004 commit_info: &ConsensusCommitInfo,
2005 ) -> (
2006 Vec<VerifiedExecutableTransactionWithAliases>,
2007 Vec<VerifiedExecutableTransactionWithAliases>,
2008 HashMap<TransactionDigest, DeferralKey>,
2009 ) {
2010 let mut previously_deferred_tx_digests = HashMap::new();
2011
2012 let deferred_txs: Vec<_> = self
2013 .epoch_store
2014 .load_deferred_transactions_for_up_to_consensus_round_v2(
2015 &mut state.output,
2016 commit_info.round,
2017 )
2018 .expect("db error")
2019 .into_iter()
2020 .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2021 .map(|(key, tx)| {
2022 previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2023 tx
2024 })
2025 .collect();
2026 trace!(
2027 "loading deferred transactions: {:?}",
2028 deferred_txs.iter().map(|tx| tx.tx().digest())
2029 );
2030
2031 let deferred_randomness_txs = if state.dkg_failed || state.randomness_round.is_some() {
2032 let txns: Vec<_> = self
2033 .epoch_store
2034 .load_deferred_transactions_for_randomness_v2(&mut state.output)
2035 .expect("db error")
2036 .into_iter()
2037 .flat_map(|(key, txns)| txns.into_iter().map(move |tx| (key, tx)))
2038 .map(|(key, tx)| {
2039 previously_deferred_tx_digests.insert(*tx.tx().digest(), key);
2040 tx
2041 })
2042 .collect();
2043 trace!(
2044 "loading deferred randomness transactions: {:?}",
2045 txns.iter().map(|tx| tx.tx().digest())
2046 );
2047 txns
2048 } else {
2049 vec![]
2050 };
2051
2052 (
2053 deferred_txs,
2054 deferred_randomness_txs,
2055 previously_deferred_tx_digests,
2056 )
2057 }
2058
2059 fn init_congestion_tracker(
2060 &self,
2061 commit_info: &ConsensusCommitInfo,
2062 for_randomness: bool,
2063 txns: &[VerifiedExecutableTransactionWithAliases],
2064 ) -> SharedObjectCongestionTracker {
2065 #[allow(unused_mut)]
2066 let mut ret = SharedObjectCongestionTracker::from_protocol_config(
2067 self.epoch_store
2068 .consensus_quarantine
2069 .read()
2070 .load_initial_object_debts(
2071 &self.epoch_store,
2072 commit_info.round,
2073 for_randomness,
2074 txns,
2075 )
2076 .expect("db error"),
2077 self.epoch_store.protocol_config(),
2078 for_randomness,
2079 self.congestion_logger.is_some(),
2080 );
2081
2082 fail_point_arg!(
2083 "initial_congestion_tracker",
2084 |tracker: SharedObjectCongestionTracker| {
2085 info!(
2086 "Initialize shared_object_congestion_tracker to {:?}",
2087 tracker
2088 );
2089 ret = tracker;
2090 }
2091 );
2092
2093 ret
2094 }
2095
2096 fn process_gasless_transactions(
2097 &self,
2098 commit_info: &ConsensusCommitInfo,
2099 user_transactions: &[VerifiedExecutableTransactionWithAliases],
2100 ) {
2101 let gasless_count = user_transactions
2102 .iter()
2103 .filter(|txn| txn.tx().transaction_data().is_gasless_transaction())
2104 .count() as u64;
2105 self.consensus_gasless_counter
2106 .record_commit(commit_info.timestamp, gasless_count);
2107 }
2108
2109 fn process_jwks(
2110 &self,
2111 state: &mut CommitHandlerState,
2112 commit_info: &ConsensusCommitInfo,
2113 new_jwks: Vec<(AuthorityName, JwkId, JWK)>,
2114 ) {
2115 for (authority_name, jwk_id, jwk) in new_jwks {
2116 self.epoch_store.record_jwk_vote(
2117 &mut state.output,
2118 commit_info.round,
2119 authority_name,
2120 &jwk_id,
2121 &jwk,
2122 );
2123 }
2124 }
2125
2126 fn process_capability_notifications(
2127 &self,
2128 capability_notifications: Vec<AuthorityCapabilitiesV2>,
2129 ) {
2130 for capabilities in capability_notifications {
2131 self.epoch_store
2132 .record_capabilities_v2(&capabilities)
2133 .expect("db error");
2134 }
2135 }
2136
2137 fn process_execution_time_observations(
2138 &self,
2139 state: &mut CommitHandlerState,
2140 execution_time_observations: Vec<ExecutionTimeObservation>,
2141 ) {
2142 let _scope = monitored_scope("ConsensusCommitHandler::process_execution_time_observations");
2143 let mut execution_time_estimator = self
2144 .epoch_store
2145 .execution_time_estimator
2146 .try_lock()
2147 .expect("should only ever be called from the commit handler thread");
2148
2149 for ExecutionTimeObservation {
2150 authority,
2151 generation,
2152 estimates,
2153 } in execution_time_observations
2154 {
2155 let authority_index = self
2156 .epoch_store
2157 .committee()
2158 .authority_index(&authority)
2159 .unwrap();
2160 execution_time_estimator.process_observations_from_consensus(
2161 authority_index,
2162 Some(generation),
2163 &estimates,
2164 );
2165 state
2166 .output
2167 .insert_execution_time_observation(authority_index, generation, estimates);
2168 }
2169 }
2170
2171 fn process_checkpoint_signature_messages(
2172 &self,
2173 checkpoint_signature_messages: Vec<CheckpointSignatureMessage>,
2174 ) {
2175 for checkpoint_signature_message in checkpoint_signature_messages {
2176 self.checkpoint_service
2177 .notify_checkpoint_signature(&checkpoint_signature_message)
2178 .expect("db error");
2179 }
2180 }
2181
2182 async fn process_dkg_updates(
2183 &self,
2184 state: &mut CommitHandlerState,
2185 commit_info: &ConsensusCommitInfo,
2186 randomness_manager: Option<&mut RandomnessManager>,
2187 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2188 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2189 ) {
2190 if !self.epoch_store.randomness_state_enabled() {
2191 let num_dkg_messages = randomness_dkg_messages.len();
2192 let num_dkg_confirmations = randomness_dkg_confirmations.len();
2193 if num_dkg_messages + num_dkg_confirmations > 0 {
2194 debug_fatal!(
2195 "received {} RandomnessDkgMessage and {} RandomnessDkgConfirmation messages when randomness is not enabled",
2196 num_dkg_messages,
2197 num_dkg_confirmations
2198 );
2199 }
2200 return;
2201 }
2202
2203 let randomness_manager =
2204 randomness_manager.expect("randomness manager should exist if randomness is enabled");
2205
2206 let randomness_dkg_updates =
2207 self.process_randomness_dkg_messages(randomness_manager, randomness_dkg_messages);
2208
2209 let randomness_dkg_confirmation_updates = self.process_randomness_dkg_confirmations(
2210 state,
2211 randomness_manager,
2212 randomness_dkg_confirmations,
2213 );
2214
2215 let always_advance_dkg_to_resolution = (self
2219 .epoch_store
2220 .protocol_config()
2221 .always_advance_dkg_to_resolution()
2222 || (self.epoch_store.get_chain() == Chain::Mainnet
2223 && self.epoch_store.epoch() >= 1143))
2224 && randomness_manager.dkg_status() == DkgStatus::Pending;
2225
2226 if randomness_dkg_updates
2227 || randomness_dkg_confirmation_updates
2228 || always_advance_dkg_to_resolution
2229 {
2230 randomness_manager
2231 .advance_dkg(&mut state.output, commit_info.round)
2232 .await
2233 .expect("epoch ended");
2234 }
2235 }
2236
2237 fn process_randomness_dkg_messages(
2238 &self,
2239 randomness_manager: &mut RandomnessManager,
2240 randomness_dkg_messages: Vec<(AuthorityName, Vec<u8>)>,
2241 ) -> bool {
2242 if randomness_dkg_messages.is_empty() {
2243 return false;
2244 }
2245
2246 let mut randomness_state_updated = false;
2247 for (authority, bytes) in randomness_dkg_messages {
2248 match bcs::from_bytes(&bytes) {
2249 Ok(message) => {
2250 randomness_manager
2251 .add_message(&authority, message)
2252 .expect("epoch ended");
2254 randomness_state_updated = true;
2255 }
2256
2257 Err(e) => {
2258 warn!(
2259 "Failed to deserialize RandomnessDkgMessage from {:?}: {e:?}",
2260 authority.concise(),
2261 );
2262 }
2263 }
2264 }
2265
2266 randomness_state_updated
2267 }
2268
2269 fn process_randomness_dkg_confirmations(
2270 &self,
2271 state: &mut CommitHandlerState,
2272 randomness_manager: &mut RandomnessManager,
2273 randomness_dkg_confirmations: Vec<(AuthorityName, Vec<u8>)>,
2274 ) -> bool {
2275 if randomness_dkg_confirmations.is_empty() {
2276 return false;
2277 }
2278
2279 let mut randomness_state_updated = false;
2280 for (authority, bytes) in randomness_dkg_confirmations {
2281 match bcs::from_bytes(&bytes) {
2282 Ok(message) => {
2283 randomness_manager
2284 .add_confirmation(&mut state.output, &authority, message)
2285 .expect("epoch ended");
2287 randomness_state_updated = true;
2288 }
2289 Err(e) => {
2290 warn!(
2291 "Failed to deserialize RandomnessDkgConfirmation from {:?}: {e:?}",
2292 authority.concise(),
2293 );
2294 }
2295 }
2296 }
2297
2298 randomness_state_updated
2299 }
2300
2301 fn process_end_of_publish_transactions(
2303 &self,
2304 state: &mut CommitHandlerState,
2305 end_of_publish_transactions: Vec<AuthorityName>,
2306 ) -> bool {
2307 let mut eop_aggregator = self.epoch_store.end_of_publish.try_lock().expect(
2308 "No contention on end_of_publish as it is only accessed from consensus handler",
2309 );
2310
2311 if eop_aggregator.has_quorum() {
2312 return true;
2313 }
2314
2315 if end_of_publish_transactions.is_empty() {
2316 return false;
2317 }
2318
2319 for authority in end_of_publish_transactions {
2320 info!("Received EndOfPublish from {:?}", authority.concise());
2321
2322 state.output.insert_end_of_publish(authority);
2325 if eop_aggregator
2326 .insert_generic(authority, ())
2327 .is_quorum_reached()
2328 {
2329 debug!(
2330 "Collected enough end_of_publish messages with last message from validator {:?}",
2331 authority.concise(),
2332 );
2333 return true;
2334 }
2335 }
2336
2337 false
2338 }
2339
2340 fn advance_eop_state_machine(
2343 &self,
2344 state: &mut CommitHandlerState,
2345 ) -> (
2346 RwLockWriteGuard<'_, ReconfigState>,
2347 bool, ) {
2349 let mut reconfig_state = self.epoch_store.get_reconfig_state_write_lock_guard();
2350 let start_state_is_reject_all_tx = reconfig_state.is_reject_all_tx();
2351
2352 reconfig_state.close_all_certs();
2353
2354 let commit_has_deferred_txns = state.output.has_deferred_transactions();
2355 let previous_commits_have_deferred_txns = !self.epoch_store.deferred_transactions_empty();
2356
2357 if !commit_has_deferred_txns && !previous_commits_have_deferred_txns {
2358 if !start_state_is_reject_all_tx {
2359 info!("Transitioning to RejectAllTx");
2360 }
2361 reconfig_state.close_all_tx();
2362 } else {
2363 debug!(
2364 "Blocking end of epoch on deferred transactions, from previous commits?={}, from this commit?={}",
2365 previous_commits_have_deferred_txns, commit_has_deferred_txns,
2366 );
2367 }
2368
2369 state.output.store_reconfig_state(reconfig_state.clone());
2370
2371 if !start_state_is_reject_all_tx && reconfig_state.is_reject_all_tx() {
2372 (reconfig_state, true)
2373 } else {
2374 (reconfig_state, false)
2375 }
2376 }
2377
2378 fn gather_commit_metadata(
2379 &self,
2380 consensus_commit: &impl ConsensusCommitAPI,
2381 ) -> (u64, AuthorityIndex, u64) {
2382 let timestamp = consensus_commit.commit_timestamp_ms();
2383 let leader_author = consensus_commit.leader_author_index();
2384 let commit_sub_dag_index = consensus_commit.commit_sub_dag_index();
2385
2386 let system_time_ms = SystemTime::now()
2387 .duration_since(UNIX_EPOCH)
2388 .unwrap()
2389 .as_millis() as i64;
2390
2391 let consensus_timestamp_bias_ms = system_time_ms - (timestamp as i64);
2392 let consensus_timestamp_bias_seconds = consensus_timestamp_bias_ms as f64 / 1000.0;
2393 self.metrics
2394 .consensus_timestamp_bias
2395 .observe(consensus_timestamp_bias_seconds);
2396
2397 let epoch_start = self
2398 .epoch_store
2399 .epoch_start_config()
2400 .epoch_start_timestamp_ms();
2401 let timestamp = if timestamp < epoch_start {
2402 error!(
2403 "Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}"
2404 );
2405 epoch_start
2406 } else {
2407 timestamp
2408 };
2409
2410 (timestamp, leader_author, commit_sub_dag_index)
2411 }
2412
2413 fn create_authenticator_state_update(
2414 &self,
2415 last_committed_round: u64,
2416 commit_info: &ConsensusCommitInfo,
2417 ) -> Option<VerifiedExecutableTransactionWithAliases> {
2418 let new_jwks = self
2426 .epoch_store
2427 .get_new_jwks(last_committed_round)
2428 .expect("Unrecoverable error in consensus handler");
2429
2430 if !new_jwks.is_empty() {
2431 let authenticator_state_update_transaction = authenticator_state_update_transaction(
2432 &self.epoch_store,
2433 commit_info.round,
2434 new_jwks,
2435 );
2436 debug!(
2437 "adding AuthenticatorStateUpdate({:?}) tx: {:?}",
2438 authenticator_state_update_transaction.digest(),
2439 authenticator_state_update_transaction,
2440 );
2441
2442 Some(VerifiedExecutableTransactionWithAliases::no_aliases(
2443 authenticator_state_update_transaction,
2444 ))
2445 } else {
2446 None
2447 }
2448 }
2449
2450 #[instrument(level = "trace", skip_all)]
2453 fn filter_consensus_txns(
2454 &mut self,
2455 initial_reconfig_state: ReconfigState,
2456 commit_info: &ConsensusCommitInfo,
2457 block_transactions: ParsedConsensusTransactions,
2458 ) -> FilteredConsensusOutput {
2459 let _scope = monitored_scope("ConsensusCommitHandler::filter_consensus_txns");
2460 let mut transactions = Vec::new();
2461 let mut owned_object_locks = HashMap::new();
2462 let mut dropped_transaction_keys = Vec::new();
2463 let mut status_updates: Vec<(ConsensusPosition, ConsensusTxStatus)> = Vec::new();
2467 let epoch = self.epoch_store.epoch();
2468 let mut num_finalized_user_transactions = vec![0; self.committee.size()];
2469 let mut num_rejected_user_transactions = vec![0; self.committee.size()];
2470
2471 let existing_locks = {
2477 let mut prefetch_refs: Vec<ObjectRef> = Vec::new();
2478 for (_block, parsed_transactions) in &block_transactions {
2479 for parsed in parsed_transactions {
2480 if let ConsensusTransactionKind::UserTransactionV2(tx_with_claims) =
2481 &parsed.transaction.kind
2482 && let Some(refs) = owned_object_refs_to_lock(tx_with_claims)
2483 {
2484 prefetch_refs.extend(refs);
2485 }
2486 }
2487 }
2488 prefetch_refs.sort();
2489 prefetch_refs.dedup();
2490 self.epoch_store
2493 .get_owned_object_locks_map(&prefetch_refs)
2494 .unwrap_or_default()
2495 };
2496
2497 for (block, parsed_transactions) in block_transactions {
2498 let author = block.author.value();
2499 let author_hostname = self.committee.authority(block.author).hostname.as_str();
2500 self.last_consensus_stats.stats.inc_num_messages(author);
2502
2503 status_updates.push((
2505 ConsensusPosition::ping(epoch, block),
2506 ConsensusTxStatus::Finalized,
2507 ));
2508
2509 for (tx_index, parsed) in parsed_transactions.into_iter().enumerate() {
2510 let position = ConsensusPosition {
2511 epoch,
2512 block,
2513 index: tx_index as TransactionIndex,
2514 };
2515
2516 if let Some(tx) = parsed.transaction.kind.as_user_transaction() {
2519 let digest = tx.digest();
2520 if let Some((spam_weight, submitter_client_addrs)) = self
2521 .epoch_store
2522 .submitted_transaction_cache
2523 .increment_submission_count(digest)
2524 {
2525 if let Some(ref traffic_controller) = self.traffic_controller {
2526 debug!(
2527 "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} applied to {} client addresses",
2528 submitter_client_addrs.len()
2529 );
2530
2531 for addr in submitter_client_addrs {
2533 traffic_controller.tally(
2534 TrafficTally::new(Some(addr), None, None, spam_weight.clone())
2535 .with_method(
2536 "consensus_submission_limit_exceeded".to_string(),
2537 ),
2538 );
2539 }
2540 } else {
2541 warn!(
2542 "Transaction {digest} exceeded submission limits, spam_weight: {spam_weight:?} for {} client addresses (traffic controller not configured)",
2543 submitter_client_addrs.len()
2544 );
2545 }
2546 }
2547 }
2548
2549 let kind = classify(&parsed.transaction);
2552 let outcome = if parsed.rejected {
2553 "rejected"
2554 } else {
2555 "accepted"
2556 };
2557 self.metrics
2558 .consensus_handler_processed
2559 .with_label_values(&[kind, outcome])
2560 .inc();
2561 self.metrics
2562 .consensus_handler_transaction_sizes
2563 .with_label_values(&[kind, outcome])
2564 .observe(parsed.serialized_len as f64);
2565 if parsed.transaction.is_user_transaction() {
2569 self.metrics
2570 .consensus_handler_processed_user_transactions
2571 .with_label_values(&[outcome, author_hostname])
2572 .inc();
2573 }
2574
2575 if parsed.rejected {
2576 if parsed.transaction.is_user_transaction() {
2577 status_updates.push((position, ConsensusTxStatus::Rejected));
2578 num_rejected_user_transactions[author] += 1;
2579 }
2580 continue;
2582 }
2583
2584 if parsed.transaction.is_user_transaction() {
2585 self.last_consensus_stats
2586 .stats
2587 .inc_num_user_transactions(author);
2588 }
2589
2590 if !initial_reconfig_state.should_accept_consensus_certs() {
2591 match &parsed.transaction.kind {
2594 ConsensusTransactionKind::UserTransactionV2(_)
2595 | ConsensusTransactionKind::UserTransaction(_)
2597 | ConsensusTransactionKind::CertifiedTransaction(_)
2598 | ConsensusTransactionKind::CapabilityNotification(_)
2599 | ConsensusTransactionKind::CapabilityNotificationV2(_)
2600 | ConsensusTransactionKind::EndOfPublish(_)
2601 | ConsensusTransactionKind::ExecutionTimeObservation(_)
2603 | ConsensusTransactionKind::NewJWKFetched(_, _, _) => {
2604 debug!(
2605 "Ignoring consensus transaction {:?} because of end of epoch",
2606 parsed.transaction.key()
2607 );
2608 continue;
2609 }
2610
2611 ConsensusTransactionKind::CheckpointSignature(_)
2613 | ConsensusTransactionKind::CheckpointSignatureV2(_)
2614 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2615 | ConsensusTransactionKind::RandomnessDkgMessage(_, _)
2616 | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => ()
2617 }
2618 }
2619
2620 if !initial_reconfig_state.should_accept_tx() {
2621 match &parsed.transaction.kind {
2622 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
2623 | ConsensusTransactionKind::RandomnessDkgMessage(_, _) => continue,
2624 _ => {}
2625 }
2626 }
2627
2628 match &parsed.transaction.kind {
2630 ConsensusTransactionKind::CapabilityNotification(_)
2631 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2632 | ConsensusTransactionKind::CheckpointSignature(_) => {
2633 debug_fatal!(
2634 "BUG: saw deprecated tx {:?}for commit round {}",
2635 parsed.transaction.key(),
2636 commit_info.round
2637 );
2638 continue;
2639 }
2640 _ => {}
2641 }
2642
2643 if parsed.transaction.is_user_transaction() {
2644 let author_name = self
2645 .epoch_store
2646 .committee()
2647 .authority_by_index(author as u32)
2648 .unwrap();
2649 if self
2650 .epoch_store
2651 .has_received_end_of_publish_from(author_name)
2652 {
2653 warn!(
2657 "Ignoring consensus transaction {:?} from authority {:?}, which already sent EndOfPublish message to consensus",
2658 author_name.concise(),
2659 parsed.transaction.key(),
2660 );
2661 continue;
2662 }
2663 }
2664
2665 if let ConsensusTransactionKind::UserTransactionV2(tx_with_claims) =
2671 &parsed.transaction.kind
2672 {
2673 let tx = tx_with_claims.tx();
2674 let Some(owned_object_refs) = owned_object_refs_to_lock(tx_with_claims) else {
2675 debug_fatal!("Invalid input objects for transaction {}", tx.digest());
2676 continue;
2677 };
2678
2679 match self
2680 .epoch_store
2681 .try_acquire_owned_object_locks_post_consensus(
2682 &owned_object_refs,
2683 *tx.digest(),
2684 &owned_object_locks,
2685 &existing_locks,
2686 ) {
2687 Ok(new_locks) => {
2688 owned_object_locks.extend(new_locks.into_iter());
2689 status_updates.push((position, ConsensusTxStatus::Finalized));
2691 num_finalized_user_transactions[author] += 1;
2692 }
2693 Err(e) => {
2694 debug!("Dropping transaction {}: {}", tx.digest(), e);
2695 self.metrics.consensus_handler_dropped_transactions.inc();
2696 status_updates.push((position, ConsensusTxStatus::Dropped));
2697 self.epoch_store.set_rejection_vote_reason(position, &e);
2698 dropped_transaction_keys.push(parsed.transaction.key());
2699 continue;
2700 }
2701 }
2702 }
2703
2704 let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);
2705 transactions.push((transaction, author as u32));
2706 }
2707 }
2708
2709 self.epoch_store.set_consensus_tx_statuses(status_updates);
2713
2714 for (i, authority) in self.committee.authorities() {
2715 let hostname = &authority.hostname;
2716 self.metrics
2717 .consensus_committed_messages
2718 .with_label_values(&[hostname])
2719 .set(self.last_consensus_stats.stats.get_num_messages(i.value()) as i64);
2720 self.metrics
2721 .consensus_committed_user_transactions
2722 .with_label_values(&[hostname])
2723 .set(
2724 self.last_consensus_stats
2725 .stats
2726 .get_num_user_transactions(i.value()) as i64,
2727 );
2728 self.metrics
2729 .consensus_finalized_user_transactions
2730 .with_label_values(&[hostname])
2731 .add(num_finalized_user_transactions[i.value()] as i64);
2732 self.metrics
2733 .consensus_rejected_user_transactions
2734 .with_label_values(&[hostname])
2735 .add(num_rejected_user_transactions[i.value()] as i64);
2736 }
2737
2738 FilteredConsensusOutput {
2739 transactions,
2740 owned_object_locks,
2741 dropped_transaction_keys,
2742 }
2743 }
2744
2745 fn deduplicate_consensus_txns(
2746 &mut self,
2747 state: &mut CommitHandlerState,
2748 commit_info: &ConsensusCommitInfo,
2749 transactions: Vec<(SequencedConsensusTransactionKind, u32)>,
2750 ) -> Vec<VerifiedSequencedConsensusTransaction> {
2751 let _scope = monitored_scope("ConsensusCommitHandler::deduplicate_consensus_txns");
2752 let mut all_transactions = Vec::new();
2753
2754 let mut occurrence_counts: HashMap<SequencedConsensusTransactionKey, u32> = HashMap::new();
2757 let mut first_commit_keys: HashSet<SequencedConsensusTransactionKey> = HashSet::new();
2759
2760 for (seq, (transaction, cert_origin)) in transactions.into_iter().enumerate() {
2761 let current_tx_index = ExecutionIndices {
2765 last_committed_round: commit_info.round,
2766 sub_dag_index: commit_info.consensus_commit_ref.index.into(),
2767 transaction_index: (seq + 1) as u64,
2768 };
2769
2770 self.last_consensus_stats.index = current_tx_index;
2771
2772 let certificate_author = *self
2773 .epoch_store
2774 .committee()
2775 .authority_by_index(cert_origin)
2776 .unwrap();
2777
2778 let sequenced_transaction = SequencedConsensusTransaction {
2779 certificate_author_index: cert_origin,
2780 certificate_author,
2781 consensus_index: current_tx_index,
2782 transaction,
2783 };
2784
2785 let Some(verified_transaction) = self
2786 .epoch_store
2787 .verify_consensus_transaction(sequenced_transaction)
2788 else {
2789 continue;
2790 };
2791
2792 let key = verified_transaction.0.key();
2793
2794 if let Some(tx_digest) = key.user_transaction_digest() {
2795 self.epoch_store
2796 .cache_recently_finalized_transaction(tx_digest);
2797 }
2798
2799 let count = occurrence_counts.entry(key.clone()).or_insert(0);
2802 *count += 1;
2803 let in_commit = *count > 1;
2804
2805 let in_cache = self.processed_cache.put(key.clone(), ()).is_some();
2806 if in_commit || in_cache {
2807 self.metrics.skipped_consensus_txns_cache_hit.inc();
2808 continue;
2809 }
2810 if self
2811 .epoch_store
2812 .is_consensus_message_processed(&key)
2813 .expect("db error")
2814 {
2815 self.metrics.skipped_consensus_txns.inc();
2816 continue;
2817 }
2818
2819 first_commit_keys.insert(key.clone());
2820
2821 state.output.record_consensus_message_processed(key);
2822
2823 all_transactions.push(verified_transaction);
2824 }
2825
2826 for key in first_commit_keys {
2827 if let Some(&count) = occurrence_counts.get(&key)
2828 && count > 1
2829 {
2830 self.metrics
2831 .consensus_handler_duplicate_tx_count
2832 .observe(count as f64);
2833 }
2834 }
2835
2836 assert!(
2838 state.occurrence_counts.is_empty(),
2839 "occurrence_counts should be empty before populating"
2840 );
2841 state.occurrence_counts.reserve(occurrence_counts.len());
2842 state.occurrence_counts.extend(
2843 occurrence_counts
2844 .into_iter()
2845 .filter_map(|(key, count)| key.user_transaction_digest().map(|d| (d, count))),
2846 );
2847
2848 all_transactions
2849 }
2850
2851 fn build_commit_handler_input(
2852 &self,
2853 transactions: Vec<VerifiedSequencedConsensusTransaction>,
2854 ) -> CommitHandlerInput {
2855 let _scope = monitored_scope("ConsensusCommitHandler::build_commit_handler_input");
2856 let epoch = self.epoch_store.epoch();
2857 let mut commit_handler_input = CommitHandlerInput::default();
2858
2859 for VerifiedSequencedConsensusTransaction(transaction) in transactions.into_iter() {
2860 match transaction.transaction {
2861 SequencedConsensusTransactionKind::External(consensus_transaction) => {
2862 match consensus_transaction.kind {
2863 ConsensusTransactionKind::UserTransactionV2(tx) => {
2865 let used_alias_versions = if self
2867 .epoch_store
2868 .protocol_config()
2869 .fix_checkpoint_signature_mapping()
2870 {
2871 tx.aliases()
2872 } else {
2873 tx.aliases_v1().map(|a| {
2877 NonEmpty::from_vec(
2878 a.into_iter()
2879 .enumerate()
2880 .map(|(idx, (_, seq))| (idx as u8, seq))
2881 .collect(),
2882 )
2883 .unwrap()
2884 })
2885 };
2886 let inner_tx = tx.into_tx();
2887 let tx = VerifiedTransaction::new_unchecked(inner_tx);
2889 let transaction =
2891 VerifiedExecutableTransaction::new_from_consensus(tx, epoch);
2892 if let Some(used_alias_versions) = used_alias_versions {
2893 commit_handler_input
2894 .user_transactions
2895 .push(WithAliases::new(transaction, used_alias_versions));
2896 } else {
2897 commit_handler_input.user_transactions.push(
2898 VerifiedExecutableTransactionWithAliases::no_aliases(
2899 transaction,
2900 ),
2901 );
2902 }
2903 }
2904
2905 ConsensusTransactionKind::EndOfPublish(authority_public_key_bytes) => {
2907 commit_handler_input
2908 .end_of_publish_transactions
2909 .push(authority_public_key_bytes);
2910 }
2911 ConsensusTransactionKind::NewJWKFetched(
2912 authority_public_key_bytes,
2913 jwk_id,
2914 jwk,
2915 ) => {
2916 commit_handler_input.new_jwks.push((
2917 authority_public_key_bytes,
2918 jwk_id,
2919 jwk,
2920 ));
2921 }
2922 ConsensusTransactionKind::RandomnessDkgMessage(
2923 authority_public_key_bytes,
2924 items,
2925 ) => {
2926 commit_handler_input
2927 .randomness_dkg_messages
2928 .push((authority_public_key_bytes, items));
2929 }
2930 ConsensusTransactionKind::RandomnessDkgConfirmation(
2931 authority_public_key_bytes,
2932 items,
2933 ) => {
2934 commit_handler_input
2935 .randomness_dkg_confirmations
2936 .push((authority_public_key_bytes, items));
2937 }
2938 ConsensusTransactionKind::CapabilityNotificationV2(
2939 authority_capabilities_v2,
2940 ) => {
2941 commit_handler_input
2942 .capability_notifications
2943 .push(authority_capabilities_v2);
2944 }
2945 ConsensusTransactionKind::ExecutionTimeObservation(
2946 execution_time_observation,
2947 ) => {
2948 commit_handler_input
2949 .execution_time_observations
2950 .push(execution_time_observation);
2951 }
2952 ConsensusTransactionKind::CheckpointSignatureV2(
2953 checkpoint_signature_message,
2954 ) => {
2955 commit_handler_input
2956 .checkpoint_signature_messages
2957 .push(*checkpoint_signature_message);
2958 }
2959
2960 ConsensusTransactionKind::CheckpointSignature(_)
2963 | ConsensusTransactionKind::RandomnessStateUpdate(_, _)
2964 | ConsensusTransactionKind::CapabilityNotification(_)
2965 | ConsensusTransactionKind::CertifiedTransaction(_)
2966 | ConsensusTransactionKind::UserTransaction(_) => {
2967 unreachable!("filtered earlier")
2968 }
2969 }
2970 }
2971 SequencedConsensusTransactionKind::System(_verified_envelope) => unreachable!(),
2973 }
2974 }
2975
2976 commit_handler_input
2977 }
2978
2979 async fn send_end_of_publish_if_needed(&self) {
2980 if self
2981 .epoch_store
2982 .protocol_config()
2983 .timestamp_based_epoch_close()
2984 {
2985 return;
2986 }
2987 if !self.epoch_store.should_send_end_of_publish() {
2988 return;
2989 }
2990
2991 let end_of_publish = ConsensusTransaction::new_end_of_publish(self.epoch_store.name);
2992 if let Err(err) =
2993 self.consensus_adapter
2994 .submit(end_of_publish, None, &self.epoch_store, None, None)
2995 {
2996 warn!(
2997 "Error when sending EndOfPublish message from ConsensusHandler: {:?}",
2998 err
2999 );
3000 } else {
3001 info!(epoch=?self.epoch_store.epoch(), "Sending EndOfPublish message to consensus");
3002 }
3003 }
3004}
3005
3006pub(crate) type SchedulerMessage = (
3009 Vec<(Schedulable, AssignedVersions)>,
3010 Option<SettlementBatchInfo>,
3011);
3012
3013#[derive(Clone)]
3014pub(crate) struct ExecutionSchedulerSender {
3015 sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3016}
3017
3018impl ExecutionSchedulerSender {
3019 fn start(
3020 settlement_scheduler: SettlementScheduler,
3021 epoch_store: Arc<AuthorityPerEpochStore>,
3022 ) -> Self {
3023 let (sender, recv) = monitored_mpsc::unbounded_channel("execution_scheduler_sender");
3024 spawn_monitored_task!(Self::run(recv, settlement_scheduler, epoch_store));
3025 Self { sender }
3026 }
3027
3028 pub(crate) fn new_for_testing(
3029 sender: monitored_mpsc::UnboundedSender<SchedulerMessage>,
3030 ) -> Self {
3031 Self { sender }
3032 }
3033
3034 fn send(
3035 &self,
3036 transactions: Vec<(Schedulable, AssignedVersions)>,
3037 settlement: Option<SettlementBatchInfo>,
3038 ) {
3039 let _ = self.sender.send((transactions, settlement));
3040 }
3041
3042 async fn run(
3043 mut recv: monitored_mpsc::UnboundedReceiver<SchedulerMessage>,
3044 settlement_scheduler: SettlementScheduler,
3045 epoch_store: Arc<AuthorityPerEpochStore>,
3046 ) {
3047 while let Some((transactions, settlement)) = recv.recv().await {
3048 let _guard = monitored_scope("ConsensusHandler::enqueue");
3049 let txns = transactions
3050 .into_iter()
3051 .map(|(txn, versions)| (txn, ExecutionEnv::new().with_assigned_versions(versions)))
3052 .collect();
3053 if let Some(settlement) = settlement {
3054 settlement_scheduler.enqueue_v2(txns, settlement, &epoch_store);
3055 } else {
3056 settlement_scheduler.enqueue(txns, &epoch_store);
3057 }
3058 }
3059 }
3060}
3061
3062const CONSENSUS_HANDLER_DESERIALIZE_CHANNEL_CAPACITY: usize = 2;
3066
3067type ParsedConsensusTransactions = Vec<(BlockRef, Vec<ParsedTransaction>)>;
3071
3072pub(crate) struct MysticetiConsensusHandler {
3074 tasks: JoinSet<()>,
3075}
3076
3077impl MysticetiConsensusHandler {
3078 pub(crate) fn new(
3079 last_processed_commit_at_startup: CommitIndex,
3080 mut consensus_handler: ConsensusHandler<CheckpointService>,
3081 mut commit_receiver: UnboundedReceiver<consensus_core::CommittedSubDag>,
3082 commit_consumer_monitor: Arc<CommitConsumerMonitor>,
3083 ) -> Self {
3084 debug!(
3085 last_processed_commit_at_startup,
3086 "Starting consensus replay"
3087 );
3088 let mut tasks = JoinSet::new();
3089
3090 let (parsed_sender, mut parsed_receiver) = monitored_mpsc::channel(
3096 "consensus_deserialized_commits",
3097 CONSENSUS_HANDLER_DESERIALIZE_CHANNEL_CAPACITY,
3098 );
3099 tasks.spawn(monitored_future!(async move {
3100 while let Some(consensus_commit) = commit_receiver.recv().await {
3101 let transactions: ParsedConsensusTransactions = {
3102 let _scope = monitored_scope("ConsensusCommitHandler::deserialize_worker");
3103 consensus_commit.transactions()
3104 };
3105 if parsed_sender
3109 .send((consensus_commit, transactions))
3110 .await
3111 .is_err()
3112 {
3113 break;
3114 }
3115 }
3116 }));
3117
3118 tasks.spawn(monitored_future!(async move {
3120 while let Some((consensus_commit, transactions)) = parsed_receiver.recv().await {
3122 let commit_index = consensus_commit.commit_ref.index;
3123 if commit_index <= last_processed_commit_at_startup {
3124 consensus_handler.handle_prior_consensus_commit(consensus_commit);
3125 } else {
3126 consensus_handler
3127 .handle_consensus_commit(consensus_commit, transactions)
3128 .await;
3129 }
3130 commit_consumer_monitor.set_highest_handled_commit(commit_index);
3131 }
3132 }));
3133 Self { tasks }
3134 }
3135
3136 pub(crate) async fn abort(&mut self) {
3137 self.tasks.shutdown().await;
3138 }
3139}
3140
3141fn authenticator_state_update_transaction(
3142 epoch_store: &AuthorityPerEpochStore,
3143 round: u64,
3144 mut new_active_jwks: Vec<ActiveJwk>,
3145) -> VerifiedExecutableTransaction {
3146 let epoch = epoch_store.epoch();
3147 new_active_jwks.sort();
3148
3149 info!("creating authenticator state update transaction");
3150 assert!(epoch_store.authenticator_state_enabled());
3151 let transaction = VerifiedTransaction::new_authenticator_state_update(
3152 epoch,
3153 round,
3154 new_active_jwks,
3155 epoch_store
3156 .epoch_start_config()
3157 .authenticator_obj_initial_shared_version()
3158 .expect("authenticator state obj must exist"),
3159 );
3160 VerifiedExecutableTransaction::new_system(transaction, epoch)
3161}
3162
3163fn owned_object_refs_to_lock(
3169 tx_with_claims: &PlainTransactionWithClaims,
3170) -> Option<Vec<ObjectRef>> {
3171 let immutable_object_ids: HashSet<ObjectID> =
3172 tx_with_claims.get_immutable_objects().into_iter().collect();
3173 let input_objects = tx_with_claims
3174 .tx()
3175 .transaction_data()
3176 .input_objects()
3177 .ok()?;
3178 Some(
3179 input_objects
3180 .iter()
3181 .filter_map(|obj| match obj {
3182 InputObjectKind::ImmOrOwnedMoveObject(obj_ref)
3183 if !immutable_object_ids.contains(&obj_ref.0) =>
3184 {
3185 Some(*obj_ref)
3186 }
3187 _ => None,
3188 })
3189 .collect(),
3190 )
3191}
3192
3193pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
3194 match &transaction.kind {
3195 ConsensusTransactionKind::CertifiedTransaction(_) => "_deprecated_certificate",
3197 ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
3198 ConsensusTransactionKind::CheckpointSignatureV2(_) => "checkpoint_signature",
3199 ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
3200 ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
3201 ConsensusTransactionKind::CapabilityNotificationV2(_) => "capability_notification_v2",
3202 ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
3203 ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
3204 ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
3205 ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
3206 ConsensusTransactionKind::UserTransaction(_) => "_deprecated_user_transaction",
3207 ConsensusTransactionKind::UserTransactionV2(tx) => {
3208 if tx.tx().is_consensus_tx() {
3209 "shared_user_transaction_v2"
3210 } else {
3211 "owned_user_transaction_v2"
3212 }
3213 }
3214 ConsensusTransactionKind::ExecutionTimeObservation(_) => "execution_time_observation",
3215 }
3216}
3217
3218#[derive(Debug, Clone, Serialize, Deserialize)]
3219pub struct SequencedConsensusTransaction {
3220 pub certificate_author_index: AuthorityIndex,
3221 pub certificate_author: AuthorityName,
3222 pub consensus_index: ExecutionIndices,
3223 pub transaction: SequencedConsensusTransactionKind,
3224}
3225
3226#[derive(Debug, Clone)]
3227#[allow(clippy::large_enum_variant)]
3228pub enum SequencedConsensusTransactionKind {
3229 External(ConsensusTransaction),
3230 System(VerifiedExecutableTransaction),
3231}
3232
3233impl Serialize for SequencedConsensusTransactionKind {
3234 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
3235 let serializable = SerializableSequencedConsensusTransactionKind::from(self);
3236 serializable.serialize(serializer)
3237 }
3238}
3239
3240impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
3241 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
3242 let serializable =
3243 SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
3244 Ok(serializable.into())
3245 }
3246}
3247
3248#[derive(Debug, Clone, Serialize, Deserialize)]
3252#[allow(clippy::large_enum_variant)]
3253enum SerializableSequencedConsensusTransactionKind {
3254 External(ConsensusTransaction),
3255 System(TrustedExecutableTransaction),
3256}
3257
3258impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
3259 fn from(kind: &SequencedConsensusTransactionKind) -> Self {
3260 match kind {
3261 SequencedConsensusTransactionKind::External(ext) => {
3262 SerializableSequencedConsensusTransactionKind::External(ext.clone())
3263 }
3264 SequencedConsensusTransactionKind::System(txn) => {
3265 SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
3266 }
3267 }
3268 }
3269}
3270
3271impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
3272 fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
3273 match kind {
3274 SerializableSequencedConsensusTransactionKind::External(ext) => {
3275 SequencedConsensusTransactionKind::External(ext)
3276 }
3277 SerializableSequencedConsensusTransactionKind::System(txn) => {
3278 SequencedConsensusTransactionKind::System(txn.into())
3279 }
3280 }
3281 }
3282}
3283
3284#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug, Ord, PartialOrd)]
3285pub enum SequencedConsensusTransactionKey {
3286 External(ConsensusTransactionKey),
3287 System(TransactionDigest),
3288}
3289
3290impl SequencedConsensusTransactionKey {
3291 pub fn user_transaction_digest(&self) -> Option<TransactionDigest> {
3292 match self {
3293 SequencedConsensusTransactionKey::External(key) => match key {
3294 ConsensusTransactionKey::Certificate(digest) => Some(*digest),
3295 _ => None,
3296 },
3297 SequencedConsensusTransactionKey::System(_) => None,
3298 }
3299 }
3300}
3301
3302impl SequencedConsensusTransactionKind {
3303 pub fn key(&self) -> SequencedConsensusTransactionKey {
3304 match self {
3305 SequencedConsensusTransactionKind::External(ext) => {
3306 SequencedConsensusTransactionKey::External(ext.key())
3307 }
3308 SequencedConsensusTransactionKind::System(txn) => {
3309 SequencedConsensusTransactionKey::System(*txn.digest())
3310 }
3311 }
3312 }
3313
3314 pub fn get_tracking_id(&self) -> u64 {
3315 match self {
3316 SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
3317 SequencedConsensusTransactionKind::System(_txn) => 0,
3318 }
3319 }
3320
3321 pub fn is_executable_transaction(&self) -> bool {
3322 match self {
3323 SequencedConsensusTransactionKind::External(ext) => ext.is_user_transaction(),
3324 SequencedConsensusTransactionKind::System(_) => true,
3325 }
3326 }
3327
3328 pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
3329 match self {
3330 SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
3331 ConsensusTransactionKind::UserTransactionV2(txn) => Some(*txn.tx().digest()),
3332 _ => None,
3333 },
3334 SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
3335 }
3336 }
3337
3338 pub fn is_end_of_publish(&self) -> bool {
3339 match self {
3340 SequencedConsensusTransactionKind::External(ext) => {
3341 matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
3342 }
3343 SequencedConsensusTransactionKind::System(_) => false,
3344 }
3345 }
3346}
3347
3348impl SequencedConsensusTransaction {
3349 pub fn sender_authority(&self) -> AuthorityName {
3350 self.certificate_author
3351 }
3352
3353 pub fn key(&self) -> SequencedConsensusTransactionKey {
3354 self.transaction.key()
3355 }
3356
3357 pub fn is_end_of_publish(&self) -> bool {
3358 if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
3359 matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
3360 } else {
3361 false
3362 }
3363 }
3364
3365 pub fn try_take_execution_time_observation(&mut self) -> Option<ExecutionTimeObservation> {
3366 if let SequencedConsensusTransactionKind::External(ConsensusTransaction {
3367 kind: ConsensusTransactionKind::ExecutionTimeObservation(observation),
3368 ..
3369 }) = &mut self.transaction
3370 {
3371 Some(std::mem::take(observation))
3372 } else {
3373 None
3374 }
3375 }
3376
3377 pub fn is_system(&self) -> bool {
3378 matches!(
3379 self.transaction,
3380 SequencedConsensusTransactionKind::System(_)
3381 )
3382 }
3383
3384 pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
3385 if !randomness_state_enabled {
3386 return false;
3389 }
3390 match &self.transaction {
3391 SequencedConsensusTransactionKind::External(ConsensusTransaction {
3392 kind: ConsensusTransactionKind::UserTransactionV2(txn),
3393 ..
3394 }) => txn.tx().transaction_data().uses_randomness(),
3395 _ => false,
3396 }
3397 }
3398
3399 pub fn as_consensus_txn(&self) -> Option<&SenderSignedData> {
3400 match &self.transaction {
3401 SequencedConsensusTransactionKind::External(ConsensusTransaction {
3402 kind: ConsensusTransactionKind::UserTransactionV2(txn),
3403 ..
3404 }) if txn.tx().is_consensus_tx() => Some(txn.tx().data()),
3405 SequencedConsensusTransactionKind::System(txn) if txn.is_consensus_tx() => {
3406 Some(txn.data())
3407 }
3408 _ => None,
3409 }
3410 }
3411}
3412
3413#[derive(Debug, Clone, Serialize, Deserialize)]
3414pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
3415
3416#[cfg(test)]
3417impl VerifiedSequencedConsensusTransaction {
3418 pub fn new_test(transaction: ConsensusTransaction) -> Self {
3419 Self(SequencedConsensusTransaction::new_test(transaction))
3420 }
3421}
3422
3423impl SequencedConsensusTransaction {
3424 pub fn new_test(transaction: ConsensusTransaction) -> Self {
3425 Self {
3426 certificate_author_index: 0,
3427 certificate_author: AuthorityName::ZERO,
3428 consensus_index: Default::default(),
3429 transaction: SequencedConsensusTransactionKind::External(transaction),
3430 }
3431 }
3432}
3433
3434#[derive(Serialize, Deserialize)]
3435pub(crate) struct CommitIntervalObserver {
3436 ring_buffer: VecDeque<u64>,
3437}
3438
3439impl CommitIntervalObserver {
3440 pub fn new(window_size: u32) -> Self {
3441 Self {
3442 ring_buffer: VecDeque::with_capacity(window_size as usize),
3443 }
3444 }
3445
3446 pub fn observe_commit_time(&mut self, consensus_commit: &impl ConsensusCommitAPI) {
3447 let commit_time = consensus_commit.commit_timestamp_ms();
3448 if self.ring_buffer.len() == self.ring_buffer.capacity() {
3449 self.ring_buffer.pop_front();
3450 }
3451 self.ring_buffer.push_back(commit_time);
3452 }
3453
3454 pub fn commit_interval_estimate(&self) -> Option<Duration> {
3455 if self.ring_buffer.len() <= 1 {
3456 None
3457 } else {
3458 let first = self.ring_buffer.front().unwrap();
3459 let last = self.ring_buffer.back().unwrap();
3460 let duration = last.saturating_sub(*first);
3461 let num_commits = self.ring_buffer.len() as u64;
3462 Some(Duration::from_millis(duration.div_ceil(num_commits)))
3463 }
3464 }
3465}
3466
3467#[cfg(test)]
3468mod tests {
3469 use std::collections::HashSet;
3470
3471 use consensus_core::{
3472 BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock,
3473 };
3474 use futures::pin_mut;
3475 use prometheus::Registry;
3476 use sui_protocol_config::{ConsensusTransactionOrdering, ProtocolConfig};
3477 use sui_types::{
3478 base_types::ExecutionDigests,
3479 base_types::{AuthorityName, FullObjectRef, ObjectID, SuiAddress, random_object_ref},
3480 committee::Committee,
3481 crypto::deterministic_random_account_key,
3482 gas::GasCostSummary,
3483 message_envelope::Message,
3484 messages_checkpoint::{
3485 CheckpointContents, CheckpointSignatureMessage, CheckpointSummary,
3486 SignedCheckpointSummary,
3487 },
3488 messages_consensus::ConsensusTransaction,
3489 object::Object,
3490 transaction::{
3491 CertifiedTransaction, TransactionData, TransactionDataAPI, VerifiedCertificate,
3492 },
3493 };
3494
3495 use super::*;
3496 use crate::{
3497 authority::{
3498 authority_per_epoch_store::ConsensusStatsAPI,
3499 consensus_tx_status_cache::NotifyReadConsensusTxStatusResult,
3500 test_authority_builder::TestAuthorityBuilder,
3501 },
3502 checkpoints::CheckpointServiceNoop,
3503 consensus_adapter::consensus_tests::test_user_transaction,
3504 consensus_test_utils::{
3505 TestConsensusCommit, make_consensus_adapter_for_test,
3506 setup_consensus_handler_for_testing,
3507 },
3508 post_consensus_tx_reorder::PostConsensusTxReorder,
3509 };
3510
3511 #[tokio::test(flavor = "current_thread", start_paused = true)]
3512 async fn test_consensus_commit_handler() {
3513 telemetry_subscribers::init_for_testing();
3514
3515 let (sender, keypair) = deterministic_random_account_key();
3518 let gas_objects: Vec<Object> = (0..12)
3520 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3521 .collect();
3522 let owned_objects: Vec<Object> = (0..4)
3524 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3525 .collect();
3526 let shared_objects: Vec<Object> = (0..6)
3528 .map(|_| Object::shared_for_testing())
3529 .collect::<Vec<_>>();
3530 let mut all_objects = gas_objects.clone();
3531 all_objects.extend(owned_objects.clone());
3532 all_objects.extend(shared_objects.clone());
3533
3534 let network_config =
3535 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
3536 .with_objects(all_objects.clone())
3537 .build();
3538
3539 let state = TestAuthorityBuilder::new()
3540 .with_network_config(&network_config, 0)
3541 .build()
3542 .await;
3543
3544 let epoch_store = state.epoch_store_for_testing().clone();
3545 let new_epoch_start_state = epoch_store.epoch_start_state();
3546 let consensus_committee = new_epoch_start_state.get_consensus_committee();
3547
3548 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3549
3550 let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
3551
3552 let backpressure_manager = BackpressureManager::new_for_tests();
3553 let consensus_adapter =
3554 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3555 let settlement_scheduler = SettlementScheduler::new(
3556 state.execution_scheduler().as_ref().clone(),
3557 state.get_transaction_cache_reader().clone(),
3558 state.metrics.clone(),
3559 );
3560 let mut consensus_handler = ConsensusHandler::new(
3561 epoch_store,
3562 Arc::new(CheckpointServiceNoop {}),
3563 settlement_scheduler,
3564 consensus_adapter,
3565 state.get_object_cache_reader().clone(),
3566 consensus_committee.clone(),
3567 metrics,
3568 Arc::new(throughput_calculator),
3569 backpressure_manager.subscribe(),
3570 state.traffic_controller.clone(),
3571 None,
3572 state.consensus_gasless_counter.clone(),
3573 );
3574
3575 let mut user_transactions = vec![];
3577 for (i, gas_object) in gas_objects[0..8].iter().enumerate() {
3578 let input_object = if i % 2 == 0 {
3579 owned_objects.get(i / 2).unwrap().clone()
3580 } else {
3581 shared_objects.get(i / 2).unwrap().clone()
3582 };
3583 let transaction = test_user_transaction(
3584 &state,
3585 sender,
3586 &keypair,
3587 gas_object.clone(),
3588 vec![input_object],
3589 )
3590 .await;
3591 user_transactions.push(transaction);
3592 }
3593
3594 for (i, gas_object) in gas_objects[8..12].iter().enumerate() {
3597 let shared_object = if i < 2 {
3598 shared_objects[4].clone()
3599 } else {
3600 shared_objects[5].clone()
3601 };
3602 let transaction = test_user_transaction(
3603 &state,
3604 sender,
3605 &keypair,
3606 gas_object.clone(),
3607 vec![shared_object],
3608 )
3609 .await;
3610 user_transactions.push(transaction);
3611 }
3612
3613 let mut blocks = Vec::new();
3615 for (i, consensus_transaction) in user_transactions
3616 .iter()
3617 .cloned()
3618 .map(|t| ConsensusTransaction::new_user_transaction_v2_message(&state.name, t.into()))
3619 .enumerate()
3620 {
3621 let transaction_bytes = bcs::to_bytes(&consensus_transaction).unwrap();
3622 let block = VerifiedBlock::new_for_test(
3623 TestBlock::new(100 + i as u32, (i % consensus_committee.size()) as u32)
3624 .set_transactions(vec![Transaction::new(transaction_bytes)])
3625 .build(),
3626 );
3627
3628 blocks.push(block);
3629 }
3630
3631 let leader_block = blocks[0].clone();
3633 let committed_sub_dag = CommittedSubDag::new(
3634 leader_block.reference(),
3635 blocks.clone(),
3636 leader_block.timestamp_ms(),
3637 CommitRef::new(10, CommitDigest::MIN),
3638 );
3639
3640 backpressure_manager.set_backpressure(true);
3642 backpressure_manager.update_highest_certified_checkpoint(1);
3644
3645 {
3647 let waiter =
3648 consensus_handler.handle_consensus_commit_for_test(committed_sub_dag.clone());
3649 pin_mut!(waiter);
3650
3651 tokio::time::timeout(std::time::Duration::from_secs(5), &mut waiter)
3653 .await
3654 .unwrap_err();
3655
3656 backpressure_manager.set_backpressure(false);
3658
3659 tokio::time::timeout(std::time::Duration::from_secs(100), waiter)
3661 .await
3662 .unwrap();
3663 }
3664
3665 let num_blocks = blocks.len();
3667 let num_transactions = user_transactions.len();
3668 let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
3669 assert_eq!(
3670 last_consensus_stats_1.index.transaction_index,
3671 num_transactions as u64
3672 );
3673 assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
3674 assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64);
3675 assert_eq!(
3676 last_consensus_stats_1.stats.get_num_messages(0),
3677 num_blocks as u64
3678 );
3679 assert_eq!(
3680 last_consensus_stats_1.stats.get_num_user_transactions(0),
3681 num_transactions as u64
3682 );
3683
3684 for (i, t) in user_transactions.iter().enumerate() {
3686 let digest = t.tx().digest();
3687 if tokio::time::timeout(
3688 std::time::Duration::from_secs(10),
3689 state.notify_read_effects_for_testing("", *digest),
3690 )
3691 .await
3692 .is_ok()
3693 {
3694 } else {
3696 panic!("User transaction {} {} did not execute", i, digest);
3697 }
3698 }
3699
3700 state.execution_scheduler().check_empty_for_testing().await;
3702 }
3703
3704 #[tokio::test(flavor = "current_thread")]
3705 async fn test_dropped_owned_object_lock_conflict_is_marked_processed() {
3706 telemetry_subscribers::init_for_testing();
3707
3708 let (sender, keypair) = deterministic_random_account_key();
3709 let gas_objects: Vec<Object> = (0..2)
3710 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
3711 .collect();
3712 let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), sender);
3713 let mut all_objects = gas_objects.clone();
3714 all_objects.push(owned_object.clone());
3715
3716 let state = TestAuthorityBuilder::new()
3717 .with_starting_objects(&all_objects)
3718 .skip_rpc_index_init()
3719 .skip_genesis_owner_index()
3720 .build()
3721 .await;
3722 let epoch_store = state.epoch_store_for_testing();
3723 let owned_object_ref = state
3724 .get_object(&owned_object.id())
3725 .unwrap()
3726 .compute_object_reference();
3727
3728 let winner = test_user_transaction(
3729 &state,
3730 sender,
3731 &keypair,
3732 gas_objects[0].clone(),
3733 vec![owned_object.clone()],
3734 )
3735 .await;
3736 let loser = test_user_transaction(
3737 &state,
3738 sender,
3739 &keypair,
3740 gas_objects[1].clone(),
3741 vec![owned_object.clone()],
3742 )
3743 .await;
3744
3745 let winner_digest = *winner.tx().digest();
3746 let loser_digest = *loser.tx().digest();
3747 assert_ne!(winner_digest, loser_digest);
3748
3749 let winner_consensus_tx =
3750 ConsensusTransaction::new_user_transaction_v2_message(&state.name, winner.into());
3751 let loser_consensus_tx =
3752 ConsensusTransaction::new_user_transaction_v2_message(&state.name, loser.into());
3753 let winner_key = SequencedConsensusTransactionKey::External(winner_consensus_tx.key());
3754 let loser_key = SequencedConsensusTransactionKey::External(loser_consensus_tx.key());
3755
3756 let round = 100;
3757 let commit = TestConsensusCommit::new(
3758 vec![winner_consensus_tx, loser_consensus_tx],
3759 round as u64,
3760 1_000,
3761 10,
3762 );
3763 let mut setup = setup_consensus_handler_for_testing(&state).await;
3764 setup
3765 .consensus_handler
3766 .handle_consensus_commit_for_test(commit)
3767 .await;
3768 assert_eq!(
3769 setup
3770 .consensus_handler
3771 .metrics
3772 .consensus_handler_dropped_transactions
3773 .get(),
3774 1
3775 );
3776
3777 let block = BlockRef {
3778 author: consensus_config::AuthorityIndex::ZERO,
3779 round,
3780 digest: Default::default(),
3781 };
3782 assert!(matches!(
3783 epoch_store
3784 .consensus_tx_status_cache
3785 .notify_read_transaction_status(ConsensusPosition {
3786 epoch: epoch_store.epoch(),
3787 block,
3788 index: 0,
3789 })
3790 .await,
3791 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
3792 ));
3793 assert!(matches!(
3794 epoch_store
3795 .consensus_tx_status_cache
3796 .notify_read_transaction_status(ConsensusPosition {
3797 epoch: epoch_store.epoch(),
3798 block,
3799 index: 1,
3800 })
3801 .await,
3802 NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Dropped)
3803 ));
3804
3805 let locks = epoch_store
3806 .get_owned_object_locks_map(&[owned_object_ref])
3807 .unwrap();
3808 assert_eq!(locks.get(&owned_object_ref), Some(&winner_digest));
3809 assert!(
3810 epoch_store
3811 .is_consensus_message_processed(&winner_key)
3812 .unwrap()
3813 );
3814 assert!(
3815 epoch_store
3816 .is_consensus_message_processed(&loser_key)
3817 .unwrap()
3818 );
3819 }
3820
3821 fn to_short_strings(txs: Vec<VerifiedExecutableTransactionWithAliases>) -> Vec<String> {
3822 txs.into_iter()
3823 .map(|tx| format!("transaction({})", tx.tx().transaction_data().gas_price()))
3824 .collect()
3825 }
3826
3827 #[test]
3828 fn test_order_by_gas_price() {
3829 let mut v = vec![user_txn(42), user_txn(100)];
3830 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3831 assert_eq!(
3832 to_short_strings(v),
3833 vec![
3834 "transaction(100)".to_string(),
3835 "transaction(42)".to_string(),
3836 ]
3837 );
3838
3839 let mut v = vec![
3840 user_txn(1200),
3841 user_txn(12),
3842 user_txn(1000),
3843 user_txn(42),
3844 user_txn(100),
3845 user_txn(1000),
3846 ];
3847 PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
3848 assert_eq!(
3849 to_short_strings(v),
3850 vec![
3851 "transaction(1200)".to_string(),
3852 "transaction(1000)".to_string(),
3853 "transaction(1000)".to_string(),
3854 "transaction(100)".to_string(),
3855 "transaction(42)".to_string(),
3856 "transaction(12)".to_string(),
3857 ]
3858 );
3859 }
3860
3861 #[tokio::test(flavor = "current_thread")]
3862 async fn test_checkpoint_signature_dedup() {
3863 telemetry_subscribers::init_for_testing();
3864
3865 let network_config =
3866 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3867 let state = TestAuthorityBuilder::new()
3868 .with_network_config(&network_config, 0)
3869 .build()
3870 .await;
3871
3872 let epoch_store = state.epoch_store_for_testing().clone();
3873 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3874
3875 let make_signed = || {
3876 let epoch = epoch_store.epoch();
3877 let contents =
3878 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
3879 let summary = CheckpointSummary::new(
3880 &ProtocolConfig::get_for_max_version_UNSAFE(),
3881 epoch,
3882 42, 10, &contents,
3885 None, GasCostSummary::default(),
3887 None, 0, Vec::new(), Vec::new(), );
3892 SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name)
3893 };
3894
3895 let v2_s1 = make_signed();
3897 let v2_s1_clone = v2_s1.clone();
3898 let v2_digest_a = v2_s1.data().digest();
3899 let v2_a =
3900 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3901 summary: v2_s1,
3902 });
3903
3904 let v2_s2 = make_signed();
3905 let v2_digest_b = v2_s2.data().digest();
3906 let v2_b =
3907 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3908 summary: v2_s2,
3909 });
3910
3911 assert_ne!(v2_digest_a, v2_digest_b);
3912
3913 assert_eq!(v2_s1_clone.data().digest(), v2_digest_a);
3915 let v2_dup =
3916 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
3917 summary: v2_s1_clone,
3918 });
3919
3920 let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
3921 let block = VerifiedBlock::new_for_test(
3922 TestBlock::new(100, 0)
3923 .set_transactions(vec![to_tx(&v2_a), to_tx(&v2_b), to_tx(&v2_dup)])
3924 .build(),
3925 );
3926 let commit = CommittedSubDag::new(
3927 block.reference(),
3928 vec![block.clone()],
3929 block.timestamp_ms(),
3930 CommitRef::new(10, CommitDigest::MIN),
3931 );
3932
3933 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
3934 let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
3935 let backpressure = BackpressureManager::new_for_tests();
3936 let consensus_adapter =
3937 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
3938 let settlement_scheduler = SettlementScheduler::new(
3939 state.execution_scheduler().as_ref().clone(),
3940 state.get_transaction_cache_reader().clone(),
3941 state.metrics.clone(),
3942 );
3943 let mut handler = ConsensusHandler::new(
3944 epoch_store.clone(),
3945 Arc::new(CheckpointServiceNoop {}),
3946 settlement_scheduler,
3947 consensus_adapter,
3948 state.get_object_cache_reader().clone(),
3949 consensus_committee.clone(),
3950 metrics,
3951 Arc::new(throughput),
3952 backpressure.subscribe(),
3953 state.traffic_controller.clone(),
3954 None,
3955 state.consensus_gasless_counter.clone(),
3956 );
3957
3958 handler.handle_consensus_commit_for_test(commit).await;
3959
3960 use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
3961 use sui_types::messages_consensus::ConsensusTransactionKey as CK;
3962
3963 let v2_key_a = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_a));
3965 let v2_key_b = SK::External(CK::CheckpointSignatureV2(state.name, 42, v2_digest_b));
3966 assert!(
3967 epoch_store
3968 .is_consensus_message_processed(&v2_key_a)
3969 .unwrap()
3970 );
3971 assert!(
3972 epoch_store
3973 .is_consensus_message_processed(&v2_key_b)
3974 .unwrap()
3975 );
3976 }
3977
3978 #[tokio::test(flavor = "current_thread")]
3979 async fn test_verify_consensus_transaction_filters_mismatched_authorities() {
3980 telemetry_subscribers::init_for_testing();
3981
3982 let network_config =
3983 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
3984 let state = TestAuthorityBuilder::new()
3985 .with_network_config(&network_config, 0)
3986 .build()
3987 .await;
3988
3989 let epoch_store = state.epoch_store_for_testing().clone();
3990 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
3991
3992 use fastcrypto::traits::KeyPair;
3994 let (_, wrong_keypair) = sui_types::crypto::get_authority_key_pair();
3995 let wrong_authority: AuthorityName = wrong_keypair.public().into();
3996
3997 let mismatched_eop = ConsensusTransaction::new_end_of_publish(wrong_authority);
3999
4000 let valid_eop = ConsensusTransaction::new_end_of_publish(state.name);
4002
4003 let epoch = epoch_store.epoch();
4005 let contents =
4006 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
4007 let summary = CheckpointSummary::new(
4008 &ProtocolConfig::get_for_max_version_UNSAFE(),
4009 epoch,
4010 42, 10, &contents,
4013 None, GasCostSummary::default(),
4015 None, 0, Vec::new(), Vec::new(), );
4020
4021 let mismatched_checkpoint_signed =
4023 SignedCheckpointSummary::new(epoch, summary.clone(), &wrong_keypair, wrong_authority);
4024 let mismatched_checkpoint_digest = mismatched_checkpoint_signed.data().digest();
4025 let mismatched_checkpoint =
4026 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4027 summary: mismatched_checkpoint_signed,
4028 });
4029
4030 let valid_checkpoint_signed =
4032 SignedCheckpointSummary::new(epoch, summary, &*state.secret, state.name);
4033 let valid_checkpoint_digest = valid_checkpoint_signed.data().digest();
4034 let valid_checkpoint =
4035 ConsensusTransaction::new_checkpoint_signature_message_v2(CheckpointSignatureMessage {
4036 summary: valid_checkpoint_signed,
4037 });
4038
4039 let to_tx = |ct: &ConsensusTransaction| Transaction::new(bcs::to_bytes(ct).unwrap());
4040
4041 let block = VerifiedBlock::new_for_test(
4043 TestBlock::new(100, 0)
4044 .set_transactions(vec![
4045 to_tx(&mismatched_eop),
4046 to_tx(&valid_eop),
4047 to_tx(&mismatched_checkpoint),
4048 to_tx(&valid_checkpoint),
4049 ])
4050 .build(),
4051 );
4052 let commit = CommittedSubDag::new(
4053 block.reference(),
4054 vec![block.clone()],
4055 block.timestamp_ms(),
4056 CommitRef::new(10, CommitDigest::MIN),
4057 );
4058
4059 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
4060 let throughput = ConsensusThroughputCalculator::new(None, metrics.clone());
4061 let backpressure = BackpressureManager::new_for_tests();
4062 let consensus_adapter =
4063 make_consensus_adapter_for_test(state.clone(), HashSet::new(), false, vec![]);
4064 let settlement_scheduler = SettlementScheduler::new(
4065 state.execution_scheduler().as_ref().clone(),
4066 state.get_transaction_cache_reader().clone(),
4067 state.metrics.clone(),
4068 );
4069 let mut handler = ConsensusHandler::new(
4070 epoch_store.clone(),
4071 Arc::new(CheckpointServiceNoop {}),
4072 settlement_scheduler,
4073 consensus_adapter,
4074 state.get_object_cache_reader().clone(),
4075 consensus_committee.clone(),
4076 metrics,
4077 Arc::new(throughput),
4078 backpressure.subscribe(),
4079 state.traffic_controller.clone(),
4080 None,
4081 state.consensus_gasless_counter.clone(),
4082 );
4083
4084 handler.handle_consensus_commit_for_test(commit).await;
4085
4086 use crate::consensus_handler::SequencedConsensusTransactionKey as SK;
4087 use sui_types::messages_consensus::ConsensusTransactionKey as CK;
4088
4089 let valid_eop_key = SK::External(CK::EndOfPublish(state.name));
4091 assert!(
4092 epoch_store
4093 .is_consensus_message_processed(&valid_eop_key)
4094 .unwrap(),
4095 "Valid EndOfPublish should have been processed"
4096 );
4097
4098 let valid_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4099 state.name,
4100 42,
4101 valid_checkpoint_digest,
4102 ));
4103 assert!(
4104 epoch_store
4105 .is_consensus_message_processed(&valid_checkpoint_key)
4106 .unwrap(),
4107 "Valid CheckpointSignature should have been processed"
4108 );
4109
4110 let mismatched_eop_key = SK::External(CK::EndOfPublish(wrong_authority));
4112 assert!(
4113 !epoch_store
4114 .is_consensus_message_processed(&mismatched_eop_key)
4115 .unwrap(),
4116 "Mismatched EndOfPublish should NOT have been processed (filtered by verify_consensus_transaction)"
4117 );
4118
4119 let mismatched_checkpoint_key = SK::External(CK::CheckpointSignatureV2(
4120 wrong_authority,
4121 42,
4122 mismatched_checkpoint_digest,
4123 ));
4124 assert!(
4125 !epoch_store
4126 .is_consensus_message_processed(&mismatched_checkpoint_key)
4127 .unwrap(),
4128 "Mismatched CheckpointSignature should NOT have been processed (filtered by verify_consensus_transaction)"
4129 );
4130 }
4131
4132 fn user_txn(gas_price: u64) -> VerifiedExecutableTransactionWithAliases {
4133 let (committee, keypairs) = Committee::new_simple_test_committee();
4134 let (sender, sender_keypair) = deterministic_random_account_key();
4135 let tx = sui_types::transaction::Transaction::from_data_and_signer(
4136 TransactionData::new_transfer(
4137 SuiAddress::default(),
4138 FullObjectRef::from_fastpath_ref(random_object_ref()),
4139 sender,
4140 random_object_ref(),
4141 1000 * gas_price,
4142 gas_price,
4143 ),
4144 vec![&sender_keypair],
4145 );
4146 let tx = VerifiedExecutableTransaction::new_from_certificate(
4147 VerifiedCertificate::new_unchecked(
4148 CertifiedTransaction::new_from_keypairs_for_testing(
4149 tx.into_data(),
4150 &keypairs,
4151 &committee,
4152 ),
4153 ),
4154 );
4155 VerifiedExecutableTransactionWithAliases::no_aliases(tx)
4156 }
4157
4158 mod checkpoint_queue_tests {
4159 use super::*;
4160 use consensus_core::CommitRef;
4161 use sui_types::digests::Digest;
4162
4163 fn make_chunk(tx_count: usize, height: u64) -> Chunk {
4164 Chunk {
4165 schedulables: (0..tx_count)
4166 .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4167 .collect(),
4168 settlement: None,
4169 height,
4170 }
4171 }
4172
4173 fn make_commit_ref(index: u32) -> CommitRef {
4174 CommitRef {
4175 index,
4176 digest: CommitDigest::MIN,
4177 }
4178 }
4179
4180 fn default_versions() -> HashMap<TransactionKey, AssignedVersions> {
4181 HashMap::new()
4182 }
4183
4184 #[test]
4185 fn test_flush_all_checkpoint_roots() {
4186 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4187 let versions = default_versions();
4188
4189 queue.push_chunk(
4190 make_chunk(5, 1),
4191 &versions,
4192 1000,
4193 make_commit_ref(1),
4194 Digest::default(),
4195 );
4196 queue.push_chunk(
4197 make_chunk(3, 2),
4198 &versions,
4199 1000,
4200 make_commit_ref(1),
4201 Digest::default(),
4202 );
4203
4204 let pending = queue.flush(1000, true);
4205
4206 assert!(pending.is_some());
4207 assert!(queue.pending_roots.is_empty());
4208 }
4209
4210 #[test]
4211 fn test_flush_respects_min_checkpoint_interval() {
4212 let min_interval = 200;
4213 let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, 1000, min_interval);
4214 let versions = default_versions();
4215
4216 queue.push_chunk(
4217 make_chunk(5, 1),
4218 &versions,
4219 1000,
4220 make_commit_ref(1),
4221 Digest::default(),
4222 );
4223
4224 let pending = queue.flush(1000 + min_interval - 1, false);
4225 assert!(pending.is_none());
4226 assert_eq!(queue.pending_roots.len(), 1);
4227
4228 let pending = queue.flush(1000 + min_interval, false);
4229 assert!(pending.is_some());
4230 assert!(queue.pending_roots.is_empty());
4231 }
4232
4233 #[test]
4234 fn test_push_chunk_flushes_when_exceeds_max() {
4235 let max_tx = 10;
4236 let mut queue = CheckpointQueue::new_for_testing(1000, 0, 0, max_tx, 0);
4237 let versions = default_versions();
4238
4239 queue.push_chunk(
4240 make_chunk(max_tx / 2 + 1, 1),
4241 &versions,
4242 1000,
4243 make_commit_ref(1),
4244 Digest::default(),
4245 );
4246
4247 let flushed = queue.push_chunk(
4248 make_chunk(max_tx / 2 + 1, 2),
4249 &versions,
4250 1000,
4251 make_commit_ref(2),
4252 Digest::default(),
4253 );
4254
4255 assert_eq!(flushed.len(), 1);
4256 assert_eq!(queue.pending_roots.len(), 1);
4257 }
4258
4259 #[test]
4260 fn test_multiple_chunks_merged_into_one_checkpoint() {
4261 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 200);
4262 let versions = default_versions();
4263
4264 queue.push_chunk(
4265 make_chunk(10, 1),
4266 &versions,
4267 1000,
4268 make_commit_ref(1),
4269 Digest::default(),
4270 );
4271 queue.push_chunk(
4272 make_chunk(10, 2),
4273 &versions,
4274 1000,
4275 make_commit_ref(2),
4276 Digest::default(),
4277 );
4278 queue.push_chunk(
4279 make_chunk(10, 3),
4280 &versions,
4281 1000,
4282 make_commit_ref(3),
4283 Digest::default(),
4284 );
4285
4286 let pending = queue.flush(1000, true).unwrap();
4287
4288 assert_eq!(pending.roots.len(), 3);
4289 }
4290
4291 #[test]
4292 fn test_push_chunk_handles_overflow() {
4293 let max_tx = 10;
4294 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4295 let versions = default_versions();
4296
4297 let flushed1 = queue.push_chunk(
4298 make_chunk(max_tx / 2, 1),
4299 &versions,
4300 1000,
4301 make_commit_ref(1),
4302 Digest::default(),
4303 );
4304 assert!(flushed1.is_empty());
4305
4306 let flushed2 = queue.push_chunk(
4307 make_chunk(max_tx / 2, 2),
4308 &versions,
4309 1000,
4310 make_commit_ref(2),
4311 Digest::default(),
4312 );
4313 assert!(flushed2.is_empty());
4314
4315 let flushed3 = queue.push_chunk(
4316 make_chunk(max_tx / 2, 3),
4317 &versions,
4318 1000,
4319 make_commit_ref(3),
4320 Digest::default(),
4321 );
4322 assert_eq!(flushed3.len(), 1);
4323
4324 let pending = queue.flush(1000, true);
4325
4326 for p in pending.iter().chain(flushed3.iter()) {
4327 let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4328 assert!(tx_count <= max_tx);
4329 }
4330 }
4331
4332 #[test]
4333 fn test_checkpoint_uses_last_chunk_height() {
4334 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4335 let versions = default_versions();
4336
4337 queue.push_chunk(
4338 make_chunk(10, 100),
4339 &versions,
4340 1000,
4341 make_commit_ref(1),
4342 Digest::default(),
4343 );
4344 queue.push_chunk(
4345 make_chunk(10, 200),
4346 &versions,
4347 1000,
4348 make_commit_ref(2),
4349 Digest::default(),
4350 );
4351
4352 let pending = queue.flush(1000, true).unwrap();
4353
4354 assert_eq!(pending.details.checkpoint_height, 200);
4355 }
4356
4357 #[test]
4358 fn test_last_built_timestamp_updated_on_flush() {
4359 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, 1000, 0);
4360 let versions = default_versions();
4361
4362 queue.push_chunk(
4363 make_chunk(10, 1),
4364 &versions,
4365 5000,
4366 make_commit_ref(1),
4367 Digest::default(),
4368 );
4369
4370 assert_eq!(queue.last_built_timestamp, 0);
4371
4372 let _ = queue.flush(5000, true);
4373
4374 assert_eq!(queue.last_built_timestamp, 5000);
4375 }
4376
4377 #[test]
4378 fn test_settlement_info_sent_through_channel() {
4379 let mut queue = CheckpointQueue::new_for_testing(0, 0, 5, 1000, 0);
4380 let versions = default_versions();
4381
4382 let chunk1 = Chunk {
4383 schedulables: vec![
4384 Schedulable::ConsensusCommitPrologue(0, 1, 0),
4385 Schedulable::ConsensusCommitPrologue(0, 2, 0),
4386 Schedulable::ConsensusCommitPrologue(0, 3, 0),
4387 ],
4388 settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4389 height: 1,
4390 };
4391
4392 let chunk2 = Chunk {
4393 schedulables: vec![
4394 Schedulable::ConsensusCommitPrologue(0, 4, 0),
4395 Schedulable::ConsensusCommitPrologue(0, 5, 0),
4396 ],
4397 settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4398 height: 2,
4399 };
4400
4401 queue.push_chunk(
4402 chunk1,
4403 &versions,
4404 1000,
4405 make_commit_ref(1),
4406 Digest::default(),
4407 );
4408 queue.push_chunk(
4409 chunk2,
4410 &versions,
4411 1000,
4412 make_commit_ref(1),
4413 Digest::default(),
4414 );
4415 }
4416
4417 #[test]
4418 fn test_settlement_checkpoint_seq_correct_after_flush() {
4419 let max_tx = 10;
4420 let initial_seq = 5;
4421 let (sender, mut receiver) = monitored_mpsc::unbounded_channel("test_settlement_seq");
4422 let mut queue =
4423 CheckpointQueue::new_for_testing_with_sender(0, 0, initial_seq, max_tx, 0, sender);
4424 let versions = default_versions();
4425
4426 let chunk1 = Chunk {
4428 schedulables: (0..max_tx / 2 + 1)
4429 .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4430 .collect(),
4431 settlement: Some(Schedulable::AccumulatorSettlement(1, 1)),
4432 height: 1,
4433 };
4434 queue.push_chunk(
4435 chunk1,
4436 &versions,
4437 1000,
4438 make_commit_ref(1),
4439 Digest::default(),
4440 );
4441
4442 let msg1 = receiver.try_recv().unwrap();
4444 let settlement1 = msg1.1.unwrap();
4445 assert_eq!(settlement1.checkpoint_seq, initial_seq);
4446
4447 let chunk2 = Chunk {
4449 schedulables: (0..max_tx / 2 + 1)
4450 .map(|_| Schedulable::Transaction(user_txn(1000).into_tx()))
4451 .collect(),
4452 settlement: Some(Schedulable::AccumulatorSettlement(1, 2)),
4453 height: 2,
4454 };
4455 let flushed = queue.push_chunk(
4456 chunk2,
4457 &versions,
4458 1000,
4459 make_commit_ref(2),
4460 Digest::default(),
4461 );
4462 assert_eq!(flushed.len(), 1);
4463 assert_eq!(flushed[0].details.checkpoint_seq, initial_seq);
4464
4465 let msg2 = receiver.try_recv().unwrap();
4468 let settlement2 = msg2.1.unwrap();
4469 assert_eq!(settlement2.checkpoint_seq, initial_seq + 1);
4470
4471 let pending = queue.flush_forced().unwrap();
4474 assert_eq!(pending.details.checkpoint_seq, settlement2.checkpoint_seq);
4475 }
4476
4477 #[test]
4478 fn test_checkpoint_seq_increments_on_flush() {
4479 let mut queue = CheckpointQueue::new_for_testing(0, 0, 10, 1000, 0);
4480 let versions = default_versions();
4481
4482 queue.push_chunk(
4483 make_chunk(5, 1),
4484 &versions,
4485 1000,
4486 make_commit_ref(1),
4487 Digest::default(),
4488 );
4489
4490 let pending = queue.flush(1000, true).unwrap();
4491
4492 assert_eq!(pending.details.checkpoint_seq, 10);
4493 assert_eq!(queue.current_checkpoint_seq, 11);
4494 }
4495
4496 #[test]
4497 fn test_multiple_chunks_with_overflow() {
4498 let max_tx = 10;
4499 let mut queue = CheckpointQueue::new_for_testing(0, 0, 0, max_tx, 0);
4500 let versions = default_versions();
4501
4502 let flushed1 = queue.push_chunk(
4503 make_chunk(max_tx / 2 + 1, 1),
4504 &versions,
4505 1000,
4506 make_commit_ref(1),
4507 Digest::default(),
4508 );
4509 let flushed2 = queue.push_chunk(
4510 make_chunk(max_tx / 2 + 1, 2),
4511 &versions,
4512 1000,
4513 make_commit_ref(1),
4514 Digest::default(),
4515 );
4516 let flushed3 = queue.push_chunk(
4517 make_chunk(max_tx / 2 + 1, 3),
4518 &versions,
4519 1000,
4520 make_commit_ref(1),
4521 Digest::default(),
4522 );
4523
4524 let all_flushed: Vec<_> = flushed1
4525 .into_iter()
4526 .chain(flushed2)
4527 .chain(flushed3)
4528 .collect();
4529 assert_eq!(all_flushed.len(), 2);
4530 assert_eq!(queue.pending_roots.len(), 1);
4531
4532 for p in &all_flushed {
4533 let tx_count: usize = p.roots.iter().map(|r| r.tx_roots.len()).sum();
4534 assert!(tx_count <= max_tx);
4535 }
4536 }
4537 }
4538}