sui_core/authority/
consensus_quarantine.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_per_epoch_store::{
5    AuthorityEpochTables, EncG, ExecutionIndicesWithStats, LockDetails, LockDetailsWrapper, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::epoch::randomness::SINGLETON_KEY;
10use dashmap::DashMap;
11use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
12use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
13use moka::policy::EvictionPolicy;
14use moka::sync::SegmentedCache as MokaCache;
15use mysten_common::fatal;
16use mysten_common::random_util::randomize_cache_capacity_in_tests;
17use parking_lot::Mutex;
18use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
19use sui_types::authenticator_state::ActiveJwk;
20use sui_types::base_types::{AuthorityName, ObjectRef, SequenceNumber};
21use sui_types::crypto::RandomnessRound;
22use sui_types::error::SuiResult;
23use sui_types::executable_transaction::{
24    TrustedExecutableTransactionWithAliases, VerifiedExecutableTransactionWithAliases,
25};
26use sui_types::execution::ExecutionTimeObservationKey;
27use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
28use sui_types::messages_consensus::AuthorityIndex;
29use sui_types::{
30    base_types::{ConsensusObjectSequenceKey, ObjectID},
31    digests::TransactionDigest,
32    messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
33    signature::GenericSignature,
34};
35use tracing::{debug, info};
36use typed_store::Map;
37use typed_store::rocks::DBBatch;
38
39use crate::{
40    authority::{
41        authority_per_epoch_store::AuthorityPerEpochStore,
42        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
43        shared_object_congestion_tracker::CongestionPerObjectDebt,
44    },
45    checkpoints::{CheckpointHeight, PendingCheckpoint},
46    consensus_handler::{SequencedConsensusTransactionKey, VerifiedSequencedConsensusTransaction},
47    epoch::{
48        randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
49        reconfiguration::ReconfigState,
50    },
51};
52
53use super::*;
54
55#[derive(Default)]
56#[allow(clippy::type_complexity)]
57pub(crate) struct ConsensusCommitOutput {
58    // Consensus and reconfig state
59    consensus_round: Round,
60    consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
61    end_of_publish: BTreeSet<AuthorityName>,
62    reconfig_state: Option<ReconfigState>,
63    consensus_commit_stats: Option<ExecutionIndicesWithStats>,
64
65    // transaction scheduling state
66    next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
67
68    // TODO: If we delay committing consensus output until after all deferrals have been loaded,
69    // we can move deferred_txns to the ConsensusOutputCache and save disk bandwidth.
70    deferred_txns: Vec<(DeferralKey, Vec<VerifiedSequencedConsensusTransaction>)>,
71    // TODO(commit-handler-rewrite): remove the original once we no longer need to support the old consensus handler
72    deferred_txns_v2: Vec<(DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>)>,
73    // deferred txns that have been loaded and can be removed
74    deleted_deferred_txns: BTreeSet<DeferralKey>,
75
76    // checkpoint state
77    pending_checkpoints: Vec<PendingCheckpoint>,
78
79    // random beacon state
80    next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
81
82    dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
83    dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
84    dkg_used_message: Option<VersionedUsedProcessedMessages>,
85    dkg_output: Option<dkg_v1::Output<PkG, EncG>>,
86
87    // jwk state
88    pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
89    active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
90
91    // congestion control state
92    congestion_control_object_debts: Vec<(ObjectID, u64)>,
93    congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
94    execution_time_observations: Vec<(
95        AuthorityIndex,
96        u64, /* generation */
97        Vec<(ExecutionTimeObservationKey, Duration)>,
98    )>,
99
100    // Owned object locks acquired post-consensus (when disable_preconsensus_locking=true)
101    owned_object_locks: HashMap<ObjectRef, LockDetails>,
102}
103
104impl ConsensusCommitOutput {
105    pub fn new(consensus_round: Round) -> Self {
106        Self {
107            consensus_round,
108            ..Default::default()
109        }
110    }
111
112    pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
113        self.deleted_deferred_txns.iter().cloned()
114    }
115
116    pub fn has_deferred_transactions(&self) -> bool {
117        !self.deferred_txns.is_empty() || !self.deferred_txns_v2.is_empty()
118    }
119
120    fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
121        self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
122    }
123
124    fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
125        self.pending_checkpoints.last().map(|cp| cp.height())
126    }
127
128    fn get_pending_checkpoints(
129        &self,
130        last: Option<CheckpointHeight>,
131    ) -> impl Iterator<Item = &PendingCheckpoint> {
132        self.pending_checkpoints.iter().filter(move |cp| {
133            if let Some(last) = last {
134                cp.height() > last
135            } else {
136                true
137            }
138        })
139    }
140
141    fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
142        self.pending_checkpoints
143            .iter()
144            .any(|cp| cp.height() == *index)
145    }
146
147    fn get_round(&self) -> Option<u64> {
148        self.consensus_commit_stats
149            .as_ref()
150            .map(|stats| stats.index.last_committed_round)
151    }
152
153    pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
154        self.end_of_publish.insert(authority);
155    }
156
157    pub fn insert_execution_time_observation(
158        &mut self,
159        source: AuthorityIndex,
160        generation: u64,
161        estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
162    ) {
163        self.execution_time_observations
164            .push((source, generation, estimates));
165    }
166
167    pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStats) {
168        self.consensus_commit_stats = Some(stats);
169    }
170
171    // in testing code we often need to write to the db outside of a consensus commit
172    pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
173        self.record_consensus_commit_stats(Default::default());
174    }
175
176    pub fn store_reconfig_state(&mut self, state: ReconfigState) {
177        self.reconfig_state = Some(state);
178    }
179
180    pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
181        self.consensus_messages_processed.insert(key);
182    }
183
184    pub fn get_consensus_messages_processed(
185        &self,
186    ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
187        self.consensus_messages_processed.iter()
188    }
189
190    pub fn set_next_shared_object_versions(
191        &mut self,
192        next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
193    ) {
194        assert!(self.next_shared_object_versions.is_none());
195        self.next_shared_object_versions = Some(next_versions);
196    }
197
198    pub fn defer_transactions(
199        &mut self,
200        key: DeferralKey,
201        transactions: Vec<VerifiedExecutableTransactionWithAliases>,
202    ) {
203        self.deferred_txns_v2.push((key, transactions));
204    }
205
206    pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
207        self.deleted_deferred_txns
208            .extend(deferral_keys.iter().cloned());
209    }
210
211    pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
212        self.pending_checkpoints.push(checkpoint);
213    }
214
215    pub fn reserve_next_randomness_round(
216        &mut self,
217        next_randomness_round: RandomnessRound,
218        commit_timestamp: TimestampMs,
219    ) {
220        assert!(self.next_randomness_round.is_none());
221        self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
222    }
223
224    pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
225        self.dkg_confirmations.insert(conf.sender(), conf);
226    }
227
228    pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
229        self.dkg_processed_messages
230            .insert(message.sender(), message);
231    }
232
233    pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
234        self.dkg_used_message = Some(used_messages);
235    }
236
237    pub fn set_dkg_output(&mut self, output: dkg_v1::Output<PkG, EncG>) {
238        self.dkg_output = Some(output);
239    }
240
241    pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
242        self.pending_jwks.insert((authority, id, jwk));
243    }
244
245    pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
246        self.active_jwks.insert((round, key));
247    }
248
249    pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
250        self.congestion_control_object_debts = object_debts;
251    }
252
253    pub fn set_congestion_control_randomness_object_debts(
254        &mut self,
255        object_debts: Vec<(ObjectID, u64)>,
256    ) {
257        self.congestion_control_randomness_object_debts = object_debts;
258    }
259
260    pub fn set_owned_object_locks(&mut self, locks: HashMap<ObjectRef, LockDetails>) {
261        assert!(self.owned_object_locks.is_empty());
262        self.owned_object_locks = locks;
263    }
264
265    pub fn write_to_batch(
266        self,
267        epoch_store: &AuthorityPerEpochStore,
268        batch: &mut DBBatch,
269    ) -> SuiResult {
270        let tables = epoch_store.tables()?;
271        batch.insert_batch(
272            &tables.consensus_message_processed,
273            self.consensus_messages_processed
274                .iter()
275                .map(|key| (key, true)),
276        )?;
277
278        batch.insert_batch(
279            &tables.end_of_publish,
280            self.end_of_publish.iter().map(|authority| (authority, ())),
281        )?;
282
283        if let Some(reconfig_state) = &self.reconfig_state {
284            batch.insert_batch(
285                &tables.reconfig_state,
286                [(RECONFIG_STATE_INDEX, reconfig_state)],
287            )?;
288        }
289
290        let consensus_commit_stats = self
291            .consensus_commit_stats
292            .expect("consensus_commit_stats must be set");
293        let round = consensus_commit_stats.index.last_committed_round;
294
295        batch.insert_batch(
296            &tables.last_consensus_stats,
297            [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
298        )?;
299
300        if let Some(next_versions) = self.next_shared_object_versions {
301            batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
302        }
303
304        if !self.owned_object_locks.is_empty() {
305            batch.insert_batch(
306                &tables.owned_object_locked_transactions,
307                self.owned_object_locks
308                    .into_iter()
309                    .map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))),
310            )?;
311        }
312
313        batch.delete_batch(
314            &tables.deferred_transactions_v2,
315            &self.deleted_deferred_txns,
316        )?;
317        batch.delete_batch(
318            &tables.deferred_transactions_with_aliases_v2,
319            &self.deleted_deferred_txns,
320        )?;
321
322        batch.insert_batch(
323            &tables.deferred_transactions_with_aliases_v2,
324            self.deferred_txns_v2.into_iter().map(|(key, txs)| {
325                (
326                    key,
327                    txs.into_iter()
328                        .map(|tx| {
329                            let tx: TrustedExecutableTransactionWithAliases = tx.serializable();
330                            tx
331                        })
332                        .collect::<Vec<_>>(),
333                )
334            }),
335        )?;
336
337        if let Some((round, commit_timestamp)) = self.next_randomness_round {
338            batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
339            batch.insert_batch(
340                &tables.randomness_last_round_timestamp,
341                [(SINGLETON_KEY, commit_timestamp)],
342            )?;
343        }
344
345        batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
346        batch.insert_batch(
347            &tables.dkg_processed_messages_v2,
348            self.dkg_processed_messages,
349        )?;
350        batch.insert_batch(
351            &tables.dkg_used_messages_v2,
352            // using Option as iter
353            self.dkg_used_message
354                .into_iter()
355                .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
356        )?;
357        if let Some(output) = self.dkg_output {
358            batch.insert_batch(&tables.dkg_output, [(SINGLETON_KEY, output)])?;
359        }
360
361        batch.insert_batch(
362            &tables.pending_jwks,
363            self.pending_jwks.into_iter().map(|j| (j, ())),
364        )?;
365        batch.insert_batch(
366            &tables.active_jwks,
367            self.active_jwks.into_iter().map(|j| {
368                // TODO: we don't need to store the round in this map if it is invariant
369                assert_eq!(j.0, round);
370                (j, ())
371            }),
372        )?;
373
374        batch.insert_batch(
375            &tables.congestion_control_object_debts,
376            self.congestion_control_object_debts
377                .into_iter()
378                .map(|(object_id, debt)| {
379                    (
380                        object_id,
381                        CongestionPerObjectDebt::new(self.consensus_round, debt),
382                    )
383                }),
384        )?;
385        batch.insert_batch(
386            &tables.congestion_control_randomness_object_debts,
387            self.congestion_control_randomness_object_debts
388                .into_iter()
389                .map(|(object_id, debt)| {
390                    (
391                        object_id,
392                        CongestionPerObjectDebt::new(self.consensus_round, debt),
393                    )
394                }),
395        )?;
396
397        batch.insert_batch(
398            &tables.execution_time_observations,
399            self.execution_time_observations
400                .into_iter()
401                .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
402        )?;
403
404        Ok(())
405    }
406}
407
408/// ConsensusOutputCache holds outputs of consensus processing that do not need to be committed to disk.
409/// Data quarantining guarantees that all of this data will be used (e.g. for building checkpoints)
410/// before the consensus commit from which it originated is marked as processed. Therefore we can rely
411/// on replay of consensus commits to recover this data.
412pub(crate) struct ConsensusOutputCache {
413    // deferred transactions is only used by consensus handler so there should never be lock contention
414    // - hence no need for a DashMap.
415    pub(crate) deferred_transactions_v2:
416        Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>>,
417
418    // user_signatures_for_checkpoints is written to by consensus handler and read from by checkpoint builder
419    // The critical sections are small in both cases so a DashMap is probably not helpful.
420    #[allow(clippy::type_complexity)]
421    pub(crate) user_signatures_for_checkpoints:
422        Mutex<HashMap<TransactionDigest, Vec<(GenericSignature, Option<SequenceNumber>)>>>,
423
424    executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
425    executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
426}
427
428impl ConsensusOutputCache {
429    pub(crate) fn new(
430        epoch_start_configuration: &EpochStartConfiguration,
431        tables: &AuthorityEpochTables,
432    ) -> Self {
433        let deferred_transactions_v2 = tables
434            .get_all_deferred_transactions_v2()
435            .expect("load deferred transactions cannot fail");
436
437        assert!(
438            epoch_start_configuration.is_data_quarantine_active_from_beginning_of_epoch(),
439            "This version of sui-node can only run after data quarantining has been enabled. Please run version 1.45.0 or later to the end of the current epoch and retry"
440        );
441
442        let executed_in_epoch_cache_capacity = 50_000;
443
444        Self {
445            deferred_transactions_v2: Mutex::new(deferred_transactions_v2),
446            user_signatures_for_checkpoints: Default::default(),
447            executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
448            executed_in_epoch_cache: MokaCache::builder(8)
449                // most queries should be for recent transactions
450                .max_capacity(randomize_cache_capacity_in_tests(
451                    executed_in_epoch_cache_capacity,
452                ))
453                .eviction_policy(EvictionPolicy::lru())
454                .build(),
455        }
456    }
457
458    pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
459        self.executed_in_epoch
460            .read()
461            .contains_key(digest) ||
462            // we use get instead of contains key to mark the entry as read
463            self.executed_in_epoch_cache.get(digest).is_some()
464    }
465
466    // Called by execution
467    pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
468        assert!(
469            self.executed_in_epoch
470                .read()
471                .insert(tx_digest, ())
472                .is_none(),
473            "transaction already executed"
474        );
475        self.executed_in_epoch_cache.insert(tx_digest, ());
476    }
477
478    // CheckpointExecutor calls this (indirectly) in order to prune the in-memory cache of executed
479    // transactions. By the time this is called, the transaction digests will have been committed to
480    // the `executed_transactions_to_checkpoint` table.
481    pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
482        let executed_in_epoch = self.executed_in_epoch.read();
483        for tx_digest in tx_digests {
484            executed_in_epoch.remove(tx_digest);
485        }
486    }
487}
488
489/// ConsensusOutputQuarantine holds outputs of consensus processing in memory until the checkpoints
490/// for the commit have been certified.
491pub(crate) struct ConsensusOutputQuarantine {
492    // Output from consensus handler
493    output_queue: VecDeque<ConsensusCommitOutput>,
494
495    // Highest known certified checkpoint sequence number
496    highest_executed_checkpoint: CheckpointSequenceNumber,
497
498    // Checkpoint Builder output
499    builder_checkpoint_summary:
500        BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
501
502    builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
503
504    // Any un-committed next versions are stored here.
505    shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
506
507    // The most recent congestion control debts for objects. Uses a ref-count to track
508    // which objects still exist in some element of output_queue.
509    congestion_control_randomness_object_debts:
510        RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
511    congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
512
513    processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
514
515    // Owned object locks acquired post-consensus.
516    owned_object_locks: HashMap<ObjectRef, LockDetails>,
517
518    metrics: Arc<EpochMetrics>,
519}
520
521impl ConsensusOutputQuarantine {
522    pub(super) fn new(
523        highest_executed_checkpoint: CheckpointSequenceNumber,
524        authority_metrics: Arc<EpochMetrics>,
525    ) -> Self {
526        Self {
527            highest_executed_checkpoint,
528
529            output_queue: VecDeque::new(),
530            builder_checkpoint_summary: BTreeMap::new(),
531            builder_digest_to_checkpoint: HashMap::new(),
532            shared_object_next_versions: RefCountedHashMap::new(),
533            processed_consensus_messages: RefCountedHashMap::new(),
534            congestion_control_randomness_object_debts: RefCountedHashMap::new(),
535            congestion_control_object_debts: RefCountedHashMap::new(),
536            owned_object_locks: HashMap::new(),
537            metrics: authority_metrics,
538        }
539    }
540}
541
542// Write methods - all methods in this block insert new data into the quarantine.
543// There are only two sources! ConsensusHandler and CheckpointBuilder.
544impl ConsensusOutputQuarantine {
545    // Push all data gathered from a consensus commit into the quarantine.
546    pub(crate) fn push_consensus_output(
547        &mut self,
548        output: ConsensusCommitOutput,
549        epoch_store: &AuthorityPerEpochStore,
550    ) -> SuiResult {
551        self.insert_shared_object_next_versions(&output);
552        self.insert_congestion_control_debts(&output);
553        self.insert_processed_consensus_messages(&output);
554        self.insert_owned_object_locks(&output);
555        self.output_queue.push_back(output);
556
557        self.metrics
558            .consensus_quarantine_queue_size
559            .set(self.output_queue.len() as i64);
560
561        // we may already have observed the certified checkpoint for this round, if state sync is running
562        // ahead of consensus, so there may be data to commit right away.
563        self.commit(epoch_store)
564    }
565
566    // Record a newly built checkpoint.
567    pub(super) fn insert_builder_summary(
568        &mut self,
569        sequence_number: CheckpointSequenceNumber,
570        summary: BuilderCheckpointSummary,
571        contents: CheckpointContents,
572    ) {
573        debug!(?sequence_number, "inserting builder summary {:?}", summary);
574        for tx in contents.iter() {
575            self.builder_digest_to_checkpoint
576                .insert(tx.transaction, sequence_number);
577        }
578        self.builder_checkpoint_summary
579            .insert(sequence_number, (summary, contents));
580    }
581}
582
583// Commit methods.
584impl ConsensusOutputQuarantine {
585    /// Update the highest executed checkpoint and commit any data which is now
586    /// below the watermark.
587    pub(super) fn update_highest_executed_checkpoint(
588        &mut self,
589        checkpoint: CheckpointSequenceNumber,
590        epoch_store: &AuthorityPerEpochStore,
591        batch: &mut DBBatch,
592    ) -> SuiResult {
593        self.highest_executed_checkpoint = checkpoint;
594        self.commit_with_batch(epoch_store, batch)
595    }
596
597    pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
598        let mut batch = epoch_store.db_batch()?;
599        self.commit_with_batch(epoch_store, &mut batch)?;
600        batch.write()?;
601        Ok(())
602    }
603
604    /// Commit all data below the watermark.
605    fn commit_with_batch(
606        &mut self,
607        epoch_store: &AuthorityPerEpochStore,
608        batch: &mut DBBatch,
609    ) -> SuiResult {
610        // The commit algorithm is simple:
611        // 1. First commit all checkpoint builder state which is below the watermark.
612        // 2. Determine the consensus commit height that corresponds to the highest committed
613        //    checkpoint.
614        // 3. Commit all consensus output at that height or below.
615
616        let tables = epoch_store.tables()?;
617
618        let mut highest_committed_height = None;
619
620        while self
621            .builder_checkpoint_summary
622            .first_key_value()
623            .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
624            == Some(true)
625        {
626            let (seq, (builder_summary, contents)) =
627                self.builder_checkpoint_summary.pop_first().unwrap();
628
629            for tx in contents.iter() {
630                let digest = &tx.transaction;
631                assert_eq!(
632                    self.builder_digest_to_checkpoint
633                        .remove(digest)
634                        .unwrap_or_else(|| {
635                            panic!(
636                                "transaction {:?} not found in builder_digest_to_checkpoint",
637                                digest
638                            )
639                        }),
640                    seq
641                );
642            }
643
644            batch.insert_batch(
645                &tables.builder_digest_to_checkpoint,
646                contents.iter().map(|tx| (tx.transaction, seq)),
647            )?;
648
649            batch.insert_batch(
650                &tables.builder_checkpoint_summary_v2,
651                [(seq, &builder_summary)],
652            )?;
653
654            let checkpoint_height = builder_summary
655                .checkpoint_height
656                .expect("non-genesis checkpoint must have height");
657            if let Some(highest) = highest_committed_height {
658                assert!(
659                    checkpoint_height >= highest,
660                    "current checkpoint height {} must be no less than highest committed height {}",
661                    checkpoint_height,
662                    highest
663                );
664            }
665
666            highest_committed_height = Some(checkpoint_height);
667        }
668
669        let Some(highest_committed_height) = highest_committed_height else {
670            return Ok(());
671        };
672
673        while !self.output_queue.is_empty() {
674            // A consensus commit can have more than one pending checkpoint (a regular one and a randomnes one).
675            // We can only write the consensus commit if the highest pending checkpoint associated with it has
676            // been processed by the builder.
677            let Some(highest_in_commit) = self
678                .output_queue
679                .front()
680                .unwrap()
681                .get_highest_pending_checkpoint_height()
682            else {
683                // if highest is none, we have already written the pending checkpoint for the final epoch,
684                // so there is no more data that needs to be committed.
685                break;
686            };
687
688            if highest_in_commit <= highest_committed_height {
689                info!(
690                    "committing output with highest pending checkpoint height {:?}",
691                    highest_in_commit
692                );
693                let output = self.output_queue.pop_front().unwrap();
694                self.remove_shared_object_next_versions(&output);
695                self.remove_processed_consensus_messages(&output);
696                self.remove_congestion_control_debts(&output);
697                self.remove_owned_object_locks(&output);
698
699                output.write_to_batch(epoch_store, batch)?;
700            } else {
701                break;
702            }
703        }
704
705        self.metrics
706            .consensus_quarantine_queue_size
707            .set(self.output_queue.len() as i64);
708
709        Ok(())
710    }
711}
712
713impl ConsensusOutputQuarantine {
714    fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
715        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
716            for (object_id, next_version) in next_versions {
717                self.shared_object_next_versions
718                    .insert(*object_id, *next_version);
719            }
720        }
721    }
722
723    fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
724        let current_round = output.consensus_round;
725
726        for (object_id, debt) in output.congestion_control_object_debts.iter() {
727            self.congestion_control_object_debts.insert(
728                *object_id,
729                CongestionPerObjectDebt::new(current_round, *debt),
730            );
731        }
732
733        for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
734            self.congestion_control_randomness_object_debts.insert(
735                *object_id,
736                CongestionPerObjectDebt::new(current_round, *debt),
737            );
738        }
739    }
740
741    fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
742        for (object_id, _) in output.congestion_control_object_debts.iter() {
743            self.congestion_control_object_debts.remove(object_id);
744        }
745        for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
746            self.congestion_control_randomness_object_debts
747                .remove(object_id);
748        }
749    }
750
751    fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
752        for tx_key in output.consensus_messages_processed.iter() {
753            self.processed_consensus_messages.insert(tx_key.clone(), ());
754        }
755    }
756
757    fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
758        for tx_key in output.consensus_messages_processed.iter() {
759            self.processed_consensus_messages.remove(tx_key);
760        }
761    }
762
763    fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
764        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
765            for object_id in next_versions.keys() {
766                if !self.shared_object_next_versions.remove(object_id) {
767                    fatal!(
768                        "Shared object next version not found in quarantine: {:?}",
769                        object_id
770                    );
771                }
772            }
773        }
774    }
775
776    fn insert_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
777        for (obj_ref, lock) in &output.owned_object_locks {
778            self.owned_object_locks.insert(*obj_ref, *lock);
779        }
780    }
781
782    fn remove_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
783        for obj_ref in output.owned_object_locks.keys() {
784            self.owned_object_locks.remove(obj_ref);
785        }
786    }
787}
788
789// Read methods - all methods in this block return data from the quarantine which would otherwise
790// be found in the database.
791impl ConsensusOutputQuarantine {
792    pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
793        self.builder_checkpoint_summary
794            .values()
795            .last()
796            .map(|(summary, _)| summary)
797    }
798
799    pub(super) fn get_built_summary(
800        &self,
801        sequence: CheckpointSequenceNumber,
802    ) -> Option<&BuilderCheckpointSummary> {
803        self.builder_checkpoint_summary
804            .get(&sequence)
805            .map(|(summary, _)| summary)
806    }
807
808    pub(super) fn included_transaction_in_checkpoint(&self, digest: &TransactionDigest) -> bool {
809        self.builder_digest_to_checkpoint.contains_key(digest)
810    }
811
812    pub(super) fn is_consensus_message_processed(
813        &self,
814        key: &SequencedConsensusTransactionKey,
815    ) -> bool {
816        self.processed_consensus_messages.contains_key(key)
817    }
818
819    pub(super) fn is_empty(&self) -> bool {
820        self.output_queue.is_empty()
821    }
822
823    pub(super) fn get_next_shared_object_versions(
824        &self,
825        tables: &AuthorityEpochTables,
826        objects_to_init: &[ConsensusObjectSequenceKey],
827    ) -> SuiResult<Vec<Option<SequenceNumber>>> {
828        Ok(do_fallback_lookup(
829            objects_to_init,
830            |object_key| {
831                if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
832                    CacheResult::Hit(Some(*next_version))
833                } else {
834                    CacheResult::Miss
835                }
836            },
837            |object_keys| {
838                tables
839                    .next_shared_object_versions_v2
840                    .multi_get(object_keys)
841                    .expect("db error")
842            },
843        ))
844    }
845
846    /// Gets owned object locks, checking quarantine first then falling back to DB.
847    /// Used for post-consensus conflict detection when preconsensus locking is disabled.
848    /// After crash recovery, quarantine is empty so we naturally fall back to DB.
849    pub(super) fn get_owned_object_locks(
850        &self,
851        tables: &AuthorityEpochTables,
852        obj_refs: &[ObjectRef],
853    ) -> SuiResult<Vec<Option<LockDetails>>> {
854        Ok(do_fallback_lookup(
855            obj_refs,
856            |obj_ref| {
857                if let Some(lock) = self.owned_object_locks.get(obj_ref) {
858                    CacheResult::Hit(Some(*lock))
859                } else {
860                    CacheResult::Miss
861                }
862            },
863            |obj_refs| {
864                tables
865                    .multi_get_locked_transactions(obj_refs)
866                    .expect("db error")
867            },
868        ))
869    }
870
871    pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
872        self.output_queue
873            .back()
874            .and_then(|output| output.get_highest_pending_checkpoint_height())
875    }
876
877    pub(super) fn get_pending_checkpoints(
878        &self,
879        last: Option<CheckpointHeight>,
880    ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
881        let mut checkpoints = Vec::new();
882        for output in &self.output_queue {
883            checkpoints.extend(
884                output
885                    .get_pending_checkpoints(last)
886                    .map(|cp| (cp.height(), cp.clone())),
887            );
888        }
889        if cfg!(debug_assertions) {
890            let mut prev = None;
891            for (height, _) in &checkpoints {
892                if let Some(prev) = prev {
893                    assert!(prev < *height);
894                }
895                prev = Some(*height);
896            }
897        }
898        checkpoints
899    }
900
901    pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
902        self.output_queue
903            .iter()
904            .any(|output| output.pending_checkpoint_exists(index))
905    }
906
907    pub(super) fn get_new_jwks(
908        &self,
909        epoch_store: &AuthorityPerEpochStore,
910        round: u64,
911    ) -> SuiResult<Vec<ActiveJwk>> {
912        let epoch = epoch_store.epoch();
913
914        // Check if the requested round is in memory
915        for output in self.output_queue.iter().rev() {
916            // unwrap safe because output will always have last consensus stats set before being added
917            // to the quarantine
918            let output_round = output.get_round().unwrap();
919            if round == output_round {
920                return Ok(output
921                    .active_jwks
922                    .iter()
923                    .map(|(_, (jwk_id, jwk))| ActiveJwk {
924                        jwk_id: jwk_id.clone(),
925                        jwk: jwk.clone(),
926                        epoch,
927                    })
928                    .collect());
929            }
930        }
931
932        // Fall back to reading from database
933        let empty_jwk_id = JwkId::new(String::new(), String::new());
934        let empty_jwk = JWK {
935            kty: String::new(),
936            e: String::new(),
937            n: String::new(),
938            alg: String::new(),
939        };
940
941        let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
942        let end = (round + 1, (empty_jwk_id, empty_jwk));
943
944        Ok(epoch_store
945            .tables()?
946            .active_jwks
947            .safe_iter_with_bounds(Some(start), Some(end))
948            .map_ok(|((r, (jwk_id, jwk)), _)| {
949                debug_assert!(round == r);
950                ActiveJwk { jwk_id, jwk, epoch }
951            })
952            .collect::<Result<Vec<_>, _>>()?)
953    }
954
955    pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
956        self.output_queue
957            .iter()
958            .rev()
959            .filter_map(|output| output.get_randomness_last_round_timestamp())
960            .next()
961    }
962
963    pub(crate) fn load_initial_object_debts(
964        &self,
965        epoch_store: &AuthorityPerEpochStore,
966        current_round: Round,
967        for_randomness: bool,
968        transactions: &[VerifiedExecutableTransactionWithAliases],
969    ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
970        let protocol_config = epoch_store.protocol_config();
971        let tables = epoch_store.tables()?;
972        let default_per_commit_budget = protocol_config
973            .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
974            .unwrap_or(0);
975        let (hash_table, db_table, per_commit_budget) = if for_randomness {
976            (
977                &self.congestion_control_randomness_object_debts,
978                &tables.congestion_control_randomness_object_debts,
979                protocol_config
980                    .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
981                    .unwrap_or(default_per_commit_budget),
982            )
983        } else {
984            (
985                &self.congestion_control_object_debts,
986                &tables.congestion_control_object_debts,
987                default_per_commit_budget,
988            )
989        };
990        let mut shared_input_object_ids: Vec<_> = transactions
991            .iter()
992            .flat_map(|tx| tx.tx().shared_input_objects().map(|obj| obj.id))
993            .collect();
994        shared_input_object_ids.sort();
995        shared_input_object_ids.dedup();
996
997        let results = do_fallback_lookup(
998            &shared_input_object_ids,
999            |object_id| {
1000                if let Some(debt) = hash_table.get(object_id) {
1001                    CacheResult::Hit(Some(debt.into_v1()))
1002                } else {
1003                    CacheResult::Miss
1004                }
1005            },
1006            |object_ids| {
1007                db_table
1008                    .multi_get(object_ids)
1009                    .expect("db error")
1010                    .into_iter()
1011                    .map(|debt| debt.map(|debt| debt.into_v1()))
1012                    .collect()
1013            },
1014        );
1015
1016        Ok(results
1017            .into_iter()
1018            .zip(shared_input_object_ids)
1019            .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1020            .map(move |((round, debt), object_id)| {
1021                // Stored debts already account for the budget of the round in which
1022                // they were accumulated. Application of budget from future rounds to
1023                // the debt is handled here.
1024                assert!(current_round > round);
1025                let num_rounds = current_round - round - 1;
1026                let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1027                (object_id, debt)
1028            }))
1029    }
1030}
1031
1032// A wrapper around HashMap that uses refcounts to keep entries alive until
1033// they are no longer needed.
1034//
1035// If there are N inserts for the same key, the key will not be removed until
1036// there are N removes.
1037//
1038// It is intended to track the *latest* value for a given key, so duplicate
1039// inserts are intended to overwrite any prior value.
1040#[derive(Debug, Default)]
1041struct RefCountedHashMap<K, V> {
1042    map: HashMap<K, (usize, V)>,
1043}
1044
1045impl<K, V> RefCountedHashMap<K, V>
1046where
1047    K: Clone + Eq + std::hash::Hash,
1048{
1049    pub fn new() -> Self {
1050        Self {
1051            map: HashMap::new(),
1052        }
1053    }
1054
1055    pub fn insert(&mut self, key: K, value: V) {
1056        let entry = self.map.entry(key);
1057        match entry {
1058            hash_map::Entry::Occupied(mut entry) => {
1059                let (ref_count, v) = entry.get_mut();
1060                *ref_count += 1;
1061                *v = value;
1062            }
1063            hash_map::Entry::Vacant(entry) => {
1064                entry.insert((1, value));
1065            }
1066        }
1067    }
1068
1069    // Returns true if the key was present, false otherwise.
1070    // Note that the key may not be removed if present, as it may have a refcount > 1.
1071    pub fn remove(&mut self, key: &K) -> bool {
1072        let entry = self.map.entry(key.clone());
1073        match entry {
1074            hash_map::Entry::Occupied(mut entry) => {
1075                let (ref_count, _) = entry.get_mut();
1076                *ref_count -= 1;
1077                if *ref_count == 0 {
1078                    entry.remove();
1079                }
1080                true
1081            }
1082            hash_map::Entry::Vacant(_) => false,
1083        }
1084    }
1085
1086    pub fn get(&self, key: &K) -> Option<&V> {
1087        self.map.get(key).map(|(_, v)| v)
1088    }
1089
1090    pub fn contains_key(&self, key: &K) -> bool {
1091        self.map.contains_key(key)
1092    }
1093}