sui_core/authority/
consensus_quarantine.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_per_epoch_store::{
5    AuthorityEpochTables, EncG, ExecutionIndicesWithStatsV2, LockDetails, LockDetailsWrapper, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::epoch::randomness::SINGLETON_KEY;
10use dashmap::DashMap;
11use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
12use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
13use moka::policy::EvictionPolicy;
14use moka::sync::SegmentedCache as MokaCache;
15use mysten_common::ZipDebugEqIteratorExt;
16use mysten_common::fatal;
17use mysten_common::random_util::randomize_cache_capacity_in_tests;
18use parking_lot::Mutex;
19use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
20use sui_types::authenticator_state::ActiveJwk;
21use sui_types::base_types::{AuthorityName, ObjectRef, SequenceNumber};
22use sui_types::crypto::RandomnessRound;
23use sui_types::error::SuiResult;
24use sui_types::executable_transaction::{
25    TrustedExecutableTransactionWithAliases, VerifiedExecutableTransactionWithAliases,
26};
27use sui_types::execution::ExecutionTimeObservationKey;
28use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
29use sui_types::messages_consensus::AuthorityIndex;
30use sui_types::{
31    base_types::{ConsensusObjectSequenceKey, ObjectID},
32    digests::TransactionDigest,
33    messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
34    signature::GenericSignature,
35};
36use tracing::{debug, info};
37use typed_store::Map;
38use typed_store::rocks::DBBatch;
39
40use crate::{
41    authority::{
42        authority_per_epoch_store::AuthorityPerEpochStore,
43        shared_object_congestion_tracker::CongestionPerObjectDebt,
44    },
45    checkpoints::{CheckpointHeight, PendingCheckpoint, PendingCheckpointV2},
46    consensus_handler::SequencedConsensusTransactionKey,
47    epoch::{
48        randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
49        reconfiguration::ReconfigState,
50    },
51};
52
53use super::*;
54
55#[derive(Default)]
56#[allow(clippy::type_complexity)]
57pub(crate) struct ConsensusCommitOutput {
58    // Consensus and reconfig state
59    consensus_round: Round,
60    consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
61    end_of_publish: BTreeSet<AuthorityName>,
62    reconfig_state: Option<ReconfigState>,
63    consensus_commit_stats: Option<ExecutionIndicesWithStatsV2>,
64
65    // transaction scheduling state
66    next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
67
68    deferred_txns: Vec<(DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>)>,
69    deleted_deferred_txns: BTreeSet<DeferralKey>,
70
71    // checkpoint state
72    pending_checkpoints: Vec<PendingCheckpoint>,
73    pending_checkpoints_v2: Vec<PendingCheckpointV2>,
74
75    // random beacon state
76    next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
77
78    dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
79    dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
80    dkg_used_message: Option<VersionedUsedProcessedMessages>,
81    dkg_output: Option<dkg_v1::Output<PkG, EncG>>,
82
83    // jwk state
84    pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
85    active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
86
87    // congestion control state
88    congestion_control_object_debts: Vec<(ObjectID, u64)>,
89    congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
90    execution_time_observations: Vec<(
91        AuthorityIndex,
92        u64, /* generation */
93        Vec<(ExecutionTimeObservationKey, Duration)>,
94    )>,
95
96    // Owned object locks acquired post-consensus (when disable_preconsensus_locking=true)
97    owned_object_locks: HashMap<ObjectRef, LockDetails>,
98
99    // True when the checkpoint queue had no pending roots after this commit's flush.
100    // Used by quarantine to determine safe commit boundaries on restart.
101    checkpoint_queue_drained: bool,
102}
103
104impl ConsensusCommitOutput {
105    pub fn new(consensus_round: Round) -> Self {
106        Self {
107            consensus_round,
108            ..Default::default()
109        }
110    }
111
112    pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
113        self.deleted_deferred_txns.iter().cloned()
114    }
115
116    pub fn has_deferred_transactions(&self) -> bool {
117        !self.deferred_txns.is_empty()
118    }
119
120    fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
121        self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
122    }
123
124    fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
125        self.pending_checkpoints.last().map(|cp| cp.height())
126    }
127
128    fn get_pending_checkpoints(
129        &self,
130        last: Option<CheckpointHeight>,
131    ) -> impl Iterator<Item = &PendingCheckpoint> {
132        self.pending_checkpoints.iter().filter(move |cp| {
133            if let Some(last) = last {
134                cp.height() > last
135            } else {
136                true
137            }
138        })
139    }
140
141    fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
142        self.pending_checkpoints
143            .iter()
144            .any(|cp| cp.height() == *index)
145    }
146
147    fn get_pending_checkpoints_v2(
148        &self,
149        last: Option<CheckpointHeight>,
150    ) -> impl Iterator<Item = &PendingCheckpointV2> {
151        self.pending_checkpoints_v2.iter().filter(move |cp| {
152            if let Some(last) = last {
153                cp.height() > last
154            } else {
155                true
156            }
157        })
158    }
159
160    fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
161        self.pending_checkpoints_v2
162            .iter()
163            .any(|cp| cp.height() == *index)
164    }
165
166    fn get_round(&self) -> Option<u64> {
167        self.consensus_commit_stats
168            .as_ref()
169            .map(|stats| stats.index.last_committed_round)
170    }
171
172    pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
173        self.end_of_publish.insert(authority);
174    }
175
176    pub fn insert_execution_time_observation(
177        &mut self,
178        source: AuthorityIndex,
179        generation: u64,
180        estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
181    ) {
182        self.execution_time_observations
183            .push((source, generation, estimates));
184    }
185
186    pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStatsV2) {
187        self.consensus_commit_stats = Some(stats);
188    }
189
190    // in testing code we often need to write to the db outside of a consensus commit
191    pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
192        self.record_consensus_commit_stats(Default::default());
193    }
194
195    pub fn store_reconfig_state(&mut self, state: ReconfigState) {
196        self.reconfig_state = Some(state);
197    }
198
199    pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
200        self.consensus_messages_processed.insert(key);
201    }
202
203    pub fn get_consensus_messages_processed(
204        &self,
205    ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
206        self.consensus_messages_processed.iter()
207    }
208
209    pub fn set_next_shared_object_versions(
210        &mut self,
211        next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
212    ) {
213        assert!(self.next_shared_object_versions.is_none());
214        self.next_shared_object_versions = Some(next_versions);
215    }
216
217    pub fn defer_transactions(
218        &mut self,
219        key: DeferralKey,
220        transactions: Vec<VerifiedExecutableTransactionWithAliases>,
221    ) {
222        self.deferred_txns.push((key, transactions));
223    }
224
225    pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
226        self.deleted_deferred_txns
227            .extend(deferral_keys.iter().cloned());
228    }
229
230    pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
231        self.pending_checkpoints.push(checkpoint);
232    }
233
234    pub fn insert_pending_checkpoint_v2(&mut self, checkpoint: PendingCheckpointV2) {
235        self.pending_checkpoints_v2.push(checkpoint);
236    }
237
238    pub fn reserve_next_randomness_round(
239        &mut self,
240        next_randomness_round: RandomnessRound,
241        commit_timestamp: TimestampMs,
242    ) {
243        assert!(self.next_randomness_round.is_none());
244        self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
245    }
246
247    pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
248        self.dkg_confirmations.insert(conf.sender(), conf);
249    }
250
251    pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
252        self.dkg_processed_messages
253            .insert(message.sender(), message);
254    }
255
256    pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
257        self.dkg_used_message = Some(used_messages);
258    }
259
260    pub fn set_dkg_output(&mut self, output: dkg_v1::Output<PkG, EncG>) {
261        self.dkg_output = Some(output);
262    }
263
264    pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
265        self.pending_jwks.insert((authority, id, jwk));
266    }
267
268    pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
269        self.active_jwks.insert((round, key));
270    }
271
272    pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
273        self.congestion_control_object_debts = object_debts;
274    }
275
276    pub fn set_congestion_control_randomness_object_debts(
277        &mut self,
278        object_debts: Vec<(ObjectID, u64)>,
279    ) {
280        self.congestion_control_randomness_object_debts = object_debts;
281    }
282
283    pub fn set_checkpoint_queue_drained(&mut self, drained: bool) {
284        self.checkpoint_queue_drained = drained;
285    }
286
287    pub fn set_owned_object_locks(&mut self, locks: HashMap<ObjectRef, LockDetails>) {
288        assert!(self.owned_object_locks.is_empty());
289        self.owned_object_locks = locks;
290    }
291
292    pub fn write_to_batch(
293        self,
294        epoch_store: &AuthorityPerEpochStore,
295        batch: &mut DBBatch,
296    ) -> SuiResult {
297        let tables = epoch_store.tables()?;
298        batch.insert_batch(
299            &tables.consensus_message_processed,
300            self.consensus_messages_processed
301                .iter()
302                .map(|key| (key, true)),
303        )?;
304
305        batch.insert_batch(
306            &tables.end_of_publish,
307            self.end_of_publish.iter().map(|authority| (authority, ())),
308        )?;
309
310        if let Some(reconfig_state) = &self.reconfig_state {
311            batch.insert_batch(
312                &tables.reconfig_state,
313                [(RECONFIG_STATE_INDEX, reconfig_state)],
314            )?;
315        }
316
317        let consensus_commit_stats = self
318            .consensus_commit_stats
319            .expect("consensus_commit_stats must be set");
320        let round = consensus_commit_stats.index.last_committed_round;
321
322        batch.insert_batch(
323            &tables.last_consensus_stats_v2,
324            [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
325        )?;
326
327        if let Some(next_versions) = self.next_shared_object_versions {
328            batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
329        }
330
331        if !self.owned_object_locks.is_empty() {
332            batch.insert_batch(
333                &tables.owned_object_locked_transactions,
334                self.owned_object_locks
335                    .into_iter()
336                    .map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))),
337            )?;
338        }
339
340        batch.delete_batch(
341            &tables.deferred_transactions_v2,
342            &self.deleted_deferred_txns,
343        )?;
344        batch.delete_batch(
345            &tables.deferred_transactions_with_aliases_v2,
346            &self.deleted_deferred_txns,
347        )?;
348        batch.delete_batch(
349            &tables.deferred_transactions_with_aliases_v3,
350            &self.deleted_deferred_txns,
351        )?;
352
353        batch.insert_batch(
354            &tables.deferred_transactions_with_aliases_v3,
355            self.deferred_txns.into_iter().map(|(key, txs)| {
356                (
357                    key,
358                    txs.into_iter()
359                        .map(|tx| {
360                            let tx: TrustedExecutableTransactionWithAliases = tx.serializable();
361                            tx
362                        })
363                        .collect::<Vec<_>>(),
364                )
365            }),
366        )?;
367
368        if let Some((round, commit_timestamp)) = self.next_randomness_round {
369            batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
370            batch.insert_batch(
371                &tables.randomness_last_round_timestamp,
372                [(SINGLETON_KEY, commit_timestamp)],
373            )?;
374        }
375
376        batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
377        batch.insert_batch(
378            &tables.dkg_processed_messages_v2,
379            self.dkg_processed_messages,
380        )?;
381        batch.insert_batch(
382            &tables.dkg_used_messages_v2,
383            // using Option as iter
384            self.dkg_used_message
385                .into_iter()
386                .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
387        )?;
388        if let Some(output) = self.dkg_output {
389            batch.insert_batch(&tables.dkg_output, [(SINGLETON_KEY, output)])?;
390        }
391
392        batch.insert_batch(
393            &tables.pending_jwks,
394            self.pending_jwks.into_iter().map(|j| (j, ())),
395        )?;
396        batch.insert_batch(
397            &tables.active_jwks,
398            self.active_jwks.into_iter().map(|j| {
399                // TODO: we don't need to store the round in this map if it is invariant
400                assert_eq!(j.0, round);
401                (j, ())
402            }),
403        )?;
404
405        batch.insert_batch(
406            &tables.congestion_control_object_debts,
407            self.congestion_control_object_debts
408                .into_iter()
409                .map(|(object_id, debt)| {
410                    (
411                        object_id,
412                        CongestionPerObjectDebt::new(self.consensus_round, debt),
413                    )
414                }),
415        )?;
416        batch.insert_batch(
417            &tables.congestion_control_randomness_object_debts,
418            self.congestion_control_randomness_object_debts
419                .into_iter()
420                .map(|(object_id, debt)| {
421                    (
422                        object_id,
423                        CongestionPerObjectDebt::new(self.consensus_round, debt),
424                    )
425                }),
426        )?;
427
428        batch.insert_batch(
429            &tables.execution_time_observations,
430            self.execution_time_observations
431                .into_iter()
432                .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
433        )?;
434
435        Ok(())
436    }
437}
438
439/// ConsensusOutputCache holds outputs of consensus processing that do not need to be committed to disk.
440/// Data quarantining guarantees that all of this data will be used (e.g. for building checkpoints)
441/// before the consensus commit from which it originated is marked as processed. Therefore we can rely
442/// on replay of consensus commits to recover this data.
443pub(crate) struct ConsensusOutputCache {
444    // deferred transactions is only used by consensus handler so there should never be lock contention
445    // - hence no need for a DashMap.
446    pub(crate) deferred_transactions:
447        Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>>,
448
449    // user_signatures_for_checkpoints is written to by consensus handler and read from by checkpoint builder
450    // The critical sections are small in both cases so a DashMap is probably not helpful.
451    #[allow(clippy::type_complexity)]
452    pub(crate) user_signatures_for_checkpoints:
453        Mutex<HashMap<TransactionDigest, Vec<(GenericSignature, Option<SequenceNumber>)>>>,
454
455    executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
456    executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
457}
458
459impl ConsensusOutputCache {
460    pub(crate) fn new(tables: &AuthorityEpochTables) -> Self {
461        let deferred_transactions = tables
462            .get_all_deferred_transactions_v2()
463            .expect("load deferred transactions cannot fail");
464
465        let executed_in_epoch_cache_capacity = 50_000;
466
467        Self {
468            deferred_transactions: Mutex::new(deferred_transactions),
469            user_signatures_for_checkpoints: Default::default(),
470            executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
471            executed_in_epoch_cache: MokaCache::builder(8)
472                // most queries should be for recent transactions
473                .max_capacity(randomize_cache_capacity_in_tests(
474                    executed_in_epoch_cache_capacity,
475                ))
476                .eviction_policy(EvictionPolicy::lru())
477                .build(),
478        }
479    }
480
481    pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
482        self.executed_in_epoch
483            .read()
484            .contains_key(digest) ||
485            // we use get instead of contains key to mark the entry as read
486            self.executed_in_epoch_cache.get(digest).is_some()
487    }
488
489    // Called by execution
490    pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
491        assert!(
492            self.executed_in_epoch
493                .read()
494                .insert(tx_digest, ())
495                .is_none(),
496            "transaction already executed"
497        );
498        self.executed_in_epoch_cache.insert(tx_digest, ());
499    }
500
501    // CheckpointExecutor calls this (indirectly) in order to prune the in-memory cache of executed
502    // transactions. By the time this is called, the transaction digests will have been committed to
503    // the `executed_transactions_to_checkpoint` table.
504    pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
505        let executed_in_epoch = self.executed_in_epoch.read();
506        for tx_digest in tx_digests {
507            executed_in_epoch.remove(tx_digest);
508        }
509    }
510}
511
512/// ConsensusOutputQuarantine holds outputs of consensus processing in memory until the checkpoints
513/// for the commit have been certified.
514pub(crate) struct ConsensusOutputQuarantine {
515    // Output from consensus handler
516    output_queue: VecDeque<ConsensusCommitOutput>,
517
518    // Highest known certified checkpoint sequence number
519    highest_executed_checkpoint: CheckpointSequenceNumber,
520
521    // Checkpoint Builder output
522    builder_checkpoint_summary:
523        BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
524
525    builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
526
527    // Any un-committed next versions are stored here.
528    shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
529
530    // The most recent congestion control debts for objects. Uses a ref-count to track
531    // which objects still exist in some element of output_queue.
532    congestion_control_randomness_object_debts:
533        RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
534    congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
535
536    processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
537
538    // Owned object locks acquired post-consensus.
539    owned_object_locks: HashMap<ObjectRef, LockDetails>,
540
541    metrics: Arc<EpochMetrics>,
542}
543
544impl ConsensusOutputQuarantine {
545    pub(super) fn new(
546        highest_executed_checkpoint: CheckpointSequenceNumber,
547        authority_metrics: Arc<EpochMetrics>,
548    ) -> Self {
549        Self {
550            highest_executed_checkpoint,
551
552            output_queue: VecDeque::new(),
553            builder_checkpoint_summary: BTreeMap::new(),
554            builder_digest_to_checkpoint: HashMap::new(),
555            shared_object_next_versions: RefCountedHashMap::new(),
556            processed_consensus_messages: RefCountedHashMap::new(),
557            congestion_control_randomness_object_debts: RefCountedHashMap::new(),
558            congestion_control_object_debts: RefCountedHashMap::new(),
559            owned_object_locks: HashMap::new(),
560            metrics: authority_metrics,
561        }
562    }
563}
564
565// Write methods - all methods in this block insert new data into the quarantine.
566// There are only two sources! ConsensusHandler and CheckpointBuilder.
567impl ConsensusOutputQuarantine {
568    // Push all data gathered from a consensus commit into the quarantine.
569    pub(crate) fn push_consensus_output(
570        &mut self,
571        output: ConsensusCommitOutput,
572        epoch_store: &AuthorityPerEpochStore,
573    ) -> SuiResult {
574        self.insert_shared_object_next_versions(&output);
575        self.insert_congestion_control_debts(&output);
576        self.insert_processed_consensus_messages(&output);
577        self.insert_owned_object_locks(&output);
578        self.output_queue.push_back(output);
579
580        self.metrics
581            .consensus_quarantine_queue_size
582            .set(self.output_queue.len() as i64);
583
584        // we may already have observed the certified checkpoint for this round, if state sync is running
585        // ahead of consensus, so there may be data to commit right away.
586        self.commit(epoch_store)
587    }
588
589    // Record a newly built checkpoint.
590    pub(super) fn insert_builder_summary(
591        &mut self,
592        sequence_number: CheckpointSequenceNumber,
593        summary: BuilderCheckpointSummary,
594        contents: CheckpointContents,
595    ) {
596        debug!(?sequence_number, "inserting builder summary {:?}", summary);
597        for tx in contents.iter() {
598            self.builder_digest_to_checkpoint
599                .insert(tx.transaction, sequence_number);
600        }
601        self.builder_checkpoint_summary
602            .insert(sequence_number, (summary, contents));
603    }
604}
605
606// Commit methods.
607impl ConsensusOutputQuarantine {
608    /// Update the highest executed checkpoint and commit any data which is now
609    /// below the watermark.
610    pub(super) fn update_highest_executed_checkpoint(
611        &mut self,
612        checkpoint: CheckpointSequenceNumber,
613        epoch_store: &AuthorityPerEpochStore,
614        batch: &mut DBBatch,
615    ) -> SuiResult {
616        self.highest_executed_checkpoint = checkpoint;
617        self.commit_with_batch(epoch_store, batch)
618    }
619
620    pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
621        let mut batch = epoch_store.db_batch()?;
622        self.commit_with_batch(epoch_store, &mut batch)?;
623        batch.write()?;
624        Ok(())
625    }
626
627    /// Commit all data below the watermark.
628    fn commit_with_batch(
629        &mut self,
630        epoch_store: &AuthorityPerEpochStore,
631        batch: &mut DBBatch,
632    ) -> SuiResult {
633        // The commit algorithm is simple:
634        // 1. First commit all checkpoint builder state which is below the watermark.
635        // 2. Determine the consensus commit height that corresponds to the highest committed
636        //    checkpoint.
637        // 3. Commit all consensus output at that height or below.
638
639        let tables = epoch_store.tables()?;
640
641        let mut highest_committed_height = None;
642
643        while self
644            .builder_checkpoint_summary
645            .first_key_value()
646            .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
647            == Some(true)
648        {
649            let (seq, (builder_summary, contents)) =
650                self.builder_checkpoint_summary.pop_first().unwrap();
651
652            for tx in contents.iter() {
653                let digest = &tx.transaction;
654                assert_eq!(
655                    self.builder_digest_to_checkpoint
656                        .remove(digest)
657                        .unwrap_or_else(|| {
658                            panic!(
659                                "transaction {:?} not found in builder_digest_to_checkpoint",
660                                digest
661                            )
662                        }),
663                    seq
664                );
665            }
666
667            batch.insert_batch(
668                &tables.builder_digest_to_checkpoint,
669                contents.iter().map(|tx| (tx.transaction, seq)),
670            )?;
671
672            batch.insert_batch(
673                &tables.builder_checkpoint_summary_v2,
674                [(seq, &builder_summary)],
675            )?;
676
677            let checkpoint_height = builder_summary
678                .checkpoint_height
679                .expect("non-genesis checkpoint must have height");
680            if let Some(highest) = highest_committed_height {
681                assert!(
682                    checkpoint_height >= highest,
683                    "current checkpoint height {} must be no less than highest committed height {}",
684                    checkpoint_height,
685                    highest
686                );
687            }
688
689            highest_committed_height = Some(checkpoint_height);
690        }
691
692        let Some(highest_committed_height) = highest_committed_height else {
693            return Ok(());
694        };
695
696        let split_checkpoints_in_consensus_handler = epoch_store
697            .protocol_config()
698            .split_checkpoints_in_consensus_handler();
699
700        if split_checkpoints_in_consensus_handler {
701            // V2: only commit outputs up to the last one where the checkpoint queue
702            // was fully drained (no pending roots). If the queue is empty after an
703            // output, there are no roots that could be lost on restart. Any outputs
704            // after the last drain point stay in the quarantine and get full-replayed
705            // on restart with correct root reconstruction.
706            let mut last_drain_idx = None;
707            for (i, output) in self.output_queue.iter().enumerate() {
708                let stats = output
709                    .consensus_commit_stats
710                    .as_ref()
711                    .expect("consensus_commit_stats must be set");
712                if stats.height > highest_committed_height {
713                    break;
714                }
715                if output.checkpoint_queue_drained {
716                    last_drain_idx = Some(i);
717                }
718            }
719            if let Some(idx) = last_drain_idx {
720                for _ in 0..=idx {
721                    let output = self.output_queue.pop_front().unwrap();
722                    info!("committing drain-boundary output");
723                    self.remove_shared_object_next_versions(&output);
724                    self.remove_processed_consensus_messages(&output);
725                    self.remove_congestion_control_debts(&output);
726                    self.remove_owned_object_locks(&output);
727                    output.write_to_batch(epoch_store, batch)?;
728                }
729            }
730        } else {
731            while !self.output_queue.is_empty() {
732                let output = self.output_queue.front().unwrap();
733                let Some(highest_in_commit) = output.get_highest_pending_checkpoint_height() else {
734                    break;
735                };
736
737                if highest_in_commit <= highest_committed_height {
738                    info!(
739                        "committing output with highest pending checkpoint height {:?}",
740                        highest_in_commit
741                    );
742                    let output = self.output_queue.pop_front().unwrap();
743                    self.remove_shared_object_next_versions(&output);
744                    self.remove_processed_consensus_messages(&output);
745                    self.remove_congestion_control_debts(&output);
746                    self.remove_owned_object_locks(&output);
747                    output.write_to_batch(epoch_store, batch)?;
748                } else {
749                    break;
750                }
751            }
752        }
753
754        self.metrics
755            .consensus_quarantine_queue_size
756            .set(self.output_queue.len() as i64);
757
758        Ok(())
759    }
760}
761
762impl ConsensusOutputQuarantine {
763    fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
764        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
765            for (object_id, next_version) in next_versions {
766                self.shared_object_next_versions
767                    .insert(*object_id, *next_version);
768            }
769        }
770    }
771
772    fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
773        let current_round = output.consensus_round;
774
775        for (object_id, debt) in output.congestion_control_object_debts.iter() {
776            self.congestion_control_object_debts.insert(
777                *object_id,
778                CongestionPerObjectDebt::new(current_round, *debt),
779            );
780        }
781
782        for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
783            self.congestion_control_randomness_object_debts.insert(
784                *object_id,
785                CongestionPerObjectDebt::new(current_round, *debt),
786            );
787        }
788    }
789
790    fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
791        for (object_id, _) in output.congestion_control_object_debts.iter() {
792            self.congestion_control_object_debts.remove(object_id);
793        }
794        for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
795            self.congestion_control_randomness_object_debts
796                .remove(object_id);
797        }
798    }
799
800    fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
801        for tx_key in output.consensus_messages_processed.iter() {
802            self.processed_consensus_messages.insert(tx_key.clone(), ());
803        }
804    }
805
806    fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
807        for tx_key in output.consensus_messages_processed.iter() {
808            self.processed_consensus_messages.remove(tx_key);
809        }
810    }
811
812    fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
813        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
814            for object_id in next_versions.keys() {
815                if !self.shared_object_next_versions.remove(object_id) {
816                    fatal!(
817                        "Shared object next version not found in quarantine: {:?}",
818                        object_id
819                    );
820                }
821            }
822        }
823    }
824
825    fn insert_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
826        for (obj_ref, lock) in &output.owned_object_locks {
827            self.owned_object_locks.insert(*obj_ref, *lock);
828        }
829    }
830
831    fn remove_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
832        for obj_ref in output.owned_object_locks.keys() {
833            self.owned_object_locks.remove(obj_ref);
834        }
835    }
836}
837
838// Read methods - all methods in this block return data from the quarantine which would otherwise
839// be found in the database.
840impl ConsensusOutputQuarantine {
841    pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
842        self.builder_checkpoint_summary
843            .values()
844            .last()
845            .map(|(summary, _)| summary)
846    }
847
848    pub(super) fn get_built_summary(
849        &self,
850        sequence: CheckpointSequenceNumber,
851    ) -> Option<&BuilderCheckpointSummary> {
852        self.builder_checkpoint_summary
853            .get(&sequence)
854            .map(|(summary, _)| summary)
855    }
856
857    pub(super) fn included_transaction_in_checkpoint(&self, digest: &TransactionDigest) -> bool {
858        self.builder_digest_to_checkpoint.contains_key(digest)
859    }
860
861    pub(super) fn is_consensus_message_processed(
862        &self,
863        key: &SequencedConsensusTransactionKey,
864    ) -> bool {
865        self.processed_consensus_messages.contains_key(key)
866    }
867
868    pub(super) fn is_empty(&self) -> bool {
869        self.output_queue.is_empty()
870    }
871
872    pub(super) fn get_next_shared_object_versions(
873        &self,
874        tables: &AuthorityEpochTables,
875        objects_to_init: &[ConsensusObjectSequenceKey],
876    ) -> SuiResult<Vec<Option<SequenceNumber>>> {
877        Ok(do_fallback_lookup(
878            objects_to_init,
879            |object_key| {
880                if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
881                    CacheResult::Hit(Some(*next_version))
882                } else {
883                    CacheResult::Miss
884                }
885            },
886            |object_keys| {
887                tables
888                    .next_shared_object_versions_v2
889                    .multi_get(object_keys)
890                    .expect("db error")
891            },
892        ))
893    }
894
895    /// Gets owned object locks, checking quarantine first then falling back to DB.
896    /// Used for post-consensus conflict detection when preconsensus locking is disabled.
897    /// After crash recovery, quarantine is empty so we naturally fall back to DB.
898    pub(super) fn get_owned_object_locks(
899        &self,
900        tables: &AuthorityEpochTables,
901        obj_refs: &[ObjectRef],
902    ) -> SuiResult<Vec<Option<LockDetails>>> {
903        Ok(do_fallback_lookup(
904            obj_refs,
905            |obj_ref| {
906                if let Some(lock) = self.owned_object_locks.get(obj_ref) {
907                    CacheResult::Hit(Some(*lock))
908                } else {
909                    CacheResult::Miss
910                }
911            },
912            |obj_refs| {
913                tables
914                    .multi_get_locked_transactions(obj_refs)
915                    .expect("db error")
916            },
917        ))
918    }
919
920    pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
921        self.output_queue
922            .back()
923            .and_then(|output| output.get_highest_pending_checkpoint_height())
924    }
925
926    pub(super) fn get_pending_checkpoints(
927        &self,
928        last: Option<CheckpointHeight>,
929    ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
930        let mut checkpoints = Vec::new();
931        for output in &self.output_queue {
932            checkpoints.extend(
933                output
934                    .get_pending_checkpoints(last)
935                    .map(|cp| (cp.height(), cp.clone())),
936            );
937        }
938        if cfg!(debug_assertions) {
939            let mut prev = None;
940            for (height, _) in &checkpoints {
941                if let Some(prev) = prev {
942                    assert!(prev < *height);
943                }
944                prev = Some(*height);
945            }
946        }
947        checkpoints
948    }
949
950    pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
951        self.output_queue
952            .iter()
953            .any(|output| output.pending_checkpoint_exists(index))
954    }
955
956    pub(super) fn get_pending_checkpoints_v2(
957        &self,
958        last: Option<CheckpointHeight>,
959    ) -> Vec<(CheckpointHeight, PendingCheckpointV2)> {
960        let mut checkpoints = Vec::new();
961        for output in &self.output_queue {
962            checkpoints.extend(
963                output
964                    .get_pending_checkpoints_v2(last)
965                    .map(|cp| (cp.height(), cp.clone())),
966            );
967        }
968        if cfg!(debug_assertions) {
969            let mut prev = None;
970            for (height, _) in &checkpoints {
971                if let Some(prev) = prev {
972                    assert!(prev < *height);
973                }
974                prev = Some(*height);
975            }
976        }
977        checkpoints
978    }
979
980    pub(super) fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
981        self.output_queue
982            .iter()
983            .any(|output| output.pending_checkpoint_exists_v2(index))
984    }
985
986    pub(super) fn get_new_jwks(
987        &self,
988        epoch_store: &AuthorityPerEpochStore,
989        round: u64,
990    ) -> SuiResult<Vec<ActiveJwk>> {
991        let epoch = epoch_store.epoch();
992
993        // Check if the requested round is in memory
994        for output in self.output_queue.iter().rev() {
995            // unwrap safe because output will always have last consensus stats set before being added
996            // to the quarantine
997            let output_round = output.get_round().unwrap();
998            if round == output_round {
999                return Ok(output
1000                    .active_jwks
1001                    .iter()
1002                    .map(|(_, (jwk_id, jwk))| ActiveJwk {
1003                        jwk_id: jwk_id.clone(),
1004                        jwk: jwk.clone(),
1005                        epoch,
1006                    })
1007                    .collect());
1008            }
1009        }
1010
1011        // Fall back to reading from database
1012        let empty_jwk_id = JwkId::new(String::new(), String::new());
1013        let empty_jwk = JWK {
1014            kty: String::new(),
1015            e: String::new(),
1016            n: String::new(),
1017            alg: String::new(),
1018        };
1019
1020        let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
1021        let end = (round + 1, (empty_jwk_id, empty_jwk));
1022
1023        Ok(epoch_store
1024            .tables()?
1025            .active_jwks
1026            .safe_iter_with_bounds(Some(start), Some(end))
1027            .map_ok(|((r, (jwk_id, jwk)), _)| {
1028                debug_assert!(round == r);
1029                ActiveJwk { jwk_id, jwk, epoch }
1030            })
1031            .collect::<Result<Vec<_>, _>>()?)
1032    }
1033
1034    pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
1035        self.output_queue
1036            .iter()
1037            .rev()
1038            .filter_map(|output| output.get_randomness_last_round_timestamp())
1039            .next()
1040    }
1041
1042    pub(crate) fn load_initial_object_debts(
1043        &self,
1044        epoch_store: &AuthorityPerEpochStore,
1045        current_round: Round,
1046        for_randomness: bool,
1047        transactions: &[VerifiedExecutableTransactionWithAliases],
1048    ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
1049        let protocol_config = epoch_store.protocol_config();
1050        let tables = epoch_store.tables()?;
1051        let default_per_commit_budget = protocol_config
1052            .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
1053            .unwrap_or(0);
1054        let (hash_table, db_table, per_commit_budget) = if for_randomness {
1055            (
1056                &self.congestion_control_randomness_object_debts,
1057                &tables.congestion_control_randomness_object_debts,
1058                protocol_config
1059                    .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
1060                    .unwrap_or(default_per_commit_budget),
1061            )
1062        } else {
1063            (
1064                &self.congestion_control_object_debts,
1065                &tables.congestion_control_object_debts,
1066                default_per_commit_budget,
1067            )
1068        };
1069        let mut shared_input_object_ids: Vec<_> = transactions
1070            .iter()
1071            .flat_map(|tx| tx.tx().shared_input_objects().map(|obj| obj.id))
1072            .collect();
1073        shared_input_object_ids.sort();
1074        shared_input_object_ids.dedup();
1075
1076        let results = do_fallback_lookup(
1077            &shared_input_object_ids,
1078            |object_id| {
1079                if let Some(debt) = hash_table.get(object_id) {
1080                    CacheResult::Hit(Some(debt.into_v1()))
1081                } else {
1082                    CacheResult::Miss
1083                }
1084            },
1085            |object_ids| {
1086                db_table
1087                    .multi_get(object_ids)
1088                    .expect("db error")
1089                    .into_iter()
1090                    .map(|debt| debt.map(|debt| debt.into_v1()))
1091                    .collect()
1092            },
1093        );
1094
1095        Ok(results
1096            .into_iter()
1097            .zip_debug_eq(shared_input_object_ids)
1098            .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1099            .map(move |((round, debt), object_id)| {
1100                // Stored debts already account for the budget of the round in which
1101                // they were accumulated. Application of budget from future rounds to
1102                // the debt is handled here.
1103                assert!(current_round > round);
1104                let num_rounds = current_round - round - 1;
1105                let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1106                (object_id, debt)
1107            }))
1108    }
1109}
1110
1111// A wrapper around HashMap that uses refcounts to keep entries alive until
1112// they are no longer needed.
1113//
1114// If there are N inserts for the same key, the key will not be removed until
1115// there are N removes.
1116//
1117// It is intended to track the *latest* value for a given key, so duplicate
1118// inserts are intended to overwrite any prior value.
1119#[derive(Debug, Default)]
1120struct RefCountedHashMap<K, V> {
1121    map: HashMap<K, (usize, V)>,
1122}
1123
1124impl<K, V> RefCountedHashMap<K, V>
1125where
1126    K: Clone + Eq + std::hash::Hash,
1127{
1128    pub fn new() -> Self {
1129        Self {
1130            map: HashMap::new(),
1131        }
1132    }
1133
1134    pub fn insert(&mut self, key: K, value: V) {
1135        let entry = self.map.entry(key);
1136        match entry {
1137            hash_map::Entry::Occupied(mut entry) => {
1138                let (ref_count, v) = entry.get_mut();
1139                *ref_count += 1;
1140                *v = value;
1141            }
1142            hash_map::Entry::Vacant(entry) => {
1143                entry.insert((1, value));
1144            }
1145        }
1146    }
1147
1148    // Returns true if the key was present, false otherwise.
1149    // Note that the key may not be removed if present, as it may have a refcount > 1.
1150    pub fn remove(&mut self, key: &K) -> bool {
1151        let entry = self.map.entry(key.clone());
1152        match entry {
1153            hash_map::Entry::Occupied(mut entry) => {
1154                let (ref_count, _) = entry.get_mut();
1155                *ref_count -= 1;
1156                if *ref_count == 0 {
1157                    entry.remove();
1158                }
1159                true
1160            }
1161            hash_map::Entry::Vacant(_) => false,
1162        }
1163    }
1164
1165    pub fn get(&self, key: &K) -> Option<&V> {
1166        self.map.get(key).map(|(_, v)| v)
1167    }
1168
1169    pub fn contains_key(&self, key: &K) -> bool {
1170        self.map.contains_key(key)
1171    }
1172}
1173
1174#[cfg(test)]
1175impl ConsensusOutputQuarantine {
1176    fn output_queue_len_for_testing(&self) -> usize {
1177        self.output_queue.len()
1178    }
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183    use super::*;
1184    use crate::authority::test_authority_builder::TestAuthorityBuilder;
1185    use sui_types::base_types::ExecutionDigests;
1186    use sui_types::gas::GasCostSummary;
1187
1188    fn make_output(height: u64, round: u64, drained: bool) -> ConsensusCommitOutput {
1189        let mut output = ConsensusCommitOutput::new(round);
1190        output.record_consensus_commit_stats(ExecutionIndicesWithStatsV2 {
1191            height,
1192            ..Default::default()
1193        });
1194        output.set_checkpoint_queue_drained(drained);
1195        output
1196    }
1197
1198    fn make_builder_summary(
1199        seq: CheckpointSequenceNumber,
1200        height: CheckpointHeight,
1201        protocol_config: &ProtocolConfig,
1202    ) -> (BuilderCheckpointSummary, CheckpointContents) {
1203        let contents =
1204            CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
1205        let summary = CheckpointSummary::new(
1206            protocol_config,
1207            0,
1208            seq,
1209            0,
1210            &contents,
1211            None,
1212            GasCostSummary::default(),
1213            None,
1214            0,
1215            vec![],
1216            vec![],
1217        );
1218        let builder_summary = BuilderCheckpointSummary {
1219            summary,
1220            checkpoint_height: Some(height),
1221            position_in_commit: 0,
1222        };
1223        (builder_summary, contents)
1224    }
1225
1226    #[tokio::test]
1227    async fn test_drain_boundary_prevents_premature_commit() {
1228        let mut protocol_config =
1229            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1230        protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(true);
1231        let state = TestAuthorityBuilder::new()
1232            .with_protocol_config(protocol_config)
1233            .build()
1234            .await;
1235        let epoch_store = state.epoch_store_for_testing();
1236
1237        let metrics = epoch_store.metrics.clone();
1238        let mut quarantine = ConsensusOutputQuarantine::new(0, metrics);
1239
1240        // Output C: height=4, not drained
1241        let c = make_output(4, 1, false);
1242        quarantine.push_consensus_output(c, &epoch_store).unwrap();
1243
1244        // Output C2: height=5, drained
1245        let c2 = make_output(5, 2, true);
1246        quarantine.push_consensus_output(c2, &epoch_store).unwrap();
1247
1248        assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1249
1250        // Insert builder summaries for checkpoints 1-4 with checkpoint_height = seq
1251        let pc = epoch_store.protocol_config();
1252        for seq in 1..=4 {
1253            let (summary, contents) = make_builder_summary(seq, seq, pc);
1254            quarantine.insert_builder_summary(seq, summary, contents);
1255        }
1256
1257        // Certify up to checkpoint 4
1258        let mut batch = epoch_store.db_batch_for_test();
1259        quarantine
1260            .update_highest_executed_checkpoint(4, &epoch_store, &mut batch)
1261            .unwrap();
1262        batch.write().unwrap();
1263
1264        // C has height=4 which is <= 4 but checkpoint_queue_drained=false.
1265        // C2 has height=5 which is > 4, so it's skipped.
1266        // No drain boundary found => nothing drained.
1267        assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1268    }
1269
1270    #[tokio::test]
1271    async fn test_drain_boundary_commits_at_safe_point() {
1272        let mut protocol_config =
1273            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1274        protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(true);
1275        let state = TestAuthorityBuilder::new()
1276            .with_protocol_config(protocol_config)
1277            .build()
1278            .await;
1279        let epoch_store = state.epoch_store_for_testing();
1280
1281        let metrics = epoch_store.metrics.clone();
1282        let mut quarantine = ConsensusOutputQuarantine::new(0, metrics);
1283
1284        let c = make_output(4, 1, false);
1285        quarantine.push_consensus_output(c, &epoch_store).unwrap();
1286
1287        let c2 = make_output(5, 2, true);
1288        quarantine.push_consensus_output(c2, &epoch_store).unwrap();
1289
1290        assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1291
1292        // Insert builder summaries for checkpoints 1-5 with checkpoint_height = seq
1293        let pc = epoch_store.protocol_config();
1294        for seq in 1..=5 {
1295            let (summary, contents) = make_builder_summary(seq, seq, pc);
1296            quarantine.insert_builder_summary(seq, summary, contents);
1297        }
1298
1299        // Certify up to checkpoint 5
1300        let mut batch = epoch_store.db_batch_for_test();
1301        quarantine
1302            .update_highest_executed_checkpoint(5, &epoch_store, &mut batch)
1303            .unwrap();
1304        batch.write().unwrap();
1305
1306        // C has height=4 <= 5, drained=false.
1307        // C2 has height=5 <= 5, drained=true => drain boundary at index 1.
1308        // Both outputs drained.
1309        assert_eq!(quarantine.output_queue_len_for_testing(), 0);
1310    }
1311}