1use crate::authority::authority_per_epoch_store::{
5 AuthorityEpochTables, EncG, ExecutionIndicesWithStats, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::consensus_handler::SequencedConsensusTransactionKind;
10use crate::epoch::randomness::SINGLETON_KEY;
11use dashmap::DashMap;
12use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
13use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
14use moka::policy::EvictionPolicy;
15use moka::sync::SegmentedCache as MokaCache;
16use mysten_common::fatal;
17use mysten_common::random_util::randomize_cache_capacity_in_tests;
18use parking_lot::Mutex;
19use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
20use sui_types::authenticator_state::ActiveJwk;
21use sui_types::base_types::{AuthorityName, SequenceNumber};
22use sui_types::crypto::RandomnessRound;
23use sui_types::error::SuiResult;
24use sui_types::execution::ExecutionTimeObservationKey;
25use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
26use sui_types::messages_consensus::{
27 AuthorityIndex, ConsensusTransaction, ConsensusTransactionKind,
28};
29use sui_types::{
30 base_types::{ConsensusObjectSequenceKey, ObjectID},
31 digests::TransactionDigest,
32 messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
33 signature::GenericSignature,
34};
35use tracing::{debug, info};
36use typed_store::Map;
37use typed_store::rocks::DBBatch;
38
39use crate::{
40 authority::{
41 authority_per_epoch_store::AuthorityPerEpochStore,
42 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
43 shared_object_congestion_tracker::CongestionPerObjectDebt,
44 },
45 checkpoints::{CheckpointHeight, PendingCheckpoint},
46 consensus_handler::{SequencedConsensusTransactionKey, VerifiedSequencedConsensusTransaction},
47 epoch::{
48 randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
49 reconfiguration::ReconfigState,
50 },
51};
52
53use super::*;
54
55#[derive(Default)]
56#[allow(clippy::type_complexity)]
57pub(crate) struct ConsensusCommitOutput {
58 consensus_round: Round,
60 consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
61 end_of_publish: BTreeSet<AuthorityName>,
62 reconfig_state: Option<ReconfigState>,
63 consensus_commit_stats: Option<ExecutionIndicesWithStats>,
64
65 next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
67
68 deferred_txns: Vec<(DeferralKey, Vec<VerifiedSequencedConsensusTransaction>)>,
71 deferred_txns_v2: Vec<(DeferralKey, Vec<VerifiedExecutableTransaction>)>,
73 deleted_deferred_txns: BTreeSet<DeferralKey>,
75
76 pending_checkpoints: Vec<PendingCheckpoint>,
78
79 next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
81
82 dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
83 dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
84 dkg_used_message: Option<VersionedUsedProcessedMessages>,
85 dkg_output: Option<dkg_v1::Output<PkG, EncG>>,
86
87 pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
89 active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
90
91 congestion_control_object_debts: Vec<(ObjectID, u64)>,
93 congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
94 execution_time_observations: Vec<(
95 AuthorityIndex,
96 u64, Vec<(ExecutionTimeObservationKey, Duration)>,
98 )>,
99}
100
101impl ConsensusCommitOutput {
102 pub fn new(consensus_round: Round) -> Self {
103 Self {
104 consensus_round,
105 ..Default::default()
106 }
107 }
108
109 pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
110 self.deleted_deferred_txns.iter().cloned()
111 }
112
113 pub fn has_deferred_transactions(&self) -> bool {
114 !self.deferred_txns.is_empty() || !self.deferred_txns_v2.is_empty()
115 }
116
117 fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
118 self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
119 }
120
121 fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
122 self.pending_checkpoints.last().map(|cp| cp.height())
123 }
124
125 fn get_pending_checkpoints(
126 &self,
127 last: Option<CheckpointHeight>,
128 ) -> impl Iterator<Item = &PendingCheckpoint> {
129 self.pending_checkpoints.iter().filter(move |cp| {
130 if let Some(last) = last {
131 cp.height() > last
132 } else {
133 true
134 }
135 })
136 }
137
138 fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
139 self.pending_checkpoints
140 .iter()
141 .any(|cp| cp.height() == *index)
142 }
143
144 fn get_round(&self) -> Option<u64> {
145 self.consensus_commit_stats
146 .as_ref()
147 .map(|stats| stats.index.last_committed_round)
148 }
149
150 pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
151 self.end_of_publish.insert(authority);
152 }
153
154 pub fn insert_execution_time_observation(
155 &mut self,
156 source: AuthorityIndex,
157 generation: u64,
158 estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
159 ) {
160 self.execution_time_observations
161 .push((source, generation, estimates));
162 }
163
164 pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStats) {
165 self.consensus_commit_stats = Some(stats);
166 }
167
168 pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
170 self.record_consensus_commit_stats(Default::default());
171 }
172
173 pub fn store_reconfig_state(&mut self, state: ReconfigState) {
174 self.reconfig_state = Some(state);
175 }
176
177 pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
178 self.consensus_messages_processed.insert(key);
179 }
180
181 pub fn get_consensus_messages_processed(
182 &self,
183 ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
184 self.consensus_messages_processed.iter()
185 }
186
187 pub fn set_next_shared_object_versions(
188 &mut self,
189 next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
190 ) {
191 assert!(self.next_shared_object_versions.is_none());
192 self.next_shared_object_versions = Some(next_versions);
193 }
194
195 pub fn defer_transactions(
196 &mut self,
197 key: DeferralKey,
198 transactions: Vec<VerifiedSequencedConsensusTransaction>,
199 ) {
200 self.deferred_txns.push((key, transactions));
201 }
202
203 pub fn defer_transactions_v2(
204 &mut self,
205 key: DeferralKey,
206 transactions: Vec<VerifiedExecutableTransaction>,
207 ) {
208 self.deferred_txns_v2.push((key, transactions));
209 }
210
211 pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
212 self.deleted_deferred_txns
213 .extend(deferral_keys.iter().cloned());
214 }
215
216 pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
217 self.pending_checkpoints.push(checkpoint);
218 }
219
220 pub fn reserve_next_randomness_round(
221 &mut self,
222 next_randomness_round: RandomnessRound,
223 commit_timestamp: TimestampMs,
224 ) {
225 assert!(self.next_randomness_round.is_none());
226 self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
227 }
228
229 pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
230 self.dkg_confirmations.insert(conf.sender(), conf);
231 }
232
233 pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
234 self.dkg_processed_messages
235 .insert(message.sender(), message);
236 }
237
238 pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
239 self.dkg_used_message = Some(used_messages);
240 }
241
242 pub fn set_dkg_output(&mut self, output: dkg_v1::Output<PkG, EncG>) {
243 self.dkg_output = Some(output);
244 }
245
246 pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
247 self.pending_jwks.insert((authority, id, jwk));
248 }
249
250 pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
251 self.active_jwks.insert((round, key));
252 }
253
254 pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
255 self.congestion_control_object_debts = object_debts;
256 }
257
258 pub fn set_congestion_control_randomness_object_debts(
259 &mut self,
260 object_debts: Vec<(ObjectID, u64)>,
261 ) {
262 self.congestion_control_randomness_object_debts = object_debts;
263 }
264
265 pub fn write_to_batch(
266 self,
267 epoch_store: &AuthorityPerEpochStore,
268 batch: &mut DBBatch,
269 ) -> SuiResult {
270 let tables = epoch_store.tables()?;
271 batch.insert_batch(
272 &tables.consensus_message_processed,
273 self.consensus_messages_processed
274 .iter()
275 .map(|key| (key, true)),
276 )?;
277
278 batch.insert_batch(
279 &tables.end_of_publish,
280 self.end_of_publish.iter().map(|authority| (authority, ())),
281 )?;
282
283 if let Some(reconfig_state) = &self.reconfig_state {
284 batch.insert_batch(
285 &tables.reconfig_state,
286 [(RECONFIG_STATE_INDEX, reconfig_state)],
287 )?;
288 }
289
290 let consensus_commit_stats = self
291 .consensus_commit_stats
292 .expect("consensus_commit_stats must be set");
293 let round = consensus_commit_stats.index.last_committed_round;
294
295 batch.insert_batch(
296 &tables.last_consensus_stats,
297 [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
298 )?;
299
300 if let Some(next_versions) = self.next_shared_object_versions {
301 batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
302 }
303
304 batch.delete_batch(&tables.deferred_transactions, &self.deleted_deferred_txns)?;
306 batch.delete_batch(
307 &tables.deferred_transactions_v2,
308 &self.deleted_deferred_txns,
309 )?;
310
311 batch.insert_batch(&tables.deferred_transactions, self.deferred_txns)?;
312 batch.insert_batch(
313 &tables.deferred_transactions_v2,
314 self.deferred_txns_v2.into_iter().map(|(key, txs)| {
315 (
316 key,
317 txs.into_iter()
318 .map(|tx| {
319 let tx: TrustedExecutableTransaction = tx.serializable();
320 tx
321 })
322 .collect::<Vec<_>>(),
323 )
324 }),
325 )?;
326
327 if let Some((round, commit_timestamp)) = self.next_randomness_round {
328 batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
329 batch.insert_batch(
330 &tables.randomness_last_round_timestamp,
331 [(SINGLETON_KEY, commit_timestamp)],
332 )?;
333 }
334
335 batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
336 batch.insert_batch(
337 &tables.dkg_processed_messages_v2,
338 self.dkg_processed_messages,
339 )?;
340 batch.insert_batch(
341 &tables.dkg_used_messages_v2,
342 self.dkg_used_message
344 .into_iter()
345 .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
346 )?;
347 if let Some(output) = self.dkg_output {
348 batch.insert_batch(&tables.dkg_output, [(SINGLETON_KEY, output)])?;
349 }
350
351 batch.insert_batch(
352 &tables.pending_jwks,
353 self.pending_jwks.into_iter().map(|j| (j, ())),
354 )?;
355 batch.insert_batch(
356 &tables.active_jwks,
357 self.active_jwks.into_iter().map(|j| {
358 assert_eq!(j.0, round);
360 (j, ())
361 }),
362 )?;
363
364 batch.insert_batch(
365 &tables.congestion_control_object_debts,
366 self.congestion_control_object_debts
367 .into_iter()
368 .map(|(object_id, debt)| {
369 (
370 object_id,
371 CongestionPerObjectDebt::new(self.consensus_round, debt),
372 )
373 }),
374 )?;
375 batch.insert_batch(
376 &tables.congestion_control_randomness_object_debts,
377 self.congestion_control_randomness_object_debts
378 .into_iter()
379 .map(|(object_id, debt)| {
380 (
381 object_id,
382 CongestionPerObjectDebt::new(self.consensus_round, debt),
383 )
384 }),
385 )?;
386
387 batch.insert_batch(
388 &tables.execution_time_observations,
389 self.execution_time_observations
390 .into_iter()
391 .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
392 )?;
393
394 Ok(())
395 }
396}
397
398pub(crate) struct ConsensusOutputCache {
403 pub(crate) deferred_transactions:
407 Mutex<BTreeMap<DeferralKey, Vec<VerifiedSequencedConsensusTransaction>>>,
408
409 pub(crate) deferred_transactions_v2:
410 Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransaction>>>,
411
412 pub(crate) user_signatures_for_checkpoints:
415 Mutex<HashMap<TransactionDigest, Vec<GenericSignature>>>,
416
417 executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
418 executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
419}
420
421impl ConsensusOutputCache {
422 pub(crate) fn new(
423 epoch_start_configuration: &EpochStartConfiguration,
424 tables: &AuthorityEpochTables,
425 ) -> Self {
426 let deferred_transactions = tables
427 .get_all_deferred_transactions()
428 .expect("load deferred transactions cannot fail");
429
430 let deferred_transactions_v2 = tables
431 .get_all_deferred_transactions_v2()
432 .expect("load deferred transactions cannot fail");
433
434 assert!(
435 epoch_start_configuration.is_data_quarantine_active_from_beginning_of_epoch(),
436 "This version of sui-node can only run after data quarantining has been enabled. Please run version 1.45.0 or later to the end of the current epoch and retry"
437 );
438
439 let executed_in_epoch_cache_capacity = 50_000;
440
441 Self {
442 deferred_transactions: Mutex::new(deferred_transactions),
443 deferred_transactions_v2: Mutex::new(deferred_transactions_v2),
444 user_signatures_for_checkpoints: Default::default(),
445 executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
446 executed_in_epoch_cache: MokaCache::builder(8)
447 .max_capacity(randomize_cache_capacity_in_tests(
449 executed_in_epoch_cache_capacity,
450 ))
451 .eviction_policy(EvictionPolicy::lru())
452 .build(),
453 }
454 }
455
456 pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
457 self.executed_in_epoch
458 .read()
459 .contains_key(digest) ||
460 self.executed_in_epoch_cache.get(digest).is_some()
462 }
463
464 pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
466 assert!(
467 self.executed_in_epoch
468 .read()
469 .insert(tx_digest, ())
470 .is_none(),
471 "transaction already executed"
472 );
473 self.executed_in_epoch_cache.insert(tx_digest, ());
474 }
475
476 pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
480 let executed_in_epoch = self.executed_in_epoch.read();
481 for tx_digest in tx_digests {
482 executed_in_epoch.remove(tx_digest);
483 }
484 }
485}
486
487pub(crate) struct ConsensusOutputQuarantine {
490 output_queue: VecDeque<ConsensusCommitOutput>,
492
493 highest_executed_checkpoint: CheckpointSequenceNumber,
495
496 builder_checkpoint_summary:
498 BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
499
500 builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
501
502 shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
504
505 congestion_control_randomness_object_debts:
508 RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
509 congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
510
511 processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
512
513 metrics: Arc<EpochMetrics>,
514}
515
516impl ConsensusOutputQuarantine {
517 pub(super) fn new(
518 highest_executed_checkpoint: CheckpointSequenceNumber,
519 authority_metrics: Arc<EpochMetrics>,
520 ) -> Self {
521 Self {
522 highest_executed_checkpoint,
523
524 output_queue: VecDeque::new(),
525 builder_checkpoint_summary: BTreeMap::new(),
526 builder_digest_to_checkpoint: HashMap::new(),
527 shared_object_next_versions: RefCountedHashMap::new(),
528 processed_consensus_messages: RefCountedHashMap::new(),
529 congestion_control_randomness_object_debts: RefCountedHashMap::new(),
530 congestion_control_object_debts: RefCountedHashMap::new(),
531 metrics: authority_metrics,
532 }
533 }
534}
535
536impl ConsensusOutputQuarantine {
539 pub(crate) fn push_consensus_output(
541 &mut self,
542 output: ConsensusCommitOutput,
543 epoch_store: &AuthorityPerEpochStore,
544 ) -> SuiResult {
545 self.insert_shared_object_next_versions(&output);
546 self.insert_congestion_control_debts(&output);
547 self.insert_processed_consensus_messages(&output);
548 self.output_queue.push_back(output);
549
550 self.metrics
551 .consensus_quarantine_queue_size
552 .set(self.output_queue.len() as i64);
553
554 self.commit(epoch_store)
557 }
558
559 pub(super) fn insert_builder_summary(
561 &mut self,
562 sequence_number: CheckpointSequenceNumber,
563 summary: BuilderCheckpointSummary,
564 contents: CheckpointContents,
565 ) {
566 debug!(?sequence_number, "inserting builder summary {:?}", summary);
567 for tx in contents.iter() {
568 self.builder_digest_to_checkpoint
569 .insert(tx.transaction, sequence_number);
570 }
571 self.builder_checkpoint_summary
572 .insert(sequence_number, (summary, contents));
573 }
574}
575
576impl ConsensusOutputQuarantine {
578 pub(super) fn update_highest_executed_checkpoint(
581 &mut self,
582 checkpoint: CheckpointSequenceNumber,
583 epoch_store: &AuthorityPerEpochStore,
584 batch: &mut DBBatch,
585 ) -> SuiResult {
586 self.highest_executed_checkpoint = checkpoint;
587 self.commit_with_batch(epoch_store, batch)
588 }
589
590 pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
591 let mut batch = epoch_store.db_batch()?;
592 self.commit_with_batch(epoch_store, &mut batch)?;
593 batch.write()?;
594 Ok(())
595 }
596
597 fn commit_with_batch(
599 &mut self,
600 epoch_store: &AuthorityPerEpochStore,
601 batch: &mut DBBatch,
602 ) -> SuiResult {
603 let tables = epoch_store.tables()?;
610
611 let mut highest_committed_height = None;
612
613 while self
614 .builder_checkpoint_summary
615 .first_key_value()
616 .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
617 == Some(true)
618 {
619 let (seq, (builder_summary, contents)) =
620 self.builder_checkpoint_summary.pop_first().unwrap();
621
622 for tx in contents.iter() {
623 let digest = &tx.transaction;
624 assert_eq!(
625 self.builder_digest_to_checkpoint
626 .remove(digest)
627 .unwrap_or_else(|| {
628 panic!(
629 "transaction {:?} not found in builder_digest_to_checkpoint",
630 digest
631 )
632 }),
633 seq
634 );
635 }
636
637 batch.insert_batch(
638 &tables.builder_digest_to_checkpoint,
639 contents.iter().map(|tx| (tx.transaction, seq)),
640 )?;
641
642 batch.insert_batch(
643 &tables.builder_checkpoint_summary_v2,
644 [(seq, &builder_summary)],
645 )?;
646
647 let checkpoint_height = builder_summary
648 .checkpoint_height
649 .expect("non-genesis checkpoint must have height");
650 if let Some(highest) = highest_committed_height {
651 assert!(
652 checkpoint_height >= highest,
653 "current checkpoint height {} must be no less than highest committed height {}",
654 checkpoint_height,
655 highest
656 );
657 }
658
659 highest_committed_height = Some(checkpoint_height);
660 }
661
662 let Some(highest_committed_height) = highest_committed_height else {
663 return Ok(());
664 };
665
666 while !self.output_queue.is_empty() {
667 let Some(highest_in_commit) = self
671 .output_queue
672 .front()
673 .unwrap()
674 .get_highest_pending_checkpoint_height()
675 else {
676 break;
679 };
680
681 if highest_in_commit <= highest_committed_height {
682 info!(
683 "committing output with highest pending checkpoint height {:?}",
684 highest_in_commit
685 );
686 let output = self.output_queue.pop_front().unwrap();
687 self.remove_shared_object_next_versions(&output);
688 self.remove_processed_consensus_messages(&output);
689 self.remove_congestion_control_debts(&output);
690
691 output.write_to_batch(epoch_store, batch)?;
692 } else {
693 break;
694 }
695 }
696
697 self.metrics
698 .consensus_quarantine_queue_size
699 .set(self.output_queue.len() as i64);
700
701 Ok(())
702 }
703}
704
705impl ConsensusOutputQuarantine {
706 fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
707 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
708 for (object_id, next_version) in next_versions {
709 self.shared_object_next_versions
710 .insert(*object_id, *next_version);
711 }
712 }
713 }
714
715 fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
716 let current_round = output.consensus_round;
717
718 for (object_id, debt) in output.congestion_control_object_debts.iter() {
719 self.congestion_control_object_debts.insert(
720 *object_id,
721 CongestionPerObjectDebt::new(current_round, *debt),
722 );
723 }
724
725 for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
726 self.congestion_control_randomness_object_debts.insert(
727 *object_id,
728 CongestionPerObjectDebt::new(current_round, *debt),
729 );
730 }
731 }
732
733 fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
734 for (object_id, _) in output.congestion_control_object_debts.iter() {
735 self.congestion_control_object_debts.remove(object_id);
736 }
737 for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
738 self.congestion_control_randomness_object_debts
739 .remove(object_id);
740 }
741 }
742
743 fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
744 for tx_key in output.consensus_messages_processed.iter() {
745 self.processed_consensus_messages.insert(tx_key.clone(), ());
746 }
747 }
748
749 fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
750 for tx_key in output.consensus_messages_processed.iter() {
751 self.processed_consensus_messages.remove(tx_key);
752 }
753 }
754
755 fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
756 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
757 for object_id in next_versions.keys() {
758 if !self.shared_object_next_versions.remove(object_id) {
759 fatal!(
760 "Shared object next version not found in quarantine: {:?}",
761 object_id
762 );
763 }
764 }
765 }
766 }
767}
768
769impl ConsensusOutputQuarantine {
772 pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
773 self.builder_checkpoint_summary
774 .values()
775 .last()
776 .map(|(summary, _)| summary)
777 }
778
779 pub(super) fn get_built_summary(
780 &self,
781 sequence: CheckpointSequenceNumber,
782 ) -> Option<&BuilderCheckpointSummary> {
783 self.builder_checkpoint_summary
784 .get(&sequence)
785 .map(|(summary, _)| summary)
786 }
787
788 pub(super) fn included_transaction_in_checkpoint(&self, digest: &TransactionDigest) -> bool {
789 self.builder_digest_to_checkpoint.contains_key(digest)
790 }
791
792 pub(super) fn is_consensus_message_processed(
793 &self,
794 key: &SequencedConsensusTransactionKey,
795 ) -> bool {
796 self.processed_consensus_messages.contains_key(key)
797 }
798
799 pub(super) fn is_empty(&self) -> bool {
800 self.output_queue.is_empty()
801 }
802
803 pub(super) fn get_next_shared_object_versions(
804 &self,
805 tables: &AuthorityEpochTables,
806 objects_to_init: &[ConsensusObjectSequenceKey],
807 ) -> SuiResult<Vec<Option<SequenceNumber>>> {
808 Ok(do_fallback_lookup(
809 objects_to_init,
810 |object_key| {
811 if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
812 CacheResult::Hit(Some(*next_version))
813 } else {
814 CacheResult::Miss
815 }
816 },
817 |object_keys| {
818 tables
819 .next_shared_object_versions_v2
820 .multi_get(object_keys)
821 .expect("db error")
822 },
823 ))
824 }
825
826 pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
827 self.output_queue
828 .back()
829 .and_then(|output| output.get_highest_pending_checkpoint_height())
830 }
831
832 pub(super) fn get_pending_checkpoints(
833 &self,
834 last: Option<CheckpointHeight>,
835 ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
836 let mut checkpoints = Vec::new();
837 for output in &self.output_queue {
838 checkpoints.extend(
839 output
840 .get_pending_checkpoints(last)
841 .map(|cp| (cp.height(), cp.clone())),
842 );
843 }
844 if cfg!(debug_assertions) {
845 let mut prev = None;
846 for (height, _) in &checkpoints {
847 if let Some(prev) = prev {
848 assert!(prev < *height);
849 }
850 prev = Some(*height);
851 }
852 }
853 checkpoints
854 }
855
856 pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
857 self.output_queue
858 .iter()
859 .any(|output| output.pending_checkpoint_exists(index))
860 }
861
862 pub(super) fn get_new_jwks(
863 &self,
864 epoch_store: &AuthorityPerEpochStore,
865 round: u64,
866 ) -> SuiResult<Vec<ActiveJwk>> {
867 let epoch = epoch_store.epoch();
868
869 for output in self.output_queue.iter().rev() {
871 let output_round = output.get_round().unwrap();
874 if round == output_round {
875 return Ok(output
876 .active_jwks
877 .iter()
878 .map(|(_, (jwk_id, jwk))| ActiveJwk {
879 jwk_id: jwk_id.clone(),
880 jwk: jwk.clone(),
881 epoch,
882 })
883 .collect());
884 }
885 }
886
887 let empty_jwk_id = JwkId::new(String::new(), String::new());
889 let empty_jwk = JWK {
890 kty: String::new(),
891 e: String::new(),
892 n: String::new(),
893 alg: String::new(),
894 };
895
896 let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
897 let end = (round + 1, (empty_jwk_id, empty_jwk));
898
899 Ok(epoch_store
900 .tables()?
901 .active_jwks
902 .safe_iter_with_bounds(Some(start), Some(end))
903 .map_ok(|((r, (jwk_id, jwk)), _)| {
904 debug_assert!(round == r);
905 ActiveJwk { jwk_id, jwk, epoch }
906 })
907 .collect::<Result<Vec<_>, _>>()?)
908 }
909
910 pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
911 self.output_queue
912 .iter()
913 .rev()
914 .filter_map(|output| output.get_randomness_last_round_timestamp())
915 .next()
916 }
917
918 pub(super) fn load_initial_object_debts(
919 &self,
920 epoch_store: &AuthorityPerEpochStore,
921 current_round: Round,
922 for_randomness: bool,
923 transactions: &[VerifiedSequencedConsensusTransaction],
924 ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
925 let protocol_config = epoch_store.protocol_config();
926 let tables = epoch_store.tables()?;
927 let default_per_commit_budget = protocol_config
928 .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
929 .unwrap_or(0);
930 let (hash_table, db_table, per_commit_budget) = if for_randomness {
931 (
932 &self.congestion_control_randomness_object_debts,
933 &tables.congestion_control_randomness_object_debts,
934 protocol_config
935 .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
936 .unwrap_or(default_per_commit_budget),
937 )
938 } else {
939 (
940 &self.congestion_control_object_debts,
941 &tables.congestion_control_object_debts,
942 default_per_commit_budget,
943 )
944 };
945 let mut shared_input_object_ids: Vec<_> = transactions
946 .iter()
947 .filter_map(|tx| {
948 match &tx.0.transaction {
949 SequencedConsensusTransactionKind::External(ConsensusTransaction {
950 kind: ConsensusTransactionKind::CertifiedTransaction(tx),
951 ..
952 }) => Some(itertools::Either::Left(
953 tx.shared_input_objects().map(|obj| obj.id),
954 )),
955 SequencedConsensusTransactionKind::External(ConsensusTransaction {
956 kind: ConsensusTransactionKind::UserTransaction(tx),
957 ..
958 }) if protocol_config.use_mfp_txns_in_load_initial_object_debts() => Some(
960 itertools::Either::Right(tx.shared_input_objects().map(|obj| obj.id)),
961 ),
962 _ => None,
963 }
964 })
965 .flatten()
966 .collect();
967 shared_input_object_ids.sort();
968 shared_input_object_ids.dedup();
969
970 let results = do_fallback_lookup(
971 &shared_input_object_ids,
972 |object_id| {
973 if let Some(debt) = hash_table.get(object_id) {
974 CacheResult::Hit(Some(debt.into_v1()))
975 } else {
976 CacheResult::Miss
977 }
978 },
979 |object_ids| {
980 db_table
981 .multi_get(object_ids)
982 .expect("db error")
983 .into_iter()
984 .map(|debt| debt.map(|debt| debt.into_v1()))
985 .collect()
986 },
987 );
988
989 Ok(results
990 .into_iter()
991 .zip(shared_input_object_ids)
992 .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
993 .map(move |((round, debt), object_id)| {
994 assert!(current_round > round);
998 let num_rounds = current_round - round - 1;
999 let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1000 (object_id, debt)
1001 }))
1002 }
1003
1004 pub(crate) fn load_initial_object_debts_v2(
1006 &self,
1007 epoch_store: &AuthorityPerEpochStore,
1008 current_round: Round,
1009 for_randomness: bool,
1010 transactions: &[VerifiedExecutableTransaction],
1011 ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
1012 let protocol_config = epoch_store.protocol_config();
1013 let tables = epoch_store.tables()?;
1014 let default_per_commit_budget = protocol_config
1015 .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
1016 .unwrap_or(0);
1017 let (hash_table, db_table, per_commit_budget) = if for_randomness {
1018 (
1019 &self.congestion_control_randomness_object_debts,
1020 &tables.congestion_control_randomness_object_debts,
1021 protocol_config
1022 .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
1023 .unwrap_or(default_per_commit_budget),
1024 )
1025 } else {
1026 (
1027 &self.congestion_control_object_debts,
1028 &tables.congestion_control_object_debts,
1029 default_per_commit_budget,
1030 )
1031 };
1032 let mut shared_input_object_ids: Vec<_> = transactions
1033 .iter()
1034 .flat_map(|tx| tx.shared_input_objects().map(|obj| obj.id))
1035 .collect();
1036 shared_input_object_ids.sort();
1037 shared_input_object_ids.dedup();
1038
1039 let results = do_fallback_lookup(
1040 &shared_input_object_ids,
1041 |object_id| {
1042 if let Some(debt) = hash_table.get(object_id) {
1043 CacheResult::Hit(Some(debt.into_v1()))
1044 } else {
1045 CacheResult::Miss
1046 }
1047 },
1048 |object_ids| {
1049 db_table
1050 .multi_get(object_ids)
1051 .expect("db error")
1052 .into_iter()
1053 .map(|debt| debt.map(|debt| debt.into_v1()))
1054 .collect()
1055 },
1056 );
1057
1058 Ok(results
1059 .into_iter()
1060 .zip(shared_input_object_ids)
1061 .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1062 .map(move |((round, debt), object_id)| {
1063 assert!(current_round > round);
1067 let num_rounds = current_round - round - 1;
1068 let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1069 (object_id, debt)
1070 }))
1071 }
1072}
1073
1074#[derive(Debug, Default)]
1083struct RefCountedHashMap<K, V> {
1084 map: HashMap<K, (usize, V)>,
1085}
1086
1087impl<K, V> RefCountedHashMap<K, V>
1088where
1089 K: Clone + Eq + std::hash::Hash,
1090{
1091 pub fn new() -> Self {
1092 Self {
1093 map: HashMap::new(),
1094 }
1095 }
1096
1097 pub fn insert(&mut self, key: K, value: V) {
1098 let entry = self.map.entry(key);
1099 match entry {
1100 hash_map::Entry::Occupied(mut entry) => {
1101 let (ref_count, v) = entry.get_mut();
1102 *ref_count += 1;
1103 *v = value;
1104 }
1105 hash_map::Entry::Vacant(entry) => {
1106 entry.insert((1, value));
1107 }
1108 }
1109 }
1110
1111 pub fn remove(&mut self, key: &K) -> bool {
1114 let entry = self.map.entry(key.clone());
1115 match entry {
1116 hash_map::Entry::Occupied(mut entry) => {
1117 let (ref_count, _) = entry.get_mut();
1118 *ref_count -= 1;
1119 if *ref_count == 0 {
1120 entry.remove();
1121 }
1122 true
1123 }
1124 hash_map::Entry::Vacant(_) => false,
1125 }
1126 }
1127
1128 pub fn get(&self, key: &K) -> Option<&V> {
1129 self.map.get(key).map(|(_, v)| v)
1130 }
1131
1132 pub fn contains_key(&self, key: &K) -> bool {
1133 self.map.contains_key(key)
1134 }
1135}