1use crate::authority::authority_per_epoch_store::{
5 AuthorityEpochTables, EncG, ExecutionIndicesWithStatsV2, LockDetails, LockDetailsWrapper, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::epoch::randomness::SINGLETON_KEY;
10use dashmap::DashMap;
11use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
12use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
13use moka::policy::EvictionPolicy;
14use moka::sync::SegmentedCache as MokaCache;
15use mysten_common::ZipDebugEqIteratorExt;
16use mysten_common::fatal;
17use mysten_common::random_util::randomize_cache_capacity_in_tests;
18use parking_lot::Mutex;
19use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
20use sui_types::authenticator_state::ActiveJwk;
21use sui_types::base_types::{AuthorityName, ObjectRef, SequenceNumber};
22use sui_types::crypto::RandomnessRound;
23use sui_types::error::SuiResult;
24use sui_types::executable_transaction::{
25 TrustedExecutableTransactionWithAliases, VerifiedExecutableTransactionWithAliases,
26};
27use sui_types::execution::ExecutionTimeObservationKey;
28use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
29use sui_types::messages_consensus::AuthorityIndex;
30use sui_types::{
31 base_types::{ConsensusObjectSequenceKey, ObjectID},
32 digests::TransactionDigest,
33 messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
34 signature::GenericSignature,
35};
36use tracing::debug;
37use typed_store::Map;
38use typed_store::rocks::DBBatch;
39
40use crate::{
41 authority::{
42 authority_per_epoch_store::AuthorityPerEpochStore,
43 shared_object_congestion_tracker::CongestionPerObjectDebt,
44 },
45 checkpoints::{CheckpointHeight, PendingCheckpoint},
46 consensus_handler::SequencedConsensusTransactionKey,
47 epoch::{
48 randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
49 reconfiguration::ReconfigState,
50 },
51};
52
53use super::*;
54
55#[derive(Default)]
56#[allow(clippy::type_complexity)]
57pub(crate) struct ConsensusCommitOutput {
58 consensus_round: Round,
60 consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
61 end_of_publish: BTreeSet<AuthorityName>,
62 reconfig_state: Option<ReconfigState>,
63 consensus_commit_stats: Option<ExecutionIndicesWithStatsV2>,
64
65 next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
67
68 deferred_txns: Vec<(DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>)>,
69 deleted_deferred_txns: BTreeSet<DeferralKey>,
70
71 pending_checkpoints: Vec<PendingCheckpoint>,
73
74 next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
76
77 dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
78 dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
79 dkg_used_message: Option<VersionedUsedProcessedMessages>,
80 dkg_output: Option<Option<dkg_v1::Output<PkG, EncG>>>,
81
82 pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
84 active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
85
86 congestion_control_object_debts: Vec<(ObjectID, u64)>,
88 congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
89 execution_time_observations: Vec<(
90 AuthorityIndex,
91 u64, Vec<(ExecutionTimeObservationKey, Duration)>,
93 )>,
94
95 owned_object_locks: HashMap<ObjectRef, LockDetails>,
97
98 checkpoint_queue_drained: bool,
101}
102
103impl ConsensusCommitOutput {
104 pub fn new(consensus_round: Round) -> Self {
105 Self {
106 consensus_round,
107 ..Default::default()
108 }
109 }
110
111 pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
112 self.deleted_deferred_txns.iter().cloned()
113 }
114
115 pub fn has_deferred_transactions(&self) -> bool {
116 !self.deferred_txns.is_empty()
117 }
118
119 fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
120 self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
121 }
122
123 fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
124 self.pending_checkpoints.last().map(|cp| cp.height())
125 }
126
127 fn get_pending_checkpoints(
128 &self,
129 last: Option<CheckpointHeight>,
130 ) -> impl Iterator<Item = &PendingCheckpoint> {
131 self.pending_checkpoints.iter().filter(move |cp| {
132 if let Some(last) = last {
133 cp.height() > last
134 } else {
135 true
136 }
137 })
138 }
139
140 fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
141 self.pending_checkpoints
142 .iter()
143 .any(|cp| cp.height() == *index)
144 }
145
146 fn get_round(&self) -> Option<u64> {
147 self.consensus_commit_stats
148 .as_ref()
149 .map(|stats| stats.index.last_committed_round)
150 }
151
152 pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
153 self.end_of_publish.insert(authority);
154 }
155
156 pub fn insert_execution_time_observation(
157 &mut self,
158 source: AuthorityIndex,
159 generation: u64,
160 estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
161 ) {
162 self.execution_time_observations
163 .push((source, generation, estimates));
164 }
165
166 pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStatsV2) {
167 self.consensus_commit_stats = Some(stats);
168 }
169
170 pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
172 self.record_consensus_commit_stats(Default::default());
173 }
174
175 pub fn store_reconfig_state(&mut self, state: ReconfigState) {
176 self.reconfig_state = Some(state);
177 }
178
179 pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
180 self.consensus_messages_processed.insert(key);
181 }
182
183 pub fn get_consensus_messages_processed(
184 &self,
185 ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
186 self.consensus_messages_processed.iter()
187 }
188
189 pub fn set_next_shared_object_versions(
190 &mut self,
191 next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
192 ) {
193 assert!(self.next_shared_object_versions.is_none());
194 self.next_shared_object_versions = Some(next_versions);
195 }
196
197 pub fn defer_transactions(
198 &mut self,
199 key: DeferralKey,
200 transactions: Vec<VerifiedExecutableTransactionWithAliases>,
201 ) {
202 self.deferred_txns.push((key, transactions));
203 }
204
205 pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
206 self.deleted_deferred_txns
207 .extend(deferral_keys.iter().cloned());
208 }
209
210 pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
211 self.pending_checkpoints.push(checkpoint);
212 }
213
214 pub fn reserve_next_randomness_round(
215 &mut self,
216 next_randomness_round: RandomnessRound,
217 commit_timestamp: TimestampMs,
218 ) {
219 assert!(self.next_randomness_round.is_none());
220 self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
221 }
222
223 pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
224 self.dkg_confirmations.insert(conf.sender(), conf);
225 }
226
227 pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
228 self.dkg_processed_messages
229 .insert(message.sender(), message);
230 }
231
232 pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
233 self.dkg_used_message = Some(used_messages);
234 }
235
236 pub fn set_dkg_output(&mut self, output: Option<dkg_v1::Output<PkG, EncG>>) {
237 self.dkg_output = Some(output);
238 }
239
240 pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
241 self.pending_jwks.insert((authority, id, jwk));
242 }
243
244 pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
245 self.active_jwks.insert((round, key));
246 }
247
248 pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
249 self.congestion_control_object_debts = object_debts;
250 }
251
252 pub fn set_congestion_control_randomness_object_debts(
253 &mut self,
254 object_debts: Vec<(ObjectID, u64)>,
255 ) {
256 self.congestion_control_randomness_object_debts = object_debts;
257 }
258
259 pub fn set_checkpoint_queue_drained(&mut self, drained: bool) {
260 self.checkpoint_queue_drained = drained;
261 }
262
263 pub fn set_owned_object_locks(&mut self, locks: HashMap<ObjectRef, LockDetails>) {
264 assert!(self.owned_object_locks.is_empty());
265 self.owned_object_locks = locks;
266 }
267
268 pub fn write_to_batch(
269 self,
270 epoch_store: &AuthorityPerEpochStore,
271 batch: &mut DBBatch,
272 ) -> SuiResult {
273 let tables = epoch_store.tables()?;
274 batch.insert_batch(
275 &tables.consensus_message_processed,
276 self.consensus_messages_processed
277 .iter()
278 .map(|key| (key, true)),
279 )?;
280
281 batch.insert_batch(
282 &tables.end_of_publish,
283 self.end_of_publish.iter().map(|authority| (authority, ())),
284 )?;
285
286 if let Some(reconfig_state) = &self.reconfig_state {
287 batch.insert_batch(
288 &tables.reconfig_state,
289 [(RECONFIG_STATE_INDEX, reconfig_state)],
290 )?;
291 }
292
293 let consensus_commit_stats = self
294 .consensus_commit_stats
295 .expect("consensus_commit_stats must be set");
296 let round = consensus_commit_stats.index.last_committed_round;
297
298 batch.insert_batch(
299 &tables.last_consensus_stats_v2,
300 [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
301 )?;
302
303 if let Some(next_versions) = self.next_shared_object_versions {
304 batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
305 }
306
307 if !self.owned_object_locks.is_empty() {
308 batch.insert_batch(
309 &tables.owned_object_locked_transactions,
310 self.owned_object_locks
311 .into_iter()
312 .map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))),
313 )?;
314 }
315
316 batch.delete_batch(
317 &tables.deferred_transactions_v2,
318 &self.deleted_deferred_txns,
319 )?;
320 batch.delete_batch(
321 &tables.deferred_transactions_with_aliases_v2,
322 &self.deleted_deferred_txns,
323 )?;
324 batch.delete_batch(
325 &tables.deferred_transactions_with_aliases_v3,
326 &self.deleted_deferred_txns,
327 )?;
328
329 batch.insert_batch(
330 &tables.deferred_transactions_with_aliases_v3,
331 self.deferred_txns.into_iter().map(|(key, txs)| {
332 (
333 key,
334 txs.into_iter()
335 .map(|tx| {
336 let tx: TrustedExecutableTransactionWithAliases = tx.serializable();
337 tx
338 })
339 .collect::<Vec<_>>(),
340 )
341 }),
342 )?;
343
344 if let Some((round, commit_timestamp)) = self.next_randomness_round {
345 batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
346 batch.insert_batch(
347 &tables.randomness_last_round_timestamp,
348 [(SINGLETON_KEY, commit_timestamp)],
349 )?;
350 }
351
352 batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
353 batch.insert_batch(
354 &tables.dkg_processed_messages_v2,
355 self.dkg_processed_messages,
356 )?;
357 batch.insert_batch(
358 &tables.dkg_used_messages_v2,
359 self.dkg_used_message
361 .into_iter()
362 .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
363 )?;
364 if let Some(output) = self.dkg_output {
365 batch.insert_batch(&tables.dkg_output_v2, [(SINGLETON_KEY, output)])?;
366 }
367
368 batch.insert_batch(
369 &tables.pending_jwks,
370 self.pending_jwks.into_iter().map(|j| (j, ())),
371 )?;
372 batch.insert_batch(
373 &tables.active_jwks,
374 self.active_jwks.into_iter().map(|j| {
375 assert_eq!(j.0, round);
377 (j, ())
378 }),
379 )?;
380
381 batch.insert_batch(
382 &tables.congestion_control_object_debts,
383 self.congestion_control_object_debts
384 .into_iter()
385 .map(|(object_id, debt)| {
386 (
387 object_id,
388 CongestionPerObjectDebt::new(self.consensus_round, debt),
389 )
390 }),
391 )?;
392 batch.insert_batch(
393 &tables.congestion_control_randomness_object_debts,
394 self.congestion_control_randomness_object_debts
395 .into_iter()
396 .map(|(object_id, debt)| {
397 (
398 object_id,
399 CongestionPerObjectDebt::new(self.consensus_round, debt),
400 )
401 }),
402 )?;
403
404 batch.insert_batch(
405 &tables.execution_time_observations,
406 self.execution_time_observations
407 .into_iter()
408 .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
409 )?;
410
411 Ok(())
412 }
413}
414
415pub(crate) struct ConsensusOutputCache {
420 pub(crate) deferred_transactions:
423 Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>>,
424
425 #[allow(clippy::type_complexity)]
428 pub(crate) user_signatures_for_checkpoints:
429 Mutex<HashMap<TransactionDigest, Vec<(GenericSignature, Option<SequenceNumber>)>>>,
430
431 executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
432 executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
433}
434
435impl ConsensusOutputCache {
436 pub(crate) fn new(tables: &AuthorityEpochTables) -> Self {
437 let deferred_transactions = tables
438 .get_all_deferred_transactions_v2()
439 .expect("load deferred transactions cannot fail");
440
441 let executed_in_epoch_cache_capacity = 50_000;
442
443 Self {
444 deferred_transactions: Mutex::new(deferred_transactions),
445 user_signatures_for_checkpoints: Default::default(),
446 executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
447 executed_in_epoch_cache: MokaCache::builder(8)
448 .max_capacity(randomize_cache_capacity_in_tests(
450 executed_in_epoch_cache_capacity,
451 ))
452 .eviction_policy(EvictionPolicy::lru())
453 .build(),
454 }
455 }
456
457 pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
458 self.executed_in_epoch
459 .read()
460 .contains_key(digest) ||
461 self.executed_in_epoch_cache.get(digest).is_some()
463 }
464
465 pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
467 assert!(
468 self.executed_in_epoch
469 .read()
470 .insert(tx_digest, ())
471 .is_none(),
472 "transaction already executed"
473 );
474 self.executed_in_epoch_cache.insert(tx_digest, ());
475 }
476
477 pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
481 let executed_in_epoch = self.executed_in_epoch.read();
482 for tx_digest in tx_digests {
483 executed_in_epoch.remove(tx_digest);
484 }
485 }
486}
487
488pub(crate) struct ConsensusOutputQuarantine {
491 output_queue: VecDeque<ConsensusCommitOutput>,
493
494 highest_executed_checkpoint: CheckpointSequenceNumber,
496
497 builder_checkpoint_summary:
499 BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
500
501 builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
502
503 shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
505
506 congestion_control_randomness_object_debts:
509 RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
510 congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
511
512 processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
513
514 owned_object_locks: HashMap<ObjectRef, LockDetails>,
516
517 metrics: Arc<EpochMetrics>,
518}
519
520impl ConsensusOutputQuarantine {
521 pub(super) fn new(
522 highest_executed_checkpoint: CheckpointSequenceNumber,
523 authority_metrics: Arc<EpochMetrics>,
524 ) -> Self {
525 Self {
526 highest_executed_checkpoint,
527
528 output_queue: VecDeque::new(),
529 builder_checkpoint_summary: BTreeMap::new(),
530 builder_digest_to_checkpoint: HashMap::new(),
531 shared_object_next_versions: RefCountedHashMap::new(),
532 processed_consensus_messages: RefCountedHashMap::new(),
533 congestion_control_randomness_object_debts: RefCountedHashMap::new(),
534 congestion_control_object_debts: RefCountedHashMap::new(),
535 owned_object_locks: HashMap::new(),
536 metrics: authority_metrics,
537 }
538 }
539}
540
541impl ConsensusOutputQuarantine {
544 pub(crate) fn push_consensus_output(
546 &mut self,
547 output: ConsensusCommitOutput,
548 epoch_store: &AuthorityPerEpochStore,
549 ) -> SuiResult {
550 self.insert_shared_object_next_versions(&output);
551 self.insert_congestion_control_debts(&output);
552 self.insert_processed_consensus_messages(&output);
553 self.insert_owned_object_locks(&output);
554 self.output_queue.push_back(output);
555
556 self.metrics
557 .consensus_quarantine_queue_size
558 .set(self.output_queue.len() as i64);
559
560 self.commit(epoch_store)
563 }
564
565 pub(super) fn insert_builder_summary(
567 &mut self,
568 sequence_number: CheckpointSequenceNumber,
569 summary: BuilderCheckpointSummary,
570 contents: CheckpointContents,
571 ) {
572 debug!(?sequence_number, "inserting builder summary {:?}", summary);
573 for tx in contents.iter() {
574 self.builder_digest_to_checkpoint
575 .insert(tx.transaction, sequence_number);
576 }
577 self.builder_checkpoint_summary
578 .insert(sequence_number, (summary, contents));
579 }
580}
581
582impl ConsensusOutputQuarantine {
584 pub(super) fn update_highest_executed_checkpoint(
587 &mut self,
588 checkpoint: CheckpointSequenceNumber,
589 epoch_store: &AuthorityPerEpochStore,
590 batch: &mut DBBatch,
591 ) -> SuiResult {
592 self.highest_executed_checkpoint = checkpoint;
593 self.commit_with_batch(epoch_store, batch)
594 }
595
596 pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
597 let mut batch = epoch_store.db_batch()?;
598 self.commit_with_batch(epoch_store, &mut batch)?;
599 batch.write()?;
600 Ok(())
601 }
602
603 fn commit_with_batch(
605 &mut self,
606 epoch_store: &AuthorityPerEpochStore,
607 batch: &mut DBBatch,
608 ) -> SuiResult {
609 let tables = epoch_store.tables()?;
616
617 let mut highest_committed_height = None;
618
619 while self
620 .builder_checkpoint_summary
621 .first_key_value()
622 .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
623 == Some(true)
624 {
625 let (seq, (builder_summary, contents)) =
626 self.builder_checkpoint_summary.pop_first().unwrap();
627
628 for tx in contents.iter() {
629 let digest = &tx.transaction;
630 assert_eq!(
631 self.builder_digest_to_checkpoint
632 .remove(digest)
633 .unwrap_or_else(|| {
634 panic!(
635 "transaction {:?} not found in builder_digest_to_checkpoint",
636 digest
637 )
638 }),
639 seq
640 );
641 }
642
643 batch.insert_batch(
644 &tables.builder_digest_to_checkpoint,
645 contents.iter().map(|tx| (tx.transaction, seq)),
646 )?;
647
648 batch.insert_batch(
649 &tables.builder_checkpoint_summary_v2,
650 [(seq, &builder_summary)],
651 )?;
652
653 let checkpoint_height = builder_summary
654 .checkpoint_height
655 .expect("non-genesis checkpoint must have height");
656 if let Some(highest) = highest_committed_height {
657 assert!(
658 checkpoint_height >= highest,
659 "current checkpoint height {} must be no less than highest committed height {}",
660 checkpoint_height,
661 highest
662 );
663 }
664
665 highest_committed_height = Some(checkpoint_height);
666 }
667
668 let Some(highest_committed_height) = highest_committed_height else {
669 return Ok(());
670 };
671
672 let mut last_drain_idx = None;
678 for (i, output) in self.output_queue.iter().enumerate() {
679 let stats = output
680 .consensus_commit_stats
681 .as_ref()
682 .expect("consensus_commit_stats must be set");
683 if stats.height > highest_committed_height {
684 break;
685 }
686 if output.checkpoint_queue_drained {
687 last_drain_idx = Some(i);
688 }
689 }
690 if let Some(idx) = last_drain_idx {
691 for _ in 0..=idx {
692 let output = self.output_queue.pop_front().unwrap();
693 self.remove_shared_object_next_versions(&output);
694 self.remove_processed_consensus_messages(&output);
695 self.remove_congestion_control_debts(&output);
696 self.remove_owned_object_locks(&output);
697 output.write_to_batch(epoch_store, batch)?;
698 }
699 }
700
701 self.metrics
702 .consensus_quarantine_queue_size
703 .set(self.output_queue.len() as i64);
704
705 Ok(())
706 }
707}
708
709impl ConsensusOutputQuarantine {
710 fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
711 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
712 for (object_id, next_version) in next_versions {
713 self.shared_object_next_versions
714 .insert(*object_id, *next_version);
715 }
716 }
717 }
718
719 fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
720 let current_round = output.consensus_round;
721
722 for (object_id, debt) in output.congestion_control_object_debts.iter() {
723 self.congestion_control_object_debts.insert(
724 *object_id,
725 CongestionPerObjectDebt::new(current_round, *debt),
726 );
727 }
728
729 for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
730 self.congestion_control_randomness_object_debts.insert(
731 *object_id,
732 CongestionPerObjectDebt::new(current_round, *debt),
733 );
734 }
735 }
736
737 fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
738 for (object_id, _) in output.congestion_control_object_debts.iter() {
739 self.congestion_control_object_debts.remove(object_id);
740 }
741 for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
742 self.congestion_control_randomness_object_debts
743 .remove(object_id);
744 }
745 }
746
747 fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
748 for tx_key in output.consensus_messages_processed.iter() {
749 self.processed_consensus_messages.insert(tx_key.clone(), ());
750 }
751 }
752
753 fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
754 for tx_key in output.consensus_messages_processed.iter() {
755 self.processed_consensus_messages.remove(tx_key);
756 }
757 }
758
759 fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
760 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
761 for object_id in next_versions.keys() {
762 if !self.shared_object_next_versions.remove(object_id) {
763 fatal!(
764 "Shared object next version not found in quarantine: {:?}",
765 object_id
766 );
767 }
768 }
769 }
770 }
771
772 fn insert_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
773 for (obj_ref, lock) in &output.owned_object_locks {
774 self.owned_object_locks.insert(*obj_ref, *lock);
775 }
776 }
777
778 fn remove_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
779 for obj_ref in output.owned_object_locks.keys() {
780 self.owned_object_locks.remove(obj_ref);
781 }
782 }
783}
784
785impl ConsensusOutputQuarantine {
788 pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
789 self.builder_checkpoint_summary
790 .values()
791 .last()
792 .map(|(summary, _)| summary)
793 }
794
795 pub(super) fn get_built_summary(
796 &self,
797 sequence: CheckpointSequenceNumber,
798 ) -> Option<&BuilderCheckpointSummary> {
799 self.builder_checkpoint_summary
800 .get(&sequence)
801 .map(|(summary, _)| summary)
802 }
803
804 pub(super) fn is_consensus_message_processed(
805 &self,
806 key: &SequencedConsensusTransactionKey,
807 ) -> bool {
808 self.processed_consensus_messages.contains_key(key)
809 }
810
811 pub(super) fn is_empty(&self) -> bool {
812 self.output_queue.is_empty()
813 }
814
815 pub(super) fn get_next_shared_object_versions(
816 &self,
817 tables: &AuthorityEpochTables,
818 objects_to_init: &[ConsensusObjectSequenceKey],
819 ) -> SuiResult<Vec<Option<SequenceNumber>>> {
820 Ok(do_fallback_lookup(
821 objects_to_init,
822 |object_key| {
823 if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
824 CacheResult::Hit(Some(*next_version))
825 } else {
826 CacheResult::Miss
827 }
828 },
829 |object_keys| {
830 tables
831 .next_shared_object_versions_v2
832 .multi_get(object_keys)
833 .expect("db error")
834 },
835 ))
836 }
837
838 pub(super) fn get_owned_object_locks(
841 &self,
842 tables: &AuthorityEpochTables,
843 obj_refs: &[ObjectRef],
844 ) -> SuiResult<Vec<Option<LockDetails>>> {
845 Ok(do_fallback_lookup(
846 obj_refs,
847 |obj_ref| {
848 if let Some(lock) = self.owned_object_locks.get(obj_ref) {
849 CacheResult::Hit(Some(*lock))
850 } else {
851 CacheResult::Miss
852 }
853 },
854 |obj_refs| {
855 tables
856 .multi_get_locked_transactions(obj_refs)
857 .expect("db error")
858 },
859 ))
860 }
861
862 pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
863 self.output_queue
864 .back()
865 .and_then(|output| output.get_highest_pending_checkpoint_height())
866 }
867
868 pub(super) fn get_pending_checkpoints(
869 &self,
870 last: Option<CheckpointHeight>,
871 ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
872 let mut checkpoints = Vec::new();
873 for output in &self.output_queue {
874 checkpoints.extend(
875 output
876 .get_pending_checkpoints(last)
877 .map(|cp| (cp.height(), cp.clone())),
878 );
879 }
880 if cfg!(debug_assertions) {
881 let mut prev = None;
882 for (height, _) in &checkpoints {
883 if let Some(prev) = prev {
884 assert!(prev < *height);
885 }
886 prev = Some(*height);
887 }
888 }
889 checkpoints
890 }
891
892 pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
893 self.output_queue
894 .iter()
895 .any(|output| output.pending_checkpoint_exists(index))
896 }
897
898 pub(super) fn get_new_jwks(
899 &self,
900 epoch_store: &AuthorityPerEpochStore,
901 round: u64,
902 ) -> SuiResult<Vec<ActiveJwk>> {
903 let epoch = epoch_store.epoch();
904
905 for output in self.output_queue.iter().rev() {
907 let output_round = output.get_round().unwrap();
910 if round == output_round {
911 return Ok(output
912 .active_jwks
913 .iter()
914 .map(|(_, (jwk_id, jwk))| ActiveJwk {
915 jwk_id: jwk_id.clone(),
916 jwk: jwk.clone(),
917 epoch,
918 })
919 .collect());
920 }
921 }
922
923 let empty_jwk_id = JwkId::new(String::new(), String::new());
925 let empty_jwk = JWK {
926 kty: String::new(),
927 e: String::new(),
928 n: String::new(),
929 alg: String::new(),
930 };
931
932 let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
933 let end = (round + 1, (empty_jwk_id, empty_jwk));
934
935 Ok(epoch_store
936 .tables()?
937 .active_jwks
938 .safe_iter_with_bounds(Some(start), Some(end))
939 .map_ok(|((r, (jwk_id, jwk)), _)| {
940 debug_assert!(round == r);
941 ActiveJwk { jwk_id, jwk, epoch }
942 })
943 .collect::<Result<Vec<_>, _>>()?)
944 }
945
946 pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
947 self.output_queue
948 .iter()
949 .rev()
950 .filter_map(|output| output.get_randomness_last_round_timestamp())
951 .next()
952 }
953
954 pub(crate) fn load_initial_object_debts(
955 &self,
956 epoch_store: &AuthorityPerEpochStore,
957 current_round: Round,
958 for_randomness: bool,
959 transactions: &[VerifiedExecutableTransactionWithAliases],
960 ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
961 let protocol_config = epoch_store.protocol_config();
962 let tables = epoch_store.tables()?;
963 let default_per_commit_budget = protocol_config
964 .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
965 .unwrap_or(0);
966 let (hash_table, db_table, per_commit_budget) = if for_randomness {
967 (
968 &self.congestion_control_randomness_object_debts,
969 &tables.congestion_control_randomness_object_debts,
970 protocol_config
971 .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
972 .unwrap_or(default_per_commit_budget),
973 )
974 } else {
975 (
976 &self.congestion_control_object_debts,
977 &tables.congestion_control_object_debts,
978 default_per_commit_budget,
979 )
980 };
981 let mut shared_input_object_ids: Vec<_> = transactions
982 .iter()
983 .flat_map(|tx| tx.tx().shared_input_objects().map(|obj| obj.id))
984 .collect();
985 shared_input_object_ids.sort();
986 shared_input_object_ids.dedup();
987
988 let results = do_fallback_lookup(
989 &shared_input_object_ids,
990 |object_id| {
991 if let Some(debt) = hash_table.get(object_id) {
992 CacheResult::Hit(Some(debt.into_v1()))
993 } else {
994 CacheResult::Miss
995 }
996 },
997 |object_ids| {
998 db_table
999 .multi_get(object_ids)
1000 .expect("db error")
1001 .into_iter()
1002 .map(|debt| debt.map(|debt| debt.into_v1()))
1003 .collect()
1004 },
1005 );
1006
1007 Ok(results
1008 .into_iter()
1009 .zip_debug_eq(shared_input_object_ids)
1010 .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1011 .map(move |((round, debt), object_id)| {
1012 assert!(current_round > round);
1016 let num_rounds = current_round - round - 1;
1017 let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1018 (object_id, debt)
1019 }))
1020 }
1021}
1022
1023#[derive(Debug, Default)]
1032struct RefCountedHashMap<K, V> {
1033 map: HashMap<K, (usize, V)>,
1034}
1035
1036impl<K, V> RefCountedHashMap<K, V>
1037where
1038 K: Clone + Eq + std::hash::Hash,
1039{
1040 pub fn new() -> Self {
1041 Self {
1042 map: HashMap::new(),
1043 }
1044 }
1045
1046 pub fn insert(&mut self, key: K, value: V) {
1047 let entry = self.map.entry(key);
1048 match entry {
1049 hash_map::Entry::Occupied(mut entry) => {
1050 let (ref_count, v) = entry.get_mut();
1051 *ref_count += 1;
1052 *v = value;
1053 }
1054 hash_map::Entry::Vacant(entry) => {
1055 entry.insert((1, value));
1056 }
1057 }
1058 }
1059
1060 pub fn remove(&mut self, key: &K) -> bool {
1063 let entry = self.map.entry(key.clone());
1064 match entry {
1065 hash_map::Entry::Occupied(mut entry) => {
1066 let (ref_count, _) = entry.get_mut();
1067 *ref_count -= 1;
1068 if *ref_count == 0 {
1069 entry.remove();
1070 }
1071 true
1072 }
1073 hash_map::Entry::Vacant(_) => false,
1074 }
1075 }
1076
1077 pub fn get(&self, key: &K) -> Option<&V> {
1078 self.map.get(key).map(|(_, v)| v)
1079 }
1080
1081 pub fn contains_key(&self, key: &K) -> bool {
1082 self.map.contains_key(key)
1083 }
1084}
1085
1086#[cfg(test)]
1087impl ConsensusOutputQuarantine {
1088 fn output_queue_len_for_testing(&self) -> usize {
1089 self.output_queue.len()
1090 }
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095 use super::*;
1096 use crate::authority::test_authority_builder::TestAuthorityBuilder;
1097 use sui_types::base_types::ExecutionDigests;
1098 use sui_types::gas::GasCostSummary;
1099
1100 fn make_output(height: u64, round: u64, drained: bool) -> ConsensusCommitOutput {
1101 let mut output = ConsensusCommitOutput::new(round);
1102 output.record_consensus_commit_stats(ExecutionIndicesWithStatsV2 {
1103 height,
1104 ..Default::default()
1105 });
1106 output.set_checkpoint_queue_drained(drained);
1107 output
1108 }
1109
1110 fn make_builder_summary(
1111 seq: CheckpointSequenceNumber,
1112 height: CheckpointHeight,
1113 protocol_config: &ProtocolConfig,
1114 ) -> (BuilderCheckpointSummary, CheckpointContents) {
1115 let contents =
1116 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
1117 let summary = CheckpointSummary::new(
1118 protocol_config,
1119 0,
1120 seq,
1121 0,
1122 &contents,
1123 None,
1124 GasCostSummary::default(),
1125 None,
1126 0,
1127 vec![],
1128 vec![],
1129 );
1130 let builder_summary = BuilderCheckpointSummary {
1131 summary,
1132 checkpoint_height: Some(height),
1133 position_in_commit: 0,
1134 };
1135 (builder_summary, contents)
1136 }
1137
1138 #[tokio::test]
1139 async fn test_drain_boundary_prevents_premature_commit() {
1140 let state = TestAuthorityBuilder::new().build().await;
1141 let epoch_store = state.epoch_store_for_testing();
1142
1143 let metrics = epoch_store.metrics.clone();
1144 let mut quarantine = ConsensusOutputQuarantine::new(0, metrics);
1145
1146 let c = make_output(4, 1, false);
1148 quarantine.push_consensus_output(c, &epoch_store).unwrap();
1149
1150 let c2 = make_output(5, 2, true);
1152 quarantine.push_consensus_output(c2, &epoch_store).unwrap();
1153
1154 assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1155
1156 let pc = epoch_store.protocol_config();
1158 for seq in 1..=4 {
1159 let (summary, contents) = make_builder_summary(seq, seq, pc);
1160 quarantine.insert_builder_summary(seq, summary, contents);
1161 }
1162
1163 let mut batch = epoch_store.db_batch_for_test();
1165 quarantine
1166 .update_highest_executed_checkpoint(4, &epoch_store, &mut batch)
1167 .unwrap();
1168 batch.write().unwrap();
1169
1170 assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1174 }
1175
1176 #[tokio::test]
1177 async fn test_drain_boundary_commits_at_safe_point() {
1178 let state = TestAuthorityBuilder::new().build().await;
1179 let epoch_store = state.epoch_store_for_testing();
1180
1181 let metrics = epoch_store.metrics.clone();
1182 let mut quarantine = ConsensusOutputQuarantine::new(0, metrics);
1183
1184 let c = make_output(4, 1, false);
1185 quarantine.push_consensus_output(c, &epoch_store).unwrap();
1186
1187 let c2 = make_output(5, 2, true);
1188 quarantine.push_consensus_output(c2, &epoch_store).unwrap();
1189
1190 assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1191
1192 let pc = epoch_store.protocol_config();
1194 for seq in 1..=5 {
1195 let (summary, contents) = make_builder_summary(seq, seq, pc);
1196 quarantine.insert_builder_summary(seq, summary, contents);
1197 }
1198
1199 let mut batch = epoch_store.db_batch_for_test();
1201 quarantine
1202 .update_highest_executed_checkpoint(5, &epoch_store, &mut batch)
1203 .unwrap();
1204 batch.write().unwrap();
1205
1206 assert_eq!(quarantine.output_queue_len_for_testing(), 0);
1210 }
1211}