1use crate::authority::authority_per_epoch_store::{
5 AuthorityEpochTables, EncG, ExecutionIndicesWithStats, LockDetails, LockDetailsWrapper, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::epoch::randomness::SINGLETON_KEY;
10use dashmap::DashMap;
11use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
12use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
13use moka::policy::EvictionPolicy;
14use moka::sync::SegmentedCache as MokaCache;
15use mysten_common::fatal;
16use mysten_common::random_util::randomize_cache_capacity_in_tests;
17use parking_lot::Mutex;
18use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
19use sui_types::authenticator_state::ActiveJwk;
20use sui_types::base_types::{AuthorityName, ObjectRef, SequenceNumber};
21use sui_types::crypto::RandomnessRound;
22use sui_types::error::SuiResult;
23use sui_types::executable_transaction::{
24 TrustedExecutableTransactionWithAliases, VerifiedExecutableTransactionWithAliases,
25};
26use sui_types::execution::ExecutionTimeObservationKey;
27use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
28use sui_types::messages_consensus::AuthorityIndex;
29use sui_types::{
30 base_types::{ConsensusObjectSequenceKey, ObjectID},
31 digests::TransactionDigest,
32 messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
33 signature::GenericSignature,
34};
35use tracing::{debug, info};
36use typed_store::Map;
37use typed_store::rocks::DBBatch;
38
39use crate::{
40 authority::{
41 authority_per_epoch_store::AuthorityPerEpochStore,
42 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
43 shared_object_congestion_tracker::CongestionPerObjectDebt,
44 },
45 checkpoints::{CheckpointHeight, PendingCheckpoint},
46 consensus_handler::{SequencedConsensusTransactionKey, VerifiedSequencedConsensusTransaction},
47 epoch::{
48 randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
49 reconfiguration::ReconfigState,
50 },
51};
52
53use super::*;
54
55#[derive(Default)]
56#[allow(clippy::type_complexity)]
57pub(crate) struct ConsensusCommitOutput {
58 consensus_round: Round,
60 consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
61 end_of_publish: BTreeSet<AuthorityName>,
62 reconfig_state: Option<ReconfigState>,
63 consensus_commit_stats: Option<ExecutionIndicesWithStats>,
64
65 next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
67
68 deferred_txns: Vec<(DeferralKey, Vec<VerifiedSequencedConsensusTransaction>)>,
71 deferred_txns_v2: Vec<(DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>)>,
73 deleted_deferred_txns: BTreeSet<DeferralKey>,
75
76 pending_checkpoints: Vec<PendingCheckpoint>,
78
79 next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
81
82 dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
83 dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
84 dkg_used_message: Option<VersionedUsedProcessedMessages>,
85 dkg_output: Option<dkg_v1::Output<PkG, EncG>>,
86
87 pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
89 active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
90
91 congestion_control_object_debts: Vec<(ObjectID, u64)>,
93 congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
94 execution_time_observations: Vec<(
95 AuthorityIndex,
96 u64, Vec<(ExecutionTimeObservationKey, Duration)>,
98 )>,
99
100 owned_object_locks: HashMap<ObjectRef, LockDetails>,
102}
103
104impl ConsensusCommitOutput {
105 pub fn new(consensus_round: Round) -> Self {
106 Self {
107 consensus_round,
108 ..Default::default()
109 }
110 }
111
112 pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
113 self.deleted_deferred_txns.iter().cloned()
114 }
115
116 pub fn has_deferred_transactions(&self) -> bool {
117 !self.deferred_txns.is_empty() || !self.deferred_txns_v2.is_empty()
118 }
119
120 fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
121 self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
122 }
123
124 fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
125 self.pending_checkpoints.last().map(|cp| cp.height())
126 }
127
128 fn get_pending_checkpoints(
129 &self,
130 last: Option<CheckpointHeight>,
131 ) -> impl Iterator<Item = &PendingCheckpoint> {
132 self.pending_checkpoints.iter().filter(move |cp| {
133 if let Some(last) = last {
134 cp.height() > last
135 } else {
136 true
137 }
138 })
139 }
140
141 fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
142 self.pending_checkpoints
143 .iter()
144 .any(|cp| cp.height() == *index)
145 }
146
147 fn get_round(&self) -> Option<u64> {
148 self.consensus_commit_stats
149 .as_ref()
150 .map(|stats| stats.index.last_committed_round)
151 }
152
153 pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
154 self.end_of_publish.insert(authority);
155 }
156
157 pub fn insert_execution_time_observation(
158 &mut self,
159 source: AuthorityIndex,
160 generation: u64,
161 estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
162 ) {
163 self.execution_time_observations
164 .push((source, generation, estimates));
165 }
166
167 pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStats) {
168 self.consensus_commit_stats = Some(stats);
169 }
170
171 pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
173 self.record_consensus_commit_stats(Default::default());
174 }
175
176 pub fn store_reconfig_state(&mut self, state: ReconfigState) {
177 self.reconfig_state = Some(state);
178 }
179
180 pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
181 self.consensus_messages_processed.insert(key);
182 }
183
184 pub fn get_consensus_messages_processed(
185 &self,
186 ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
187 self.consensus_messages_processed.iter()
188 }
189
190 pub fn set_next_shared_object_versions(
191 &mut self,
192 next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
193 ) {
194 assert!(self.next_shared_object_versions.is_none());
195 self.next_shared_object_versions = Some(next_versions);
196 }
197
198 pub fn defer_transactions(
199 &mut self,
200 key: DeferralKey,
201 transactions: Vec<VerifiedExecutableTransactionWithAliases>,
202 ) {
203 self.deferred_txns_v2.push((key, transactions));
204 }
205
206 pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
207 self.deleted_deferred_txns
208 .extend(deferral_keys.iter().cloned());
209 }
210
211 pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
212 self.pending_checkpoints.push(checkpoint);
213 }
214
215 pub fn reserve_next_randomness_round(
216 &mut self,
217 next_randomness_round: RandomnessRound,
218 commit_timestamp: TimestampMs,
219 ) {
220 assert!(self.next_randomness_round.is_none());
221 self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
222 }
223
224 pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
225 self.dkg_confirmations.insert(conf.sender(), conf);
226 }
227
228 pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
229 self.dkg_processed_messages
230 .insert(message.sender(), message);
231 }
232
233 pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
234 self.dkg_used_message = Some(used_messages);
235 }
236
237 pub fn set_dkg_output(&mut self, output: dkg_v1::Output<PkG, EncG>) {
238 self.dkg_output = Some(output);
239 }
240
241 pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
242 self.pending_jwks.insert((authority, id, jwk));
243 }
244
245 pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
246 self.active_jwks.insert((round, key));
247 }
248
249 pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
250 self.congestion_control_object_debts = object_debts;
251 }
252
253 pub fn set_congestion_control_randomness_object_debts(
254 &mut self,
255 object_debts: Vec<(ObjectID, u64)>,
256 ) {
257 self.congestion_control_randomness_object_debts = object_debts;
258 }
259
260 pub fn set_owned_object_locks(&mut self, locks: HashMap<ObjectRef, LockDetails>) {
261 assert!(self.owned_object_locks.is_empty());
262 self.owned_object_locks = locks;
263 }
264
265 pub fn write_to_batch(
266 self,
267 epoch_store: &AuthorityPerEpochStore,
268 batch: &mut DBBatch,
269 ) -> SuiResult {
270 let tables = epoch_store.tables()?;
271 batch.insert_batch(
272 &tables.consensus_message_processed,
273 self.consensus_messages_processed
274 .iter()
275 .map(|key| (key, true)),
276 )?;
277
278 batch.insert_batch(
279 &tables.end_of_publish,
280 self.end_of_publish.iter().map(|authority| (authority, ())),
281 )?;
282
283 if let Some(reconfig_state) = &self.reconfig_state {
284 batch.insert_batch(
285 &tables.reconfig_state,
286 [(RECONFIG_STATE_INDEX, reconfig_state)],
287 )?;
288 }
289
290 let consensus_commit_stats = self
291 .consensus_commit_stats
292 .expect("consensus_commit_stats must be set");
293 let round = consensus_commit_stats.index.last_committed_round;
294
295 batch.insert_batch(
296 &tables.last_consensus_stats,
297 [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
298 )?;
299
300 if let Some(next_versions) = self.next_shared_object_versions {
301 batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
302 }
303
304 if !self.owned_object_locks.is_empty() {
305 batch.insert_batch(
306 &tables.owned_object_locked_transactions,
307 self.owned_object_locks
308 .into_iter()
309 .map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))),
310 )?;
311 }
312
313 batch.delete_batch(
314 &tables.deferred_transactions_v2,
315 &self.deleted_deferred_txns,
316 )?;
317 batch.delete_batch(
318 &tables.deferred_transactions_with_aliases_v2,
319 &self.deleted_deferred_txns,
320 )?;
321
322 batch.insert_batch(
323 &tables.deferred_transactions_with_aliases_v2,
324 self.deferred_txns_v2.into_iter().map(|(key, txs)| {
325 (
326 key,
327 txs.into_iter()
328 .map(|tx| {
329 let tx: TrustedExecutableTransactionWithAliases = tx.serializable();
330 tx
331 })
332 .collect::<Vec<_>>(),
333 )
334 }),
335 )?;
336
337 if let Some((round, commit_timestamp)) = self.next_randomness_round {
338 batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
339 batch.insert_batch(
340 &tables.randomness_last_round_timestamp,
341 [(SINGLETON_KEY, commit_timestamp)],
342 )?;
343 }
344
345 batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
346 batch.insert_batch(
347 &tables.dkg_processed_messages_v2,
348 self.dkg_processed_messages,
349 )?;
350 batch.insert_batch(
351 &tables.dkg_used_messages_v2,
352 self.dkg_used_message
354 .into_iter()
355 .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
356 )?;
357 if let Some(output) = self.dkg_output {
358 batch.insert_batch(&tables.dkg_output, [(SINGLETON_KEY, output)])?;
359 }
360
361 batch.insert_batch(
362 &tables.pending_jwks,
363 self.pending_jwks.into_iter().map(|j| (j, ())),
364 )?;
365 batch.insert_batch(
366 &tables.active_jwks,
367 self.active_jwks.into_iter().map(|j| {
368 assert_eq!(j.0, round);
370 (j, ())
371 }),
372 )?;
373
374 batch.insert_batch(
375 &tables.congestion_control_object_debts,
376 self.congestion_control_object_debts
377 .into_iter()
378 .map(|(object_id, debt)| {
379 (
380 object_id,
381 CongestionPerObjectDebt::new(self.consensus_round, debt),
382 )
383 }),
384 )?;
385 batch.insert_batch(
386 &tables.congestion_control_randomness_object_debts,
387 self.congestion_control_randomness_object_debts
388 .into_iter()
389 .map(|(object_id, debt)| {
390 (
391 object_id,
392 CongestionPerObjectDebt::new(self.consensus_round, debt),
393 )
394 }),
395 )?;
396
397 batch.insert_batch(
398 &tables.execution_time_observations,
399 self.execution_time_observations
400 .into_iter()
401 .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
402 )?;
403
404 Ok(())
405 }
406}
407
408pub(crate) struct ConsensusOutputCache {
413 pub(crate) deferred_transactions_v2:
416 Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>>,
417
418 #[allow(clippy::type_complexity)]
421 pub(crate) user_signatures_for_checkpoints:
422 Mutex<HashMap<TransactionDigest, Vec<(GenericSignature, Option<SequenceNumber>)>>>,
423
424 executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
425 executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
426}
427
428impl ConsensusOutputCache {
429 pub(crate) fn new(
430 epoch_start_configuration: &EpochStartConfiguration,
431 tables: &AuthorityEpochTables,
432 ) -> Self {
433 let deferred_transactions_v2 = tables
434 .get_all_deferred_transactions_v2()
435 .expect("load deferred transactions cannot fail");
436
437 assert!(
438 epoch_start_configuration.is_data_quarantine_active_from_beginning_of_epoch(),
439 "This version of sui-node can only run after data quarantining has been enabled. Please run version 1.45.0 or later to the end of the current epoch and retry"
440 );
441
442 let executed_in_epoch_cache_capacity = 50_000;
443
444 Self {
445 deferred_transactions_v2: Mutex::new(deferred_transactions_v2),
446 user_signatures_for_checkpoints: Default::default(),
447 executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
448 executed_in_epoch_cache: MokaCache::builder(8)
449 .max_capacity(randomize_cache_capacity_in_tests(
451 executed_in_epoch_cache_capacity,
452 ))
453 .eviction_policy(EvictionPolicy::lru())
454 .build(),
455 }
456 }
457
458 pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
459 self.executed_in_epoch
460 .read()
461 .contains_key(digest) ||
462 self.executed_in_epoch_cache.get(digest).is_some()
464 }
465
466 pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
468 assert!(
469 self.executed_in_epoch
470 .read()
471 .insert(tx_digest, ())
472 .is_none(),
473 "transaction already executed"
474 );
475 self.executed_in_epoch_cache.insert(tx_digest, ());
476 }
477
478 pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
482 let executed_in_epoch = self.executed_in_epoch.read();
483 for tx_digest in tx_digests {
484 executed_in_epoch.remove(tx_digest);
485 }
486 }
487}
488
489pub(crate) struct ConsensusOutputQuarantine {
492 output_queue: VecDeque<ConsensusCommitOutput>,
494
495 highest_executed_checkpoint: CheckpointSequenceNumber,
497
498 builder_checkpoint_summary:
500 BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
501
502 builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
503
504 shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
506
507 congestion_control_randomness_object_debts:
510 RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
511 congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
512
513 processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
514
515 owned_object_locks: HashMap<ObjectRef, LockDetails>,
517
518 metrics: Arc<EpochMetrics>,
519}
520
521impl ConsensusOutputQuarantine {
522 pub(super) fn new(
523 highest_executed_checkpoint: CheckpointSequenceNumber,
524 authority_metrics: Arc<EpochMetrics>,
525 ) -> Self {
526 Self {
527 highest_executed_checkpoint,
528
529 output_queue: VecDeque::new(),
530 builder_checkpoint_summary: BTreeMap::new(),
531 builder_digest_to_checkpoint: HashMap::new(),
532 shared_object_next_versions: RefCountedHashMap::new(),
533 processed_consensus_messages: RefCountedHashMap::new(),
534 congestion_control_randomness_object_debts: RefCountedHashMap::new(),
535 congestion_control_object_debts: RefCountedHashMap::new(),
536 owned_object_locks: HashMap::new(),
537 metrics: authority_metrics,
538 }
539 }
540}
541
542impl ConsensusOutputQuarantine {
545 pub(crate) fn push_consensus_output(
547 &mut self,
548 output: ConsensusCommitOutput,
549 epoch_store: &AuthorityPerEpochStore,
550 ) -> SuiResult {
551 self.insert_shared_object_next_versions(&output);
552 self.insert_congestion_control_debts(&output);
553 self.insert_processed_consensus_messages(&output);
554 self.insert_owned_object_locks(&output);
555 self.output_queue.push_back(output);
556
557 self.metrics
558 .consensus_quarantine_queue_size
559 .set(self.output_queue.len() as i64);
560
561 self.commit(epoch_store)
564 }
565
566 pub(super) fn insert_builder_summary(
568 &mut self,
569 sequence_number: CheckpointSequenceNumber,
570 summary: BuilderCheckpointSummary,
571 contents: CheckpointContents,
572 ) {
573 debug!(?sequence_number, "inserting builder summary {:?}", summary);
574 for tx in contents.iter() {
575 self.builder_digest_to_checkpoint
576 .insert(tx.transaction, sequence_number);
577 }
578 self.builder_checkpoint_summary
579 .insert(sequence_number, (summary, contents));
580 }
581}
582
583impl ConsensusOutputQuarantine {
585 pub(super) fn update_highest_executed_checkpoint(
588 &mut self,
589 checkpoint: CheckpointSequenceNumber,
590 epoch_store: &AuthorityPerEpochStore,
591 batch: &mut DBBatch,
592 ) -> SuiResult {
593 self.highest_executed_checkpoint = checkpoint;
594 self.commit_with_batch(epoch_store, batch)
595 }
596
597 pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
598 let mut batch = epoch_store.db_batch()?;
599 self.commit_with_batch(epoch_store, &mut batch)?;
600 batch.write()?;
601 Ok(())
602 }
603
604 fn commit_with_batch(
606 &mut self,
607 epoch_store: &AuthorityPerEpochStore,
608 batch: &mut DBBatch,
609 ) -> SuiResult {
610 let tables = epoch_store.tables()?;
617
618 let mut highest_committed_height = None;
619
620 while self
621 .builder_checkpoint_summary
622 .first_key_value()
623 .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
624 == Some(true)
625 {
626 let (seq, (builder_summary, contents)) =
627 self.builder_checkpoint_summary.pop_first().unwrap();
628
629 for tx in contents.iter() {
630 let digest = &tx.transaction;
631 assert_eq!(
632 self.builder_digest_to_checkpoint
633 .remove(digest)
634 .unwrap_or_else(|| {
635 panic!(
636 "transaction {:?} not found in builder_digest_to_checkpoint",
637 digest
638 )
639 }),
640 seq
641 );
642 }
643
644 batch.insert_batch(
645 &tables.builder_digest_to_checkpoint,
646 contents.iter().map(|tx| (tx.transaction, seq)),
647 )?;
648
649 batch.insert_batch(
650 &tables.builder_checkpoint_summary_v2,
651 [(seq, &builder_summary)],
652 )?;
653
654 let checkpoint_height = builder_summary
655 .checkpoint_height
656 .expect("non-genesis checkpoint must have height");
657 if let Some(highest) = highest_committed_height {
658 assert!(
659 checkpoint_height >= highest,
660 "current checkpoint height {} must be no less than highest committed height {}",
661 checkpoint_height,
662 highest
663 );
664 }
665
666 highest_committed_height = Some(checkpoint_height);
667 }
668
669 let Some(highest_committed_height) = highest_committed_height else {
670 return Ok(());
671 };
672
673 while !self.output_queue.is_empty() {
674 let Some(highest_in_commit) = self
678 .output_queue
679 .front()
680 .unwrap()
681 .get_highest_pending_checkpoint_height()
682 else {
683 break;
686 };
687
688 if highest_in_commit <= highest_committed_height {
689 info!(
690 "committing output with highest pending checkpoint height {:?}",
691 highest_in_commit
692 );
693 let output = self.output_queue.pop_front().unwrap();
694 self.remove_shared_object_next_versions(&output);
695 self.remove_processed_consensus_messages(&output);
696 self.remove_congestion_control_debts(&output);
697 self.remove_owned_object_locks(&output);
698
699 output.write_to_batch(epoch_store, batch)?;
700 } else {
701 break;
702 }
703 }
704
705 self.metrics
706 .consensus_quarantine_queue_size
707 .set(self.output_queue.len() as i64);
708
709 Ok(())
710 }
711}
712
713impl ConsensusOutputQuarantine {
714 fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
715 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
716 for (object_id, next_version) in next_versions {
717 self.shared_object_next_versions
718 .insert(*object_id, *next_version);
719 }
720 }
721 }
722
723 fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
724 let current_round = output.consensus_round;
725
726 for (object_id, debt) in output.congestion_control_object_debts.iter() {
727 self.congestion_control_object_debts.insert(
728 *object_id,
729 CongestionPerObjectDebt::new(current_round, *debt),
730 );
731 }
732
733 for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
734 self.congestion_control_randomness_object_debts.insert(
735 *object_id,
736 CongestionPerObjectDebt::new(current_round, *debt),
737 );
738 }
739 }
740
741 fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
742 for (object_id, _) in output.congestion_control_object_debts.iter() {
743 self.congestion_control_object_debts.remove(object_id);
744 }
745 for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
746 self.congestion_control_randomness_object_debts
747 .remove(object_id);
748 }
749 }
750
751 fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
752 for tx_key in output.consensus_messages_processed.iter() {
753 self.processed_consensus_messages.insert(tx_key.clone(), ());
754 }
755 }
756
757 fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
758 for tx_key in output.consensus_messages_processed.iter() {
759 self.processed_consensus_messages.remove(tx_key);
760 }
761 }
762
763 fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
764 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
765 for object_id in next_versions.keys() {
766 if !self.shared_object_next_versions.remove(object_id) {
767 fatal!(
768 "Shared object next version not found in quarantine: {:?}",
769 object_id
770 );
771 }
772 }
773 }
774 }
775
776 fn insert_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
777 for (obj_ref, lock) in &output.owned_object_locks {
778 self.owned_object_locks.insert(*obj_ref, *lock);
779 }
780 }
781
782 fn remove_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
783 for obj_ref in output.owned_object_locks.keys() {
784 self.owned_object_locks.remove(obj_ref);
785 }
786 }
787}
788
789impl ConsensusOutputQuarantine {
792 pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
793 self.builder_checkpoint_summary
794 .values()
795 .last()
796 .map(|(summary, _)| summary)
797 }
798
799 pub(super) fn get_built_summary(
800 &self,
801 sequence: CheckpointSequenceNumber,
802 ) -> Option<&BuilderCheckpointSummary> {
803 self.builder_checkpoint_summary
804 .get(&sequence)
805 .map(|(summary, _)| summary)
806 }
807
808 pub(super) fn included_transaction_in_checkpoint(&self, digest: &TransactionDigest) -> bool {
809 self.builder_digest_to_checkpoint.contains_key(digest)
810 }
811
812 pub(super) fn is_consensus_message_processed(
813 &self,
814 key: &SequencedConsensusTransactionKey,
815 ) -> bool {
816 self.processed_consensus_messages.contains_key(key)
817 }
818
819 pub(super) fn is_empty(&self) -> bool {
820 self.output_queue.is_empty()
821 }
822
823 pub(super) fn get_next_shared_object_versions(
824 &self,
825 tables: &AuthorityEpochTables,
826 objects_to_init: &[ConsensusObjectSequenceKey],
827 ) -> SuiResult<Vec<Option<SequenceNumber>>> {
828 Ok(do_fallback_lookup(
829 objects_to_init,
830 |object_key| {
831 if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
832 CacheResult::Hit(Some(*next_version))
833 } else {
834 CacheResult::Miss
835 }
836 },
837 |object_keys| {
838 tables
839 .next_shared_object_versions_v2
840 .multi_get(object_keys)
841 .expect("db error")
842 },
843 ))
844 }
845
846 pub(super) fn get_owned_object_locks(
850 &self,
851 tables: &AuthorityEpochTables,
852 obj_refs: &[ObjectRef],
853 ) -> SuiResult<Vec<Option<LockDetails>>> {
854 Ok(do_fallback_lookup(
855 obj_refs,
856 |obj_ref| {
857 if let Some(lock) = self.owned_object_locks.get(obj_ref) {
858 CacheResult::Hit(Some(*lock))
859 } else {
860 CacheResult::Miss
861 }
862 },
863 |obj_refs| {
864 tables
865 .multi_get_locked_transactions(obj_refs)
866 .expect("db error")
867 },
868 ))
869 }
870
871 pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
872 self.output_queue
873 .back()
874 .and_then(|output| output.get_highest_pending_checkpoint_height())
875 }
876
877 pub(super) fn get_pending_checkpoints(
878 &self,
879 last: Option<CheckpointHeight>,
880 ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
881 let mut checkpoints = Vec::new();
882 for output in &self.output_queue {
883 checkpoints.extend(
884 output
885 .get_pending_checkpoints(last)
886 .map(|cp| (cp.height(), cp.clone())),
887 );
888 }
889 if cfg!(debug_assertions) {
890 let mut prev = None;
891 for (height, _) in &checkpoints {
892 if let Some(prev) = prev {
893 assert!(prev < *height);
894 }
895 prev = Some(*height);
896 }
897 }
898 checkpoints
899 }
900
901 pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
902 self.output_queue
903 .iter()
904 .any(|output| output.pending_checkpoint_exists(index))
905 }
906
907 pub(super) fn get_new_jwks(
908 &self,
909 epoch_store: &AuthorityPerEpochStore,
910 round: u64,
911 ) -> SuiResult<Vec<ActiveJwk>> {
912 let epoch = epoch_store.epoch();
913
914 for output in self.output_queue.iter().rev() {
916 let output_round = output.get_round().unwrap();
919 if round == output_round {
920 return Ok(output
921 .active_jwks
922 .iter()
923 .map(|(_, (jwk_id, jwk))| ActiveJwk {
924 jwk_id: jwk_id.clone(),
925 jwk: jwk.clone(),
926 epoch,
927 })
928 .collect());
929 }
930 }
931
932 let empty_jwk_id = JwkId::new(String::new(), String::new());
934 let empty_jwk = JWK {
935 kty: String::new(),
936 e: String::new(),
937 n: String::new(),
938 alg: String::new(),
939 };
940
941 let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
942 let end = (round + 1, (empty_jwk_id, empty_jwk));
943
944 Ok(epoch_store
945 .tables()?
946 .active_jwks
947 .safe_iter_with_bounds(Some(start), Some(end))
948 .map_ok(|((r, (jwk_id, jwk)), _)| {
949 debug_assert!(round == r);
950 ActiveJwk { jwk_id, jwk, epoch }
951 })
952 .collect::<Result<Vec<_>, _>>()?)
953 }
954
955 pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
956 self.output_queue
957 .iter()
958 .rev()
959 .filter_map(|output| output.get_randomness_last_round_timestamp())
960 .next()
961 }
962
963 pub(crate) fn load_initial_object_debts(
964 &self,
965 epoch_store: &AuthorityPerEpochStore,
966 current_round: Round,
967 for_randomness: bool,
968 transactions: &[VerifiedExecutableTransactionWithAliases],
969 ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
970 let protocol_config = epoch_store.protocol_config();
971 let tables = epoch_store.tables()?;
972 let default_per_commit_budget = protocol_config
973 .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
974 .unwrap_or(0);
975 let (hash_table, db_table, per_commit_budget) = if for_randomness {
976 (
977 &self.congestion_control_randomness_object_debts,
978 &tables.congestion_control_randomness_object_debts,
979 protocol_config
980 .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
981 .unwrap_or(default_per_commit_budget),
982 )
983 } else {
984 (
985 &self.congestion_control_object_debts,
986 &tables.congestion_control_object_debts,
987 default_per_commit_budget,
988 )
989 };
990 let mut shared_input_object_ids: Vec<_> = transactions
991 .iter()
992 .flat_map(|tx| tx.tx().shared_input_objects().map(|obj| obj.id))
993 .collect();
994 shared_input_object_ids.sort();
995 shared_input_object_ids.dedup();
996
997 let results = do_fallback_lookup(
998 &shared_input_object_ids,
999 |object_id| {
1000 if let Some(debt) = hash_table.get(object_id) {
1001 CacheResult::Hit(Some(debt.into_v1()))
1002 } else {
1003 CacheResult::Miss
1004 }
1005 },
1006 |object_ids| {
1007 db_table
1008 .multi_get(object_ids)
1009 .expect("db error")
1010 .into_iter()
1011 .map(|debt| debt.map(|debt| debt.into_v1()))
1012 .collect()
1013 },
1014 );
1015
1016 Ok(results
1017 .into_iter()
1018 .zip(shared_input_object_ids)
1019 .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1020 .map(move |((round, debt), object_id)| {
1021 assert!(current_round > round);
1025 let num_rounds = current_round - round - 1;
1026 let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1027 (object_id, debt)
1028 }))
1029 }
1030}
1031
1032#[derive(Debug, Default)]
1041struct RefCountedHashMap<K, V> {
1042 map: HashMap<K, (usize, V)>,
1043}
1044
1045impl<K, V> RefCountedHashMap<K, V>
1046where
1047 K: Clone + Eq + std::hash::Hash,
1048{
1049 pub fn new() -> Self {
1050 Self {
1051 map: HashMap::new(),
1052 }
1053 }
1054
1055 pub fn insert(&mut self, key: K, value: V) {
1056 let entry = self.map.entry(key);
1057 match entry {
1058 hash_map::Entry::Occupied(mut entry) => {
1059 let (ref_count, v) = entry.get_mut();
1060 *ref_count += 1;
1061 *v = value;
1062 }
1063 hash_map::Entry::Vacant(entry) => {
1064 entry.insert((1, value));
1065 }
1066 }
1067 }
1068
1069 pub fn remove(&mut self, key: &K) -> bool {
1072 let entry = self.map.entry(key.clone());
1073 match entry {
1074 hash_map::Entry::Occupied(mut entry) => {
1075 let (ref_count, _) = entry.get_mut();
1076 *ref_count -= 1;
1077 if *ref_count == 0 {
1078 entry.remove();
1079 }
1080 true
1081 }
1082 hash_map::Entry::Vacant(_) => false,
1083 }
1084 }
1085
1086 pub fn get(&self, key: &K) -> Option<&V> {
1087 self.map.get(key).map(|(_, v)| v)
1088 }
1089
1090 pub fn contains_key(&self, key: &K) -> bool {
1091 self.map.contains_key(key)
1092 }
1093}