1use crate::authority::authority_per_epoch_store::{
5 AuthorityEpochTables, EncG, ExecutionIndicesWithStatsV2, LockDetails, LockDetailsWrapper, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::epoch::randomness::SINGLETON_KEY;
10use dashmap::DashMap;
11use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
12use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
13use moka::policy::EvictionPolicy;
14use moka::sync::SegmentedCache as MokaCache;
15use mysten_common::fatal;
16use mysten_common::random_util::randomize_cache_capacity_in_tests;
17use parking_lot::Mutex;
18use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
19use sui_types::authenticator_state::ActiveJwk;
20use sui_types::base_types::{AuthorityName, ObjectRef, SequenceNumber};
21use sui_types::crypto::RandomnessRound;
22use sui_types::error::SuiResult;
23use sui_types::executable_transaction::{
24 TrustedExecutableTransactionWithAliases, VerifiedExecutableTransactionWithAliases,
25};
26use sui_types::execution::ExecutionTimeObservationKey;
27use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
28use sui_types::messages_consensus::AuthorityIndex;
29use sui_types::{
30 base_types::{ConsensusObjectSequenceKey, ObjectID},
31 digests::TransactionDigest,
32 messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
33 signature::GenericSignature,
34};
35use tracing::{debug, info};
36use typed_store::Map;
37use typed_store::rocks::DBBatch;
38
39use crate::{
40 authority::{
41 authority_per_epoch_store::AuthorityPerEpochStore,
42 shared_object_congestion_tracker::CongestionPerObjectDebt,
43 },
44 checkpoints::{CheckpointHeight, PendingCheckpoint, PendingCheckpointV2},
45 consensus_handler::SequencedConsensusTransactionKey,
46 epoch::{
47 randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
48 reconfiguration::ReconfigState,
49 },
50};
51
52use super::*;
53
54#[derive(Default)]
55#[allow(clippy::type_complexity)]
56pub(crate) struct ConsensusCommitOutput {
57 consensus_round: Round,
59 consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
60 end_of_publish: BTreeSet<AuthorityName>,
61 reconfig_state: Option<ReconfigState>,
62 consensus_commit_stats: Option<ExecutionIndicesWithStatsV2>,
63
64 next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
66
67 deferred_txns: Vec<(DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>)>,
68 deleted_deferred_txns: BTreeSet<DeferralKey>,
69
70 pending_checkpoints: Vec<PendingCheckpoint>,
72 pending_checkpoints_v2: Vec<PendingCheckpointV2>,
73
74 next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
76
77 dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
78 dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
79 dkg_used_message: Option<VersionedUsedProcessedMessages>,
80 dkg_output: Option<dkg_v1::Output<PkG, EncG>>,
81
82 pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
84 active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
85
86 congestion_control_object_debts: Vec<(ObjectID, u64)>,
88 congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
89 execution_time_observations: Vec<(
90 AuthorityIndex,
91 u64, Vec<(ExecutionTimeObservationKey, Duration)>,
93 )>,
94
95 owned_object_locks: HashMap<ObjectRef, LockDetails>,
97
98 checkpoint_queue_drained: bool,
101}
102
103impl ConsensusCommitOutput {
104 pub fn new(consensus_round: Round) -> Self {
105 Self {
106 consensus_round,
107 ..Default::default()
108 }
109 }
110
111 pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
112 self.deleted_deferred_txns.iter().cloned()
113 }
114
115 pub fn has_deferred_transactions(&self) -> bool {
116 !self.deferred_txns.is_empty()
117 }
118
119 fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
120 self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
121 }
122
123 fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
124 self.pending_checkpoints.last().map(|cp| cp.height())
125 }
126
127 fn get_pending_checkpoints(
128 &self,
129 last: Option<CheckpointHeight>,
130 ) -> impl Iterator<Item = &PendingCheckpoint> {
131 self.pending_checkpoints.iter().filter(move |cp| {
132 if let Some(last) = last {
133 cp.height() > last
134 } else {
135 true
136 }
137 })
138 }
139
140 fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
141 self.pending_checkpoints
142 .iter()
143 .any(|cp| cp.height() == *index)
144 }
145
146 fn get_pending_checkpoints_v2(
147 &self,
148 last: Option<CheckpointHeight>,
149 ) -> impl Iterator<Item = &PendingCheckpointV2> {
150 self.pending_checkpoints_v2.iter().filter(move |cp| {
151 if let Some(last) = last {
152 cp.height() > last
153 } else {
154 true
155 }
156 })
157 }
158
159 fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
160 self.pending_checkpoints_v2
161 .iter()
162 .any(|cp| cp.height() == *index)
163 }
164
165 fn get_round(&self) -> Option<u64> {
166 self.consensus_commit_stats
167 .as_ref()
168 .map(|stats| stats.index.last_committed_round)
169 }
170
171 pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
172 self.end_of_publish.insert(authority);
173 }
174
175 pub fn insert_execution_time_observation(
176 &mut self,
177 source: AuthorityIndex,
178 generation: u64,
179 estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
180 ) {
181 self.execution_time_observations
182 .push((source, generation, estimates));
183 }
184
185 pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStatsV2) {
186 self.consensus_commit_stats = Some(stats);
187 }
188
189 pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
191 self.record_consensus_commit_stats(Default::default());
192 }
193
194 pub fn store_reconfig_state(&mut self, state: ReconfigState) {
195 self.reconfig_state = Some(state);
196 }
197
198 pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
199 self.consensus_messages_processed.insert(key);
200 }
201
202 pub fn get_consensus_messages_processed(
203 &self,
204 ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
205 self.consensus_messages_processed.iter()
206 }
207
208 pub fn set_next_shared_object_versions(
209 &mut self,
210 next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
211 ) {
212 assert!(self.next_shared_object_versions.is_none());
213 self.next_shared_object_versions = Some(next_versions);
214 }
215
216 pub fn defer_transactions(
217 &mut self,
218 key: DeferralKey,
219 transactions: Vec<VerifiedExecutableTransactionWithAliases>,
220 ) {
221 self.deferred_txns.push((key, transactions));
222 }
223
224 pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
225 self.deleted_deferred_txns
226 .extend(deferral_keys.iter().cloned());
227 }
228
229 pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
230 self.pending_checkpoints.push(checkpoint);
231 }
232
233 pub fn insert_pending_checkpoint_v2(&mut self, checkpoint: PendingCheckpointV2) {
234 self.pending_checkpoints_v2.push(checkpoint);
235 }
236
237 pub fn reserve_next_randomness_round(
238 &mut self,
239 next_randomness_round: RandomnessRound,
240 commit_timestamp: TimestampMs,
241 ) {
242 assert!(self.next_randomness_round.is_none());
243 self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
244 }
245
246 pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
247 self.dkg_confirmations.insert(conf.sender(), conf);
248 }
249
250 pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
251 self.dkg_processed_messages
252 .insert(message.sender(), message);
253 }
254
255 pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
256 self.dkg_used_message = Some(used_messages);
257 }
258
259 pub fn set_dkg_output(&mut self, output: dkg_v1::Output<PkG, EncG>) {
260 self.dkg_output = Some(output);
261 }
262
263 pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
264 self.pending_jwks.insert((authority, id, jwk));
265 }
266
267 pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
268 self.active_jwks.insert((round, key));
269 }
270
271 pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
272 self.congestion_control_object_debts = object_debts;
273 }
274
275 pub fn set_congestion_control_randomness_object_debts(
276 &mut self,
277 object_debts: Vec<(ObjectID, u64)>,
278 ) {
279 self.congestion_control_randomness_object_debts = object_debts;
280 }
281
282 pub fn set_checkpoint_queue_drained(&mut self, drained: bool) {
283 self.checkpoint_queue_drained = drained;
284 }
285
286 pub fn set_owned_object_locks(&mut self, locks: HashMap<ObjectRef, LockDetails>) {
287 assert!(self.owned_object_locks.is_empty());
288 self.owned_object_locks = locks;
289 }
290
291 pub fn write_to_batch(
292 self,
293 epoch_store: &AuthorityPerEpochStore,
294 batch: &mut DBBatch,
295 ) -> SuiResult {
296 let tables = epoch_store.tables()?;
297 batch.insert_batch(
298 &tables.consensus_message_processed,
299 self.consensus_messages_processed
300 .iter()
301 .map(|key| (key, true)),
302 )?;
303
304 batch.insert_batch(
305 &tables.end_of_publish,
306 self.end_of_publish.iter().map(|authority| (authority, ())),
307 )?;
308
309 if let Some(reconfig_state) = &self.reconfig_state {
310 batch.insert_batch(
311 &tables.reconfig_state,
312 [(RECONFIG_STATE_INDEX, reconfig_state)],
313 )?;
314 }
315
316 let consensus_commit_stats = self
317 .consensus_commit_stats
318 .expect("consensus_commit_stats must be set");
319 let round = consensus_commit_stats.index.last_committed_round;
320
321 batch.insert_batch(
322 &tables.last_consensus_stats_v2,
323 [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
324 )?;
325
326 if let Some(next_versions) = self.next_shared_object_versions {
327 batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
328 }
329
330 if !self.owned_object_locks.is_empty() {
331 batch.insert_batch(
332 &tables.owned_object_locked_transactions,
333 self.owned_object_locks
334 .into_iter()
335 .map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))),
336 )?;
337 }
338
339 batch.delete_batch(
340 &tables.deferred_transactions_v2,
341 &self.deleted_deferred_txns,
342 )?;
343 batch.delete_batch(
344 &tables.deferred_transactions_with_aliases_v2,
345 &self.deleted_deferred_txns,
346 )?;
347 batch.delete_batch(
348 &tables.deferred_transactions_with_aliases_v3,
349 &self.deleted_deferred_txns,
350 )?;
351
352 batch.insert_batch(
353 &tables.deferred_transactions_with_aliases_v3,
354 self.deferred_txns.into_iter().map(|(key, txs)| {
355 (
356 key,
357 txs.into_iter()
358 .map(|tx| {
359 let tx: TrustedExecutableTransactionWithAliases = tx.serializable();
360 tx
361 })
362 .collect::<Vec<_>>(),
363 )
364 }),
365 )?;
366
367 if let Some((round, commit_timestamp)) = self.next_randomness_round {
368 batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
369 batch.insert_batch(
370 &tables.randomness_last_round_timestamp,
371 [(SINGLETON_KEY, commit_timestamp)],
372 )?;
373 }
374
375 batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
376 batch.insert_batch(
377 &tables.dkg_processed_messages_v2,
378 self.dkg_processed_messages,
379 )?;
380 batch.insert_batch(
381 &tables.dkg_used_messages_v2,
382 self.dkg_used_message
384 .into_iter()
385 .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
386 )?;
387 if let Some(output) = self.dkg_output {
388 batch.insert_batch(&tables.dkg_output, [(SINGLETON_KEY, output)])?;
389 }
390
391 batch.insert_batch(
392 &tables.pending_jwks,
393 self.pending_jwks.into_iter().map(|j| (j, ())),
394 )?;
395 batch.insert_batch(
396 &tables.active_jwks,
397 self.active_jwks.into_iter().map(|j| {
398 assert_eq!(j.0, round);
400 (j, ())
401 }),
402 )?;
403
404 batch.insert_batch(
405 &tables.congestion_control_object_debts,
406 self.congestion_control_object_debts
407 .into_iter()
408 .map(|(object_id, debt)| {
409 (
410 object_id,
411 CongestionPerObjectDebt::new(self.consensus_round, debt),
412 )
413 }),
414 )?;
415 batch.insert_batch(
416 &tables.congestion_control_randomness_object_debts,
417 self.congestion_control_randomness_object_debts
418 .into_iter()
419 .map(|(object_id, debt)| {
420 (
421 object_id,
422 CongestionPerObjectDebt::new(self.consensus_round, debt),
423 )
424 }),
425 )?;
426
427 batch.insert_batch(
428 &tables.execution_time_observations,
429 self.execution_time_observations
430 .into_iter()
431 .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
432 )?;
433
434 Ok(())
435 }
436}
437
438pub(crate) struct ConsensusOutputCache {
443 pub(crate) deferred_transactions:
446 Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>>,
447
448 #[allow(clippy::type_complexity)]
451 pub(crate) user_signatures_for_checkpoints:
452 Mutex<HashMap<TransactionDigest, Vec<(GenericSignature, Option<SequenceNumber>)>>>,
453
454 executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
455 executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
456}
457
458impl ConsensusOutputCache {
459 pub(crate) fn new(tables: &AuthorityEpochTables) -> Self {
460 let deferred_transactions = tables
461 .get_all_deferred_transactions_v2()
462 .expect("load deferred transactions cannot fail");
463
464 let executed_in_epoch_cache_capacity = 50_000;
465
466 Self {
467 deferred_transactions: Mutex::new(deferred_transactions),
468 user_signatures_for_checkpoints: Default::default(),
469 executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
470 executed_in_epoch_cache: MokaCache::builder(8)
471 .max_capacity(randomize_cache_capacity_in_tests(
473 executed_in_epoch_cache_capacity,
474 ))
475 .eviction_policy(EvictionPolicy::lru())
476 .build(),
477 }
478 }
479
480 pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
481 self.executed_in_epoch
482 .read()
483 .contains_key(digest) ||
484 self.executed_in_epoch_cache.get(digest).is_some()
486 }
487
488 pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
490 assert!(
491 self.executed_in_epoch
492 .read()
493 .insert(tx_digest, ())
494 .is_none(),
495 "transaction already executed"
496 );
497 self.executed_in_epoch_cache.insert(tx_digest, ());
498 }
499
500 pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
504 let executed_in_epoch = self.executed_in_epoch.read();
505 for tx_digest in tx_digests {
506 executed_in_epoch.remove(tx_digest);
507 }
508 }
509}
510
511pub(crate) struct ConsensusOutputQuarantine {
514 output_queue: VecDeque<ConsensusCommitOutput>,
516
517 highest_executed_checkpoint: CheckpointSequenceNumber,
519
520 builder_checkpoint_summary:
522 BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
523
524 builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
525
526 shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
528
529 congestion_control_randomness_object_debts:
532 RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
533 congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
534
535 processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
536
537 owned_object_locks: HashMap<ObjectRef, LockDetails>,
539
540 metrics: Arc<EpochMetrics>,
541}
542
543impl ConsensusOutputQuarantine {
544 pub(super) fn new(
545 highest_executed_checkpoint: CheckpointSequenceNumber,
546 authority_metrics: Arc<EpochMetrics>,
547 ) -> Self {
548 Self {
549 highest_executed_checkpoint,
550
551 output_queue: VecDeque::new(),
552 builder_checkpoint_summary: BTreeMap::new(),
553 builder_digest_to_checkpoint: HashMap::new(),
554 shared_object_next_versions: RefCountedHashMap::new(),
555 processed_consensus_messages: RefCountedHashMap::new(),
556 congestion_control_randomness_object_debts: RefCountedHashMap::new(),
557 congestion_control_object_debts: RefCountedHashMap::new(),
558 owned_object_locks: HashMap::new(),
559 metrics: authority_metrics,
560 }
561 }
562}
563
564impl ConsensusOutputQuarantine {
567 pub(crate) fn push_consensus_output(
569 &mut self,
570 output: ConsensusCommitOutput,
571 epoch_store: &AuthorityPerEpochStore,
572 ) -> SuiResult {
573 self.insert_shared_object_next_versions(&output);
574 self.insert_congestion_control_debts(&output);
575 self.insert_processed_consensus_messages(&output);
576 self.insert_owned_object_locks(&output);
577 self.output_queue.push_back(output);
578
579 self.metrics
580 .consensus_quarantine_queue_size
581 .set(self.output_queue.len() as i64);
582
583 self.commit(epoch_store)
586 }
587
588 pub(super) fn insert_builder_summary(
590 &mut self,
591 sequence_number: CheckpointSequenceNumber,
592 summary: BuilderCheckpointSummary,
593 contents: CheckpointContents,
594 ) {
595 debug!(?sequence_number, "inserting builder summary {:?}", summary);
596 for tx in contents.iter() {
597 self.builder_digest_to_checkpoint
598 .insert(tx.transaction, sequence_number);
599 }
600 self.builder_checkpoint_summary
601 .insert(sequence_number, (summary, contents));
602 }
603}
604
605impl ConsensusOutputQuarantine {
607 pub(super) fn update_highest_executed_checkpoint(
610 &mut self,
611 checkpoint: CheckpointSequenceNumber,
612 epoch_store: &AuthorityPerEpochStore,
613 batch: &mut DBBatch,
614 ) -> SuiResult {
615 self.highest_executed_checkpoint = checkpoint;
616 self.commit_with_batch(epoch_store, batch)
617 }
618
619 pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
620 let mut batch = epoch_store.db_batch()?;
621 self.commit_with_batch(epoch_store, &mut batch)?;
622 batch.write()?;
623 Ok(())
624 }
625
626 fn commit_with_batch(
628 &mut self,
629 epoch_store: &AuthorityPerEpochStore,
630 batch: &mut DBBatch,
631 ) -> SuiResult {
632 let tables = epoch_store.tables()?;
639
640 let mut highest_committed_height = None;
641
642 while self
643 .builder_checkpoint_summary
644 .first_key_value()
645 .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
646 == Some(true)
647 {
648 let (seq, (builder_summary, contents)) =
649 self.builder_checkpoint_summary.pop_first().unwrap();
650
651 for tx in contents.iter() {
652 let digest = &tx.transaction;
653 assert_eq!(
654 self.builder_digest_to_checkpoint
655 .remove(digest)
656 .unwrap_or_else(|| {
657 panic!(
658 "transaction {:?} not found in builder_digest_to_checkpoint",
659 digest
660 )
661 }),
662 seq
663 );
664 }
665
666 batch.insert_batch(
667 &tables.builder_digest_to_checkpoint,
668 contents.iter().map(|tx| (tx.transaction, seq)),
669 )?;
670
671 batch.insert_batch(
672 &tables.builder_checkpoint_summary_v2,
673 [(seq, &builder_summary)],
674 )?;
675
676 let checkpoint_height = builder_summary
677 .checkpoint_height
678 .expect("non-genesis checkpoint must have height");
679 if let Some(highest) = highest_committed_height {
680 assert!(
681 checkpoint_height >= highest,
682 "current checkpoint height {} must be no less than highest committed height {}",
683 checkpoint_height,
684 highest
685 );
686 }
687
688 highest_committed_height = Some(checkpoint_height);
689 }
690
691 let Some(highest_committed_height) = highest_committed_height else {
692 return Ok(());
693 };
694
695 let split_checkpoints_in_consensus_handler = epoch_store
696 .protocol_config()
697 .split_checkpoints_in_consensus_handler();
698
699 if split_checkpoints_in_consensus_handler {
700 let mut last_drain_idx = None;
706 for (i, output) in self.output_queue.iter().enumerate() {
707 let stats = output
708 .consensus_commit_stats
709 .as_ref()
710 .expect("consensus_commit_stats must be set");
711 if stats.height > highest_committed_height {
712 break;
713 }
714 if output.checkpoint_queue_drained {
715 last_drain_idx = Some(i);
716 }
717 }
718 if let Some(idx) = last_drain_idx {
719 for _ in 0..=idx {
720 let output = self.output_queue.pop_front().unwrap();
721 info!("committing drain-boundary output");
722 self.remove_shared_object_next_versions(&output);
723 self.remove_processed_consensus_messages(&output);
724 self.remove_congestion_control_debts(&output);
725 self.remove_owned_object_locks(&output);
726 output.write_to_batch(epoch_store, batch)?;
727 }
728 }
729 } else {
730 while !self.output_queue.is_empty() {
731 let output = self.output_queue.front().unwrap();
732 let Some(highest_in_commit) = output.get_highest_pending_checkpoint_height() else {
733 break;
734 };
735
736 if highest_in_commit <= highest_committed_height {
737 info!(
738 "committing output with highest pending checkpoint height {:?}",
739 highest_in_commit
740 );
741 let output = self.output_queue.pop_front().unwrap();
742 self.remove_shared_object_next_versions(&output);
743 self.remove_processed_consensus_messages(&output);
744 self.remove_congestion_control_debts(&output);
745 self.remove_owned_object_locks(&output);
746 output.write_to_batch(epoch_store, batch)?;
747 } else {
748 break;
749 }
750 }
751 }
752
753 self.metrics
754 .consensus_quarantine_queue_size
755 .set(self.output_queue.len() as i64);
756
757 Ok(())
758 }
759}
760
761impl ConsensusOutputQuarantine {
762 fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
763 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
764 for (object_id, next_version) in next_versions {
765 self.shared_object_next_versions
766 .insert(*object_id, *next_version);
767 }
768 }
769 }
770
771 fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
772 let current_round = output.consensus_round;
773
774 for (object_id, debt) in output.congestion_control_object_debts.iter() {
775 self.congestion_control_object_debts.insert(
776 *object_id,
777 CongestionPerObjectDebt::new(current_round, *debt),
778 );
779 }
780
781 for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
782 self.congestion_control_randomness_object_debts.insert(
783 *object_id,
784 CongestionPerObjectDebt::new(current_round, *debt),
785 );
786 }
787 }
788
789 fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
790 for (object_id, _) in output.congestion_control_object_debts.iter() {
791 self.congestion_control_object_debts.remove(object_id);
792 }
793 for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
794 self.congestion_control_randomness_object_debts
795 .remove(object_id);
796 }
797 }
798
799 fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
800 for tx_key in output.consensus_messages_processed.iter() {
801 self.processed_consensus_messages.insert(tx_key.clone(), ());
802 }
803 }
804
805 fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
806 for tx_key in output.consensus_messages_processed.iter() {
807 self.processed_consensus_messages.remove(tx_key);
808 }
809 }
810
811 fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
812 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
813 for object_id in next_versions.keys() {
814 if !self.shared_object_next_versions.remove(object_id) {
815 fatal!(
816 "Shared object next version not found in quarantine: {:?}",
817 object_id
818 );
819 }
820 }
821 }
822 }
823
824 fn insert_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
825 for (obj_ref, lock) in &output.owned_object_locks {
826 self.owned_object_locks.insert(*obj_ref, *lock);
827 }
828 }
829
830 fn remove_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
831 for obj_ref in output.owned_object_locks.keys() {
832 self.owned_object_locks.remove(obj_ref);
833 }
834 }
835}
836
837impl ConsensusOutputQuarantine {
840 pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
841 self.builder_checkpoint_summary
842 .values()
843 .last()
844 .map(|(summary, _)| summary)
845 }
846
847 pub(super) fn get_built_summary(
848 &self,
849 sequence: CheckpointSequenceNumber,
850 ) -> Option<&BuilderCheckpointSummary> {
851 self.builder_checkpoint_summary
852 .get(&sequence)
853 .map(|(summary, _)| summary)
854 }
855
856 pub(super) fn included_transaction_in_checkpoint(&self, digest: &TransactionDigest) -> bool {
857 self.builder_digest_to_checkpoint.contains_key(digest)
858 }
859
860 pub(super) fn is_consensus_message_processed(
861 &self,
862 key: &SequencedConsensusTransactionKey,
863 ) -> bool {
864 self.processed_consensus_messages.contains_key(key)
865 }
866
867 pub(super) fn is_empty(&self) -> bool {
868 self.output_queue.is_empty()
869 }
870
871 pub(super) fn get_next_shared_object_versions(
872 &self,
873 tables: &AuthorityEpochTables,
874 objects_to_init: &[ConsensusObjectSequenceKey],
875 ) -> SuiResult<Vec<Option<SequenceNumber>>> {
876 Ok(do_fallback_lookup(
877 objects_to_init,
878 |object_key| {
879 if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
880 CacheResult::Hit(Some(*next_version))
881 } else {
882 CacheResult::Miss
883 }
884 },
885 |object_keys| {
886 tables
887 .next_shared_object_versions_v2
888 .multi_get(object_keys)
889 .expect("db error")
890 },
891 ))
892 }
893
894 pub(super) fn get_owned_object_locks(
898 &self,
899 tables: &AuthorityEpochTables,
900 obj_refs: &[ObjectRef],
901 ) -> SuiResult<Vec<Option<LockDetails>>> {
902 Ok(do_fallback_lookup(
903 obj_refs,
904 |obj_ref| {
905 if let Some(lock) = self.owned_object_locks.get(obj_ref) {
906 CacheResult::Hit(Some(*lock))
907 } else {
908 CacheResult::Miss
909 }
910 },
911 |obj_refs| {
912 tables
913 .multi_get_locked_transactions(obj_refs)
914 .expect("db error")
915 },
916 ))
917 }
918
919 pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
920 self.output_queue
921 .back()
922 .and_then(|output| output.get_highest_pending_checkpoint_height())
923 }
924
925 pub(super) fn get_pending_checkpoints(
926 &self,
927 last: Option<CheckpointHeight>,
928 ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
929 let mut checkpoints = Vec::new();
930 for output in &self.output_queue {
931 checkpoints.extend(
932 output
933 .get_pending_checkpoints(last)
934 .map(|cp| (cp.height(), cp.clone())),
935 );
936 }
937 if cfg!(debug_assertions) {
938 let mut prev = None;
939 for (height, _) in &checkpoints {
940 if let Some(prev) = prev {
941 assert!(prev < *height);
942 }
943 prev = Some(*height);
944 }
945 }
946 checkpoints
947 }
948
949 pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
950 self.output_queue
951 .iter()
952 .any(|output| output.pending_checkpoint_exists(index))
953 }
954
955 pub(super) fn get_pending_checkpoints_v2(
956 &self,
957 last: Option<CheckpointHeight>,
958 ) -> Vec<(CheckpointHeight, PendingCheckpointV2)> {
959 let mut checkpoints = Vec::new();
960 for output in &self.output_queue {
961 checkpoints.extend(
962 output
963 .get_pending_checkpoints_v2(last)
964 .map(|cp| (cp.height(), cp.clone())),
965 );
966 }
967 if cfg!(debug_assertions) {
968 let mut prev = None;
969 for (height, _) in &checkpoints {
970 if let Some(prev) = prev {
971 assert!(prev < *height);
972 }
973 prev = Some(*height);
974 }
975 }
976 checkpoints
977 }
978
979 pub(super) fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
980 self.output_queue
981 .iter()
982 .any(|output| output.pending_checkpoint_exists_v2(index))
983 }
984
985 pub(super) fn get_new_jwks(
986 &self,
987 epoch_store: &AuthorityPerEpochStore,
988 round: u64,
989 ) -> SuiResult<Vec<ActiveJwk>> {
990 let epoch = epoch_store.epoch();
991
992 for output in self.output_queue.iter().rev() {
994 let output_round = output.get_round().unwrap();
997 if round == output_round {
998 return Ok(output
999 .active_jwks
1000 .iter()
1001 .map(|(_, (jwk_id, jwk))| ActiveJwk {
1002 jwk_id: jwk_id.clone(),
1003 jwk: jwk.clone(),
1004 epoch,
1005 })
1006 .collect());
1007 }
1008 }
1009
1010 let empty_jwk_id = JwkId::new(String::new(), String::new());
1012 let empty_jwk = JWK {
1013 kty: String::new(),
1014 e: String::new(),
1015 n: String::new(),
1016 alg: String::new(),
1017 };
1018
1019 let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
1020 let end = (round + 1, (empty_jwk_id, empty_jwk));
1021
1022 Ok(epoch_store
1023 .tables()?
1024 .active_jwks
1025 .safe_iter_with_bounds(Some(start), Some(end))
1026 .map_ok(|((r, (jwk_id, jwk)), _)| {
1027 debug_assert!(round == r);
1028 ActiveJwk { jwk_id, jwk, epoch }
1029 })
1030 .collect::<Result<Vec<_>, _>>()?)
1031 }
1032
1033 pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
1034 self.output_queue
1035 .iter()
1036 .rev()
1037 .filter_map(|output| output.get_randomness_last_round_timestamp())
1038 .next()
1039 }
1040
1041 pub(crate) fn load_initial_object_debts(
1042 &self,
1043 epoch_store: &AuthorityPerEpochStore,
1044 current_round: Round,
1045 for_randomness: bool,
1046 transactions: &[VerifiedExecutableTransactionWithAliases],
1047 ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
1048 let protocol_config = epoch_store.protocol_config();
1049 let tables = epoch_store.tables()?;
1050 let default_per_commit_budget = protocol_config
1051 .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
1052 .unwrap_or(0);
1053 let (hash_table, db_table, per_commit_budget) = if for_randomness {
1054 (
1055 &self.congestion_control_randomness_object_debts,
1056 &tables.congestion_control_randomness_object_debts,
1057 protocol_config
1058 .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
1059 .unwrap_or(default_per_commit_budget),
1060 )
1061 } else {
1062 (
1063 &self.congestion_control_object_debts,
1064 &tables.congestion_control_object_debts,
1065 default_per_commit_budget,
1066 )
1067 };
1068 let mut shared_input_object_ids: Vec<_> = transactions
1069 .iter()
1070 .flat_map(|tx| tx.tx().shared_input_objects().map(|obj| obj.id))
1071 .collect();
1072 shared_input_object_ids.sort();
1073 shared_input_object_ids.dedup();
1074
1075 let results = do_fallback_lookup(
1076 &shared_input_object_ids,
1077 |object_id| {
1078 if let Some(debt) = hash_table.get(object_id) {
1079 CacheResult::Hit(Some(debt.into_v1()))
1080 } else {
1081 CacheResult::Miss
1082 }
1083 },
1084 |object_ids| {
1085 db_table
1086 .multi_get(object_ids)
1087 .expect("db error")
1088 .into_iter()
1089 .map(|debt| debt.map(|debt| debt.into_v1()))
1090 .collect()
1091 },
1092 );
1093
1094 Ok(results
1095 .into_iter()
1096 .zip(shared_input_object_ids)
1097 .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1098 .map(move |((round, debt), object_id)| {
1099 assert!(current_round > round);
1103 let num_rounds = current_round - round - 1;
1104 let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1105 (object_id, debt)
1106 }))
1107 }
1108}
1109
1110#[derive(Debug, Default)]
1119struct RefCountedHashMap<K, V> {
1120 map: HashMap<K, (usize, V)>,
1121}
1122
1123impl<K, V> RefCountedHashMap<K, V>
1124where
1125 K: Clone + Eq + std::hash::Hash,
1126{
1127 pub fn new() -> Self {
1128 Self {
1129 map: HashMap::new(),
1130 }
1131 }
1132
1133 pub fn insert(&mut self, key: K, value: V) {
1134 let entry = self.map.entry(key);
1135 match entry {
1136 hash_map::Entry::Occupied(mut entry) => {
1137 let (ref_count, v) = entry.get_mut();
1138 *ref_count += 1;
1139 *v = value;
1140 }
1141 hash_map::Entry::Vacant(entry) => {
1142 entry.insert((1, value));
1143 }
1144 }
1145 }
1146
1147 pub fn remove(&mut self, key: &K) -> bool {
1150 let entry = self.map.entry(key.clone());
1151 match entry {
1152 hash_map::Entry::Occupied(mut entry) => {
1153 let (ref_count, _) = entry.get_mut();
1154 *ref_count -= 1;
1155 if *ref_count == 0 {
1156 entry.remove();
1157 }
1158 true
1159 }
1160 hash_map::Entry::Vacant(_) => false,
1161 }
1162 }
1163
1164 pub fn get(&self, key: &K) -> Option<&V> {
1165 self.map.get(key).map(|(_, v)| v)
1166 }
1167
1168 pub fn contains_key(&self, key: &K) -> bool {
1169 self.map.contains_key(key)
1170 }
1171}
1172
1173#[cfg(test)]
1174impl ConsensusOutputQuarantine {
1175 fn output_queue_len_for_testing(&self) -> usize {
1176 self.output_queue.len()
1177 }
1178}
1179
1180#[cfg(test)]
1181mod tests {
1182 use super::*;
1183 use crate::authority::test_authority_builder::TestAuthorityBuilder;
1184 use sui_types::base_types::ExecutionDigests;
1185 use sui_types::gas::GasCostSummary;
1186
1187 fn make_output(height: u64, round: u64, drained: bool) -> ConsensusCommitOutput {
1188 let mut output = ConsensusCommitOutput::new(round);
1189 output.record_consensus_commit_stats(ExecutionIndicesWithStatsV2 {
1190 height,
1191 ..Default::default()
1192 });
1193 output.set_checkpoint_queue_drained(drained);
1194 output
1195 }
1196
1197 fn make_builder_summary(
1198 seq: CheckpointSequenceNumber,
1199 height: CheckpointHeight,
1200 protocol_config: &ProtocolConfig,
1201 ) -> (BuilderCheckpointSummary, CheckpointContents) {
1202 let contents =
1203 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
1204 let summary = CheckpointSummary::new(
1205 protocol_config,
1206 0,
1207 seq,
1208 0,
1209 &contents,
1210 None,
1211 GasCostSummary::default(),
1212 None,
1213 0,
1214 vec![],
1215 vec![],
1216 );
1217 let builder_summary = BuilderCheckpointSummary {
1218 summary,
1219 checkpoint_height: Some(height),
1220 position_in_commit: 0,
1221 };
1222 (builder_summary, contents)
1223 }
1224
1225 #[tokio::test]
1226 async fn test_drain_boundary_prevents_premature_commit() {
1227 let mut protocol_config =
1228 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1229 protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(true);
1230 let state = TestAuthorityBuilder::new()
1231 .with_protocol_config(protocol_config)
1232 .build()
1233 .await;
1234 let epoch_store = state.epoch_store_for_testing();
1235
1236 let metrics = epoch_store.metrics.clone();
1237 let mut quarantine = ConsensusOutputQuarantine::new(0, metrics);
1238
1239 let c = make_output(4, 1, false);
1241 quarantine.push_consensus_output(c, &epoch_store).unwrap();
1242
1243 let c2 = make_output(5, 2, true);
1245 quarantine.push_consensus_output(c2, &epoch_store).unwrap();
1246
1247 assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1248
1249 let pc = epoch_store.protocol_config();
1251 for seq in 1..=4 {
1252 let (summary, contents) = make_builder_summary(seq, seq, pc);
1253 quarantine.insert_builder_summary(seq, summary, contents);
1254 }
1255
1256 let mut batch = epoch_store.db_batch_for_test();
1258 quarantine
1259 .update_highest_executed_checkpoint(4, &epoch_store, &mut batch)
1260 .unwrap();
1261 batch.write().unwrap();
1262
1263 assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1267 }
1268
1269 #[tokio::test]
1270 async fn test_drain_boundary_commits_at_safe_point() {
1271 let mut protocol_config =
1272 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1273 protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(true);
1274 let state = TestAuthorityBuilder::new()
1275 .with_protocol_config(protocol_config)
1276 .build()
1277 .await;
1278 let epoch_store = state.epoch_store_for_testing();
1279
1280 let metrics = epoch_store.metrics.clone();
1281 let mut quarantine = ConsensusOutputQuarantine::new(0, metrics);
1282
1283 let c = make_output(4, 1, false);
1284 quarantine.push_consensus_output(c, &epoch_store).unwrap();
1285
1286 let c2 = make_output(5, 2, true);
1287 quarantine.push_consensus_output(c2, &epoch_store).unwrap();
1288
1289 assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1290
1291 let pc = epoch_store.protocol_config();
1293 for seq in 1..=5 {
1294 let (summary, contents) = make_builder_summary(seq, seq, pc);
1295 quarantine.insert_builder_summary(seq, summary, contents);
1296 }
1297
1298 let mut batch = epoch_store.db_batch_for_test();
1300 quarantine
1301 .update_highest_executed_checkpoint(5, &epoch_store, &mut batch)
1302 .unwrap();
1303 batch.write().unwrap();
1304
1305 assert_eq!(quarantine.output_queue_len_for_testing(), 0);
1309 }
1310}