1use crate::authority::authority_per_epoch_store::{
5 AuthorityEpochTables, EncG, ExecutionIndicesWithStatsV2, LockDetails, LockDetailsWrapper, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::epoch::randomness::SINGLETON_KEY;
10use dashmap::DashMap;
11use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
12use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
13use moka::policy::EvictionPolicy;
14use moka::sync::SegmentedCache as MokaCache;
15use mysten_common::ZipDebugEqIteratorExt;
16use mysten_common::fatal;
17use mysten_common::random_util::randomize_cache_capacity_in_tests;
18use parking_lot::Mutex;
19use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
20use sui_types::authenticator_state::ActiveJwk;
21use sui_types::base_types::{AuthorityName, ObjectRef, SequenceNumber};
22use sui_types::crypto::RandomnessRound;
23use sui_types::error::SuiResult;
24use sui_types::executable_transaction::{
25 TrustedExecutableTransactionWithAliases, VerifiedExecutableTransactionWithAliases,
26};
27use sui_types::execution::ExecutionTimeObservationKey;
28use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
29use sui_types::messages_consensus::AuthorityIndex;
30use sui_types::{
31 base_types::{ConsensusObjectSequenceKey, ObjectID},
32 digests::TransactionDigest,
33 messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
34 signature::GenericSignature,
35};
36use tracing::{debug, info};
37use typed_store::Map;
38use typed_store::rocks::DBBatch;
39
40use crate::{
41 authority::{
42 authority_per_epoch_store::AuthorityPerEpochStore,
43 shared_object_congestion_tracker::CongestionPerObjectDebt,
44 },
45 checkpoints::{CheckpointHeight, PendingCheckpoint, PendingCheckpointV2},
46 consensus_handler::SequencedConsensusTransactionKey,
47 epoch::{
48 randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
49 reconfiguration::ReconfigState,
50 },
51};
52
53use super::*;
54
55#[derive(Default)]
56#[allow(clippy::type_complexity)]
57pub(crate) struct ConsensusCommitOutput {
58 consensus_round: Round,
60 consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
61 end_of_publish: BTreeSet<AuthorityName>,
62 reconfig_state: Option<ReconfigState>,
63 consensus_commit_stats: Option<ExecutionIndicesWithStatsV2>,
64
65 next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
67
68 deferred_txns: Vec<(DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>)>,
69 deleted_deferred_txns: BTreeSet<DeferralKey>,
70
71 pending_checkpoints: Vec<PendingCheckpoint>,
73 pending_checkpoints_v2: Vec<PendingCheckpointV2>,
74
75 next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
77
78 dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
79 dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
80 dkg_used_message: Option<VersionedUsedProcessedMessages>,
81 dkg_output: Option<dkg_v1::Output<PkG, EncG>>,
82
83 pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
85 active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
86
87 congestion_control_object_debts: Vec<(ObjectID, u64)>,
89 congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
90 execution_time_observations: Vec<(
91 AuthorityIndex,
92 u64, Vec<(ExecutionTimeObservationKey, Duration)>,
94 )>,
95
96 owned_object_locks: HashMap<ObjectRef, LockDetails>,
98
99 checkpoint_queue_drained: bool,
102}
103
104impl ConsensusCommitOutput {
105 pub fn new(consensus_round: Round) -> Self {
106 Self {
107 consensus_round,
108 ..Default::default()
109 }
110 }
111
112 pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
113 self.deleted_deferred_txns.iter().cloned()
114 }
115
116 pub fn has_deferred_transactions(&self) -> bool {
117 !self.deferred_txns.is_empty()
118 }
119
120 fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
121 self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
122 }
123
124 fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
125 self.pending_checkpoints.last().map(|cp| cp.height())
126 }
127
128 fn get_pending_checkpoints(
129 &self,
130 last: Option<CheckpointHeight>,
131 ) -> impl Iterator<Item = &PendingCheckpoint> {
132 self.pending_checkpoints.iter().filter(move |cp| {
133 if let Some(last) = last {
134 cp.height() > last
135 } else {
136 true
137 }
138 })
139 }
140
141 fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
142 self.pending_checkpoints
143 .iter()
144 .any(|cp| cp.height() == *index)
145 }
146
147 fn get_pending_checkpoints_v2(
148 &self,
149 last: Option<CheckpointHeight>,
150 ) -> impl Iterator<Item = &PendingCheckpointV2> {
151 self.pending_checkpoints_v2.iter().filter(move |cp| {
152 if let Some(last) = last {
153 cp.height() > last
154 } else {
155 true
156 }
157 })
158 }
159
160 fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
161 self.pending_checkpoints_v2
162 .iter()
163 .any(|cp| cp.height() == *index)
164 }
165
166 fn get_round(&self) -> Option<u64> {
167 self.consensus_commit_stats
168 .as_ref()
169 .map(|stats| stats.index.last_committed_round)
170 }
171
172 pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
173 self.end_of_publish.insert(authority);
174 }
175
176 pub fn insert_execution_time_observation(
177 &mut self,
178 source: AuthorityIndex,
179 generation: u64,
180 estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
181 ) {
182 self.execution_time_observations
183 .push((source, generation, estimates));
184 }
185
186 pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStatsV2) {
187 self.consensus_commit_stats = Some(stats);
188 }
189
190 pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
192 self.record_consensus_commit_stats(Default::default());
193 }
194
195 pub fn store_reconfig_state(&mut self, state: ReconfigState) {
196 self.reconfig_state = Some(state);
197 }
198
199 pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
200 self.consensus_messages_processed.insert(key);
201 }
202
203 pub fn get_consensus_messages_processed(
204 &self,
205 ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
206 self.consensus_messages_processed.iter()
207 }
208
209 pub fn set_next_shared_object_versions(
210 &mut self,
211 next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
212 ) {
213 assert!(self.next_shared_object_versions.is_none());
214 self.next_shared_object_versions = Some(next_versions);
215 }
216
217 pub fn defer_transactions(
218 &mut self,
219 key: DeferralKey,
220 transactions: Vec<VerifiedExecutableTransactionWithAliases>,
221 ) {
222 self.deferred_txns.push((key, transactions));
223 }
224
225 pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
226 self.deleted_deferred_txns
227 .extend(deferral_keys.iter().cloned());
228 }
229
230 pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
231 self.pending_checkpoints.push(checkpoint);
232 }
233
234 pub fn insert_pending_checkpoint_v2(&mut self, checkpoint: PendingCheckpointV2) {
235 self.pending_checkpoints_v2.push(checkpoint);
236 }
237
238 pub fn reserve_next_randomness_round(
239 &mut self,
240 next_randomness_round: RandomnessRound,
241 commit_timestamp: TimestampMs,
242 ) {
243 assert!(self.next_randomness_round.is_none());
244 self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
245 }
246
247 pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
248 self.dkg_confirmations.insert(conf.sender(), conf);
249 }
250
251 pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
252 self.dkg_processed_messages
253 .insert(message.sender(), message);
254 }
255
256 pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
257 self.dkg_used_message = Some(used_messages);
258 }
259
260 pub fn set_dkg_output(&mut self, output: dkg_v1::Output<PkG, EncG>) {
261 self.dkg_output = Some(output);
262 }
263
264 pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
265 self.pending_jwks.insert((authority, id, jwk));
266 }
267
268 pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
269 self.active_jwks.insert((round, key));
270 }
271
272 pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
273 self.congestion_control_object_debts = object_debts;
274 }
275
276 pub fn set_congestion_control_randomness_object_debts(
277 &mut self,
278 object_debts: Vec<(ObjectID, u64)>,
279 ) {
280 self.congestion_control_randomness_object_debts = object_debts;
281 }
282
283 pub fn set_checkpoint_queue_drained(&mut self, drained: bool) {
284 self.checkpoint_queue_drained = drained;
285 }
286
287 pub fn set_owned_object_locks(&mut self, locks: HashMap<ObjectRef, LockDetails>) {
288 assert!(self.owned_object_locks.is_empty());
289 self.owned_object_locks = locks;
290 }
291
292 pub fn write_to_batch(
293 self,
294 epoch_store: &AuthorityPerEpochStore,
295 batch: &mut DBBatch,
296 ) -> SuiResult {
297 let tables = epoch_store.tables()?;
298 batch.insert_batch(
299 &tables.consensus_message_processed,
300 self.consensus_messages_processed
301 .iter()
302 .map(|key| (key, true)),
303 )?;
304
305 batch.insert_batch(
306 &tables.end_of_publish,
307 self.end_of_publish.iter().map(|authority| (authority, ())),
308 )?;
309
310 if let Some(reconfig_state) = &self.reconfig_state {
311 batch.insert_batch(
312 &tables.reconfig_state,
313 [(RECONFIG_STATE_INDEX, reconfig_state)],
314 )?;
315 }
316
317 let consensus_commit_stats = self
318 .consensus_commit_stats
319 .expect("consensus_commit_stats must be set");
320 let round = consensus_commit_stats.index.last_committed_round;
321
322 batch.insert_batch(
323 &tables.last_consensus_stats_v2,
324 [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
325 )?;
326
327 if let Some(next_versions) = self.next_shared_object_versions {
328 batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
329 }
330
331 if !self.owned_object_locks.is_empty() {
332 batch.insert_batch(
333 &tables.owned_object_locked_transactions,
334 self.owned_object_locks
335 .into_iter()
336 .map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))),
337 )?;
338 }
339
340 batch.delete_batch(
341 &tables.deferred_transactions_v2,
342 &self.deleted_deferred_txns,
343 )?;
344 batch.delete_batch(
345 &tables.deferred_transactions_with_aliases_v2,
346 &self.deleted_deferred_txns,
347 )?;
348 batch.delete_batch(
349 &tables.deferred_transactions_with_aliases_v3,
350 &self.deleted_deferred_txns,
351 )?;
352
353 batch.insert_batch(
354 &tables.deferred_transactions_with_aliases_v3,
355 self.deferred_txns.into_iter().map(|(key, txs)| {
356 (
357 key,
358 txs.into_iter()
359 .map(|tx| {
360 let tx: TrustedExecutableTransactionWithAliases = tx.serializable();
361 tx
362 })
363 .collect::<Vec<_>>(),
364 )
365 }),
366 )?;
367
368 if let Some((round, commit_timestamp)) = self.next_randomness_round {
369 batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
370 batch.insert_batch(
371 &tables.randomness_last_round_timestamp,
372 [(SINGLETON_KEY, commit_timestamp)],
373 )?;
374 }
375
376 batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
377 batch.insert_batch(
378 &tables.dkg_processed_messages_v2,
379 self.dkg_processed_messages,
380 )?;
381 batch.insert_batch(
382 &tables.dkg_used_messages_v2,
383 self.dkg_used_message
385 .into_iter()
386 .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
387 )?;
388 if let Some(output) = self.dkg_output {
389 batch.insert_batch(&tables.dkg_output, [(SINGLETON_KEY, output)])?;
390 }
391
392 batch.insert_batch(
393 &tables.pending_jwks,
394 self.pending_jwks.into_iter().map(|j| (j, ())),
395 )?;
396 batch.insert_batch(
397 &tables.active_jwks,
398 self.active_jwks.into_iter().map(|j| {
399 assert_eq!(j.0, round);
401 (j, ())
402 }),
403 )?;
404
405 batch.insert_batch(
406 &tables.congestion_control_object_debts,
407 self.congestion_control_object_debts
408 .into_iter()
409 .map(|(object_id, debt)| {
410 (
411 object_id,
412 CongestionPerObjectDebt::new(self.consensus_round, debt),
413 )
414 }),
415 )?;
416 batch.insert_batch(
417 &tables.congestion_control_randomness_object_debts,
418 self.congestion_control_randomness_object_debts
419 .into_iter()
420 .map(|(object_id, debt)| {
421 (
422 object_id,
423 CongestionPerObjectDebt::new(self.consensus_round, debt),
424 )
425 }),
426 )?;
427
428 batch.insert_batch(
429 &tables.execution_time_observations,
430 self.execution_time_observations
431 .into_iter()
432 .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
433 )?;
434
435 Ok(())
436 }
437}
438
439pub(crate) struct ConsensusOutputCache {
444 pub(crate) deferred_transactions:
447 Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>>,
448
449 #[allow(clippy::type_complexity)]
452 pub(crate) user_signatures_for_checkpoints:
453 Mutex<HashMap<TransactionDigest, Vec<(GenericSignature, Option<SequenceNumber>)>>>,
454
455 executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
456 executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
457}
458
459impl ConsensusOutputCache {
460 pub(crate) fn new(tables: &AuthorityEpochTables) -> Self {
461 let deferred_transactions = tables
462 .get_all_deferred_transactions_v2()
463 .expect("load deferred transactions cannot fail");
464
465 let executed_in_epoch_cache_capacity = 50_000;
466
467 Self {
468 deferred_transactions: Mutex::new(deferred_transactions),
469 user_signatures_for_checkpoints: Default::default(),
470 executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
471 executed_in_epoch_cache: MokaCache::builder(8)
472 .max_capacity(randomize_cache_capacity_in_tests(
474 executed_in_epoch_cache_capacity,
475 ))
476 .eviction_policy(EvictionPolicy::lru())
477 .build(),
478 }
479 }
480
481 pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
482 self.executed_in_epoch
483 .read()
484 .contains_key(digest) ||
485 self.executed_in_epoch_cache.get(digest).is_some()
487 }
488
489 pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
491 assert!(
492 self.executed_in_epoch
493 .read()
494 .insert(tx_digest, ())
495 .is_none(),
496 "transaction already executed"
497 );
498 self.executed_in_epoch_cache.insert(tx_digest, ());
499 }
500
501 pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
505 let executed_in_epoch = self.executed_in_epoch.read();
506 for tx_digest in tx_digests {
507 executed_in_epoch.remove(tx_digest);
508 }
509 }
510}
511
512pub(crate) struct ConsensusOutputQuarantine {
515 output_queue: VecDeque<ConsensusCommitOutput>,
517
518 highest_executed_checkpoint: CheckpointSequenceNumber,
520
521 builder_checkpoint_summary:
523 BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
524
525 builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
526
527 shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
529
530 congestion_control_randomness_object_debts:
533 RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
534 congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
535
536 processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
537
538 owned_object_locks: HashMap<ObjectRef, LockDetails>,
540
541 metrics: Arc<EpochMetrics>,
542}
543
544impl ConsensusOutputQuarantine {
545 pub(super) fn new(
546 highest_executed_checkpoint: CheckpointSequenceNumber,
547 authority_metrics: Arc<EpochMetrics>,
548 ) -> Self {
549 Self {
550 highest_executed_checkpoint,
551
552 output_queue: VecDeque::new(),
553 builder_checkpoint_summary: BTreeMap::new(),
554 builder_digest_to_checkpoint: HashMap::new(),
555 shared_object_next_versions: RefCountedHashMap::new(),
556 processed_consensus_messages: RefCountedHashMap::new(),
557 congestion_control_randomness_object_debts: RefCountedHashMap::new(),
558 congestion_control_object_debts: RefCountedHashMap::new(),
559 owned_object_locks: HashMap::new(),
560 metrics: authority_metrics,
561 }
562 }
563}
564
565impl ConsensusOutputQuarantine {
568 pub(crate) fn push_consensus_output(
570 &mut self,
571 output: ConsensusCommitOutput,
572 epoch_store: &AuthorityPerEpochStore,
573 ) -> SuiResult {
574 self.insert_shared_object_next_versions(&output);
575 self.insert_congestion_control_debts(&output);
576 self.insert_processed_consensus_messages(&output);
577 self.insert_owned_object_locks(&output);
578 self.output_queue.push_back(output);
579
580 self.metrics
581 .consensus_quarantine_queue_size
582 .set(self.output_queue.len() as i64);
583
584 self.commit(epoch_store)
587 }
588
589 pub(super) fn insert_builder_summary(
591 &mut self,
592 sequence_number: CheckpointSequenceNumber,
593 summary: BuilderCheckpointSummary,
594 contents: CheckpointContents,
595 ) {
596 debug!(?sequence_number, "inserting builder summary {:?}", summary);
597 for tx in contents.iter() {
598 self.builder_digest_to_checkpoint
599 .insert(tx.transaction, sequence_number);
600 }
601 self.builder_checkpoint_summary
602 .insert(sequence_number, (summary, contents));
603 }
604}
605
606impl ConsensusOutputQuarantine {
608 pub(super) fn update_highest_executed_checkpoint(
611 &mut self,
612 checkpoint: CheckpointSequenceNumber,
613 epoch_store: &AuthorityPerEpochStore,
614 batch: &mut DBBatch,
615 ) -> SuiResult {
616 self.highest_executed_checkpoint = checkpoint;
617 self.commit_with_batch(epoch_store, batch)
618 }
619
620 pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
621 let mut batch = epoch_store.db_batch()?;
622 self.commit_with_batch(epoch_store, &mut batch)?;
623 batch.write()?;
624 Ok(())
625 }
626
627 fn commit_with_batch(
629 &mut self,
630 epoch_store: &AuthorityPerEpochStore,
631 batch: &mut DBBatch,
632 ) -> SuiResult {
633 let tables = epoch_store.tables()?;
640
641 let mut highest_committed_height = None;
642
643 while self
644 .builder_checkpoint_summary
645 .first_key_value()
646 .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
647 == Some(true)
648 {
649 let (seq, (builder_summary, contents)) =
650 self.builder_checkpoint_summary.pop_first().unwrap();
651
652 for tx in contents.iter() {
653 let digest = &tx.transaction;
654 assert_eq!(
655 self.builder_digest_to_checkpoint
656 .remove(digest)
657 .unwrap_or_else(|| {
658 panic!(
659 "transaction {:?} not found in builder_digest_to_checkpoint",
660 digest
661 )
662 }),
663 seq
664 );
665 }
666
667 batch.insert_batch(
668 &tables.builder_digest_to_checkpoint,
669 contents.iter().map(|tx| (tx.transaction, seq)),
670 )?;
671
672 batch.insert_batch(
673 &tables.builder_checkpoint_summary_v2,
674 [(seq, &builder_summary)],
675 )?;
676
677 let checkpoint_height = builder_summary
678 .checkpoint_height
679 .expect("non-genesis checkpoint must have height");
680 if let Some(highest) = highest_committed_height {
681 assert!(
682 checkpoint_height >= highest,
683 "current checkpoint height {} must be no less than highest committed height {}",
684 checkpoint_height,
685 highest
686 );
687 }
688
689 highest_committed_height = Some(checkpoint_height);
690 }
691
692 let Some(highest_committed_height) = highest_committed_height else {
693 return Ok(());
694 };
695
696 let split_checkpoints_in_consensus_handler = epoch_store
697 .protocol_config()
698 .split_checkpoints_in_consensus_handler();
699
700 if split_checkpoints_in_consensus_handler {
701 let mut last_drain_idx = None;
707 for (i, output) in self.output_queue.iter().enumerate() {
708 let stats = output
709 .consensus_commit_stats
710 .as_ref()
711 .expect("consensus_commit_stats must be set");
712 if stats.height > highest_committed_height {
713 break;
714 }
715 if output.checkpoint_queue_drained {
716 last_drain_idx = Some(i);
717 }
718 }
719 if let Some(idx) = last_drain_idx {
720 for _ in 0..=idx {
721 let output = self.output_queue.pop_front().unwrap();
722 info!("committing drain-boundary output");
723 self.remove_shared_object_next_versions(&output);
724 self.remove_processed_consensus_messages(&output);
725 self.remove_congestion_control_debts(&output);
726 self.remove_owned_object_locks(&output);
727 output.write_to_batch(epoch_store, batch)?;
728 }
729 }
730 } else {
731 while !self.output_queue.is_empty() {
732 let output = self.output_queue.front().unwrap();
733 let Some(highest_in_commit) = output.get_highest_pending_checkpoint_height() else {
734 break;
735 };
736
737 if highest_in_commit <= highest_committed_height {
738 info!(
739 "committing output with highest pending checkpoint height {:?}",
740 highest_in_commit
741 );
742 let output = self.output_queue.pop_front().unwrap();
743 self.remove_shared_object_next_versions(&output);
744 self.remove_processed_consensus_messages(&output);
745 self.remove_congestion_control_debts(&output);
746 self.remove_owned_object_locks(&output);
747 output.write_to_batch(epoch_store, batch)?;
748 } else {
749 break;
750 }
751 }
752 }
753
754 self.metrics
755 .consensus_quarantine_queue_size
756 .set(self.output_queue.len() as i64);
757
758 Ok(())
759 }
760}
761
762impl ConsensusOutputQuarantine {
763 fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
764 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
765 for (object_id, next_version) in next_versions {
766 self.shared_object_next_versions
767 .insert(*object_id, *next_version);
768 }
769 }
770 }
771
772 fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
773 let current_round = output.consensus_round;
774
775 for (object_id, debt) in output.congestion_control_object_debts.iter() {
776 self.congestion_control_object_debts.insert(
777 *object_id,
778 CongestionPerObjectDebt::new(current_round, *debt),
779 );
780 }
781
782 for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
783 self.congestion_control_randomness_object_debts.insert(
784 *object_id,
785 CongestionPerObjectDebt::new(current_round, *debt),
786 );
787 }
788 }
789
790 fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
791 for (object_id, _) in output.congestion_control_object_debts.iter() {
792 self.congestion_control_object_debts.remove(object_id);
793 }
794 for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
795 self.congestion_control_randomness_object_debts
796 .remove(object_id);
797 }
798 }
799
800 fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
801 for tx_key in output.consensus_messages_processed.iter() {
802 self.processed_consensus_messages.insert(tx_key.clone(), ());
803 }
804 }
805
806 fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
807 for tx_key in output.consensus_messages_processed.iter() {
808 self.processed_consensus_messages.remove(tx_key);
809 }
810 }
811
812 fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
813 if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
814 for object_id in next_versions.keys() {
815 if !self.shared_object_next_versions.remove(object_id) {
816 fatal!(
817 "Shared object next version not found in quarantine: {:?}",
818 object_id
819 );
820 }
821 }
822 }
823 }
824
825 fn insert_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
826 for (obj_ref, lock) in &output.owned_object_locks {
827 self.owned_object_locks.insert(*obj_ref, *lock);
828 }
829 }
830
831 fn remove_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
832 for obj_ref in output.owned_object_locks.keys() {
833 self.owned_object_locks.remove(obj_ref);
834 }
835 }
836}
837
838impl ConsensusOutputQuarantine {
841 pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
842 self.builder_checkpoint_summary
843 .values()
844 .last()
845 .map(|(summary, _)| summary)
846 }
847
848 pub(super) fn get_built_summary(
849 &self,
850 sequence: CheckpointSequenceNumber,
851 ) -> Option<&BuilderCheckpointSummary> {
852 self.builder_checkpoint_summary
853 .get(&sequence)
854 .map(|(summary, _)| summary)
855 }
856
857 pub(super) fn included_transaction_in_checkpoint(&self, digest: &TransactionDigest) -> bool {
858 self.builder_digest_to_checkpoint.contains_key(digest)
859 }
860
861 pub(super) fn is_consensus_message_processed(
862 &self,
863 key: &SequencedConsensusTransactionKey,
864 ) -> bool {
865 self.processed_consensus_messages.contains_key(key)
866 }
867
868 pub(super) fn is_empty(&self) -> bool {
869 self.output_queue.is_empty()
870 }
871
872 pub(super) fn get_next_shared_object_versions(
873 &self,
874 tables: &AuthorityEpochTables,
875 objects_to_init: &[ConsensusObjectSequenceKey],
876 ) -> SuiResult<Vec<Option<SequenceNumber>>> {
877 Ok(do_fallback_lookup(
878 objects_to_init,
879 |object_key| {
880 if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
881 CacheResult::Hit(Some(*next_version))
882 } else {
883 CacheResult::Miss
884 }
885 },
886 |object_keys| {
887 tables
888 .next_shared_object_versions_v2
889 .multi_get(object_keys)
890 .expect("db error")
891 },
892 ))
893 }
894
895 pub(super) fn get_owned_object_locks(
899 &self,
900 tables: &AuthorityEpochTables,
901 obj_refs: &[ObjectRef],
902 ) -> SuiResult<Vec<Option<LockDetails>>> {
903 Ok(do_fallback_lookup(
904 obj_refs,
905 |obj_ref| {
906 if let Some(lock) = self.owned_object_locks.get(obj_ref) {
907 CacheResult::Hit(Some(*lock))
908 } else {
909 CacheResult::Miss
910 }
911 },
912 |obj_refs| {
913 tables
914 .multi_get_locked_transactions(obj_refs)
915 .expect("db error")
916 },
917 ))
918 }
919
920 pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
921 self.output_queue
922 .back()
923 .and_then(|output| output.get_highest_pending_checkpoint_height())
924 }
925
926 pub(super) fn get_pending_checkpoints(
927 &self,
928 last: Option<CheckpointHeight>,
929 ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
930 let mut checkpoints = Vec::new();
931 for output in &self.output_queue {
932 checkpoints.extend(
933 output
934 .get_pending_checkpoints(last)
935 .map(|cp| (cp.height(), cp.clone())),
936 );
937 }
938 if cfg!(debug_assertions) {
939 let mut prev = None;
940 for (height, _) in &checkpoints {
941 if let Some(prev) = prev {
942 assert!(prev < *height);
943 }
944 prev = Some(*height);
945 }
946 }
947 checkpoints
948 }
949
950 pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
951 self.output_queue
952 .iter()
953 .any(|output| output.pending_checkpoint_exists(index))
954 }
955
956 pub(super) fn get_pending_checkpoints_v2(
957 &self,
958 last: Option<CheckpointHeight>,
959 ) -> Vec<(CheckpointHeight, PendingCheckpointV2)> {
960 let mut checkpoints = Vec::new();
961 for output in &self.output_queue {
962 checkpoints.extend(
963 output
964 .get_pending_checkpoints_v2(last)
965 .map(|cp| (cp.height(), cp.clone())),
966 );
967 }
968 if cfg!(debug_assertions) {
969 let mut prev = None;
970 for (height, _) in &checkpoints {
971 if let Some(prev) = prev {
972 assert!(prev < *height);
973 }
974 prev = Some(*height);
975 }
976 }
977 checkpoints
978 }
979
980 pub(super) fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
981 self.output_queue
982 .iter()
983 .any(|output| output.pending_checkpoint_exists_v2(index))
984 }
985
986 pub(super) fn get_new_jwks(
987 &self,
988 epoch_store: &AuthorityPerEpochStore,
989 round: u64,
990 ) -> SuiResult<Vec<ActiveJwk>> {
991 let epoch = epoch_store.epoch();
992
993 for output in self.output_queue.iter().rev() {
995 let output_round = output.get_round().unwrap();
998 if round == output_round {
999 return Ok(output
1000 .active_jwks
1001 .iter()
1002 .map(|(_, (jwk_id, jwk))| ActiveJwk {
1003 jwk_id: jwk_id.clone(),
1004 jwk: jwk.clone(),
1005 epoch,
1006 })
1007 .collect());
1008 }
1009 }
1010
1011 let empty_jwk_id = JwkId::new(String::new(), String::new());
1013 let empty_jwk = JWK {
1014 kty: String::new(),
1015 e: String::new(),
1016 n: String::new(),
1017 alg: String::new(),
1018 };
1019
1020 let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
1021 let end = (round + 1, (empty_jwk_id, empty_jwk));
1022
1023 Ok(epoch_store
1024 .tables()?
1025 .active_jwks
1026 .safe_iter_with_bounds(Some(start), Some(end))
1027 .map_ok(|((r, (jwk_id, jwk)), _)| {
1028 debug_assert!(round == r);
1029 ActiveJwk { jwk_id, jwk, epoch }
1030 })
1031 .collect::<Result<Vec<_>, _>>()?)
1032 }
1033
1034 pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
1035 self.output_queue
1036 .iter()
1037 .rev()
1038 .filter_map(|output| output.get_randomness_last_round_timestamp())
1039 .next()
1040 }
1041
1042 pub(crate) fn load_initial_object_debts(
1043 &self,
1044 epoch_store: &AuthorityPerEpochStore,
1045 current_round: Round,
1046 for_randomness: bool,
1047 transactions: &[VerifiedExecutableTransactionWithAliases],
1048 ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
1049 let protocol_config = epoch_store.protocol_config();
1050 let tables = epoch_store.tables()?;
1051 let default_per_commit_budget = protocol_config
1052 .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
1053 .unwrap_or(0);
1054 let (hash_table, db_table, per_commit_budget) = if for_randomness {
1055 (
1056 &self.congestion_control_randomness_object_debts,
1057 &tables.congestion_control_randomness_object_debts,
1058 protocol_config
1059 .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
1060 .unwrap_or(default_per_commit_budget),
1061 )
1062 } else {
1063 (
1064 &self.congestion_control_object_debts,
1065 &tables.congestion_control_object_debts,
1066 default_per_commit_budget,
1067 )
1068 };
1069 let mut shared_input_object_ids: Vec<_> = transactions
1070 .iter()
1071 .flat_map(|tx| tx.tx().shared_input_objects().map(|obj| obj.id))
1072 .collect();
1073 shared_input_object_ids.sort();
1074 shared_input_object_ids.dedup();
1075
1076 let results = do_fallback_lookup(
1077 &shared_input_object_ids,
1078 |object_id| {
1079 if let Some(debt) = hash_table.get(object_id) {
1080 CacheResult::Hit(Some(debt.into_v1()))
1081 } else {
1082 CacheResult::Miss
1083 }
1084 },
1085 |object_ids| {
1086 db_table
1087 .multi_get(object_ids)
1088 .expect("db error")
1089 .into_iter()
1090 .map(|debt| debt.map(|debt| debt.into_v1()))
1091 .collect()
1092 },
1093 );
1094
1095 Ok(results
1096 .into_iter()
1097 .zip_debug_eq(shared_input_object_ids)
1098 .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1099 .map(move |((round, debt), object_id)| {
1100 assert!(current_round > round);
1104 let num_rounds = current_round - round - 1;
1105 let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1106 (object_id, debt)
1107 }))
1108 }
1109}
1110
1111#[derive(Debug, Default)]
1120struct RefCountedHashMap<K, V> {
1121 map: HashMap<K, (usize, V)>,
1122}
1123
1124impl<K, V> RefCountedHashMap<K, V>
1125where
1126 K: Clone + Eq + std::hash::Hash,
1127{
1128 pub fn new() -> Self {
1129 Self {
1130 map: HashMap::new(),
1131 }
1132 }
1133
1134 pub fn insert(&mut self, key: K, value: V) {
1135 let entry = self.map.entry(key);
1136 match entry {
1137 hash_map::Entry::Occupied(mut entry) => {
1138 let (ref_count, v) = entry.get_mut();
1139 *ref_count += 1;
1140 *v = value;
1141 }
1142 hash_map::Entry::Vacant(entry) => {
1143 entry.insert((1, value));
1144 }
1145 }
1146 }
1147
1148 pub fn remove(&mut self, key: &K) -> bool {
1151 let entry = self.map.entry(key.clone());
1152 match entry {
1153 hash_map::Entry::Occupied(mut entry) => {
1154 let (ref_count, _) = entry.get_mut();
1155 *ref_count -= 1;
1156 if *ref_count == 0 {
1157 entry.remove();
1158 }
1159 true
1160 }
1161 hash_map::Entry::Vacant(_) => false,
1162 }
1163 }
1164
1165 pub fn get(&self, key: &K) -> Option<&V> {
1166 self.map.get(key).map(|(_, v)| v)
1167 }
1168
1169 pub fn contains_key(&self, key: &K) -> bool {
1170 self.map.contains_key(key)
1171 }
1172}
1173
1174#[cfg(test)]
1175impl ConsensusOutputQuarantine {
1176 fn output_queue_len_for_testing(&self) -> usize {
1177 self.output_queue.len()
1178 }
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183 use super::*;
1184 use crate::authority::test_authority_builder::TestAuthorityBuilder;
1185 use sui_types::base_types::ExecutionDigests;
1186 use sui_types::gas::GasCostSummary;
1187
1188 fn make_output(height: u64, round: u64, drained: bool) -> ConsensusCommitOutput {
1189 let mut output = ConsensusCommitOutput::new(round);
1190 output.record_consensus_commit_stats(ExecutionIndicesWithStatsV2 {
1191 height,
1192 ..Default::default()
1193 });
1194 output.set_checkpoint_queue_drained(drained);
1195 output
1196 }
1197
1198 fn make_builder_summary(
1199 seq: CheckpointSequenceNumber,
1200 height: CheckpointHeight,
1201 protocol_config: &ProtocolConfig,
1202 ) -> (BuilderCheckpointSummary, CheckpointContents) {
1203 let contents =
1204 CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
1205 let summary = CheckpointSummary::new(
1206 protocol_config,
1207 0,
1208 seq,
1209 0,
1210 &contents,
1211 None,
1212 GasCostSummary::default(),
1213 None,
1214 0,
1215 vec![],
1216 vec![],
1217 );
1218 let builder_summary = BuilderCheckpointSummary {
1219 summary,
1220 checkpoint_height: Some(height),
1221 position_in_commit: 0,
1222 };
1223 (builder_summary, contents)
1224 }
1225
1226 #[tokio::test]
1227 async fn test_drain_boundary_prevents_premature_commit() {
1228 let mut protocol_config =
1229 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1230 protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(true);
1231 let state = TestAuthorityBuilder::new()
1232 .with_protocol_config(protocol_config)
1233 .build()
1234 .await;
1235 let epoch_store = state.epoch_store_for_testing();
1236
1237 let metrics = epoch_store.metrics.clone();
1238 let mut quarantine = ConsensusOutputQuarantine::new(0, metrics);
1239
1240 let c = make_output(4, 1, false);
1242 quarantine.push_consensus_output(c, &epoch_store).unwrap();
1243
1244 let c2 = make_output(5, 2, true);
1246 quarantine.push_consensus_output(c2, &epoch_store).unwrap();
1247
1248 assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1249
1250 let pc = epoch_store.protocol_config();
1252 for seq in 1..=4 {
1253 let (summary, contents) = make_builder_summary(seq, seq, pc);
1254 quarantine.insert_builder_summary(seq, summary, contents);
1255 }
1256
1257 let mut batch = epoch_store.db_batch_for_test();
1259 quarantine
1260 .update_highest_executed_checkpoint(4, &epoch_store, &mut batch)
1261 .unwrap();
1262 batch.write().unwrap();
1263
1264 assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1268 }
1269
1270 #[tokio::test]
1271 async fn test_drain_boundary_commits_at_safe_point() {
1272 let mut protocol_config =
1273 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1274 protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(true);
1275 let state = TestAuthorityBuilder::new()
1276 .with_protocol_config(protocol_config)
1277 .build()
1278 .await;
1279 let epoch_store = state.epoch_store_for_testing();
1280
1281 let metrics = epoch_store.metrics.clone();
1282 let mut quarantine = ConsensusOutputQuarantine::new(0, metrics);
1283
1284 let c = make_output(4, 1, false);
1285 quarantine.push_consensus_output(c, &epoch_store).unwrap();
1286
1287 let c2 = make_output(5, 2, true);
1288 quarantine.push_consensus_output(c2, &epoch_store).unwrap();
1289
1290 assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1291
1292 let pc = epoch_store.protocol_config();
1294 for seq in 1..=5 {
1295 let (summary, contents) = make_builder_summary(seq, seq, pc);
1296 quarantine.insert_builder_summary(seq, summary, contents);
1297 }
1298
1299 let mut batch = epoch_store.db_batch_for_test();
1301 quarantine
1302 .update_highest_executed_checkpoint(5, &epoch_store, &mut batch)
1303 .unwrap();
1304 batch.write().unwrap();
1305
1306 assert_eq!(quarantine.output_queue_len_for_testing(), 0);
1310 }
1311}