1use crate::authority::authority_per_epoch_store::{
5 AuthorityEpochTables, EncG, ExecutionIndicesWithStats, LockDetails, LockDetailsWrapper, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::epoch::randomness::SINGLETON_KEY;
10use dashmap::DashMap;
11use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
12use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
13use moka::policy::EvictionPolicy;
14use moka::sync::SegmentedCache as MokaCache;
15use mysten_common::fatal;
16use mysten_common::random_util::randomize_cache_capacity_in_tests;
17use parking_lot::Mutex;
18use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
19use sui_types::authenticator_state::ActiveJwk;
20use sui_types::base_types::{AuthorityName, ObjectRef, SequenceNumber};
21use sui_types::crypto::RandomnessRound;
22use sui_types::error::SuiResult;
23use sui_types::executable_transaction::{
24 TrustedExecutableTransactionWithAliases, VerifiedExecutableTransactionWithAliases,
25};
26use sui_types::execution::ExecutionTimeObservationKey;
27use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
28use sui_types::messages_consensus::AuthorityIndex;
29use sui_types::{
30 base_types::{ConsensusObjectSequenceKey, ObjectID},
31 digests::TransactionDigest,
32 messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
33 signature::GenericSignature,
34};
35use tracing::{debug, info};
36use typed_store::Map;
37use typed_store::rocks::DBBatch;
38
39use crate::{
40 authority::{
41 authority_per_epoch_store::AuthorityPerEpochStore,
42 shared_object_congestion_tracker::CongestionPerObjectDebt,
43 },
44 checkpoints::{CheckpointHeight, PendingCheckpoint, PendingCheckpointV2},
45 consensus_handler::SequencedConsensusTransactionKey,
46 epoch::{
47 randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
48 reconfiguration::ReconfigState,
49 },
50};
51
52use super::*;
53
54#[derive(Default)]
55#[allow(clippy::type_complexity)]
56pub(crate) struct ConsensusCommitOutput {
57 consensus_round: Round,
59 consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
60 end_of_publish: BTreeSet<AuthorityName>,
61 reconfig_state: Option<ReconfigState>,
62 consensus_commit_stats: Option<ExecutionIndicesWithStats>,
63
64 next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
66
67 deferred_txns: Vec<(DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>)>,
68 deleted_deferred_txns: BTreeSet<DeferralKey>,
69
70 pending_checkpoints: Vec<PendingCheckpoint>,
72 pending_checkpoints_v2: Vec<PendingCheckpointV2>,
73
74 next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
76
77 dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
78 dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
79 dkg_used_message: Option<VersionedUsedProcessedMessages>,
80 dkg_output: Option<dkg_v1::Output<PkG, EncG>>,
81
82 pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
84 active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
85
86 congestion_control_object_debts: Vec<(ObjectID, u64)>,
88 congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
89 execution_time_observations: Vec<(
90 AuthorityIndex,
91 u64, Vec<(ExecutionTimeObservationKey, Duration)>,
93 )>,
94
95 owned_object_locks: HashMap<ObjectRef, LockDetails>,
97}
98
99impl ConsensusCommitOutput {
100 pub fn new(consensus_round: Round) -> Self {
101 Self {
102 consensus_round,
103 ..Default::default()
104 }
105 }
106
107 pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
108 self.deleted_deferred_txns.iter().cloned()
109 }
110
111 pub fn has_deferred_transactions(&self) -> bool {
112 !self.deferred_txns.is_empty()
113 }
114
115 fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
116 self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
117 }
118
119 fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
120 self.pending_checkpoints.last().map(|cp| cp.height())
121 }
122
123 fn get_pending_checkpoints(
124 &self,
125 last: Option<CheckpointHeight>,
126 ) -> impl Iterator<Item = &PendingCheckpoint> {
127 self.pending_checkpoints.iter().filter(move |cp| {
128 if let Some(last) = last {
129 cp.height() > last
130 } else {
131 true
132 }
133 })
134 }
135
136 fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
137 self.pending_checkpoints
138 .iter()
139 .any(|cp| cp.height() == *index)
140 }
141
142 fn get_pending_checkpoints_v2(
143 &self,
144 last: Option<CheckpointHeight>,
145 ) -> impl Iterator<Item = &PendingCheckpointV2> {
146 self.pending_checkpoints_v2.iter().filter(move |cp| {
147 if let Some(last) = last {
148 cp.height() > last
149 } else {
150 true
151 }
152 })
153 }
154
155 fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
156 self.pending_checkpoints_v2
157 .iter()
158 .any(|cp| cp.height() == *index)
159 }
160
161 fn get_round(&self) -> Option<u64> {
162 self.consensus_commit_stats
163 .as_ref()
164 .map(|stats| stats.index.last_committed_round)
165 }
166
167 pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
168 self.end_of_publish.insert(authority);
169 }
170
171 pub fn insert_execution_time_observation(
172 &mut self,
173 source: AuthorityIndex,
174 generation: u64,
175 estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
176 ) {
177 self.execution_time_observations
178 .push((source, generation, estimates));
179 }
180
181 pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStats) {
182 self.consensus_commit_stats = Some(stats);
183 }
184
185 pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
187 self.record_consensus_commit_stats(Default::default());
188 }
189
190 pub fn store_reconfig_state(&mut self, state: ReconfigState) {
191 self.reconfig_state = Some(state);
192 }
193
194 pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
195 self.consensus_messages_processed.insert(key);
196 }
197
198 pub fn get_consensus_messages_processed(
199 &self,
200 ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
201 self.consensus_messages_processed.iter()
202 }
203
204 pub fn set_next_shared_object_versions(
205 &mut self,
206 next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
207 ) {
208 assert!(self.next_shared_object_versions.is_none());
209 self.next_shared_object_versions = Some(next_versions);
210 }
211
212 pub fn defer_transactions(
213 &mut self,
214 key: DeferralKey,
215 transactions: Vec<VerifiedExecutableTransactionWithAliases>,
216 ) {
217 self.deferred_txns.push((key, transactions));
218 }
219
220 pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
221 self.deleted_deferred_txns
222 .extend(deferral_keys.iter().cloned());
223 }
224
225 pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
226 self.pending_checkpoints.push(checkpoint);
227 }
228
229 pub fn insert_pending_checkpoint_v2(&mut self, checkpoint: PendingCheckpointV2) {
230 self.pending_checkpoints_v2.push(checkpoint);
231 }
232
233 pub fn reserve_next_randomness_round(
234 &mut self,
235 next_randomness_round: RandomnessRound,
236 commit_timestamp: TimestampMs,
237 ) {
238 assert!(self.next_randomness_round.is_none());
239 self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
240 }
241
242 pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
243 self.dkg_confirmations.insert(conf.sender(), conf);
244 }
245
246 pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
247 self.dkg_processed_messages
248 .insert(message.sender(), message);
249 }
250
251 pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
252 self.dkg_used_message = Some(used_messages);
253 }
254
255 pub fn set_dkg_output(&mut self, output: dkg_v1::Output<PkG, EncG>) {
256 self.dkg_output = Some(output);
257 }
258
259 pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
260 self.pending_jwks.insert((authority, id, jwk));
261 }
262
263 pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
264 self.active_jwks.insert((round, key));
265 }
266
267 pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
268 self.congestion_control_object_debts = object_debts;
269 }
270
271 pub fn set_congestion_control_randomness_object_debts(
272 &mut self,
273 object_debts: Vec<(ObjectID, u64)>,
274 ) {
275 self.congestion_control_randomness_object_debts = object_debts;
276 }
277
278 pub fn set_owned_object_locks(&mut self, locks: HashMap<ObjectRef, LockDetails>) {
279 assert!(self.owned_object_locks.is_empty());
280 self.owned_object_locks = locks;
281 }
282
283 pub fn write_to_batch(
284 self,
285 epoch_store: &AuthorityPerEpochStore,
286 batch: &mut DBBatch,
287 ) -> SuiResult {
288 let tables = epoch_store.tables()?;
289 batch.insert_batch(
290 &tables.consensus_message_processed,
291 self.consensus_messages_processed
292 .iter()
293 .map(|key| (key, true)),
294 )?;
295
296 batch.insert_batch(
297 &tables.end_of_publish,
298 self.end_of_publish.iter().map(|authority| (authority, ())),
299 )?;
300
301 if let Some(reconfig_state) = &self.reconfig_state {
302 batch.insert_batch(
303 &tables.reconfig_state,
304 [(RECONFIG_STATE_INDEX, reconfig_state)],
305 )?;
306 }
307
308 let consensus_commit_stats = self
309 .consensus_commit_stats
310 .expect("consensus_commit_stats must be set");
311 let round = consensus_commit_stats.index.last_committed_round;
312
313 batch.insert_batch(
314 &tables.last_consensus_stats,
315 [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
316 )?;
317
318 if let Some(next_versions) = self.next_shared_object_versions {
319 batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
320 }
321
322 if !self.owned_object_locks.is_empty() {
323 batch.insert_batch(
324 &tables.owned_object_locked_transactions,
325 self.owned_object_locks
326 .into_iter()
327 .map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))),
328 )?;
329 }
330
331 batch.delete_batch(
332 &tables.deferred_transactions_v2,
333 &self.deleted_deferred_txns,
334 )?;
335 batch.delete_batch(
336 &tables.deferred_transactions_with_aliases_v2,
337 &self.deleted_deferred_txns,
338 )?;
339 batch.delete_batch(
340 &tables.deferred_transactions_with_aliases_v3,
341 &self.deleted_deferred_txns,
342 )?;
343
344 batch.insert_batch(
345 &tables.deferred_transactions_with_aliases_v3,
346 self.deferred_txns.into_iter().map(|(key, txs)| {
347 (
348 key,
349 txs.into_iter()
350 .map(|tx| {
351 let tx: TrustedExecutableTransactionWithAliases = tx.serializable();
352 tx
353 })
354 .collect::<Vec<_>>(),
355 )
356 }),
357 )?;
358
359 if let Some((round, commit_timestamp)) = self.next_randomness_round {
360 batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
361 batch.insert_batch(
362 &tables.randomness_last_round_timestamp,
363 [(SINGLETON_KEY, commit_timestamp)],
364 )?;
365 }
366
367 batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
368 batch.insert_batch(
369 &tables.dkg_processed_messages_v2,
370 self.dkg_processed_messages,
371 )?;
372 batch.insert_batch(
373 &tables.dkg_used_messages_v2,
374 self.dkg_used_message
376 .into_iter()
377 .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
378 )?;
379 if let Some(output) = self.dkg_output {
380 batch.insert_batch(&tables.dkg_output, [(SINGLETON_KEY, output)])?;
381 }
382
383 batch.insert_batch(
384 &tables.pending_jwks,
385 self.pending_jwks.into_iter().map(|j| (j, ())),
386 )?;
387 batch.insert_batch(
388 &tables.active_jwks,
389 self.active_jwks.into_iter().map(|j| {
390 assert_eq!(j.0, round);
392 (j, ())
393 }),
394 )?;
395
396 batch.insert_batch(
397 &tables.congestion_control_object_debts,
398 self.congestion_control_object_debts
399 .into_iter()
400 .map(|(object_id, debt)| {
401 (
402 object_id,
403 CongestionPerObjectDebt::new(self.consensus_round, debt),
404 )
405 }),
406 )?;
407 batch.insert_batch(
408 &tables.congestion_control_randomness_object_debts,
409 self.congestion_control_randomness_object_debts
410 .into_iter()
411 .map(|(object_id, debt)| {
412 (
413 object_id,
414 CongestionPerObjectDebt::new(self.consensus_round, debt),
415 )
416 }),
417 )?;
418
419 batch.insert_batch(
420 &tables.execution_time_observations,
421 self.execution_time_observations
422 .into_iter()
423 .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
424 )?;
425
426 Ok(())
427 }
428}
429
430pub(crate) struct ConsensusOutputCache {
435 pub(crate) deferred_transactions:
438 Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>>,
439
440 #[allow(clippy::type_complexity)]
443 pub(crate) user_signatures_for_checkpoints:
444 Mutex<HashMap<TransactionDigest, Vec<(GenericSignature, Option<SequenceNumber>)>>>,
445
446 executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
447 executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
448}
449
450impl ConsensusOutputCache {
451 pub(crate) fn new(tables: &AuthorityEpochTables) -> Self {
452 let deferred_transactions = tables
453 .get_all_deferred_transactions_v2()
454 .expect("load deferred transactions cannot fail");
455
456 let executed_in_epoch_cache_capacity = 50_000;
457
458 Self {
459 deferred_transactions: Mutex::new(deferred_transactions),
460 user_signatures_for_checkpoints: Default::default(),
461 executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
462 executed_in_epoch_cache: MokaCache::builder(8)
463 .max_capacity(randomize_cache_capacity_in_tests(
465 executed_in_epoch_cache_capacity,
466 ))
467 .eviction_policy(EvictionPolicy::lru())
468 .build(),
469 }
470 }
471
472 pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
473 self.executed_in_epoch
474 .read()
475 .contains_key(digest) ||
476 self.executed_in_epoch_cache.get(digest).is_some()
478 }
479
480 pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
482 assert!(
483 self.executed_in_epoch
484 .read()
485 .insert(tx_digest, ())
486 .is_none(),
487 "transaction already executed"
488 );
489 self.executed_in_epoch_cache.insert(tx_digest, ());
490 }
491
492 pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
496 let executed_in_epoch = self.executed_in_epoch.read();
497 for tx_digest in tx_digests {
498 executed_in_epoch.remove(tx_digest);
499 }
500 }
501}
502
503pub(crate) struct ConsensusOutputQuarantine {
506 output_queue: VecDeque<ConsensusCommitOutput>,
508
509 highest_executed_checkpoint: CheckpointSequenceNumber,
511
512 builder_checkpoint_summary:
514 BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
515
516 builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
517
518 shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
520
521 congestion_control_randomness_object_debts:
524 RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
525 congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
526
527 processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
528
529 owned_object_locks: HashMap<ObjectRef, LockDetails>,
531
532 metrics: Arc<EpochMetrics>,
533}
534
535impl ConsensusOutputQuarantine {
536 pub(super) fn new(
537 highest_executed_checkpoint: CheckpointSequenceNumber,
538 authority_metrics: Arc<EpochMetrics>,
539 ) -> Self {
540 Self {
541 highest_executed_checkpoint,
542
543 output_queue: VecDeque::new(),
544 builder_checkpoint_summary: BTreeMap::new(),
545 builder_digest_to_checkpoint: HashMap::new(),
546 shared_object_next_versions: RefCountedHashMap::new(),
547 processed_consensus_messages: RefCountedHashMap::new(),
548 congestion_control_randomness_object_debts: RefCountedHashMap::new(),
549 congestion_control_object_debts: RefCountedHashMap::new(),
550 owned_object_locks: HashMap::new(),
551 metrics: authority_metrics,
552 }
553 }
554}
555
556impl ConsensusOutputQuarantine {
559 pub(crate) fn push_consensus_output(
561 &mut self,
562 output: ConsensusCommitOutput,
563 epoch_store: &AuthorityPerEpochStore,
564 ) -> SuiResult {
565 self.insert_shared_object_next_versions(&output);
566 self.insert_congestion_control_debts(&output);
567 self.insert_processed_consensus_messages(&output);
568 self.insert_owned_object_locks(&output);
569 self.output_queue.push_back(output);
570
571 self.metrics
572 .consensus_quarantine_queue_size
573 .set(self.output_queue.len() as i64);
574
575 self.commit(epoch_store)
578 }
579
580 pub(super) fn insert_builder_summary(
582 &mut self,
583 sequence_number: CheckpointSequenceNumber,
584 summary: BuilderCheckpointSummary,
585 contents: CheckpointContents,
586 ) {
587 debug!(?sequence_number, "inserting builder summary {:?}", summary);
588 for tx in contents.iter() {
589 self.builder_digest_to_checkpoint
590 .insert(tx.transaction, sequence_number);
591 }
592 self.builder_checkpoint_summary
593 .insert(sequence_number, (summary, contents));
594 }
595}
596
597impl ConsensusOutputQuarantine {
599 pub(super) fn update_highest_executed_checkpoint(
602 &mut self,
603 checkpoint: CheckpointSequenceNumber,
604 epoch_store: &AuthorityPerEpochStore,
605 batch: &mut DBBatch,
606 ) -> SuiResult {
607 self.highest_executed_checkpoint = checkpoint;
608 self.commit_with_batch(epoch_store, batch)
609 }
610
611 pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
612 let mut batch = epoch_store.db_batch()?;
613 self.commit_with_batch(epoch_store, &mut batch)?;
614 batch.write()?;
615 Ok(())
616 }
617
618 fn commit_with_batch(
620 &mut self,
621 epoch_store: &AuthorityPerEpochStore,
622 batch: &mut DBBatch,
623 ) -> SuiResult {
624 let tables = epoch_store.tables()?;
631
632 let mut highest_committed_height = None;
633
634 while self
635 .builder_checkpoint_summary
636 .first_key_value()
637 .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
638 == Some(true)
639 {
640 let (seq, (builder_summary, contents)) =
641 self.builder_checkpoint_summary.pop_first().unwrap();
642
643 for tx in contents.iter() {
644 let digest = &tx.transaction;
645 assert_eq!(
646 self.builder_digest_to_checkpoint
647 .remove(digest)
648 .unwrap_or_else(|| {
649 panic!(
650 "transaction {:?} not found in builder_digest_to_checkpoint",
651 digest
652 )
653 }),
654 seq
655 );
656 }
657
658 batch.insert_batch(
659 &tables.builder_digest_to_checkpoint,
660 contents.iter().map(|tx| (tx.transaction, seq)),
661 )?;
662
663 batch.insert_batch(
664 &tables.builder_checkpoint_summary_v2,
665 [(seq, &builder_summary)],
666 )?;
667
668 let checkpoint_height = builder_summary
669 .checkpoint_height
670 .expect("non-genesis checkpoint must have height");
671 if let Some(highest) = highest_committed_height {
672 assert!(
673 checkpoint_height >= highest,
674 "current checkpoint height {} must be no less than highest committed height {}",
675 checkpoint_height,
676 highest
677 );
678 }
679
680 highest_committed_height = Some(checkpoint_height);
681 }
682
683 let Some(highest_committed_height) = highest_committed_height else {
684 return Ok(());
685 };
686
687 let split_checkpoints_in_consensus_handler = epoch_store
688 .protocol_config()
689 .split_checkpoints_in_consensus_handler();
690
691 while !self.output_queue.is_empty() {
692 let output = self.output_queue.front().unwrap();
696 let highest_in_commit = if split_checkpoints_in_consensus_handler {
697 output
700 .consensus_commit_stats
701 .as_ref()
702 .expect("consensus_commit_stats must be set")
703 .height
704 } else {
705 let Some(h) = output.get_highest_pending_checkpoint_height() else {
707 break;
710 };
711 h
712 };
713
714 if highest_in_commit <= highest_committed_height {
715 info!(
716 "committing output with highest pending checkpoint height {:?}",
717 highest_in_commit
718 );
719 let output = self.output_queue.pop_front().unwrap();
720 self.remove_shared_object_next_versions(&output);
721 self.remove_processed_consensus_messages(&output);
722 self.remove_congestion_control_debts(&output);
723 self.remove_owned_object_locks(&output);
724
725 output.write_to_batch(epoch_store, batch)?;
726 } else {
727 break;
728 }
729 }
730
731 self.metrics
732 .consensus_quarantine_queue_size
733 .set(self.output_queue.len() as i64);
734
735 Ok(())
736 }
737}
738
739impl ConsensusOutputQuarantine {
740 fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
741 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
742 for (object_id, next_version) in next_versions {
743 self.shared_object_next_versions
744 .insert(*object_id, *next_version);
745 }
746 }
747 }
748
749 fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
750 let current_round = output.consensus_round;
751
752 for (object_id, debt) in output.congestion_control_object_debts.iter() {
753 self.congestion_control_object_debts.insert(
754 *object_id,
755 CongestionPerObjectDebt::new(current_round, *debt),
756 );
757 }
758
759 for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
760 self.congestion_control_randomness_object_debts.insert(
761 *object_id,
762 CongestionPerObjectDebt::new(current_round, *debt),
763 );
764 }
765 }
766
767 fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
768 for (object_id, _) in output.congestion_control_object_debts.iter() {
769 self.congestion_control_object_debts.remove(object_id);
770 }
771 for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
772 self.congestion_control_randomness_object_debts
773 .remove(object_id);
774 }
775 }
776
777 fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
778 for tx_key in output.consensus_messages_processed.iter() {
779 self.processed_consensus_messages.insert(tx_key.clone(), ());
780 }
781 }
782
783 fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
784 for tx_key in output.consensus_messages_processed.iter() {
785 self.processed_consensus_messages.remove(tx_key);
786 }
787 }
788
789 fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
790 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
791 for object_id in next_versions.keys() {
792 if !self.shared_object_next_versions.remove(object_id) {
793 fatal!(
794 "Shared object next version not found in quarantine: {:?}",
795 object_id
796 );
797 }
798 }
799 }
800 }
801
802 fn insert_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
803 for (obj_ref, lock) in &output.owned_object_locks {
804 self.owned_object_locks.insert(*obj_ref, *lock);
805 }
806 }
807
808 fn remove_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
809 for obj_ref in output.owned_object_locks.keys() {
810 self.owned_object_locks.remove(obj_ref);
811 }
812 }
813}
814
815impl ConsensusOutputQuarantine {
818 pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
819 self.builder_checkpoint_summary
820 .values()
821 .last()
822 .map(|(summary, _)| summary)
823 }
824
825 pub(super) fn get_built_summary(
826 &self,
827 sequence: CheckpointSequenceNumber,
828 ) -> Option<&BuilderCheckpointSummary> {
829 self.builder_checkpoint_summary
830 .get(&sequence)
831 .map(|(summary, _)| summary)
832 }
833
834 pub(super) fn included_transaction_in_checkpoint(&self, digest: &TransactionDigest) -> bool {
835 self.builder_digest_to_checkpoint.contains_key(digest)
836 }
837
838 pub(super) fn is_consensus_message_processed(
839 &self,
840 key: &SequencedConsensusTransactionKey,
841 ) -> bool {
842 self.processed_consensus_messages.contains_key(key)
843 }
844
845 pub(super) fn is_empty(&self) -> bool {
846 self.output_queue.is_empty()
847 }
848
849 pub(super) fn get_next_shared_object_versions(
850 &self,
851 tables: &AuthorityEpochTables,
852 objects_to_init: &[ConsensusObjectSequenceKey],
853 ) -> SuiResult<Vec<Option<SequenceNumber>>> {
854 Ok(do_fallback_lookup(
855 objects_to_init,
856 |object_key| {
857 if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
858 CacheResult::Hit(Some(*next_version))
859 } else {
860 CacheResult::Miss
861 }
862 },
863 |object_keys| {
864 tables
865 .next_shared_object_versions_v2
866 .multi_get(object_keys)
867 .expect("db error")
868 },
869 ))
870 }
871
872 pub(super) fn get_owned_object_locks(
876 &self,
877 tables: &AuthorityEpochTables,
878 obj_refs: &[ObjectRef],
879 ) -> SuiResult<Vec<Option<LockDetails>>> {
880 Ok(do_fallback_lookup(
881 obj_refs,
882 |obj_ref| {
883 if let Some(lock) = self.owned_object_locks.get(obj_ref) {
884 CacheResult::Hit(Some(*lock))
885 } else {
886 CacheResult::Miss
887 }
888 },
889 |obj_refs| {
890 tables
891 .multi_get_locked_transactions(obj_refs)
892 .expect("db error")
893 },
894 ))
895 }
896
897 pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
898 self.output_queue
899 .back()
900 .and_then(|output| output.get_highest_pending_checkpoint_height())
901 }
902
903 pub(super) fn get_pending_checkpoints(
904 &self,
905 last: Option<CheckpointHeight>,
906 ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
907 let mut checkpoints = Vec::new();
908 for output in &self.output_queue {
909 checkpoints.extend(
910 output
911 .get_pending_checkpoints(last)
912 .map(|cp| (cp.height(), cp.clone())),
913 );
914 }
915 if cfg!(debug_assertions) {
916 let mut prev = None;
917 for (height, _) in &checkpoints {
918 if let Some(prev) = prev {
919 assert!(prev < *height);
920 }
921 prev = Some(*height);
922 }
923 }
924 checkpoints
925 }
926
927 pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
928 self.output_queue
929 .iter()
930 .any(|output| output.pending_checkpoint_exists(index))
931 }
932
933 pub(super) fn get_pending_checkpoints_v2(
934 &self,
935 last: Option<CheckpointHeight>,
936 ) -> Vec<(CheckpointHeight, PendingCheckpointV2)> {
937 let mut checkpoints = Vec::new();
938 for output in &self.output_queue {
939 checkpoints.extend(
940 output
941 .get_pending_checkpoints_v2(last)
942 .map(|cp| (cp.height(), cp.clone())),
943 );
944 }
945 if cfg!(debug_assertions) {
946 let mut prev = None;
947 for (height, _) in &checkpoints {
948 if let Some(prev) = prev {
949 assert!(prev < *height);
950 }
951 prev = Some(*height);
952 }
953 }
954 checkpoints
955 }
956
957 pub(super) fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
958 self.output_queue
959 .iter()
960 .any(|output| output.pending_checkpoint_exists_v2(index))
961 }
962
963 pub(super) fn get_new_jwks(
964 &self,
965 epoch_store: &AuthorityPerEpochStore,
966 round: u64,
967 ) -> SuiResult<Vec<ActiveJwk>> {
968 let epoch = epoch_store.epoch();
969
970 for output in self.output_queue.iter().rev() {
972 let output_round = output.get_round().unwrap();
975 if round == output_round {
976 return Ok(output
977 .active_jwks
978 .iter()
979 .map(|(_, (jwk_id, jwk))| ActiveJwk {
980 jwk_id: jwk_id.clone(),
981 jwk: jwk.clone(),
982 epoch,
983 })
984 .collect());
985 }
986 }
987
988 let empty_jwk_id = JwkId::new(String::new(), String::new());
990 let empty_jwk = JWK {
991 kty: String::new(),
992 e: String::new(),
993 n: String::new(),
994 alg: String::new(),
995 };
996
997 let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
998 let end = (round + 1, (empty_jwk_id, empty_jwk));
999
1000 Ok(epoch_store
1001 .tables()?
1002 .active_jwks
1003 .safe_iter_with_bounds(Some(start), Some(end))
1004 .map_ok(|((r, (jwk_id, jwk)), _)| {
1005 debug_assert!(round == r);
1006 ActiveJwk { jwk_id, jwk, epoch }
1007 })
1008 .collect::<Result<Vec<_>, _>>()?)
1009 }
1010
1011 pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
1012 self.output_queue
1013 .iter()
1014 .rev()
1015 .filter_map(|output| output.get_randomness_last_round_timestamp())
1016 .next()
1017 }
1018
1019 pub(crate) fn load_initial_object_debts(
1020 &self,
1021 epoch_store: &AuthorityPerEpochStore,
1022 current_round: Round,
1023 for_randomness: bool,
1024 transactions: &[VerifiedExecutableTransactionWithAliases],
1025 ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
1026 let protocol_config = epoch_store.protocol_config();
1027 let tables = epoch_store.tables()?;
1028 let default_per_commit_budget = protocol_config
1029 .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
1030 .unwrap_or(0);
1031 let (hash_table, db_table, per_commit_budget) = if for_randomness {
1032 (
1033 &self.congestion_control_randomness_object_debts,
1034 &tables.congestion_control_randomness_object_debts,
1035 protocol_config
1036 .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
1037 .unwrap_or(default_per_commit_budget),
1038 )
1039 } else {
1040 (
1041 &self.congestion_control_object_debts,
1042 &tables.congestion_control_object_debts,
1043 default_per_commit_budget,
1044 )
1045 };
1046 let mut shared_input_object_ids: Vec<_> = transactions
1047 .iter()
1048 .flat_map(|tx| tx.tx().shared_input_objects().map(|obj| obj.id))
1049 .collect();
1050 shared_input_object_ids.sort();
1051 shared_input_object_ids.dedup();
1052
1053 let results = do_fallback_lookup(
1054 &shared_input_object_ids,
1055 |object_id| {
1056 if let Some(debt) = hash_table.get(object_id) {
1057 CacheResult::Hit(Some(debt.into_v1()))
1058 } else {
1059 CacheResult::Miss
1060 }
1061 },
1062 |object_ids| {
1063 db_table
1064 .multi_get(object_ids)
1065 .expect("db error")
1066 .into_iter()
1067 .map(|debt| debt.map(|debt| debt.into_v1()))
1068 .collect()
1069 },
1070 );
1071
1072 Ok(results
1073 .into_iter()
1074 .zip(shared_input_object_ids)
1075 .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1076 .map(move |((round, debt), object_id)| {
1077 assert!(current_round > round);
1081 let num_rounds = current_round - round - 1;
1082 let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1083 (object_id, debt)
1084 }))
1085 }
1086}
1087
1088#[derive(Debug, Default)]
1097struct RefCountedHashMap<K, V> {
1098 map: HashMap<K, (usize, V)>,
1099}
1100
1101impl<K, V> RefCountedHashMap<K, V>
1102where
1103 K: Clone + Eq + std::hash::Hash,
1104{
1105 pub fn new() -> Self {
1106 Self {
1107 map: HashMap::new(),
1108 }
1109 }
1110
1111 pub fn insert(&mut self, key: K, value: V) {
1112 let entry = self.map.entry(key);
1113 match entry {
1114 hash_map::Entry::Occupied(mut entry) => {
1115 let (ref_count, v) = entry.get_mut();
1116 *ref_count += 1;
1117 *v = value;
1118 }
1119 hash_map::Entry::Vacant(entry) => {
1120 entry.insert((1, value));
1121 }
1122 }
1123 }
1124
1125 pub fn remove(&mut self, key: &K) -> bool {
1128 let entry = self.map.entry(key.clone());
1129 match entry {
1130 hash_map::Entry::Occupied(mut entry) => {
1131 let (ref_count, _) = entry.get_mut();
1132 *ref_count -= 1;
1133 if *ref_count == 0 {
1134 entry.remove();
1135 }
1136 true
1137 }
1138 hash_map::Entry::Vacant(_) => false,
1139 }
1140 }
1141
1142 pub fn get(&self, key: &K) -> Option<&V> {
1143 self.map.get(key).map(|(_, v)| v)
1144 }
1145
1146 pub fn contains_key(&self, key: &K) -> bool {
1147 self.map.contains_key(key)
1148 }
1149}