1use crate::authority::authority_per_epoch_store::{
5 AuthorityEpochTables, EncG, ExecutionIndicesWithStats, LockDetails, LockDetailsWrapper, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::epoch::randomness::SINGLETON_KEY;
10use dashmap::DashMap;
11use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
12use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
13use moka::policy::EvictionPolicy;
14use moka::sync::SegmentedCache as MokaCache;
15use mysten_common::fatal;
16use mysten_common::random_util::randomize_cache_capacity_in_tests;
17use parking_lot::Mutex;
18use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
19use sui_types::authenticator_state::ActiveJwk;
20use sui_types::base_types::{AuthorityName, ObjectRef, SequenceNumber};
21use sui_types::crypto::RandomnessRound;
22use sui_types::error::SuiResult;
23use sui_types::executable_transaction::{
24 TrustedExecutableTransactionWithAliases, VerifiedExecutableTransactionWithAliases,
25};
26use sui_types::execution::ExecutionTimeObservationKey;
27use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
28use sui_types::messages_consensus::AuthorityIndex;
29use sui_types::{
30 base_types::{ConsensusObjectSequenceKey, ObjectID},
31 digests::TransactionDigest,
32 messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
33 signature::GenericSignature,
34};
35use tracing::{debug, info};
36use typed_store::Map;
37use typed_store::rocks::DBBatch;
38
39use crate::{
40 authority::{
41 authority_per_epoch_store::AuthorityPerEpochStore,
42 shared_object_congestion_tracker::CongestionPerObjectDebt,
43 },
44 checkpoints::{CheckpointHeight, PendingCheckpoint},
45 consensus_handler::SequencedConsensusTransactionKey,
46 epoch::{
47 randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
48 reconfiguration::ReconfigState,
49 },
50};
51
52use super::*;
53
54#[derive(Default)]
55#[allow(clippy::type_complexity)]
56pub(crate) struct ConsensusCommitOutput {
57 consensus_round: Round,
59 consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
60 end_of_publish: BTreeSet<AuthorityName>,
61 reconfig_state: Option<ReconfigState>,
62 consensus_commit_stats: Option<ExecutionIndicesWithStats>,
63
64 next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
66
67 deferred_txns: Vec<(DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>)>,
68 deleted_deferred_txns: BTreeSet<DeferralKey>,
69
70 pending_checkpoints: Vec<PendingCheckpoint>,
72
73 next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
75
76 dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
77 dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
78 dkg_used_message: Option<VersionedUsedProcessedMessages>,
79 dkg_output: Option<dkg_v1::Output<PkG, EncG>>,
80
81 pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
83 active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
84
85 congestion_control_object_debts: Vec<(ObjectID, u64)>,
87 congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
88 execution_time_observations: Vec<(
89 AuthorityIndex,
90 u64, Vec<(ExecutionTimeObservationKey, Duration)>,
92 )>,
93
94 owned_object_locks: HashMap<ObjectRef, LockDetails>,
96}
97
98impl ConsensusCommitOutput {
99 pub fn new(consensus_round: Round) -> Self {
100 Self {
101 consensus_round,
102 ..Default::default()
103 }
104 }
105
106 pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
107 self.deleted_deferred_txns.iter().cloned()
108 }
109
110 pub fn has_deferred_transactions(&self) -> bool {
111 !self.deferred_txns.is_empty()
112 }
113
114 fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
115 self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
116 }
117
118 fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
119 self.pending_checkpoints.last().map(|cp| cp.height())
120 }
121
122 fn get_pending_checkpoints(
123 &self,
124 last: Option<CheckpointHeight>,
125 ) -> impl Iterator<Item = &PendingCheckpoint> {
126 self.pending_checkpoints.iter().filter(move |cp| {
127 if let Some(last) = last {
128 cp.height() > last
129 } else {
130 true
131 }
132 })
133 }
134
135 fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
136 self.pending_checkpoints
137 .iter()
138 .any(|cp| cp.height() == *index)
139 }
140
141 fn get_round(&self) -> Option<u64> {
142 self.consensus_commit_stats
143 .as_ref()
144 .map(|stats| stats.index.last_committed_round)
145 }
146
147 pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
148 self.end_of_publish.insert(authority);
149 }
150
151 pub fn insert_execution_time_observation(
152 &mut self,
153 source: AuthorityIndex,
154 generation: u64,
155 estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
156 ) {
157 self.execution_time_observations
158 .push((source, generation, estimates));
159 }
160
161 pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStats) {
162 self.consensus_commit_stats = Some(stats);
163 }
164
165 pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
167 self.record_consensus_commit_stats(Default::default());
168 }
169
170 pub fn store_reconfig_state(&mut self, state: ReconfigState) {
171 self.reconfig_state = Some(state);
172 }
173
174 pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
175 self.consensus_messages_processed.insert(key);
176 }
177
178 pub fn get_consensus_messages_processed(
179 &self,
180 ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
181 self.consensus_messages_processed.iter()
182 }
183
184 pub fn set_next_shared_object_versions(
185 &mut self,
186 next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
187 ) {
188 assert!(self.next_shared_object_versions.is_none());
189 self.next_shared_object_versions = Some(next_versions);
190 }
191
192 pub fn defer_transactions(
193 &mut self,
194 key: DeferralKey,
195 transactions: Vec<VerifiedExecutableTransactionWithAliases>,
196 ) {
197 self.deferred_txns.push((key, transactions));
198 }
199
200 pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
201 self.deleted_deferred_txns
202 .extend(deferral_keys.iter().cloned());
203 }
204
205 pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
206 self.pending_checkpoints.push(checkpoint);
207 }
208
209 pub fn reserve_next_randomness_round(
210 &mut self,
211 next_randomness_round: RandomnessRound,
212 commit_timestamp: TimestampMs,
213 ) {
214 assert!(self.next_randomness_round.is_none());
215 self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
216 }
217
218 pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
219 self.dkg_confirmations.insert(conf.sender(), conf);
220 }
221
222 pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
223 self.dkg_processed_messages
224 .insert(message.sender(), message);
225 }
226
227 pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
228 self.dkg_used_message = Some(used_messages);
229 }
230
231 pub fn set_dkg_output(&mut self, output: dkg_v1::Output<PkG, EncG>) {
232 self.dkg_output = Some(output);
233 }
234
235 pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
236 self.pending_jwks.insert((authority, id, jwk));
237 }
238
239 pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
240 self.active_jwks.insert((round, key));
241 }
242
243 pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
244 self.congestion_control_object_debts = object_debts;
245 }
246
247 pub fn set_congestion_control_randomness_object_debts(
248 &mut self,
249 object_debts: Vec<(ObjectID, u64)>,
250 ) {
251 self.congestion_control_randomness_object_debts = object_debts;
252 }
253
254 pub fn set_owned_object_locks(&mut self, locks: HashMap<ObjectRef, LockDetails>) {
255 assert!(self.owned_object_locks.is_empty());
256 self.owned_object_locks = locks;
257 }
258
259 pub fn write_to_batch(
260 self,
261 epoch_store: &AuthorityPerEpochStore,
262 batch: &mut DBBatch,
263 ) -> SuiResult {
264 let tables = epoch_store.tables()?;
265 batch.insert_batch(
266 &tables.consensus_message_processed,
267 self.consensus_messages_processed
268 .iter()
269 .map(|key| (key, true)),
270 )?;
271
272 batch.insert_batch(
273 &tables.end_of_publish,
274 self.end_of_publish.iter().map(|authority| (authority, ())),
275 )?;
276
277 if let Some(reconfig_state) = &self.reconfig_state {
278 batch.insert_batch(
279 &tables.reconfig_state,
280 [(RECONFIG_STATE_INDEX, reconfig_state)],
281 )?;
282 }
283
284 let consensus_commit_stats = self
285 .consensus_commit_stats
286 .expect("consensus_commit_stats must be set");
287 let round = consensus_commit_stats.index.last_committed_round;
288
289 batch.insert_batch(
290 &tables.last_consensus_stats,
291 [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
292 )?;
293
294 if let Some(next_versions) = self.next_shared_object_versions {
295 batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
296 }
297
298 if !self.owned_object_locks.is_empty() {
299 batch.insert_batch(
300 &tables.owned_object_locked_transactions,
301 self.owned_object_locks
302 .into_iter()
303 .map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))),
304 )?;
305 }
306
307 batch.delete_batch(
308 &tables.deferred_transactions_v2,
309 &self.deleted_deferred_txns,
310 )?;
311 batch.delete_batch(
312 &tables.deferred_transactions_with_aliases_v2,
313 &self.deleted_deferred_txns,
314 )?;
315
316 batch.insert_batch(
317 &tables.deferred_transactions_with_aliases_v2,
318 self.deferred_txns.into_iter().map(|(key, txs)| {
319 (
320 key,
321 txs.into_iter()
322 .map(|tx| {
323 let tx: TrustedExecutableTransactionWithAliases = tx.serializable();
324 tx
325 })
326 .collect::<Vec<_>>(),
327 )
328 }),
329 )?;
330
331 if let Some((round, commit_timestamp)) = self.next_randomness_round {
332 batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
333 batch.insert_batch(
334 &tables.randomness_last_round_timestamp,
335 [(SINGLETON_KEY, commit_timestamp)],
336 )?;
337 }
338
339 batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
340 batch.insert_batch(
341 &tables.dkg_processed_messages_v2,
342 self.dkg_processed_messages,
343 )?;
344 batch.insert_batch(
345 &tables.dkg_used_messages_v2,
346 self.dkg_used_message
348 .into_iter()
349 .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
350 )?;
351 if let Some(output) = self.dkg_output {
352 batch.insert_batch(&tables.dkg_output, [(SINGLETON_KEY, output)])?;
353 }
354
355 batch.insert_batch(
356 &tables.pending_jwks,
357 self.pending_jwks.into_iter().map(|j| (j, ())),
358 )?;
359 batch.insert_batch(
360 &tables.active_jwks,
361 self.active_jwks.into_iter().map(|j| {
362 assert_eq!(j.0, round);
364 (j, ())
365 }),
366 )?;
367
368 batch.insert_batch(
369 &tables.congestion_control_object_debts,
370 self.congestion_control_object_debts
371 .into_iter()
372 .map(|(object_id, debt)| {
373 (
374 object_id,
375 CongestionPerObjectDebt::new(self.consensus_round, debt),
376 )
377 }),
378 )?;
379 batch.insert_batch(
380 &tables.congestion_control_randomness_object_debts,
381 self.congestion_control_randomness_object_debts
382 .into_iter()
383 .map(|(object_id, debt)| {
384 (
385 object_id,
386 CongestionPerObjectDebt::new(self.consensus_round, debt),
387 )
388 }),
389 )?;
390
391 batch.insert_batch(
392 &tables.execution_time_observations,
393 self.execution_time_observations
394 .into_iter()
395 .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
396 )?;
397
398 Ok(())
399 }
400}
401
402pub(crate) struct ConsensusOutputCache {
407 pub(crate) deferred_transactions:
410 Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>>,
411
412 #[allow(clippy::type_complexity)]
415 pub(crate) user_signatures_for_checkpoints:
416 Mutex<HashMap<TransactionDigest, Vec<(GenericSignature, Option<SequenceNumber>)>>>,
417
418 executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
419 executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
420}
421
422impl ConsensusOutputCache {
423 pub(crate) fn new(tables: &AuthorityEpochTables) -> Self {
424 let deferred_transactions = tables
425 .get_all_deferred_transactions_v2()
426 .expect("load deferred transactions cannot fail");
427
428 let executed_in_epoch_cache_capacity = 50_000;
429
430 Self {
431 deferred_transactions: Mutex::new(deferred_transactions),
432 user_signatures_for_checkpoints: Default::default(),
433 executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
434 executed_in_epoch_cache: MokaCache::builder(8)
435 .max_capacity(randomize_cache_capacity_in_tests(
437 executed_in_epoch_cache_capacity,
438 ))
439 .eviction_policy(EvictionPolicy::lru())
440 .build(),
441 }
442 }
443
444 pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
445 self.executed_in_epoch
446 .read()
447 .contains_key(digest) ||
448 self.executed_in_epoch_cache.get(digest).is_some()
450 }
451
452 pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
454 assert!(
455 self.executed_in_epoch
456 .read()
457 .insert(tx_digest, ())
458 .is_none(),
459 "transaction already executed"
460 );
461 self.executed_in_epoch_cache.insert(tx_digest, ());
462 }
463
464 pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
468 let executed_in_epoch = self.executed_in_epoch.read();
469 for tx_digest in tx_digests {
470 executed_in_epoch.remove(tx_digest);
471 }
472 }
473}
474
475pub(crate) struct ConsensusOutputQuarantine {
478 output_queue: VecDeque<ConsensusCommitOutput>,
480
481 highest_executed_checkpoint: CheckpointSequenceNumber,
483
484 builder_checkpoint_summary:
486 BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
487
488 builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
489
490 shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
492
493 congestion_control_randomness_object_debts:
496 RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
497 congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
498
499 processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
500
501 owned_object_locks: HashMap<ObjectRef, LockDetails>,
503
504 metrics: Arc<EpochMetrics>,
505}
506
507impl ConsensusOutputQuarantine {
508 pub(super) fn new(
509 highest_executed_checkpoint: CheckpointSequenceNumber,
510 authority_metrics: Arc<EpochMetrics>,
511 ) -> Self {
512 Self {
513 highest_executed_checkpoint,
514
515 output_queue: VecDeque::new(),
516 builder_checkpoint_summary: BTreeMap::new(),
517 builder_digest_to_checkpoint: HashMap::new(),
518 shared_object_next_versions: RefCountedHashMap::new(),
519 processed_consensus_messages: RefCountedHashMap::new(),
520 congestion_control_randomness_object_debts: RefCountedHashMap::new(),
521 congestion_control_object_debts: RefCountedHashMap::new(),
522 owned_object_locks: HashMap::new(),
523 metrics: authority_metrics,
524 }
525 }
526}
527
528impl ConsensusOutputQuarantine {
531 pub(crate) fn push_consensus_output(
533 &mut self,
534 output: ConsensusCommitOutput,
535 epoch_store: &AuthorityPerEpochStore,
536 ) -> SuiResult {
537 self.insert_shared_object_next_versions(&output);
538 self.insert_congestion_control_debts(&output);
539 self.insert_processed_consensus_messages(&output);
540 self.insert_owned_object_locks(&output);
541 self.output_queue.push_back(output);
542
543 self.metrics
544 .consensus_quarantine_queue_size
545 .set(self.output_queue.len() as i64);
546
547 self.commit(epoch_store)
550 }
551
552 pub(super) fn insert_builder_summary(
554 &mut self,
555 sequence_number: CheckpointSequenceNumber,
556 summary: BuilderCheckpointSummary,
557 contents: CheckpointContents,
558 ) {
559 debug!(?sequence_number, "inserting builder summary {:?}", summary);
560 for tx in contents.iter() {
561 self.builder_digest_to_checkpoint
562 .insert(tx.transaction, sequence_number);
563 }
564 self.builder_checkpoint_summary
565 .insert(sequence_number, (summary, contents));
566 }
567}
568
569impl ConsensusOutputQuarantine {
571 pub(super) fn update_highest_executed_checkpoint(
574 &mut self,
575 checkpoint: CheckpointSequenceNumber,
576 epoch_store: &AuthorityPerEpochStore,
577 batch: &mut DBBatch,
578 ) -> SuiResult {
579 self.highest_executed_checkpoint = checkpoint;
580 self.commit_with_batch(epoch_store, batch)
581 }
582
583 pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
584 let mut batch = epoch_store.db_batch()?;
585 self.commit_with_batch(epoch_store, &mut batch)?;
586 batch.write()?;
587 Ok(())
588 }
589
590 fn commit_with_batch(
592 &mut self,
593 epoch_store: &AuthorityPerEpochStore,
594 batch: &mut DBBatch,
595 ) -> SuiResult {
596 let tables = epoch_store.tables()?;
603
604 let mut highest_committed_height = None;
605
606 while self
607 .builder_checkpoint_summary
608 .first_key_value()
609 .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
610 == Some(true)
611 {
612 let (seq, (builder_summary, contents)) =
613 self.builder_checkpoint_summary.pop_first().unwrap();
614
615 for tx in contents.iter() {
616 let digest = &tx.transaction;
617 assert_eq!(
618 self.builder_digest_to_checkpoint
619 .remove(digest)
620 .unwrap_or_else(|| {
621 panic!(
622 "transaction {:?} not found in builder_digest_to_checkpoint",
623 digest
624 )
625 }),
626 seq
627 );
628 }
629
630 batch.insert_batch(
631 &tables.builder_digest_to_checkpoint,
632 contents.iter().map(|tx| (tx.transaction, seq)),
633 )?;
634
635 batch.insert_batch(
636 &tables.builder_checkpoint_summary_v2,
637 [(seq, &builder_summary)],
638 )?;
639
640 let checkpoint_height = builder_summary
641 .checkpoint_height
642 .expect("non-genesis checkpoint must have height");
643 if let Some(highest) = highest_committed_height {
644 assert!(
645 checkpoint_height >= highest,
646 "current checkpoint height {} must be no less than highest committed height {}",
647 checkpoint_height,
648 highest
649 );
650 }
651
652 highest_committed_height = Some(checkpoint_height);
653 }
654
655 let Some(highest_committed_height) = highest_committed_height else {
656 return Ok(());
657 };
658
659 while !self.output_queue.is_empty() {
660 let Some(highest_in_commit) = self
664 .output_queue
665 .front()
666 .unwrap()
667 .get_highest_pending_checkpoint_height()
668 else {
669 break;
672 };
673
674 if highest_in_commit <= highest_committed_height {
675 info!(
676 "committing output with highest pending checkpoint height {:?}",
677 highest_in_commit
678 );
679 let output = self.output_queue.pop_front().unwrap();
680 self.remove_shared_object_next_versions(&output);
681 self.remove_processed_consensus_messages(&output);
682 self.remove_congestion_control_debts(&output);
683 self.remove_owned_object_locks(&output);
684
685 output.write_to_batch(epoch_store, batch)?;
686 } else {
687 break;
688 }
689 }
690
691 self.metrics
692 .consensus_quarantine_queue_size
693 .set(self.output_queue.len() as i64);
694
695 Ok(())
696 }
697}
698
699impl ConsensusOutputQuarantine {
700 fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
701 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
702 for (object_id, next_version) in next_versions {
703 self.shared_object_next_versions
704 .insert(*object_id, *next_version);
705 }
706 }
707 }
708
709 fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
710 let current_round = output.consensus_round;
711
712 for (object_id, debt) in output.congestion_control_object_debts.iter() {
713 self.congestion_control_object_debts.insert(
714 *object_id,
715 CongestionPerObjectDebt::new(current_round, *debt),
716 );
717 }
718
719 for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
720 self.congestion_control_randomness_object_debts.insert(
721 *object_id,
722 CongestionPerObjectDebt::new(current_round, *debt),
723 );
724 }
725 }
726
727 fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
728 for (object_id, _) in output.congestion_control_object_debts.iter() {
729 self.congestion_control_object_debts.remove(object_id);
730 }
731 for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
732 self.congestion_control_randomness_object_debts
733 .remove(object_id);
734 }
735 }
736
737 fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
738 for tx_key in output.consensus_messages_processed.iter() {
739 self.processed_consensus_messages.insert(tx_key.clone(), ());
740 }
741 }
742
743 fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
744 for tx_key in output.consensus_messages_processed.iter() {
745 self.processed_consensus_messages.remove(tx_key);
746 }
747 }
748
749 fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
750 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
751 for object_id in next_versions.keys() {
752 if !self.shared_object_next_versions.remove(object_id) {
753 fatal!(
754 "Shared object next version not found in quarantine: {:?}",
755 object_id
756 );
757 }
758 }
759 }
760 }
761
762 fn insert_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
763 for (obj_ref, lock) in &output.owned_object_locks {
764 self.owned_object_locks.insert(*obj_ref, *lock);
765 }
766 }
767
768 fn remove_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
769 for obj_ref in output.owned_object_locks.keys() {
770 self.owned_object_locks.remove(obj_ref);
771 }
772 }
773}
774
775impl ConsensusOutputQuarantine {
778 pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
779 self.builder_checkpoint_summary
780 .values()
781 .last()
782 .map(|(summary, _)| summary)
783 }
784
785 pub(super) fn get_built_summary(
786 &self,
787 sequence: CheckpointSequenceNumber,
788 ) -> Option<&BuilderCheckpointSummary> {
789 self.builder_checkpoint_summary
790 .get(&sequence)
791 .map(|(summary, _)| summary)
792 }
793
794 pub(super) fn included_transaction_in_checkpoint(&self, digest: &TransactionDigest) -> bool {
795 self.builder_digest_to_checkpoint.contains_key(digest)
796 }
797
798 pub(super) fn is_consensus_message_processed(
799 &self,
800 key: &SequencedConsensusTransactionKey,
801 ) -> bool {
802 self.processed_consensus_messages.contains_key(key)
803 }
804
805 pub(super) fn is_empty(&self) -> bool {
806 self.output_queue.is_empty()
807 }
808
809 pub(super) fn get_next_shared_object_versions(
810 &self,
811 tables: &AuthorityEpochTables,
812 objects_to_init: &[ConsensusObjectSequenceKey],
813 ) -> SuiResult<Vec<Option<SequenceNumber>>> {
814 Ok(do_fallback_lookup(
815 objects_to_init,
816 |object_key| {
817 if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
818 CacheResult::Hit(Some(*next_version))
819 } else {
820 CacheResult::Miss
821 }
822 },
823 |object_keys| {
824 tables
825 .next_shared_object_versions_v2
826 .multi_get(object_keys)
827 .expect("db error")
828 },
829 ))
830 }
831
832 pub(super) fn get_owned_object_locks(
836 &self,
837 tables: &AuthorityEpochTables,
838 obj_refs: &[ObjectRef],
839 ) -> SuiResult<Vec<Option<LockDetails>>> {
840 Ok(do_fallback_lookup(
841 obj_refs,
842 |obj_ref| {
843 if let Some(lock) = self.owned_object_locks.get(obj_ref) {
844 CacheResult::Hit(Some(*lock))
845 } else {
846 CacheResult::Miss
847 }
848 },
849 |obj_refs| {
850 tables
851 .multi_get_locked_transactions(obj_refs)
852 .expect("db error")
853 },
854 ))
855 }
856
857 pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
858 self.output_queue
859 .back()
860 .and_then(|output| output.get_highest_pending_checkpoint_height())
861 }
862
863 pub(super) fn get_pending_checkpoints(
864 &self,
865 last: Option<CheckpointHeight>,
866 ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
867 let mut checkpoints = Vec::new();
868 for output in &self.output_queue {
869 checkpoints.extend(
870 output
871 .get_pending_checkpoints(last)
872 .map(|cp| (cp.height(), cp.clone())),
873 );
874 }
875 if cfg!(debug_assertions) {
876 let mut prev = None;
877 for (height, _) in &checkpoints {
878 if let Some(prev) = prev {
879 assert!(prev < *height);
880 }
881 prev = Some(*height);
882 }
883 }
884 checkpoints
885 }
886
887 pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
888 self.output_queue
889 .iter()
890 .any(|output| output.pending_checkpoint_exists(index))
891 }
892
893 pub(super) fn get_new_jwks(
894 &self,
895 epoch_store: &AuthorityPerEpochStore,
896 round: u64,
897 ) -> SuiResult<Vec<ActiveJwk>> {
898 let epoch = epoch_store.epoch();
899
900 for output in self.output_queue.iter().rev() {
902 let output_round = output.get_round().unwrap();
905 if round == output_round {
906 return Ok(output
907 .active_jwks
908 .iter()
909 .map(|(_, (jwk_id, jwk))| ActiveJwk {
910 jwk_id: jwk_id.clone(),
911 jwk: jwk.clone(),
912 epoch,
913 })
914 .collect());
915 }
916 }
917
918 let empty_jwk_id = JwkId::new(String::new(), String::new());
920 let empty_jwk = JWK {
921 kty: String::new(),
922 e: String::new(),
923 n: String::new(),
924 alg: String::new(),
925 };
926
927 let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
928 let end = (round + 1, (empty_jwk_id, empty_jwk));
929
930 Ok(epoch_store
931 .tables()?
932 .active_jwks
933 .safe_iter_with_bounds(Some(start), Some(end))
934 .map_ok(|((r, (jwk_id, jwk)), _)| {
935 debug_assert!(round == r);
936 ActiveJwk { jwk_id, jwk, epoch }
937 })
938 .collect::<Result<Vec<_>, _>>()?)
939 }
940
941 pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
942 self.output_queue
943 .iter()
944 .rev()
945 .filter_map(|output| output.get_randomness_last_round_timestamp())
946 .next()
947 }
948
949 pub(crate) fn load_initial_object_debts(
950 &self,
951 epoch_store: &AuthorityPerEpochStore,
952 current_round: Round,
953 for_randomness: bool,
954 transactions: &[VerifiedExecutableTransactionWithAliases],
955 ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
956 let protocol_config = epoch_store.protocol_config();
957 let tables = epoch_store.tables()?;
958 let default_per_commit_budget = protocol_config
959 .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
960 .unwrap_or(0);
961 let (hash_table, db_table, per_commit_budget) = if for_randomness {
962 (
963 &self.congestion_control_randomness_object_debts,
964 &tables.congestion_control_randomness_object_debts,
965 protocol_config
966 .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
967 .unwrap_or(default_per_commit_budget),
968 )
969 } else {
970 (
971 &self.congestion_control_object_debts,
972 &tables.congestion_control_object_debts,
973 default_per_commit_budget,
974 )
975 };
976 let mut shared_input_object_ids: Vec<_> = transactions
977 .iter()
978 .flat_map(|tx| tx.tx().shared_input_objects().map(|obj| obj.id))
979 .collect();
980 shared_input_object_ids.sort();
981 shared_input_object_ids.dedup();
982
983 let results = do_fallback_lookup(
984 &shared_input_object_ids,
985 |object_id| {
986 if let Some(debt) = hash_table.get(object_id) {
987 CacheResult::Hit(Some(debt.into_v1()))
988 } else {
989 CacheResult::Miss
990 }
991 },
992 |object_ids| {
993 db_table
994 .multi_get(object_ids)
995 .expect("db error")
996 .into_iter()
997 .map(|debt| debt.map(|debt| debt.into_v1()))
998 .collect()
999 },
1000 );
1001
1002 Ok(results
1003 .into_iter()
1004 .zip(shared_input_object_ids)
1005 .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1006 .map(move |((round, debt), object_id)| {
1007 assert!(current_round > round);
1011 let num_rounds = current_round - round - 1;
1012 let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1013 (object_id, debt)
1014 }))
1015 }
1016}
1017
1018#[derive(Debug, Default)]
1027struct RefCountedHashMap<K, V> {
1028 map: HashMap<K, (usize, V)>,
1029}
1030
1031impl<K, V> RefCountedHashMap<K, V>
1032where
1033 K: Clone + Eq + std::hash::Hash,
1034{
1035 pub fn new() -> Self {
1036 Self {
1037 map: HashMap::new(),
1038 }
1039 }
1040
1041 pub fn insert(&mut self, key: K, value: V) {
1042 let entry = self.map.entry(key);
1043 match entry {
1044 hash_map::Entry::Occupied(mut entry) => {
1045 let (ref_count, v) = entry.get_mut();
1046 *ref_count += 1;
1047 *v = value;
1048 }
1049 hash_map::Entry::Vacant(entry) => {
1050 entry.insert((1, value));
1051 }
1052 }
1053 }
1054
1055 pub fn remove(&mut self, key: &K) -> bool {
1058 let entry = self.map.entry(key.clone());
1059 match entry {
1060 hash_map::Entry::Occupied(mut entry) => {
1061 let (ref_count, _) = entry.get_mut();
1062 *ref_count -= 1;
1063 if *ref_count == 0 {
1064 entry.remove();
1065 }
1066 true
1067 }
1068 hash_map::Entry::Vacant(_) => false,
1069 }
1070 }
1071
1072 pub fn get(&self, key: &K) -> Option<&V> {
1073 self.map.get(key).map(|(_, v)| v)
1074 }
1075
1076 pub fn contains_key(&self, key: &K) -> bool {
1077 self.map.contains_key(key)
1078 }
1079}