sui_core/authority/
consensus_quarantine.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_per_epoch_store::{
5    AuthorityEpochTables, EncG, ExecutionIndicesWithStatsV2, LockDetails, LockDetailsWrapper, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::epoch::randomness::SINGLETON_KEY;
10use dashmap::DashMap;
11use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
12use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
13use moka::policy::EvictionPolicy;
14use moka::sync::SegmentedCache as MokaCache;
15use mysten_common::ZipDebugEqIteratorExt;
16use mysten_common::fatal;
17use mysten_common::random_util::randomize_cache_capacity_in_tests;
18use parking_lot::Mutex;
19use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
20use sui_types::authenticator_state::ActiveJwk;
21use sui_types::base_types::{AuthorityName, ObjectRef, SequenceNumber};
22use sui_types::crypto::RandomnessRound;
23use sui_types::error::SuiResult;
24use sui_types::executable_transaction::{
25    TrustedExecutableTransactionWithAliases, VerifiedExecutableTransactionWithAliases,
26};
27use sui_types::execution::ExecutionTimeObservationKey;
28use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
29use sui_types::messages_consensus::AuthorityIndex;
30use sui_types::{
31    base_types::{ConsensusObjectSequenceKey, ObjectID},
32    digests::TransactionDigest,
33    messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
34    signature::GenericSignature,
35};
36use tracing::debug;
37use typed_store::Map;
38use typed_store::rocks::DBBatch;
39
40use crate::{
41    authority::{
42        authority_per_epoch_store::AuthorityPerEpochStore,
43        shared_object_congestion_tracker::CongestionPerObjectDebt,
44    },
45    checkpoints::{CheckpointHeight, PendingCheckpoint},
46    consensus_handler::SequencedConsensusTransactionKey,
47    epoch::{
48        randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
49        reconfiguration::ReconfigState,
50    },
51};
52
53use super::*;
54
55#[derive(Default)]
56#[allow(clippy::type_complexity)]
57pub(crate) struct ConsensusCommitOutput {
58    // Consensus and reconfig state
59    consensus_round: Round,
60    consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
61    end_of_publish: BTreeSet<AuthorityName>,
62    reconfig_state: Option<ReconfigState>,
63    consensus_commit_stats: Option<ExecutionIndicesWithStatsV2>,
64
65    // transaction scheduling state
66    next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
67
68    deferred_txns: Vec<(DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>)>,
69    deleted_deferred_txns: BTreeSet<DeferralKey>,
70
71    // checkpoint state
72    pending_checkpoints: Vec<PendingCheckpoint>,
73
74    // random beacon state
75    next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
76
77    dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
78    dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
79    dkg_used_message: Option<VersionedUsedProcessedMessages>,
80    dkg_output: Option<Option<dkg_v1::Output<PkG, EncG>>>,
81
82    // jwk state
83    pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
84    active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
85
86    // congestion control state
87    congestion_control_object_debts: Vec<(ObjectID, u64)>,
88    congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
89    execution_time_observations: Vec<(
90        AuthorityIndex,
91        u64, /* generation */
92        Vec<(ExecutionTimeObservationKey, Duration)>,
93    )>,
94
95    // Owned object locks acquired post-consensus.
96    owned_object_locks: HashMap<ObjectRef, LockDetails>,
97
98    // True when the checkpoint queue had no pending roots after this commit's flush.
99    // Used by quarantine to determine safe commit boundaries on restart.
100    checkpoint_queue_drained: bool,
101}
102
103impl ConsensusCommitOutput {
104    pub fn new(consensus_round: Round) -> Self {
105        Self {
106            consensus_round,
107            ..Default::default()
108        }
109    }
110
111    pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
112        self.deleted_deferred_txns.iter().cloned()
113    }
114
115    pub fn has_deferred_transactions(&self) -> bool {
116        !self.deferred_txns.is_empty()
117    }
118
119    fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
120        self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
121    }
122
123    fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
124        self.pending_checkpoints.last().map(|cp| cp.height())
125    }
126
127    fn get_pending_checkpoints(
128        &self,
129        last: Option<CheckpointHeight>,
130    ) -> impl Iterator<Item = &PendingCheckpoint> {
131        self.pending_checkpoints.iter().filter(move |cp| {
132            if let Some(last) = last {
133                cp.height() > last
134            } else {
135                true
136            }
137        })
138    }
139
140    fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
141        self.pending_checkpoints
142            .iter()
143            .any(|cp| cp.height() == *index)
144    }
145
146    fn get_round(&self) -> Option<u64> {
147        self.consensus_commit_stats
148            .as_ref()
149            .map(|stats| stats.index.last_committed_round)
150    }
151
152    pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
153        self.end_of_publish.insert(authority);
154    }
155
156    pub fn insert_execution_time_observation(
157        &mut self,
158        source: AuthorityIndex,
159        generation: u64,
160        estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
161    ) {
162        self.execution_time_observations
163            .push((source, generation, estimates));
164    }
165
166    pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStatsV2) {
167        self.consensus_commit_stats = Some(stats);
168    }
169
170    // in testing code we often need to write to the db outside of a consensus commit
171    pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
172        self.record_consensus_commit_stats(Default::default());
173    }
174
175    pub fn store_reconfig_state(&mut self, state: ReconfigState) {
176        self.reconfig_state = Some(state);
177    }
178
179    pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
180        self.consensus_messages_processed.insert(key);
181    }
182
183    pub fn get_consensus_messages_processed(
184        &self,
185    ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
186        self.consensus_messages_processed.iter()
187    }
188
189    pub fn set_next_shared_object_versions(
190        &mut self,
191        next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
192    ) {
193        assert!(self.next_shared_object_versions.is_none());
194        self.next_shared_object_versions = Some(next_versions);
195    }
196
197    pub fn defer_transactions(
198        &mut self,
199        key: DeferralKey,
200        transactions: Vec<VerifiedExecutableTransactionWithAliases>,
201    ) {
202        self.deferred_txns.push((key, transactions));
203    }
204
205    pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
206        self.deleted_deferred_txns
207            .extend(deferral_keys.iter().cloned());
208    }
209
210    pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
211        self.pending_checkpoints.push(checkpoint);
212    }
213
214    pub fn reserve_next_randomness_round(
215        &mut self,
216        next_randomness_round: RandomnessRound,
217        commit_timestamp: TimestampMs,
218    ) {
219        assert!(self.next_randomness_round.is_none());
220        self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
221    }
222
223    pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
224        self.dkg_confirmations.insert(conf.sender(), conf);
225    }
226
227    pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
228        self.dkg_processed_messages
229            .insert(message.sender(), message);
230    }
231
232    pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
233        self.dkg_used_message = Some(used_messages);
234    }
235
236    pub fn set_dkg_output(&mut self, output: Option<dkg_v1::Output<PkG, EncG>>) {
237        self.dkg_output = Some(output);
238    }
239
240    pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
241        self.pending_jwks.insert((authority, id, jwk));
242    }
243
244    pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
245        self.active_jwks.insert((round, key));
246    }
247
248    pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
249        self.congestion_control_object_debts = object_debts;
250    }
251
252    pub fn set_congestion_control_randomness_object_debts(
253        &mut self,
254        object_debts: Vec<(ObjectID, u64)>,
255    ) {
256        self.congestion_control_randomness_object_debts = object_debts;
257    }
258
259    pub fn set_checkpoint_queue_drained(&mut self, drained: bool) {
260        self.checkpoint_queue_drained = drained;
261    }
262
263    pub fn set_owned_object_locks(&mut self, locks: HashMap<ObjectRef, LockDetails>) {
264        assert!(self.owned_object_locks.is_empty());
265        self.owned_object_locks = locks;
266    }
267
268    pub fn write_to_batch(
269        self,
270        epoch_store: &AuthorityPerEpochStore,
271        batch: &mut DBBatch,
272    ) -> SuiResult {
273        let tables = epoch_store.tables()?;
274        batch.insert_batch(
275            &tables.consensus_message_processed,
276            self.consensus_messages_processed
277                .iter()
278                .map(|key| (key, true)),
279        )?;
280
281        batch.insert_batch(
282            &tables.end_of_publish,
283            self.end_of_publish.iter().map(|authority| (authority, ())),
284        )?;
285
286        if let Some(reconfig_state) = &self.reconfig_state {
287            batch.insert_batch(
288                &tables.reconfig_state,
289                [(RECONFIG_STATE_INDEX, reconfig_state)],
290            )?;
291        }
292
293        let consensus_commit_stats = self
294            .consensus_commit_stats
295            .expect("consensus_commit_stats must be set");
296        let round = consensus_commit_stats.index.last_committed_round;
297
298        batch.insert_batch(
299            &tables.last_consensus_stats_v2,
300            [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
301        )?;
302
303        if let Some(next_versions) = self.next_shared_object_versions {
304            batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
305        }
306
307        if !self.owned_object_locks.is_empty() {
308            batch.insert_batch(
309                &tables.owned_object_locked_transactions,
310                self.owned_object_locks
311                    .into_iter()
312                    .map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))),
313            )?;
314        }
315
316        batch.delete_batch(
317            &tables.deferred_transactions_v2,
318            &self.deleted_deferred_txns,
319        )?;
320        batch.delete_batch(
321            &tables.deferred_transactions_with_aliases_v2,
322            &self.deleted_deferred_txns,
323        )?;
324        batch.delete_batch(
325            &tables.deferred_transactions_with_aliases_v3,
326            &self.deleted_deferred_txns,
327        )?;
328
329        batch.insert_batch(
330            &tables.deferred_transactions_with_aliases_v3,
331            self.deferred_txns.into_iter().map(|(key, txs)| {
332                (
333                    key,
334                    txs.into_iter()
335                        .map(|tx| {
336                            let tx: TrustedExecutableTransactionWithAliases = tx.serializable();
337                            tx
338                        })
339                        .collect::<Vec<_>>(),
340                )
341            }),
342        )?;
343
344        if let Some((round, commit_timestamp)) = self.next_randomness_round {
345            batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
346            batch.insert_batch(
347                &tables.randomness_last_round_timestamp,
348                [(SINGLETON_KEY, commit_timestamp)],
349            )?;
350        }
351
352        batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
353        batch.insert_batch(
354            &tables.dkg_processed_messages_v2,
355            self.dkg_processed_messages,
356        )?;
357        batch.insert_batch(
358            &tables.dkg_used_messages_v2,
359            // using Option as iter
360            self.dkg_used_message
361                .into_iter()
362                .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
363        )?;
364        if let Some(output) = self.dkg_output {
365            batch.insert_batch(&tables.dkg_output_v2, [(SINGLETON_KEY, output)])?;
366        }
367
368        batch.insert_batch(
369            &tables.pending_jwks,
370            self.pending_jwks.into_iter().map(|j| (j, ())),
371        )?;
372        batch.insert_batch(
373            &tables.active_jwks,
374            self.active_jwks.into_iter().map(|j| {
375                // TODO: we don't need to store the round in this map if it is invariant
376                assert_eq!(j.0, round);
377                (j, ())
378            }),
379        )?;
380
381        batch.insert_batch(
382            &tables.congestion_control_object_debts,
383            self.congestion_control_object_debts
384                .into_iter()
385                .map(|(object_id, debt)| {
386                    (
387                        object_id,
388                        CongestionPerObjectDebt::new(self.consensus_round, debt),
389                    )
390                }),
391        )?;
392        batch.insert_batch(
393            &tables.congestion_control_randomness_object_debts,
394            self.congestion_control_randomness_object_debts
395                .into_iter()
396                .map(|(object_id, debt)| {
397                    (
398                        object_id,
399                        CongestionPerObjectDebt::new(self.consensus_round, debt),
400                    )
401                }),
402        )?;
403
404        batch.insert_batch(
405            &tables.execution_time_observations,
406            self.execution_time_observations
407                .into_iter()
408                .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
409        )?;
410
411        Ok(())
412    }
413}
414
415/// ConsensusOutputCache holds outputs of consensus processing that do not need to be committed to disk.
416/// Data quarantining guarantees that all of this data will be used (e.g. for building checkpoints)
417/// before the consensus commit from which it originated is marked as processed. Therefore we can rely
418/// on replay of consensus commits to recover this data.
419pub(crate) struct ConsensusOutputCache {
420    // deferred transactions is only used by consensus handler so there should never be lock contention
421    // - hence no need for a DashMap.
422    pub(crate) deferred_transactions:
423        Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>>,
424
425    // user_signatures_for_checkpoints is written to by consensus handler and read from by checkpoint builder
426    // The critical sections are small in both cases so a DashMap is probably not helpful.
427    #[allow(clippy::type_complexity)]
428    pub(crate) user_signatures_for_checkpoints:
429        Mutex<HashMap<TransactionDigest, Vec<(GenericSignature, Option<SequenceNumber>)>>>,
430
431    executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
432    executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
433}
434
435impl ConsensusOutputCache {
436    pub(crate) fn new(tables: &AuthorityEpochTables) -> Self {
437        let deferred_transactions = tables
438            .get_all_deferred_transactions_v2()
439            .expect("load deferred transactions cannot fail");
440
441        let executed_in_epoch_cache_capacity = 50_000;
442
443        Self {
444            deferred_transactions: Mutex::new(deferred_transactions),
445            user_signatures_for_checkpoints: Default::default(),
446            executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
447            executed_in_epoch_cache: MokaCache::builder(8)
448                // most queries should be for recent transactions
449                .max_capacity(randomize_cache_capacity_in_tests(
450                    executed_in_epoch_cache_capacity,
451                ))
452                .eviction_policy(EvictionPolicy::lru())
453                .build(),
454        }
455    }
456
457    pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
458        self.executed_in_epoch
459            .read()
460            .contains_key(digest) ||
461            // we use get instead of contains key to mark the entry as read
462            self.executed_in_epoch_cache.get(digest).is_some()
463    }
464
465    // Called by execution
466    pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
467        assert!(
468            self.executed_in_epoch
469                .read()
470                .insert(tx_digest, ())
471                .is_none(),
472            "transaction already executed"
473        );
474        self.executed_in_epoch_cache.insert(tx_digest, ());
475    }
476
477    // CheckpointExecutor calls this (indirectly) in order to prune the in-memory cache of executed
478    // transactions. By the time this is called, the transaction digests will have been committed to
479    // the `executed_transactions_to_checkpoint` table.
480    pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
481        let executed_in_epoch = self.executed_in_epoch.read();
482        for tx_digest in tx_digests {
483            executed_in_epoch.remove(tx_digest);
484        }
485    }
486}
487
488/// ConsensusOutputQuarantine holds outputs of consensus processing in memory until the checkpoints
489/// for the commit have been certified.
490pub(crate) struct ConsensusOutputQuarantine {
491    // Output from consensus handler
492    output_queue: VecDeque<ConsensusCommitOutput>,
493
494    // Highest known certified checkpoint sequence number
495    highest_executed_checkpoint: CheckpointSequenceNumber,
496
497    // Checkpoint Builder output
498    builder_checkpoint_summary:
499        BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
500
501    builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
502
503    // Any un-committed next versions are stored here.
504    shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
505
506    // The most recent congestion control debts for objects. Uses a ref-count to track
507    // which objects still exist in some element of output_queue.
508    congestion_control_randomness_object_debts:
509        RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
510    congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
511
512    processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
513
514    // Owned object locks acquired post-consensus.
515    owned_object_locks: HashMap<ObjectRef, LockDetails>,
516
517    metrics: Arc<EpochMetrics>,
518}
519
520impl ConsensusOutputQuarantine {
521    pub(super) fn new(
522        highest_executed_checkpoint: CheckpointSequenceNumber,
523        authority_metrics: Arc<EpochMetrics>,
524    ) -> Self {
525        Self {
526            highest_executed_checkpoint,
527
528            output_queue: VecDeque::new(),
529            builder_checkpoint_summary: BTreeMap::new(),
530            builder_digest_to_checkpoint: HashMap::new(),
531            shared_object_next_versions: RefCountedHashMap::new(),
532            processed_consensus_messages: RefCountedHashMap::new(),
533            congestion_control_randomness_object_debts: RefCountedHashMap::new(),
534            congestion_control_object_debts: RefCountedHashMap::new(),
535            owned_object_locks: HashMap::new(),
536            metrics: authority_metrics,
537        }
538    }
539}
540
541// Write methods - all methods in this block insert new data into the quarantine.
542// There are only two sources! ConsensusHandler and CheckpointBuilder.
543impl ConsensusOutputQuarantine {
544    // Push all data gathered from a consensus commit into the quarantine.
545    pub(crate) fn push_consensus_output(
546        &mut self,
547        output: ConsensusCommitOutput,
548        epoch_store: &AuthorityPerEpochStore,
549    ) -> SuiResult {
550        self.insert_shared_object_next_versions(&output);
551        self.insert_congestion_control_debts(&output);
552        self.insert_processed_consensus_messages(&output);
553        self.insert_owned_object_locks(&output);
554        self.output_queue.push_back(output);
555
556        self.metrics
557            .consensus_quarantine_queue_size
558            .set(self.output_queue.len() as i64);
559
560        // we may already have observed the certified checkpoint for this round, if state sync is running
561        // ahead of consensus, so there may be data to commit right away.
562        self.commit(epoch_store)
563    }
564
565    // Record a newly built checkpoint.
566    pub(super) fn insert_builder_summary(
567        &mut self,
568        sequence_number: CheckpointSequenceNumber,
569        summary: BuilderCheckpointSummary,
570        contents: CheckpointContents,
571    ) {
572        debug!(?sequence_number, "inserting builder summary {:?}", summary);
573        for tx in contents.iter() {
574            self.builder_digest_to_checkpoint
575                .insert(tx.transaction, sequence_number);
576        }
577        self.builder_checkpoint_summary
578            .insert(sequence_number, (summary, contents));
579    }
580}
581
582// Commit methods.
583impl ConsensusOutputQuarantine {
584    /// Update the highest executed checkpoint and commit any data which is now
585    /// below the watermark.
586    pub(super) fn update_highest_executed_checkpoint(
587        &mut self,
588        checkpoint: CheckpointSequenceNumber,
589        epoch_store: &AuthorityPerEpochStore,
590        batch: &mut DBBatch,
591    ) -> SuiResult {
592        self.highest_executed_checkpoint = checkpoint;
593        self.commit_with_batch(epoch_store, batch)
594    }
595
596    pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
597        let mut batch = epoch_store.db_batch()?;
598        self.commit_with_batch(epoch_store, &mut batch)?;
599        batch.write()?;
600        Ok(())
601    }
602
603    /// Commit all data below the watermark.
604    fn commit_with_batch(
605        &mut self,
606        epoch_store: &AuthorityPerEpochStore,
607        batch: &mut DBBatch,
608    ) -> SuiResult {
609        // The commit algorithm is simple:
610        // 1. First commit all checkpoint builder state which is below the watermark.
611        // 2. Determine the consensus commit height that corresponds to the highest committed
612        //    checkpoint.
613        // 3. Commit all consensus output at that height or below.
614
615        let tables = epoch_store.tables()?;
616
617        let mut highest_committed_height = None;
618
619        while self
620            .builder_checkpoint_summary
621            .first_key_value()
622            .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
623            == Some(true)
624        {
625            let (seq, (builder_summary, contents)) =
626                self.builder_checkpoint_summary.pop_first().unwrap();
627
628            for tx in contents.iter() {
629                let digest = &tx.transaction;
630                assert_eq!(
631                    self.builder_digest_to_checkpoint
632                        .remove(digest)
633                        .unwrap_or_else(|| {
634                            panic!(
635                                "transaction {:?} not found in builder_digest_to_checkpoint",
636                                digest
637                            )
638                        }),
639                    seq
640                );
641            }
642
643            batch.insert_batch(
644                &tables.builder_digest_to_checkpoint,
645                contents.iter().map(|tx| (tx.transaction, seq)),
646            )?;
647
648            batch.insert_batch(
649                &tables.builder_checkpoint_summary_v2,
650                [(seq, &builder_summary)],
651            )?;
652
653            let checkpoint_height = builder_summary
654                .checkpoint_height
655                .expect("non-genesis checkpoint must have height");
656            if let Some(highest) = highest_committed_height {
657                assert!(
658                    checkpoint_height >= highest,
659                    "current checkpoint height {} must be no less than highest committed height {}",
660                    checkpoint_height,
661                    highest
662                );
663            }
664
665            highest_committed_height = Some(checkpoint_height);
666        }
667
668        let Some(highest_committed_height) = highest_committed_height else {
669            return Ok(());
670        };
671
672        // Only commit outputs up to the last one where the checkpoint queue
673        // was fully drained (no pending roots). If the queue is empty after an
674        // output, there are no roots that could be lost on restart. Any outputs
675        // after the last drain point stay in the quarantine and get full-replayed
676        // on restart with correct root reconstruction.
677        let mut last_drain_idx = None;
678        for (i, output) in self.output_queue.iter().enumerate() {
679            let stats = output
680                .consensus_commit_stats
681                .as_ref()
682                .expect("consensus_commit_stats must be set");
683            if stats.height > highest_committed_height {
684                break;
685            }
686            if output.checkpoint_queue_drained {
687                last_drain_idx = Some(i);
688            }
689        }
690        if let Some(idx) = last_drain_idx {
691            for _ in 0..=idx {
692                let output = self.output_queue.pop_front().unwrap();
693                self.remove_shared_object_next_versions(&output);
694                self.remove_processed_consensus_messages(&output);
695                self.remove_congestion_control_debts(&output);
696                self.remove_owned_object_locks(&output);
697                output.write_to_batch(epoch_store, batch)?;
698            }
699        }
700
701        self.metrics
702            .consensus_quarantine_queue_size
703            .set(self.output_queue.len() as i64);
704
705        Ok(())
706    }
707}
708
709impl ConsensusOutputQuarantine {
710    fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
711        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
712            for (object_id, next_version) in next_versions {
713                self.shared_object_next_versions
714                    .insert(*object_id, *next_version);
715            }
716        }
717    }
718
719    fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
720        let current_round = output.consensus_round;
721
722        for (object_id, debt) in output.congestion_control_object_debts.iter() {
723            self.congestion_control_object_debts.insert(
724                *object_id,
725                CongestionPerObjectDebt::new(current_round, *debt),
726            );
727        }
728
729        for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
730            self.congestion_control_randomness_object_debts.insert(
731                *object_id,
732                CongestionPerObjectDebt::new(current_round, *debt),
733            );
734        }
735    }
736
737    fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
738        for (object_id, _) in output.congestion_control_object_debts.iter() {
739            self.congestion_control_object_debts.remove(object_id);
740        }
741        for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
742            self.congestion_control_randomness_object_debts
743                .remove(object_id);
744        }
745    }
746
747    fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
748        for tx_key in output.consensus_messages_processed.iter() {
749            self.processed_consensus_messages.insert(tx_key.clone(), ());
750        }
751    }
752
753    fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
754        for tx_key in output.consensus_messages_processed.iter() {
755            self.processed_consensus_messages.remove(tx_key);
756        }
757    }
758
759    fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
760        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
761            for object_id in next_versions.keys() {
762                if !self.shared_object_next_versions.remove(object_id) {
763                    fatal!(
764                        "Shared object next version not found in quarantine: {:?}",
765                        object_id
766                    );
767                }
768            }
769        }
770    }
771
772    fn insert_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
773        for (obj_ref, lock) in &output.owned_object_locks {
774            self.owned_object_locks.insert(*obj_ref, *lock);
775        }
776    }
777
778    fn remove_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
779        for obj_ref in output.owned_object_locks.keys() {
780            self.owned_object_locks.remove(obj_ref);
781        }
782    }
783}
784
785// Read methods - all methods in this block return data from the quarantine which would otherwise
786// be found in the database.
787impl ConsensusOutputQuarantine {
788    pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
789        self.builder_checkpoint_summary
790            .values()
791            .last()
792            .map(|(summary, _)| summary)
793    }
794
795    pub(super) fn get_built_summary(
796        &self,
797        sequence: CheckpointSequenceNumber,
798    ) -> Option<&BuilderCheckpointSummary> {
799        self.builder_checkpoint_summary
800            .get(&sequence)
801            .map(|(summary, _)| summary)
802    }
803
804    pub(super) fn is_consensus_message_processed(
805        &self,
806        key: &SequencedConsensusTransactionKey,
807    ) -> bool {
808        self.processed_consensus_messages.contains_key(key)
809    }
810
811    pub(super) fn is_empty(&self) -> bool {
812        self.output_queue.is_empty()
813    }
814
815    pub(super) fn get_next_shared_object_versions(
816        &self,
817        tables: &AuthorityEpochTables,
818        objects_to_init: &[ConsensusObjectSequenceKey],
819    ) -> SuiResult<Vec<Option<SequenceNumber>>> {
820        Ok(do_fallback_lookup(
821            objects_to_init,
822            |object_key| {
823                if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
824                    CacheResult::Hit(Some(*next_version))
825                } else {
826                    CacheResult::Miss
827                }
828            },
829            |object_keys| {
830                tables
831                    .next_shared_object_versions_v2
832                    .multi_get(object_keys)
833                    .expect("db error")
834            },
835        ))
836    }
837
838    /// Gets owned object locks, checking quarantine first then falling back to DB.
839    /// After crash recovery, quarantine is empty so we naturally fall back to DB.
840    pub(super) fn get_owned_object_locks(
841        &self,
842        tables: &AuthorityEpochTables,
843        obj_refs: &[ObjectRef],
844    ) -> SuiResult<Vec<Option<LockDetails>>> {
845        Ok(do_fallback_lookup(
846            obj_refs,
847            |obj_ref| {
848                if let Some(lock) = self.owned_object_locks.get(obj_ref) {
849                    CacheResult::Hit(Some(*lock))
850                } else {
851                    CacheResult::Miss
852                }
853            },
854            |obj_refs| {
855                tables
856                    .multi_get_locked_transactions(obj_refs)
857                    .expect("db error")
858            },
859        ))
860    }
861
862    pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
863        self.output_queue
864            .back()
865            .and_then(|output| output.get_highest_pending_checkpoint_height())
866    }
867
868    pub(super) fn get_pending_checkpoints(
869        &self,
870        last: Option<CheckpointHeight>,
871    ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
872        let mut checkpoints = Vec::new();
873        for output in &self.output_queue {
874            checkpoints.extend(
875                output
876                    .get_pending_checkpoints(last)
877                    .map(|cp| (cp.height(), cp.clone())),
878            );
879        }
880        if cfg!(debug_assertions) {
881            let mut prev = None;
882            for (height, _) in &checkpoints {
883                if let Some(prev) = prev {
884                    assert!(prev < *height);
885                }
886                prev = Some(*height);
887            }
888        }
889        checkpoints
890    }
891
892    pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
893        self.output_queue
894            .iter()
895            .any(|output| output.pending_checkpoint_exists(index))
896    }
897
898    pub(super) fn get_new_jwks(
899        &self,
900        epoch_store: &AuthorityPerEpochStore,
901        round: u64,
902    ) -> SuiResult<Vec<ActiveJwk>> {
903        let epoch = epoch_store.epoch();
904
905        // Check if the requested round is in memory
906        for output in self.output_queue.iter().rev() {
907            // unwrap safe because output will always have last consensus stats set before being added
908            // to the quarantine
909            let output_round = output.get_round().unwrap();
910            if round == output_round {
911                return Ok(output
912                    .active_jwks
913                    .iter()
914                    .map(|(_, (jwk_id, jwk))| ActiveJwk {
915                        jwk_id: jwk_id.clone(),
916                        jwk: jwk.clone(),
917                        epoch,
918                    })
919                    .collect());
920            }
921        }
922
923        // Fall back to reading from database
924        let empty_jwk_id = JwkId::new(String::new(), String::new());
925        let empty_jwk = JWK {
926            kty: String::new(),
927            e: String::new(),
928            n: String::new(),
929            alg: String::new(),
930        };
931
932        let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
933        let end = (round + 1, (empty_jwk_id, empty_jwk));
934
935        Ok(epoch_store
936            .tables()?
937            .active_jwks
938            .safe_iter_with_bounds(Some(start), Some(end))
939            .map_ok(|((r, (jwk_id, jwk)), _)| {
940                debug_assert!(round == r);
941                ActiveJwk { jwk_id, jwk, epoch }
942            })
943            .collect::<Result<Vec<_>, _>>()?)
944    }
945
946    pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
947        self.output_queue
948            .iter()
949            .rev()
950            .filter_map(|output| output.get_randomness_last_round_timestamp())
951            .next()
952    }
953
954    pub(crate) fn load_initial_object_debts(
955        &self,
956        epoch_store: &AuthorityPerEpochStore,
957        current_round: Round,
958        for_randomness: bool,
959        transactions: &[VerifiedExecutableTransactionWithAliases],
960    ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
961        let protocol_config = epoch_store.protocol_config();
962        let tables = epoch_store.tables()?;
963        let default_per_commit_budget = protocol_config
964            .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
965            .unwrap_or(0);
966        let (hash_table, db_table, per_commit_budget) = if for_randomness {
967            (
968                &self.congestion_control_randomness_object_debts,
969                &tables.congestion_control_randomness_object_debts,
970                protocol_config
971                    .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
972                    .unwrap_or(default_per_commit_budget),
973            )
974        } else {
975            (
976                &self.congestion_control_object_debts,
977                &tables.congestion_control_object_debts,
978                default_per_commit_budget,
979            )
980        };
981        let mut shared_input_object_ids: Vec<_> = transactions
982            .iter()
983            .flat_map(|tx| tx.tx().shared_input_objects().map(|obj| obj.id))
984            .collect();
985        shared_input_object_ids.sort();
986        shared_input_object_ids.dedup();
987
988        let results = do_fallback_lookup(
989            &shared_input_object_ids,
990            |object_id| {
991                if let Some(debt) = hash_table.get(object_id) {
992                    CacheResult::Hit(Some(debt.into_v1()))
993                } else {
994                    CacheResult::Miss
995                }
996            },
997            |object_ids| {
998                db_table
999                    .multi_get(object_ids)
1000                    .expect("db error")
1001                    .into_iter()
1002                    .map(|debt| debt.map(|debt| debt.into_v1()))
1003                    .collect()
1004            },
1005        );
1006
1007        Ok(results
1008            .into_iter()
1009            .zip_debug_eq(shared_input_object_ids)
1010            .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1011            .map(move |((round, debt), object_id)| {
1012                // Stored debts already account for the budget of the round in which
1013                // they were accumulated. Application of budget from future rounds to
1014                // the debt is handled here.
1015                assert!(current_round > round);
1016                let num_rounds = current_round - round - 1;
1017                let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1018                (object_id, debt)
1019            }))
1020    }
1021}
1022
1023// A wrapper around HashMap that uses refcounts to keep entries alive until
1024// they are no longer needed.
1025//
1026// If there are N inserts for the same key, the key will not be removed until
1027// there are N removes.
1028//
1029// It is intended to track the *latest* value for a given key, so duplicate
1030// inserts are intended to overwrite any prior value.
1031#[derive(Debug, Default)]
1032struct RefCountedHashMap<K, V> {
1033    map: HashMap<K, (usize, V)>,
1034}
1035
1036impl<K, V> RefCountedHashMap<K, V>
1037where
1038    K: Clone + Eq + std::hash::Hash,
1039{
1040    pub fn new() -> Self {
1041        Self {
1042            map: HashMap::new(),
1043        }
1044    }
1045
1046    pub fn insert(&mut self, key: K, value: V) {
1047        let entry = self.map.entry(key);
1048        match entry {
1049            hash_map::Entry::Occupied(mut entry) => {
1050                let (ref_count, v) = entry.get_mut();
1051                *ref_count += 1;
1052                *v = value;
1053            }
1054            hash_map::Entry::Vacant(entry) => {
1055                entry.insert((1, value));
1056            }
1057        }
1058    }
1059
1060    // Returns true if the key was present, false otherwise.
1061    // Note that the key may not be removed if present, as it may have a refcount > 1.
1062    pub fn remove(&mut self, key: &K) -> bool {
1063        let entry = self.map.entry(key.clone());
1064        match entry {
1065            hash_map::Entry::Occupied(mut entry) => {
1066                let (ref_count, _) = entry.get_mut();
1067                *ref_count -= 1;
1068                if *ref_count == 0 {
1069                    entry.remove();
1070                }
1071                true
1072            }
1073            hash_map::Entry::Vacant(_) => false,
1074        }
1075    }
1076
1077    pub fn get(&self, key: &K) -> Option<&V> {
1078        self.map.get(key).map(|(_, v)| v)
1079    }
1080
1081    pub fn contains_key(&self, key: &K) -> bool {
1082        self.map.contains_key(key)
1083    }
1084}
1085
1086#[cfg(test)]
1087impl ConsensusOutputQuarantine {
1088    fn output_queue_len_for_testing(&self) -> usize {
1089        self.output_queue.len()
1090    }
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095    use super::*;
1096    use crate::authority::test_authority_builder::TestAuthorityBuilder;
1097    use sui_types::base_types::ExecutionDigests;
1098    use sui_types::gas::GasCostSummary;
1099
1100    fn make_output(height: u64, round: u64, drained: bool) -> ConsensusCommitOutput {
1101        let mut output = ConsensusCommitOutput::new(round);
1102        output.record_consensus_commit_stats(ExecutionIndicesWithStatsV2 {
1103            height,
1104            ..Default::default()
1105        });
1106        output.set_checkpoint_queue_drained(drained);
1107        output
1108    }
1109
1110    fn make_builder_summary(
1111        seq: CheckpointSequenceNumber,
1112        height: CheckpointHeight,
1113        protocol_config: &ProtocolConfig,
1114    ) -> (BuilderCheckpointSummary, CheckpointContents) {
1115        let contents =
1116            CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
1117        let summary = CheckpointSummary::new(
1118            protocol_config,
1119            0,
1120            seq,
1121            0,
1122            &contents,
1123            None,
1124            GasCostSummary::default(),
1125            None,
1126            0,
1127            vec![],
1128            vec![],
1129        );
1130        let builder_summary = BuilderCheckpointSummary {
1131            summary,
1132            checkpoint_height: Some(height),
1133            position_in_commit: 0,
1134        };
1135        (builder_summary, contents)
1136    }
1137
1138    #[tokio::test]
1139    async fn test_drain_boundary_prevents_premature_commit() {
1140        let state = TestAuthorityBuilder::new().build().await;
1141        let epoch_store = state.epoch_store_for_testing();
1142
1143        let metrics = epoch_store.metrics.clone();
1144        let mut quarantine = ConsensusOutputQuarantine::new(0, metrics);
1145
1146        // Output C: height=4, not drained
1147        let c = make_output(4, 1, false);
1148        quarantine.push_consensus_output(c, &epoch_store).unwrap();
1149
1150        // Output C2: height=5, drained
1151        let c2 = make_output(5, 2, true);
1152        quarantine.push_consensus_output(c2, &epoch_store).unwrap();
1153
1154        assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1155
1156        // Insert builder summaries for checkpoints 1-4 with checkpoint_height = seq
1157        let pc = epoch_store.protocol_config();
1158        for seq in 1..=4 {
1159            let (summary, contents) = make_builder_summary(seq, seq, pc);
1160            quarantine.insert_builder_summary(seq, summary, contents);
1161        }
1162
1163        // Certify up to checkpoint 4
1164        let mut batch = epoch_store.db_batch_for_test();
1165        quarantine
1166            .update_highest_executed_checkpoint(4, &epoch_store, &mut batch)
1167            .unwrap();
1168        batch.write().unwrap();
1169
1170        // C has height=4 which is <= 4 but checkpoint_queue_drained=false.
1171        // C2 has height=5 which is > 4, so it's skipped.
1172        // No drain boundary found => nothing drained.
1173        assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1174    }
1175
1176    #[tokio::test]
1177    async fn test_drain_boundary_commits_at_safe_point() {
1178        let state = TestAuthorityBuilder::new().build().await;
1179        let epoch_store = state.epoch_store_for_testing();
1180
1181        let metrics = epoch_store.metrics.clone();
1182        let mut quarantine = ConsensusOutputQuarantine::new(0, metrics);
1183
1184        let c = make_output(4, 1, false);
1185        quarantine.push_consensus_output(c, &epoch_store).unwrap();
1186
1187        let c2 = make_output(5, 2, true);
1188        quarantine.push_consensus_output(c2, &epoch_store).unwrap();
1189
1190        assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1191
1192        // Insert builder summaries for checkpoints 1-5 with checkpoint_height = seq
1193        let pc = epoch_store.protocol_config();
1194        for seq in 1..=5 {
1195            let (summary, contents) = make_builder_summary(seq, seq, pc);
1196            quarantine.insert_builder_summary(seq, summary, contents);
1197        }
1198
1199        // Certify up to checkpoint 5
1200        let mut batch = epoch_store.db_batch_for_test();
1201        quarantine
1202            .update_highest_executed_checkpoint(5, &epoch_store, &mut batch)
1203            .unwrap();
1204        batch.write().unwrap();
1205
1206        // C has height=4 <= 5, drained=false.
1207        // C2 has height=5 <= 5, drained=true => drain boundary at index 1.
1208        // Both outputs drained.
1209        assert_eq!(quarantine.output_queue_len_for_testing(), 0);
1210    }
1211}