1use crate::authority::authority_per_epoch_store::{
5 AuthorityEpochTables, EncG, ExecutionIndicesWithStats, LockDetails, LockDetailsWrapper, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::epoch::randomness::SINGLETON_KEY;
10use dashmap::DashMap;
11use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
12use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
13use moka::policy::EvictionPolicy;
14use moka::sync::SegmentedCache as MokaCache;
15use mysten_common::fatal;
16use mysten_common::random_util::randomize_cache_capacity_in_tests;
17use parking_lot::Mutex;
18use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
19use sui_types::authenticator_state::ActiveJwk;
20use sui_types::base_types::{AuthorityName, ObjectRef, SequenceNumber};
21use sui_types::crypto::RandomnessRound;
22use sui_types::error::SuiResult;
23use sui_types::executable_transaction::{
24 TrustedExecutableTransactionWithAliases, VerifiedExecutableTransactionWithAliases,
25};
26use sui_types::execution::ExecutionTimeObservationKey;
27use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
28use sui_types::messages_consensus::AuthorityIndex;
29use sui_types::{
30 base_types::{ConsensusObjectSequenceKey, ObjectID},
31 digests::TransactionDigest,
32 messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
33 signature::GenericSignature,
34};
35use tracing::{debug, info};
36use typed_store::Map;
37use typed_store::rocks::DBBatch;
38
39use crate::{
40 authority::{
41 authority_per_epoch_store::AuthorityPerEpochStore,
42 shared_object_congestion_tracker::CongestionPerObjectDebt,
43 },
44 checkpoints::{CheckpointHeight, PendingCheckpoint, PendingCheckpointV2},
45 consensus_handler::SequencedConsensusTransactionKey,
46 epoch::{
47 randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
48 reconfiguration::ReconfigState,
49 },
50};
51
52use super::*;
53
54#[derive(Default)]
55#[allow(clippy::type_complexity)]
56pub(crate) struct ConsensusCommitOutput {
57 consensus_round: Round,
59 consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
60 end_of_publish: BTreeSet<AuthorityName>,
61 reconfig_state: Option<ReconfigState>,
62 consensus_commit_stats: Option<ExecutionIndicesWithStats>,
63
64 next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
66
67 deferred_txns: Vec<(DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>)>,
68 deleted_deferred_txns: BTreeSet<DeferralKey>,
69
70 pending_checkpoints: Vec<PendingCheckpoint>,
72 pending_checkpoints_v2: Vec<PendingCheckpointV2>,
73
74 next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
76
77 dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
78 dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
79 dkg_used_message: Option<VersionedUsedProcessedMessages>,
80 dkg_output: Option<dkg_v1::Output<PkG, EncG>>,
81
82 pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
84 active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
85
86 congestion_control_object_debts: Vec<(ObjectID, u64)>,
88 congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
89 execution_time_observations: Vec<(
90 AuthorityIndex,
91 u64, Vec<(ExecutionTimeObservationKey, Duration)>,
93 )>,
94
95 owned_object_locks: HashMap<ObjectRef, LockDetails>,
97}
98
99impl ConsensusCommitOutput {
100 pub fn new(consensus_round: Round) -> Self {
101 Self {
102 consensus_round,
103 ..Default::default()
104 }
105 }
106
107 pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
108 self.deleted_deferred_txns.iter().cloned()
109 }
110
111 pub fn has_deferred_transactions(&self) -> bool {
112 !self.deferred_txns.is_empty()
113 }
114
115 fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
116 self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
117 }
118
119 fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
120 self.pending_checkpoints.last().map(|cp| cp.height())
121 }
122
123 fn get_pending_checkpoints(
124 &self,
125 last: Option<CheckpointHeight>,
126 ) -> impl Iterator<Item = &PendingCheckpoint> {
127 self.pending_checkpoints.iter().filter(move |cp| {
128 if let Some(last) = last {
129 cp.height() > last
130 } else {
131 true
132 }
133 })
134 }
135
136 fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
137 self.pending_checkpoints
138 .iter()
139 .any(|cp| cp.height() == *index)
140 }
141
142 fn get_pending_checkpoints_v2(
143 &self,
144 last: Option<CheckpointHeight>,
145 ) -> impl Iterator<Item = &PendingCheckpointV2> {
146 self.pending_checkpoints_v2.iter().filter(move |cp| {
147 if let Some(last) = last {
148 cp.height() > last
149 } else {
150 true
151 }
152 })
153 }
154
155 fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
156 self.pending_checkpoints_v2
157 .iter()
158 .any(|cp| cp.height() == *index)
159 }
160
161 fn get_round(&self) -> Option<u64> {
162 self.consensus_commit_stats
163 .as_ref()
164 .map(|stats| stats.index.last_committed_round)
165 }
166
167 pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
168 self.end_of_publish.insert(authority);
169 }
170
171 pub fn insert_execution_time_observation(
172 &mut self,
173 source: AuthorityIndex,
174 generation: u64,
175 estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
176 ) {
177 self.execution_time_observations
178 .push((source, generation, estimates));
179 }
180
181 pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStats) {
182 self.consensus_commit_stats = Some(stats);
183 }
184
185 pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
187 self.record_consensus_commit_stats(Default::default());
188 }
189
190 pub fn store_reconfig_state(&mut self, state: ReconfigState) {
191 self.reconfig_state = Some(state);
192 }
193
194 pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
195 self.consensus_messages_processed.insert(key);
196 }
197
198 pub fn get_consensus_messages_processed(
199 &self,
200 ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
201 self.consensus_messages_processed.iter()
202 }
203
204 pub fn set_next_shared_object_versions(
205 &mut self,
206 next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
207 ) {
208 assert!(self.next_shared_object_versions.is_none());
209 self.next_shared_object_versions = Some(next_versions);
210 }
211
212 pub fn defer_transactions(
213 &mut self,
214 key: DeferralKey,
215 transactions: Vec<VerifiedExecutableTransactionWithAliases>,
216 ) {
217 self.deferred_txns.push((key, transactions));
218 }
219
220 pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
221 self.deleted_deferred_txns
222 .extend(deferral_keys.iter().cloned());
223 }
224
225 pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
226 self.pending_checkpoints.push(checkpoint);
227 }
228
229 pub fn insert_pending_checkpoint_v2(&mut self, checkpoint: PendingCheckpointV2) {
230 self.pending_checkpoints_v2.push(checkpoint);
231 }
232
233 pub fn reserve_next_randomness_round(
234 &mut self,
235 next_randomness_round: RandomnessRound,
236 commit_timestamp: TimestampMs,
237 ) {
238 assert!(self.next_randomness_round.is_none());
239 self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
240 }
241
242 pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
243 self.dkg_confirmations.insert(conf.sender(), conf);
244 }
245
246 pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
247 self.dkg_processed_messages
248 .insert(message.sender(), message);
249 }
250
251 pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
252 self.dkg_used_message = Some(used_messages);
253 }
254
255 pub fn set_dkg_output(&mut self, output: dkg_v1::Output<PkG, EncG>) {
256 self.dkg_output = Some(output);
257 }
258
259 pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
260 self.pending_jwks.insert((authority, id, jwk));
261 }
262
263 pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
264 self.active_jwks.insert((round, key));
265 }
266
267 pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
268 self.congestion_control_object_debts = object_debts;
269 }
270
271 pub fn set_congestion_control_randomness_object_debts(
272 &mut self,
273 object_debts: Vec<(ObjectID, u64)>,
274 ) {
275 self.congestion_control_randomness_object_debts = object_debts;
276 }
277
278 pub fn set_owned_object_locks(&mut self, locks: HashMap<ObjectRef, LockDetails>) {
279 assert!(self.owned_object_locks.is_empty());
280 self.owned_object_locks = locks;
281 }
282
283 pub fn write_to_batch(
284 self,
285 epoch_store: &AuthorityPerEpochStore,
286 batch: &mut DBBatch,
287 ) -> SuiResult {
288 let tables = epoch_store.tables()?;
289 batch.insert_batch(
290 &tables.consensus_message_processed,
291 self.consensus_messages_processed
292 .iter()
293 .map(|key| (key, true)),
294 )?;
295
296 batch.insert_batch(
297 &tables.end_of_publish,
298 self.end_of_publish.iter().map(|authority| (authority, ())),
299 )?;
300
301 if let Some(reconfig_state) = &self.reconfig_state {
302 batch.insert_batch(
303 &tables.reconfig_state,
304 [(RECONFIG_STATE_INDEX, reconfig_state)],
305 )?;
306 }
307
308 let consensus_commit_stats = self
309 .consensus_commit_stats
310 .expect("consensus_commit_stats must be set");
311 let round = consensus_commit_stats.index.last_committed_round;
312
313 batch.insert_batch(
314 &tables.last_consensus_stats,
315 [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
316 )?;
317
318 if let Some(next_versions) = self.next_shared_object_versions {
319 batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
320 }
321
322 if !self.owned_object_locks.is_empty() {
323 batch.insert_batch(
324 &tables.owned_object_locked_transactions,
325 self.owned_object_locks
326 .into_iter()
327 .map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))),
328 )?;
329 }
330
331 batch.delete_batch(
332 &tables.deferred_transactions_v2,
333 &self.deleted_deferred_txns,
334 )?;
335 batch.delete_batch(
336 &tables.deferred_transactions_with_aliases_v2,
337 &self.deleted_deferred_txns,
338 )?;
339
340 batch.insert_batch(
341 &tables.deferred_transactions_with_aliases_v2,
342 self.deferred_txns.into_iter().map(|(key, txs)| {
343 (
344 key,
345 txs.into_iter()
346 .map(|tx| {
347 let tx: TrustedExecutableTransactionWithAliases = tx.serializable();
348 tx
349 })
350 .collect::<Vec<_>>(),
351 )
352 }),
353 )?;
354
355 if let Some((round, commit_timestamp)) = self.next_randomness_round {
356 batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
357 batch.insert_batch(
358 &tables.randomness_last_round_timestamp,
359 [(SINGLETON_KEY, commit_timestamp)],
360 )?;
361 }
362
363 batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
364 batch.insert_batch(
365 &tables.dkg_processed_messages_v2,
366 self.dkg_processed_messages,
367 )?;
368 batch.insert_batch(
369 &tables.dkg_used_messages_v2,
370 self.dkg_used_message
372 .into_iter()
373 .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
374 )?;
375 if let Some(output) = self.dkg_output {
376 batch.insert_batch(&tables.dkg_output, [(SINGLETON_KEY, output)])?;
377 }
378
379 batch.insert_batch(
380 &tables.pending_jwks,
381 self.pending_jwks.into_iter().map(|j| (j, ())),
382 )?;
383 batch.insert_batch(
384 &tables.active_jwks,
385 self.active_jwks.into_iter().map(|j| {
386 assert_eq!(j.0, round);
388 (j, ())
389 }),
390 )?;
391
392 batch.insert_batch(
393 &tables.congestion_control_object_debts,
394 self.congestion_control_object_debts
395 .into_iter()
396 .map(|(object_id, debt)| {
397 (
398 object_id,
399 CongestionPerObjectDebt::new(self.consensus_round, debt),
400 )
401 }),
402 )?;
403 batch.insert_batch(
404 &tables.congestion_control_randomness_object_debts,
405 self.congestion_control_randomness_object_debts
406 .into_iter()
407 .map(|(object_id, debt)| {
408 (
409 object_id,
410 CongestionPerObjectDebt::new(self.consensus_round, debt),
411 )
412 }),
413 )?;
414
415 batch.insert_batch(
416 &tables.execution_time_observations,
417 self.execution_time_observations
418 .into_iter()
419 .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
420 )?;
421
422 Ok(())
423 }
424}
425
426pub(crate) struct ConsensusOutputCache {
431 pub(crate) deferred_transactions:
434 Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>>,
435
436 #[allow(clippy::type_complexity)]
439 pub(crate) user_signatures_for_checkpoints:
440 Mutex<HashMap<TransactionDigest, Vec<(GenericSignature, Option<SequenceNumber>)>>>,
441
442 executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
443 executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
444}
445
446impl ConsensusOutputCache {
447 pub(crate) fn new(tables: &AuthorityEpochTables) -> Self {
448 let deferred_transactions = tables
449 .get_all_deferred_transactions_v2()
450 .expect("load deferred transactions cannot fail");
451
452 let executed_in_epoch_cache_capacity = 50_000;
453
454 Self {
455 deferred_transactions: Mutex::new(deferred_transactions),
456 user_signatures_for_checkpoints: Default::default(),
457 executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
458 executed_in_epoch_cache: MokaCache::builder(8)
459 .max_capacity(randomize_cache_capacity_in_tests(
461 executed_in_epoch_cache_capacity,
462 ))
463 .eviction_policy(EvictionPolicy::lru())
464 .build(),
465 }
466 }
467
468 pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
469 self.executed_in_epoch
470 .read()
471 .contains_key(digest) ||
472 self.executed_in_epoch_cache.get(digest).is_some()
474 }
475
476 pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
478 assert!(
479 self.executed_in_epoch
480 .read()
481 .insert(tx_digest, ())
482 .is_none(),
483 "transaction already executed"
484 );
485 self.executed_in_epoch_cache.insert(tx_digest, ());
486 }
487
488 pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
492 let executed_in_epoch = self.executed_in_epoch.read();
493 for tx_digest in tx_digests {
494 executed_in_epoch.remove(tx_digest);
495 }
496 }
497}
498
499pub(crate) struct ConsensusOutputQuarantine {
502 output_queue: VecDeque<ConsensusCommitOutput>,
504
505 highest_executed_checkpoint: CheckpointSequenceNumber,
507
508 builder_checkpoint_summary:
510 BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
511
512 builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
513
514 shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
516
517 congestion_control_randomness_object_debts:
520 RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
521 congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
522
523 processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
524
525 owned_object_locks: HashMap<ObjectRef, LockDetails>,
527
528 metrics: Arc<EpochMetrics>,
529}
530
531impl ConsensusOutputQuarantine {
532 pub(super) fn new(
533 highest_executed_checkpoint: CheckpointSequenceNumber,
534 authority_metrics: Arc<EpochMetrics>,
535 ) -> Self {
536 Self {
537 highest_executed_checkpoint,
538
539 output_queue: VecDeque::new(),
540 builder_checkpoint_summary: BTreeMap::new(),
541 builder_digest_to_checkpoint: HashMap::new(),
542 shared_object_next_versions: RefCountedHashMap::new(),
543 processed_consensus_messages: RefCountedHashMap::new(),
544 congestion_control_randomness_object_debts: RefCountedHashMap::new(),
545 congestion_control_object_debts: RefCountedHashMap::new(),
546 owned_object_locks: HashMap::new(),
547 metrics: authority_metrics,
548 }
549 }
550}
551
552impl ConsensusOutputQuarantine {
555 pub(crate) fn push_consensus_output(
557 &mut self,
558 output: ConsensusCommitOutput,
559 epoch_store: &AuthorityPerEpochStore,
560 ) -> SuiResult {
561 self.insert_shared_object_next_versions(&output);
562 self.insert_congestion_control_debts(&output);
563 self.insert_processed_consensus_messages(&output);
564 self.insert_owned_object_locks(&output);
565 self.output_queue.push_back(output);
566
567 self.metrics
568 .consensus_quarantine_queue_size
569 .set(self.output_queue.len() as i64);
570
571 self.commit(epoch_store)
574 }
575
576 pub(super) fn insert_builder_summary(
578 &mut self,
579 sequence_number: CheckpointSequenceNumber,
580 summary: BuilderCheckpointSummary,
581 contents: CheckpointContents,
582 ) {
583 debug!(?sequence_number, "inserting builder summary {:?}", summary);
584 for tx in contents.iter() {
585 self.builder_digest_to_checkpoint
586 .insert(tx.transaction, sequence_number);
587 }
588 self.builder_checkpoint_summary
589 .insert(sequence_number, (summary, contents));
590 }
591}
592
593impl ConsensusOutputQuarantine {
595 pub(super) fn update_highest_executed_checkpoint(
598 &mut self,
599 checkpoint: CheckpointSequenceNumber,
600 epoch_store: &AuthorityPerEpochStore,
601 batch: &mut DBBatch,
602 ) -> SuiResult {
603 self.highest_executed_checkpoint = checkpoint;
604 self.commit_with_batch(epoch_store, batch)
605 }
606
607 pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
608 let mut batch = epoch_store.db_batch()?;
609 self.commit_with_batch(epoch_store, &mut batch)?;
610 batch.write()?;
611 Ok(())
612 }
613
614 fn commit_with_batch(
616 &mut self,
617 epoch_store: &AuthorityPerEpochStore,
618 batch: &mut DBBatch,
619 ) -> SuiResult {
620 let tables = epoch_store.tables()?;
627
628 let mut highest_committed_height = None;
629
630 while self
631 .builder_checkpoint_summary
632 .first_key_value()
633 .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
634 == Some(true)
635 {
636 let (seq, (builder_summary, contents)) =
637 self.builder_checkpoint_summary.pop_first().unwrap();
638
639 for tx in contents.iter() {
640 let digest = &tx.transaction;
641 assert_eq!(
642 self.builder_digest_to_checkpoint
643 .remove(digest)
644 .unwrap_or_else(|| {
645 panic!(
646 "transaction {:?} not found in builder_digest_to_checkpoint",
647 digest
648 )
649 }),
650 seq
651 );
652 }
653
654 batch.insert_batch(
655 &tables.builder_digest_to_checkpoint,
656 contents.iter().map(|tx| (tx.transaction, seq)),
657 )?;
658
659 batch.insert_batch(
660 &tables.builder_checkpoint_summary_v2,
661 [(seq, &builder_summary)],
662 )?;
663
664 let checkpoint_height = builder_summary
665 .checkpoint_height
666 .expect("non-genesis checkpoint must have height");
667 if let Some(highest) = highest_committed_height {
668 assert!(
669 checkpoint_height >= highest,
670 "current checkpoint height {} must be no less than highest committed height {}",
671 checkpoint_height,
672 highest
673 );
674 }
675
676 highest_committed_height = Some(checkpoint_height);
677 }
678
679 let Some(highest_committed_height) = highest_committed_height else {
680 return Ok(());
681 };
682
683 let split_checkpoints_in_consensus_handler = epoch_store
684 .protocol_config()
685 .split_checkpoints_in_consensus_handler();
686
687 while !self.output_queue.is_empty() {
688 let output = self.output_queue.front().unwrap();
692 let highest_in_commit = if split_checkpoints_in_consensus_handler {
693 output
696 .consensus_commit_stats
697 .as_ref()
698 .expect("consensus_commit_stats must be set")
699 .height
700 } else {
701 let Some(h) = output.get_highest_pending_checkpoint_height() else {
703 break;
706 };
707 h
708 };
709
710 if highest_in_commit <= highest_committed_height {
711 info!(
712 "committing output with highest pending checkpoint height {:?}",
713 highest_in_commit
714 );
715 let output = self.output_queue.pop_front().unwrap();
716 self.remove_shared_object_next_versions(&output);
717 self.remove_processed_consensus_messages(&output);
718 self.remove_congestion_control_debts(&output);
719 self.remove_owned_object_locks(&output);
720
721 output.write_to_batch(epoch_store, batch)?;
722 } else {
723 break;
724 }
725 }
726
727 self.metrics
728 .consensus_quarantine_queue_size
729 .set(self.output_queue.len() as i64);
730
731 Ok(())
732 }
733}
734
735impl ConsensusOutputQuarantine {
736 fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
737 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
738 for (object_id, next_version) in next_versions {
739 self.shared_object_next_versions
740 .insert(*object_id, *next_version);
741 }
742 }
743 }
744
745 fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
746 let current_round = output.consensus_round;
747
748 for (object_id, debt) in output.congestion_control_object_debts.iter() {
749 self.congestion_control_object_debts.insert(
750 *object_id,
751 CongestionPerObjectDebt::new(current_round, *debt),
752 );
753 }
754
755 for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
756 self.congestion_control_randomness_object_debts.insert(
757 *object_id,
758 CongestionPerObjectDebt::new(current_round, *debt),
759 );
760 }
761 }
762
763 fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
764 for (object_id, _) in output.congestion_control_object_debts.iter() {
765 self.congestion_control_object_debts.remove(object_id);
766 }
767 for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
768 self.congestion_control_randomness_object_debts
769 .remove(object_id);
770 }
771 }
772
773 fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
774 for tx_key in output.consensus_messages_processed.iter() {
775 self.processed_consensus_messages.insert(tx_key.clone(), ());
776 }
777 }
778
779 fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
780 for tx_key in output.consensus_messages_processed.iter() {
781 self.processed_consensus_messages.remove(tx_key);
782 }
783 }
784
785 fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
786 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
787 for object_id in next_versions.keys() {
788 if !self.shared_object_next_versions.remove(object_id) {
789 fatal!(
790 "Shared object next version not found in quarantine: {:?}",
791 object_id
792 );
793 }
794 }
795 }
796 }
797
798 fn insert_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
799 for (obj_ref, lock) in &output.owned_object_locks {
800 self.owned_object_locks.insert(*obj_ref, *lock);
801 }
802 }
803
804 fn remove_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
805 for obj_ref in output.owned_object_locks.keys() {
806 self.owned_object_locks.remove(obj_ref);
807 }
808 }
809}
810
811impl ConsensusOutputQuarantine {
814 pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
815 self.builder_checkpoint_summary
816 .values()
817 .last()
818 .map(|(summary, _)| summary)
819 }
820
821 pub(super) fn get_built_summary(
822 &self,
823 sequence: CheckpointSequenceNumber,
824 ) -> Option<&BuilderCheckpointSummary> {
825 self.builder_checkpoint_summary
826 .get(&sequence)
827 .map(|(summary, _)| summary)
828 }
829
830 pub(super) fn included_transaction_in_checkpoint(&self, digest: &TransactionDigest) -> bool {
831 self.builder_digest_to_checkpoint.contains_key(digest)
832 }
833
834 pub(super) fn is_consensus_message_processed(
835 &self,
836 key: &SequencedConsensusTransactionKey,
837 ) -> bool {
838 self.processed_consensus_messages.contains_key(key)
839 }
840
841 pub(super) fn is_empty(&self) -> bool {
842 self.output_queue.is_empty()
843 }
844
845 pub(super) fn get_next_shared_object_versions(
846 &self,
847 tables: &AuthorityEpochTables,
848 objects_to_init: &[ConsensusObjectSequenceKey],
849 ) -> SuiResult<Vec<Option<SequenceNumber>>> {
850 Ok(do_fallback_lookup(
851 objects_to_init,
852 |object_key| {
853 if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
854 CacheResult::Hit(Some(*next_version))
855 } else {
856 CacheResult::Miss
857 }
858 },
859 |object_keys| {
860 tables
861 .next_shared_object_versions_v2
862 .multi_get(object_keys)
863 .expect("db error")
864 },
865 ))
866 }
867
868 pub(super) fn get_owned_object_locks(
872 &self,
873 tables: &AuthorityEpochTables,
874 obj_refs: &[ObjectRef],
875 ) -> SuiResult<Vec<Option<LockDetails>>> {
876 Ok(do_fallback_lookup(
877 obj_refs,
878 |obj_ref| {
879 if let Some(lock) = self.owned_object_locks.get(obj_ref) {
880 CacheResult::Hit(Some(*lock))
881 } else {
882 CacheResult::Miss
883 }
884 },
885 |obj_refs| {
886 tables
887 .multi_get_locked_transactions(obj_refs)
888 .expect("db error")
889 },
890 ))
891 }
892
893 pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
894 self.output_queue
895 .back()
896 .and_then(|output| output.get_highest_pending_checkpoint_height())
897 }
898
899 pub(super) fn get_pending_checkpoints(
900 &self,
901 last: Option<CheckpointHeight>,
902 ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
903 let mut checkpoints = Vec::new();
904 for output in &self.output_queue {
905 checkpoints.extend(
906 output
907 .get_pending_checkpoints(last)
908 .map(|cp| (cp.height(), cp.clone())),
909 );
910 }
911 if cfg!(debug_assertions) {
912 let mut prev = None;
913 for (height, _) in &checkpoints {
914 if let Some(prev) = prev {
915 assert!(prev < *height);
916 }
917 prev = Some(*height);
918 }
919 }
920 checkpoints
921 }
922
923 pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
924 self.output_queue
925 .iter()
926 .any(|output| output.pending_checkpoint_exists(index))
927 }
928
929 pub(super) fn get_pending_checkpoints_v2(
930 &self,
931 last: Option<CheckpointHeight>,
932 ) -> Vec<(CheckpointHeight, PendingCheckpointV2)> {
933 let mut checkpoints = Vec::new();
934 for output in &self.output_queue {
935 checkpoints.extend(
936 output
937 .get_pending_checkpoints_v2(last)
938 .map(|cp| (cp.height(), cp.clone())),
939 );
940 }
941 if cfg!(debug_assertions) {
942 let mut prev = None;
943 for (height, _) in &checkpoints {
944 if let Some(prev) = prev {
945 assert!(prev < *height);
946 }
947 prev = Some(*height);
948 }
949 }
950 checkpoints
951 }
952
953 pub(super) fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
954 self.output_queue
955 .iter()
956 .any(|output| output.pending_checkpoint_exists_v2(index))
957 }
958
959 pub(super) fn get_new_jwks(
960 &self,
961 epoch_store: &AuthorityPerEpochStore,
962 round: u64,
963 ) -> SuiResult<Vec<ActiveJwk>> {
964 let epoch = epoch_store.epoch();
965
966 for output in self.output_queue.iter().rev() {
968 let output_round = output.get_round().unwrap();
971 if round == output_round {
972 return Ok(output
973 .active_jwks
974 .iter()
975 .map(|(_, (jwk_id, jwk))| ActiveJwk {
976 jwk_id: jwk_id.clone(),
977 jwk: jwk.clone(),
978 epoch,
979 })
980 .collect());
981 }
982 }
983
984 let empty_jwk_id = JwkId::new(String::new(), String::new());
986 let empty_jwk = JWK {
987 kty: String::new(),
988 e: String::new(),
989 n: String::new(),
990 alg: String::new(),
991 };
992
993 let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
994 let end = (round + 1, (empty_jwk_id, empty_jwk));
995
996 Ok(epoch_store
997 .tables()?
998 .active_jwks
999 .safe_iter_with_bounds(Some(start), Some(end))
1000 .map_ok(|((r, (jwk_id, jwk)), _)| {
1001 debug_assert!(round == r);
1002 ActiveJwk { jwk_id, jwk, epoch }
1003 })
1004 .collect::<Result<Vec<_>, _>>()?)
1005 }
1006
1007 pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
1008 self.output_queue
1009 .iter()
1010 .rev()
1011 .filter_map(|output| output.get_randomness_last_round_timestamp())
1012 .next()
1013 }
1014
1015 pub(crate) fn load_initial_object_debts(
1016 &self,
1017 epoch_store: &AuthorityPerEpochStore,
1018 current_round: Round,
1019 for_randomness: bool,
1020 transactions: &[VerifiedExecutableTransactionWithAliases],
1021 ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
1022 let protocol_config = epoch_store.protocol_config();
1023 let tables = epoch_store.tables()?;
1024 let default_per_commit_budget = protocol_config
1025 .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
1026 .unwrap_or(0);
1027 let (hash_table, db_table, per_commit_budget) = if for_randomness {
1028 (
1029 &self.congestion_control_randomness_object_debts,
1030 &tables.congestion_control_randomness_object_debts,
1031 protocol_config
1032 .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
1033 .unwrap_or(default_per_commit_budget),
1034 )
1035 } else {
1036 (
1037 &self.congestion_control_object_debts,
1038 &tables.congestion_control_object_debts,
1039 default_per_commit_budget,
1040 )
1041 };
1042 let mut shared_input_object_ids: Vec<_> = transactions
1043 .iter()
1044 .flat_map(|tx| tx.tx().shared_input_objects().map(|obj| obj.id))
1045 .collect();
1046 shared_input_object_ids.sort();
1047 shared_input_object_ids.dedup();
1048
1049 let results = do_fallback_lookup(
1050 &shared_input_object_ids,
1051 |object_id| {
1052 if let Some(debt) = hash_table.get(object_id) {
1053 CacheResult::Hit(Some(debt.into_v1()))
1054 } else {
1055 CacheResult::Miss
1056 }
1057 },
1058 |object_ids| {
1059 db_table
1060 .multi_get(object_ids)
1061 .expect("db error")
1062 .into_iter()
1063 .map(|debt| debt.map(|debt| debt.into_v1()))
1064 .collect()
1065 },
1066 );
1067
1068 Ok(results
1069 .into_iter()
1070 .zip(shared_input_object_ids)
1071 .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1072 .map(move |((round, debt), object_id)| {
1073 assert!(current_round > round);
1077 let num_rounds = current_round - round - 1;
1078 let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1079 (object_id, debt)
1080 }))
1081 }
1082}
1083
1084#[derive(Debug, Default)]
1093struct RefCountedHashMap<K, V> {
1094 map: HashMap<K, (usize, V)>,
1095}
1096
1097impl<K, V> RefCountedHashMap<K, V>
1098where
1099 K: Clone + Eq + std::hash::Hash,
1100{
1101 pub fn new() -> Self {
1102 Self {
1103 map: HashMap::new(),
1104 }
1105 }
1106
1107 pub fn insert(&mut self, key: K, value: V) {
1108 let entry = self.map.entry(key);
1109 match entry {
1110 hash_map::Entry::Occupied(mut entry) => {
1111 let (ref_count, v) = entry.get_mut();
1112 *ref_count += 1;
1113 *v = value;
1114 }
1115 hash_map::Entry::Vacant(entry) => {
1116 entry.insert((1, value));
1117 }
1118 }
1119 }
1120
1121 pub fn remove(&mut self, key: &K) -> bool {
1124 let entry = self.map.entry(key.clone());
1125 match entry {
1126 hash_map::Entry::Occupied(mut entry) => {
1127 let (ref_count, _) = entry.get_mut();
1128 *ref_count -= 1;
1129 if *ref_count == 0 {
1130 entry.remove();
1131 }
1132 true
1133 }
1134 hash_map::Entry::Vacant(_) => false,
1135 }
1136 }
1137
1138 pub fn get(&self, key: &K) -> Option<&V> {
1139 self.map.get(key).map(|(_, v)| v)
1140 }
1141
1142 pub fn contains_key(&self, key: &K) -> bool {
1143 self.map.contains_key(key)
1144 }
1145}