sui_core/authority/
consensus_quarantine.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_per_epoch_store::{
5    AuthorityEpochTables, EncG, ExecutionIndicesWithStatsV2, LockDetails, LockDetailsWrapper, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::epoch::randomness::SINGLETON_KEY;
10use dashmap::DashMap;
11use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
12use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
13use moka::policy::EvictionPolicy;
14use moka::sync::SegmentedCache as MokaCache;
15use mysten_common::fatal;
16use mysten_common::random_util::randomize_cache_capacity_in_tests;
17use parking_lot::Mutex;
18use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
19use sui_types::authenticator_state::ActiveJwk;
20use sui_types::base_types::{AuthorityName, ObjectRef, SequenceNumber};
21use sui_types::crypto::RandomnessRound;
22use sui_types::error::SuiResult;
23use sui_types::executable_transaction::{
24    TrustedExecutableTransactionWithAliases, VerifiedExecutableTransactionWithAliases,
25};
26use sui_types::execution::ExecutionTimeObservationKey;
27use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
28use sui_types::messages_consensus::AuthorityIndex;
29use sui_types::{
30    base_types::{ConsensusObjectSequenceKey, ObjectID},
31    digests::TransactionDigest,
32    messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
33    signature::GenericSignature,
34};
35use tracing::{debug, info};
36use typed_store::Map;
37use typed_store::rocks::DBBatch;
38
39use crate::{
40    authority::{
41        authority_per_epoch_store::AuthorityPerEpochStore,
42        shared_object_congestion_tracker::CongestionPerObjectDebt,
43    },
44    checkpoints::{CheckpointHeight, PendingCheckpoint, PendingCheckpointV2},
45    consensus_handler::SequencedConsensusTransactionKey,
46    epoch::{
47        randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
48        reconfiguration::ReconfigState,
49    },
50};
51
52use super::*;
53
54#[derive(Default)]
55#[allow(clippy::type_complexity)]
56pub(crate) struct ConsensusCommitOutput {
57    // Consensus and reconfig state
58    consensus_round: Round,
59    consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
60    end_of_publish: BTreeSet<AuthorityName>,
61    reconfig_state: Option<ReconfigState>,
62    consensus_commit_stats: Option<ExecutionIndicesWithStatsV2>,
63
64    // transaction scheduling state
65    next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
66
67    deferred_txns: Vec<(DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>)>,
68    deleted_deferred_txns: BTreeSet<DeferralKey>,
69
70    // checkpoint state
71    pending_checkpoints: Vec<PendingCheckpoint>,
72    pending_checkpoints_v2: Vec<PendingCheckpointV2>,
73
74    // random beacon state
75    next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
76
77    dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
78    dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
79    dkg_used_message: Option<VersionedUsedProcessedMessages>,
80    dkg_output: Option<dkg_v1::Output<PkG, EncG>>,
81
82    // jwk state
83    pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
84    active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
85
86    // congestion control state
87    congestion_control_object_debts: Vec<(ObjectID, u64)>,
88    congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
89    execution_time_observations: Vec<(
90        AuthorityIndex,
91        u64, /* generation */
92        Vec<(ExecutionTimeObservationKey, Duration)>,
93    )>,
94
95    // Owned object locks acquired post-consensus (when disable_preconsensus_locking=true)
96    owned_object_locks: HashMap<ObjectRef, LockDetails>,
97
98    // True when the checkpoint queue had no pending roots after this commit's flush.
99    // Used by quarantine to determine safe commit boundaries on restart.
100    checkpoint_queue_drained: bool,
101}
102
103impl ConsensusCommitOutput {
104    pub fn new(consensus_round: Round) -> Self {
105        Self {
106            consensus_round,
107            ..Default::default()
108        }
109    }
110
111    pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
112        self.deleted_deferred_txns.iter().cloned()
113    }
114
115    pub fn has_deferred_transactions(&self) -> bool {
116        !self.deferred_txns.is_empty()
117    }
118
119    fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
120        self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
121    }
122
123    fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
124        self.pending_checkpoints.last().map(|cp| cp.height())
125    }
126
127    fn get_pending_checkpoints(
128        &self,
129        last: Option<CheckpointHeight>,
130    ) -> impl Iterator<Item = &PendingCheckpoint> {
131        self.pending_checkpoints.iter().filter(move |cp| {
132            if let Some(last) = last {
133                cp.height() > last
134            } else {
135                true
136            }
137        })
138    }
139
140    fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
141        self.pending_checkpoints
142            .iter()
143            .any(|cp| cp.height() == *index)
144    }
145
146    fn get_pending_checkpoints_v2(
147        &self,
148        last: Option<CheckpointHeight>,
149    ) -> impl Iterator<Item = &PendingCheckpointV2> {
150        self.pending_checkpoints_v2.iter().filter(move |cp| {
151            if let Some(last) = last {
152                cp.height() > last
153            } else {
154                true
155            }
156        })
157    }
158
159    fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
160        self.pending_checkpoints_v2
161            .iter()
162            .any(|cp| cp.height() == *index)
163    }
164
165    fn get_round(&self) -> Option<u64> {
166        self.consensus_commit_stats
167            .as_ref()
168            .map(|stats| stats.index.last_committed_round)
169    }
170
171    pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
172        self.end_of_publish.insert(authority);
173    }
174
175    pub fn insert_execution_time_observation(
176        &mut self,
177        source: AuthorityIndex,
178        generation: u64,
179        estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
180    ) {
181        self.execution_time_observations
182            .push((source, generation, estimates));
183    }
184
185    pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStatsV2) {
186        self.consensus_commit_stats = Some(stats);
187    }
188
189    // in testing code we often need to write to the db outside of a consensus commit
190    pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
191        self.record_consensus_commit_stats(Default::default());
192    }
193
194    pub fn store_reconfig_state(&mut self, state: ReconfigState) {
195        self.reconfig_state = Some(state);
196    }
197
198    pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
199        self.consensus_messages_processed.insert(key);
200    }
201
202    pub fn get_consensus_messages_processed(
203        &self,
204    ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
205        self.consensus_messages_processed.iter()
206    }
207
208    pub fn set_next_shared_object_versions(
209        &mut self,
210        next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
211    ) {
212        assert!(self.next_shared_object_versions.is_none());
213        self.next_shared_object_versions = Some(next_versions);
214    }
215
216    pub fn defer_transactions(
217        &mut self,
218        key: DeferralKey,
219        transactions: Vec<VerifiedExecutableTransactionWithAliases>,
220    ) {
221        self.deferred_txns.push((key, transactions));
222    }
223
224    pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
225        self.deleted_deferred_txns
226            .extend(deferral_keys.iter().cloned());
227    }
228
229    pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
230        self.pending_checkpoints.push(checkpoint);
231    }
232
233    pub fn insert_pending_checkpoint_v2(&mut self, checkpoint: PendingCheckpointV2) {
234        self.pending_checkpoints_v2.push(checkpoint);
235    }
236
237    pub fn reserve_next_randomness_round(
238        &mut self,
239        next_randomness_round: RandomnessRound,
240        commit_timestamp: TimestampMs,
241    ) {
242        assert!(self.next_randomness_round.is_none());
243        self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
244    }
245
246    pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
247        self.dkg_confirmations.insert(conf.sender(), conf);
248    }
249
250    pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
251        self.dkg_processed_messages
252            .insert(message.sender(), message);
253    }
254
255    pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
256        self.dkg_used_message = Some(used_messages);
257    }
258
259    pub fn set_dkg_output(&mut self, output: dkg_v1::Output<PkG, EncG>) {
260        self.dkg_output = Some(output);
261    }
262
263    pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
264        self.pending_jwks.insert((authority, id, jwk));
265    }
266
267    pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
268        self.active_jwks.insert((round, key));
269    }
270
271    pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
272        self.congestion_control_object_debts = object_debts;
273    }
274
275    pub fn set_congestion_control_randomness_object_debts(
276        &mut self,
277        object_debts: Vec<(ObjectID, u64)>,
278    ) {
279        self.congestion_control_randomness_object_debts = object_debts;
280    }
281
282    pub fn set_checkpoint_queue_drained(&mut self, drained: bool) {
283        self.checkpoint_queue_drained = drained;
284    }
285
286    pub fn set_owned_object_locks(&mut self, locks: HashMap<ObjectRef, LockDetails>) {
287        assert!(self.owned_object_locks.is_empty());
288        self.owned_object_locks = locks;
289    }
290
291    pub fn write_to_batch(
292        self,
293        epoch_store: &AuthorityPerEpochStore,
294        batch: &mut DBBatch,
295    ) -> SuiResult {
296        let tables = epoch_store.tables()?;
297        batch.insert_batch(
298            &tables.consensus_message_processed,
299            self.consensus_messages_processed
300                .iter()
301                .map(|key| (key, true)),
302        )?;
303
304        batch.insert_batch(
305            &tables.end_of_publish,
306            self.end_of_publish.iter().map(|authority| (authority, ())),
307        )?;
308
309        if let Some(reconfig_state) = &self.reconfig_state {
310            batch.insert_batch(
311                &tables.reconfig_state,
312                [(RECONFIG_STATE_INDEX, reconfig_state)],
313            )?;
314        }
315
316        let consensus_commit_stats = self
317            .consensus_commit_stats
318            .expect("consensus_commit_stats must be set");
319        let round = consensus_commit_stats.index.last_committed_round;
320
321        batch.insert_batch(
322            &tables.last_consensus_stats_v2,
323            [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
324        )?;
325
326        if let Some(next_versions) = self.next_shared_object_versions {
327            batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
328        }
329
330        if !self.owned_object_locks.is_empty() {
331            batch.insert_batch(
332                &tables.owned_object_locked_transactions,
333                self.owned_object_locks
334                    .into_iter()
335                    .map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))),
336            )?;
337        }
338
339        batch.delete_batch(
340            &tables.deferred_transactions_v2,
341            &self.deleted_deferred_txns,
342        )?;
343        batch.delete_batch(
344            &tables.deferred_transactions_with_aliases_v2,
345            &self.deleted_deferred_txns,
346        )?;
347        batch.delete_batch(
348            &tables.deferred_transactions_with_aliases_v3,
349            &self.deleted_deferred_txns,
350        )?;
351
352        batch.insert_batch(
353            &tables.deferred_transactions_with_aliases_v3,
354            self.deferred_txns.into_iter().map(|(key, txs)| {
355                (
356                    key,
357                    txs.into_iter()
358                        .map(|tx| {
359                            let tx: TrustedExecutableTransactionWithAliases = tx.serializable();
360                            tx
361                        })
362                        .collect::<Vec<_>>(),
363                )
364            }),
365        )?;
366
367        if let Some((round, commit_timestamp)) = self.next_randomness_round {
368            batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
369            batch.insert_batch(
370                &tables.randomness_last_round_timestamp,
371                [(SINGLETON_KEY, commit_timestamp)],
372            )?;
373        }
374
375        batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
376        batch.insert_batch(
377            &tables.dkg_processed_messages_v2,
378            self.dkg_processed_messages,
379        )?;
380        batch.insert_batch(
381            &tables.dkg_used_messages_v2,
382            // using Option as iter
383            self.dkg_used_message
384                .into_iter()
385                .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
386        )?;
387        if let Some(output) = self.dkg_output {
388            batch.insert_batch(&tables.dkg_output, [(SINGLETON_KEY, output)])?;
389        }
390
391        batch.insert_batch(
392            &tables.pending_jwks,
393            self.pending_jwks.into_iter().map(|j| (j, ())),
394        )?;
395        batch.insert_batch(
396            &tables.active_jwks,
397            self.active_jwks.into_iter().map(|j| {
398                // TODO: we don't need to store the round in this map if it is invariant
399                assert_eq!(j.0, round);
400                (j, ())
401            }),
402        )?;
403
404        batch.insert_batch(
405            &tables.congestion_control_object_debts,
406            self.congestion_control_object_debts
407                .into_iter()
408                .map(|(object_id, debt)| {
409                    (
410                        object_id,
411                        CongestionPerObjectDebt::new(self.consensus_round, debt),
412                    )
413                }),
414        )?;
415        batch.insert_batch(
416            &tables.congestion_control_randomness_object_debts,
417            self.congestion_control_randomness_object_debts
418                .into_iter()
419                .map(|(object_id, debt)| {
420                    (
421                        object_id,
422                        CongestionPerObjectDebt::new(self.consensus_round, debt),
423                    )
424                }),
425        )?;
426
427        batch.insert_batch(
428            &tables.execution_time_observations,
429            self.execution_time_observations
430                .into_iter()
431                .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
432        )?;
433
434        Ok(())
435    }
436}
437
438/// ConsensusOutputCache holds outputs of consensus processing that do not need to be committed to disk.
439/// Data quarantining guarantees that all of this data will be used (e.g. for building checkpoints)
440/// before the consensus commit from which it originated is marked as processed. Therefore we can rely
441/// on replay of consensus commits to recover this data.
442pub(crate) struct ConsensusOutputCache {
443    // deferred transactions is only used by consensus handler so there should never be lock contention
444    // - hence no need for a DashMap.
445    pub(crate) deferred_transactions:
446        Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>>,
447
448    // user_signatures_for_checkpoints is written to by consensus handler and read from by checkpoint builder
449    // The critical sections are small in both cases so a DashMap is probably not helpful.
450    #[allow(clippy::type_complexity)]
451    pub(crate) user_signatures_for_checkpoints:
452        Mutex<HashMap<TransactionDigest, Vec<(GenericSignature, Option<SequenceNumber>)>>>,
453
454    executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
455    executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
456}
457
458impl ConsensusOutputCache {
459    pub(crate) fn new(tables: &AuthorityEpochTables) -> Self {
460        let deferred_transactions = tables
461            .get_all_deferred_transactions_v2()
462            .expect("load deferred transactions cannot fail");
463
464        let executed_in_epoch_cache_capacity = 50_000;
465
466        Self {
467            deferred_transactions: Mutex::new(deferred_transactions),
468            user_signatures_for_checkpoints: Default::default(),
469            executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
470            executed_in_epoch_cache: MokaCache::builder(8)
471                // most queries should be for recent transactions
472                .max_capacity(randomize_cache_capacity_in_tests(
473                    executed_in_epoch_cache_capacity,
474                ))
475                .eviction_policy(EvictionPolicy::lru())
476                .build(),
477        }
478    }
479
480    pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
481        self.executed_in_epoch
482            .read()
483            .contains_key(digest) ||
484            // we use get instead of contains key to mark the entry as read
485            self.executed_in_epoch_cache.get(digest).is_some()
486    }
487
488    // Called by execution
489    pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
490        assert!(
491            self.executed_in_epoch
492                .read()
493                .insert(tx_digest, ())
494                .is_none(),
495            "transaction already executed"
496        );
497        self.executed_in_epoch_cache.insert(tx_digest, ());
498    }
499
500    // CheckpointExecutor calls this (indirectly) in order to prune the in-memory cache of executed
501    // transactions. By the time this is called, the transaction digests will have been committed to
502    // the `executed_transactions_to_checkpoint` table.
503    pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
504        let executed_in_epoch = self.executed_in_epoch.read();
505        for tx_digest in tx_digests {
506            executed_in_epoch.remove(tx_digest);
507        }
508    }
509}
510
511/// ConsensusOutputQuarantine holds outputs of consensus processing in memory until the checkpoints
512/// for the commit have been certified.
513pub(crate) struct ConsensusOutputQuarantine {
514    // Output from consensus handler
515    output_queue: VecDeque<ConsensusCommitOutput>,
516
517    // Highest known certified checkpoint sequence number
518    highest_executed_checkpoint: CheckpointSequenceNumber,
519
520    // Checkpoint Builder output
521    builder_checkpoint_summary:
522        BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
523
524    builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
525
526    // Any un-committed next versions are stored here.
527    shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
528
529    // The most recent congestion control debts for objects. Uses a ref-count to track
530    // which objects still exist in some element of output_queue.
531    congestion_control_randomness_object_debts:
532        RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
533    congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
534
535    processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
536
537    // Owned object locks acquired post-consensus.
538    owned_object_locks: HashMap<ObjectRef, LockDetails>,
539
540    metrics: Arc<EpochMetrics>,
541}
542
543impl ConsensusOutputQuarantine {
544    pub(super) fn new(
545        highest_executed_checkpoint: CheckpointSequenceNumber,
546        authority_metrics: Arc<EpochMetrics>,
547    ) -> Self {
548        Self {
549            highest_executed_checkpoint,
550
551            output_queue: VecDeque::new(),
552            builder_checkpoint_summary: BTreeMap::new(),
553            builder_digest_to_checkpoint: HashMap::new(),
554            shared_object_next_versions: RefCountedHashMap::new(),
555            processed_consensus_messages: RefCountedHashMap::new(),
556            congestion_control_randomness_object_debts: RefCountedHashMap::new(),
557            congestion_control_object_debts: RefCountedHashMap::new(),
558            owned_object_locks: HashMap::new(),
559            metrics: authority_metrics,
560        }
561    }
562}
563
564// Write methods - all methods in this block insert new data into the quarantine.
565// There are only two sources! ConsensusHandler and CheckpointBuilder.
566impl ConsensusOutputQuarantine {
567    // Push all data gathered from a consensus commit into the quarantine.
568    pub(crate) fn push_consensus_output(
569        &mut self,
570        output: ConsensusCommitOutput,
571        epoch_store: &AuthorityPerEpochStore,
572    ) -> SuiResult {
573        self.insert_shared_object_next_versions(&output);
574        self.insert_congestion_control_debts(&output);
575        self.insert_processed_consensus_messages(&output);
576        self.insert_owned_object_locks(&output);
577        self.output_queue.push_back(output);
578
579        self.metrics
580            .consensus_quarantine_queue_size
581            .set(self.output_queue.len() as i64);
582
583        // we may already have observed the certified checkpoint for this round, if state sync is running
584        // ahead of consensus, so there may be data to commit right away.
585        self.commit(epoch_store)
586    }
587
588    // Record a newly built checkpoint.
589    pub(super) fn insert_builder_summary(
590        &mut self,
591        sequence_number: CheckpointSequenceNumber,
592        summary: BuilderCheckpointSummary,
593        contents: CheckpointContents,
594    ) {
595        debug!(?sequence_number, "inserting builder summary {:?}", summary);
596        for tx in contents.iter() {
597            self.builder_digest_to_checkpoint
598                .insert(tx.transaction, sequence_number);
599        }
600        self.builder_checkpoint_summary
601            .insert(sequence_number, (summary, contents));
602    }
603}
604
605// Commit methods.
606impl ConsensusOutputQuarantine {
607    /// Update the highest executed checkpoint and commit any data which is now
608    /// below the watermark.
609    pub(super) fn update_highest_executed_checkpoint(
610        &mut self,
611        checkpoint: CheckpointSequenceNumber,
612        epoch_store: &AuthorityPerEpochStore,
613        batch: &mut DBBatch,
614    ) -> SuiResult {
615        self.highest_executed_checkpoint = checkpoint;
616        self.commit_with_batch(epoch_store, batch)
617    }
618
619    pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
620        let mut batch = epoch_store.db_batch()?;
621        self.commit_with_batch(epoch_store, &mut batch)?;
622        batch.write()?;
623        Ok(())
624    }
625
626    /// Commit all data below the watermark.
627    fn commit_with_batch(
628        &mut self,
629        epoch_store: &AuthorityPerEpochStore,
630        batch: &mut DBBatch,
631    ) -> SuiResult {
632        // The commit algorithm is simple:
633        // 1. First commit all checkpoint builder state which is below the watermark.
634        // 2. Determine the consensus commit height that corresponds to the highest committed
635        //    checkpoint.
636        // 3. Commit all consensus output at that height or below.
637
638        let tables = epoch_store.tables()?;
639
640        let mut highest_committed_height = None;
641
642        while self
643            .builder_checkpoint_summary
644            .first_key_value()
645            .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
646            == Some(true)
647        {
648            let (seq, (builder_summary, contents)) =
649                self.builder_checkpoint_summary.pop_first().unwrap();
650
651            for tx in contents.iter() {
652                let digest = &tx.transaction;
653                assert_eq!(
654                    self.builder_digest_to_checkpoint
655                        .remove(digest)
656                        .unwrap_or_else(|| {
657                            panic!(
658                                "transaction {:?} not found in builder_digest_to_checkpoint",
659                                digest
660                            )
661                        }),
662                    seq
663                );
664            }
665
666            batch.insert_batch(
667                &tables.builder_digest_to_checkpoint,
668                contents.iter().map(|tx| (tx.transaction, seq)),
669            )?;
670
671            batch.insert_batch(
672                &tables.builder_checkpoint_summary_v2,
673                [(seq, &builder_summary)],
674            )?;
675
676            let checkpoint_height = builder_summary
677                .checkpoint_height
678                .expect("non-genesis checkpoint must have height");
679            if let Some(highest) = highest_committed_height {
680                assert!(
681                    checkpoint_height >= highest,
682                    "current checkpoint height {} must be no less than highest committed height {}",
683                    checkpoint_height,
684                    highest
685                );
686            }
687
688            highest_committed_height = Some(checkpoint_height);
689        }
690
691        let Some(highest_committed_height) = highest_committed_height else {
692            return Ok(());
693        };
694
695        let split_checkpoints_in_consensus_handler = epoch_store
696            .protocol_config()
697            .split_checkpoints_in_consensus_handler();
698
699        if split_checkpoints_in_consensus_handler {
700            // V2: only commit outputs up to the last one where the checkpoint queue
701            // was fully drained (no pending roots). If the queue is empty after an
702            // output, there are no roots that could be lost on restart. Any outputs
703            // after the last drain point stay in the quarantine and get full-replayed
704            // on restart with correct root reconstruction.
705            let mut last_drain_idx = None;
706            for (i, output) in self.output_queue.iter().enumerate() {
707                let stats = output
708                    .consensus_commit_stats
709                    .as_ref()
710                    .expect("consensus_commit_stats must be set");
711                if stats.height > highest_committed_height {
712                    break;
713                }
714                if output.checkpoint_queue_drained {
715                    last_drain_idx = Some(i);
716                }
717            }
718            if let Some(idx) = last_drain_idx {
719                for _ in 0..=idx {
720                    let output = self.output_queue.pop_front().unwrap();
721                    info!("committing drain-boundary output");
722                    self.remove_shared_object_next_versions(&output);
723                    self.remove_processed_consensus_messages(&output);
724                    self.remove_congestion_control_debts(&output);
725                    self.remove_owned_object_locks(&output);
726                    output.write_to_batch(epoch_store, batch)?;
727                }
728            }
729        } else {
730            while !self.output_queue.is_empty() {
731                let output = self.output_queue.front().unwrap();
732                let Some(highest_in_commit) = output.get_highest_pending_checkpoint_height() else {
733                    break;
734                };
735
736                if highest_in_commit <= highest_committed_height {
737                    info!(
738                        "committing output with highest pending checkpoint height {:?}",
739                        highest_in_commit
740                    );
741                    let output = self.output_queue.pop_front().unwrap();
742                    self.remove_shared_object_next_versions(&output);
743                    self.remove_processed_consensus_messages(&output);
744                    self.remove_congestion_control_debts(&output);
745                    self.remove_owned_object_locks(&output);
746                    output.write_to_batch(epoch_store, batch)?;
747                } else {
748                    break;
749                }
750            }
751        }
752
753        self.metrics
754            .consensus_quarantine_queue_size
755            .set(self.output_queue.len() as i64);
756
757        Ok(())
758    }
759}
760
761impl ConsensusOutputQuarantine {
762    fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
763        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
764            for (object_id, next_version) in next_versions {
765                self.shared_object_next_versions
766                    .insert(*object_id, *next_version);
767            }
768        }
769    }
770
771    fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
772        let current_round = output.consensus_round;
773
774        for (object_id, debt) in output.congestion_control_object_debts.iter() {
775            self.congestion_control_object_debts.insert(
776                *object_id,
777                CongestionPerObjectDebt::new(current_round, *debt),
778            );
779        }
780
781        for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
782            self.congestion_control_randomness_object_debts.insert(
783                *object_id,
784                CongestionPerObjectDebt::new(current_round, *debt),
785            );
786        }
787    }
788
789    fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
790        for (object_id, _) in output.congestion_control_object_debts.iter() {
791            self.congestion_control_object_debts.remove(object_id);
792        }
793        for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
794            self.congestion_control_randomness_object_debts
795                .remove(object_id);
796        }
797    }
798
799    fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
800        for tx_key in output.consensus_messages_processed.iter() {
801            self.processed_consensus_messages.insert(tx_key.clone(), ());
802        }
803    }
804
805    fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
806        for tx_key in output.consensus_messages_processed.iter() {
807            self.processed_consensus_messages.remove(tx_key);
808        }
809    }
810
811    fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
812        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
813            for object_id in next_versions.keys() {
814                if !self.shared_object_next_versions.remove(object_id) {
815                    fatal!(
816                        "Shared object next version not found in quarantine: {:?}",
817                        object_id
818                    );
819                }
820            }
821        }
822    }
823
824    fn insert_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
825        for (obj_ref, lock) in &output.owned_object_locks {
826            self.owned_object_locks.insert(*obj_ref, *lock);
827        }
828    }
829
830    fn remove_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
831        for obj_ref in output.owned_object_locks.keys() {
832            self.owned_object_locks.remove(obj_ref);
833        }
834    }
835}
836
837// Read methods - all methods in this block return data from the quarantine which would otherwise
838// be found in the database.
839impl ConsensusOutputQuarantine {
840    pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
841        self.builder_checkpoint_summary
842            .values()
843            .last()
844            .map(|(summary, _)| summary)
845    }
846
847    pub(super) fn get_built_summary(
848        &self,
849        sequence: CheckpointSequenceNumber,
850    ) -> Option<&BuilderCheckpointSummary> {
851        self.builder_checkpoint_summary
852            .get(&sequence)
853            .map(|(summary, _)| summary)
854    }
855
856    pub(super) fn included_transaction_in_checkpoint(&self, digest: &TransactionDigest) -> bool {
857        self.builder_digest_to_checkpoint.contains_key(digest)
858    }
859
860    pub(super) fn is_consensus_message_processed(
861        &self,
862        key: &SequencedConsensusTransactionKey,
863    ) -> bool {
864        self.processed_consensus_messages.contains_key(key)
865    }
866
867    pub(super) fn is_empty(&self) -> bool {
868        self.output_queue.is_empty()
869    }
870
871    pub(super) fn get_next_shared_object_versions(
872        &self,
873        tables: &AuthorityEpochTables,
874        objects_to_init: &[ConsensusObjectSequenceKey],
875    ) -> SuiResult<Vec<Option<SequenceNumber>>> {
876        Ok(do_fallback_lookup(
877            objects_to_init,
878            |object_key| {
879                if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
880                    CacheResult::Hit(Some(*next_version))
881                } else {
882                    CacheResult::Miss
883                }
884            },
885            |object_keys| {
886                tables
887                    .next_shared_object_versions_v2
888                    .multi_get(object_keys)
889                    .expect("db error")
890            },
891        ))
892    }
893
894    /// Gets owned object locks, checking quarantine first then falling back to DB.
895    /// Used for post-consensus conflict detection when preconsensus locking is disabled.
896    /// After crash recovery, quarantine is empty so we naturally fall back to DB.
897    pub(super) fn get_owned_object_locks(
898        &self,
899        tables: &AuthorityEpochTables,
900        obj_refs: &[ObjectRef],
901    ) -> SuiResult<Vec<Option<LockDetails>>> {
902        Ok(do_fallback_lookup(
903            obj_refs,
904            |obj_ref| {
905                if let Some(lock) = self.owned_object_locks.get(obj_ref) {
906                    CacheResult::Hit(Some(*lock))
907                } else {
908                    CacheResult::Miss
909                }
910            },
911            |obj_refs| {
912                tables
913                    .multi_get_locked_transactions(obj_refs)
914                    .expect("db error")
915            },
916        ))
917    }
918
919    pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
920        self.output_queue
921            .back()
922            .and_then(|output| output.get_highest_pending_checkpoint_height())
923    }
924
925    pub(super) fn get_pending_checkpoints(
926        &self,
927        last: Option<CheckpointHeight>,
928    ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
929        let mut checkpoints = Vec::new();
930        for output in &self.output_queue {
931            checkpoints.extend(
932                output
933                    .get_pending_checkpoints(last)
934                    .map(|cp| (cp.height(), cp.clone())),
935            );
936        }
937        if cfg!(debug_assertions) {
938            let mut prev = None;
939            for (height, _) in &checkpoints {
940                if let Some(prev) = prev {
941                    assert!(prev < *height);
942                }
943                prev = Some(*height);
944            }
945        }
946        checkpoints
947    }
948
949    pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
950        self.output_queue
951            .iter()
952            .any(|output| output.pending_checkpoint_exists(index))
953    }
954
955    pub(super) fn get_pending_checkpoints_v2(
956        &self,
957        last: Option<CheckpointHeight>,
958    ) -> Vec<(CheckpointHeight, PendingCheckpointV2)> {
959        let mut checkpoints = Vec::new();
960        for output in &self.output_queue {
961            checkpoints.extend(
962                output
963                    .get_pending_checkpoints_v2(last)
964                    .map(|cp| (cp.height(), cp.clone())),
965            );
966        }
967        if cfg!(debug_assertions) {
968            let mut prev = None;
969            for (height, _) in &checkpoints {
970                if let Some(prev) = prev {
971                    assert!(prev < *height);
972                }
973                prev = Some(*height);
974            }
975        }
976        checkpoints
977    }
978
979    pub(super) fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
980        self.output_queue
981            .iter()
982            .any(|output| output.pending_checkpoint_exists_v2(index))
983    }
984
985    pub(super) fn get_new_jwks(
986        &self,
987        epoch_store: &AuthorityPerEpochStore,
988        round: u64,
989    ) -> SuiResult<Vec<ActiveJwk>> {
990        let epoch = epoch_store.epoch();
991
992        // Check if the requested round is in memory
993        for output in self.output_queue.iter().rev() {
994            // unwrap safe because output will always have last consensus stats set before being added
995            // to the quarantine
996            let output_round = output.get_round().unwrap();
997            if round == output_round {
998                return Ok(output
999                    .active_jwks
1000                    .iter()
1001                    .map(|(_, (jwk_id, jwk))| ActiveJwk {
1002                        jwk_id: jwk_id.clone(),
1003                        jwk: jwk.clone(),
1004                        epoch,
1005                    })
1006                    .collect());
1007            }
1008        }
1009
1010        // Fall back to reading from database
1011        let empty_jwk_id = JwkId::new(String::new(), String::new());
1012        let empty_jwk = JWK {
1013            kty: String::new(),
1014            e: String::new(),
1015            n: String::new(),
1016            alg: String::new(),
1017        };
1018
1019        let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
1020        let end = (round + 1, (empty_jwk_id, empty_jwk));
1021
1022        Ok(epoch_store
1023            .tables()?
1024            .active_jwks
1025            .safe_iter_with_bounds(Some(start), Some(end))
1026            .map_ok(|((r, (jwk_id, jwk)), _)| {
1027                debug_assert!(round == r);
1028                ActiveJwk { jwk_id, jwk, epoch }
1029            })
1030            .collect::<Result<Vec<_>, _>>()?)
1031    }
1032
1033    pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
1034        self.output_queue
1035            .iter()
1036            .rev()
1037            .filter_map(|output| output.get_randomness_last_round_timestamp())
1038            .next()
1039    }
1040
1041    pub(crate) fn load_initial_object_debts(
1042        &self,
1043        epoch_store: &AuthorityPerEpochStore,
1044        current_round: Round,
1045        for_randomness: bool,
1046        transactions: &[VerifiedExecutableTransactionWithAliases],
1047    ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
1048        let protocol_config = epoch_store.protocol_config();
1049        let tables = epoch_store.tables()?;
1050        let default_per_commit_budget = protocol_config
1051            .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
1052            .unwrap_or(0);
1053        let (hash_table, db_table, per_commit_budget) = if for_randomness {
1054            (
1055                &self.congestion_control_randomness_object_debts,
1056                &tables.congestion_control_randomness_object_debts,
1057                protocol_config
1058                    .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
1059                    .unwrap_or(default_per_commit_budget),
1060            )
1061        } else {
1062            (
1063                &self.congestion_control_object_debts,
1064                &tables.congestion_control_object_debts,
1065                default_per_commit_budget,
1066            )
1067        };
1068        let mut shared_input_object_ids: Vec<_> = transactions
1069            .iter()
1070            .flat_map(|tx| tx.tx().shared_input_objects().map(|obj| obj.id))
1071            .collect();
1072        shared_input_object_ids.sort();
1073        shared_input_object_ids.dedup();
1074
1075        let results = do_fallback_lookup(
1076            &shared_input_object_ids,
1077            |object_id| {
1078                if let Some(debt) = hash_table.get(object_id) {
1079                    CacheResult::Hit(Some(debt.into_v1()))
1080                } else {
1081                    CacheResult::Miss
1082                }
1083            },
1084            |object_ids| {
1085                db_table
1086                    .multi_get(object_ids)
1087                    .expect("db error")
1088                    .into_iter()
1089                    .map(|debt| debt.map(|debt| debt.into_v1()))
1090                    .collect()
1091            },
1092        );
1093
1094        Ok(results
1095            .into_iter()
1096            .zip(shared_input_object_ids)
1097            .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1098            .map(move |((round, debt), object_id)| {
1099                // Stored debts already account for the budget of the round in which
1100                // they were accumulated. Application of budget from future rounds to
1101                // the debt is handled here.
1102                assert!(current_round > round);
1103                let num_rounds = current_round - round - 1;
1104                let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1105                (object_id, debt)
1106            }))
1107    }
1108}
1109
1110// A wrapper around HashMap that uses refcounts to keep entries alive until
1111// they are no longer needed.
1112//
1113// If there are N inserts for the same key, the key will not be removed until
1114// there are N removes.
1115//
1116// It is intended to track the *latest* value for a given key, so duplicate
1117// inserts are intended to overwrite any prior value.
1118#[derive(Debug, Default)]
1119struct RefCountedHashMap<K, V> {
1120    map: HashMap<K, (usize, V)>,
1121}
1122
1123impl<K, V> RefCountedHashMap<K, V>
1124where
1125    K: Clone + Eq + std::hash::Hash,
1126{
1127    pub fn new() -> Self {
1128        Self {
1129            map: HashMap::new(),
1130        }
1131    }
1132
1133    pub fn insert(&mut self, key: K, value: V) {
1134        let entry = self.map.entry(key);
1135        match entry {
1136            hash_map::Entry::Occupied(mut entry) => {
1137                let (ref_count, v) = entry.get_mut();
1138                *ref_count += 1;
1139                *v = value;
1140            }
1141            hash_map::Entry::Vacant(entry) => {
1142                entry.insert((1, value));
1143            }
1144        }
1145    }
1146
1147    // Returns true if the key was present, false otherwise.
1148    // Note that the key may not be removed if present, as it may have a refcount > 1.
1149    pub fn remove(&mut self, key: &K) -> bool {
1150        let entry = self.map.entry(key.clone());
1151        match entry {
1152            hash_map::Entry::Occupied(mut entry) => {
1153                let (ref_count, _) = entry.get_mut();
1154                *ref_count -= 1;
1155                if *ref_count == 0 {
1156                    entry.remove();
1157                }
1158                true
1159            }
1160            hash_map::Entry::Vacant(_) => false,
1161        }
1162    }
1163
1164    pub fn get(&self, key: &K) -> Option<&V> {
1165        self.map.get(key).map(|(_, v)| v)
1166    }
1167
1168    pub fn contains_key(&self, key: &K) -> bool {
1169        self.map.contains_key(key)
1170    }
1171}
1172
1173#[cfg(test)]
1174impl ConsensusOutputQuarantine {
1175    fn output_queue_len_for_testing(&self) -> usize {
1176        self.output_queue.len()
1177    }
1178}
1179
1180#[cfg(test)]
1181mod tests {
1182    use super::*;
1183    use crate::authority::test_authority_builder::TestAuthorityBuilder;
1184    use sui_types::base_types::ExecutionDigests;
1185    use sui_types::gas::GasCostSummary;
1186
1187    fn make_output(height: u64, round: u64, drained: bool) -> ConsensusCommitOutput {
1188        let mut output = ConsensusCommitOutput::new(round);
1189        output.record_consensus_commit_stats(ExecutionIndicesWithStatsV2 {
1190            height,
1191            ..Default::default()
1192        });
1193        output.set_checkpoint_queue_drained(drained);
1194        output
1195    }
1196
1197    fn make_builder_summary(
1198        seq: CheckpointSequenceNumber,
1199        height: CheckpointHeight,
1200        protocol_config: &ProtocolConfig,
1201    ) -> (BuilderCheckpointSummary, CheckpointContents) {
1202        let contents =
1203            CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]);
1204        let summary = CheckpointSummary::new(
1205            protocol_config,
1206            0,
1207            seq,
1208            0,
1209            &contents,
1210            None,
1211            GasCostSummary::default(),
1212            None,
1213            0,
1214            vec![],
1215            vec![],
1216        );
1217        let builder_summary = BuilderCheckpointSummary {
1218            summary,
1219            checkpoint_height: Some(height),
1220            position_in_commit: 0,
1221        };
1222        (builder_summary, contents)
1223    }
1224
1225    #[tokio::test]
1226    async fn test_drain_boundary_prevents_premature_commit() {
1227        let mut protocol_config =
1228            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1229        protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(true);
1230        let state = TestAuthorityBuilder::new()
1231            .with_protocol_config(protocol_config)
1232            .build()
1233            .await;
1234        let epoch_store = state.epoch_store_for_testing();
1235
1236        let metrics = epoch_store.metrics.clone();
1237        let mut quarantine = ConsensusOutputQuarantine::new(0, metrics);
1238
1239        // Output C: height=4, not drained
1240        let c = make_output(4, 1, false);
1241        quarantine.push_consensus_output(c, &epoch_store).unwrap();
1242
1243        // Output C2: height=5, drained
1244        let c2 = make_output(5, 2, true);
1245        quarantine.push_consensus_output(c2, &epoch_store).unwrap();
1246
1247        assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1248
1249        // Insert builder summaries for checkpoints 1-4 with checkpoint_height = seq
1250        let pc = epoch_store.protocol_config();
1251        for seq in 1..=4 {
1252            let (summary, contents) = make_builder_summary(seq, seq, pc);
1253            quarantine.insert_builder_summary(seq, summary, contents);
1254        }
1255
1256        // Certify up to checkpoint 4
1257        let mut batch = epoch_store.db_batch_for_test();
1258        quarantine
1259            .update_highest_executed_checkpoint(4, &epoch_store, &mut batch)
1260            .unwrap();
1261        batch.write().unwrap();
1262
1263        // C has height=4 which is <= 4 but checkpoint_queue_drained=false.
1264        // C2 has height=5 which is > 4, so it's skipped.
1265        // No drain boundary found => nothing drained.
1266        assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1267    }
1268
1269    #[tokio::test]
1270    async fn test_drain_boundary_commits_at_safe_point() {
1271        let mut protocol_config =
1272            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
1273        protocol_config.set_split_checkpoints_in_consensus_handler_for_testing(true);
1274        let state = TestAuthorityBuilder::new()
1275            .with_protocol_config(protocol_config)
1276            .build()
1277            .await;
1278        let epoch_store = state.epoch_store_for_testing();
1279
1280        let metrics = epoch_store.metrics.clone();
1281        let mut quarantine = ConsensusOutputQuarantine::new(0, metrics);
1282
1283        let c = make_output(4, 1, false);
1284        quarantine.push_consensus_output(c, &epoch_store).unwrap();
1285
1286        let c2 = make_output(5, 2, true);
1287        quarantine.push_consensus_output(c2, &epoch_store).unwrap();
1288
1289        assert_eq!(quarantine.output_queue_len_for_testing(), 2);
1290
1291        // Insert builder summaries for checkpoints 1-5 with checkpoint_height = seq
1292        let pc = epoch_store.protocol_config();
1293        for seq in 1..=5 {
1294            let (summary, contents) = make_builder_summary(seq, seq, pc);
1295            quarantine.insert_builder_summary(seq, summary, contents);
1296        }
1297
1298        // Certify up to checkpoint 5
1299        let mut batch = epoch_store.db_batch_for_test();
1300        quarantine
1301            .update_highest_executed_checkpoint(5, &epoch_store, &mut batch)
1302            .unwrap();
1303        batch.write().unwrap();
1304
1305        // C has height=4 <= 5, drained=false.
1306        // C2 has height=5 <= 5, drained=true => drain boundary at index 1.
1307        // Both outputs drained.
1308        assert_eq!(quarantine.output_queue_len_for_testing(), 0);
1309    }
1310}