sui_core/authority/
consensus_quarantine.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_per_epoch_store::{
5    AuthorityEpochTables, EncG, ExecutionIndicesWithStats, LockDetails, LockDetailsWrapper, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::epoch::randomness::SINGLETON_KEY;
10use dashmap::DashMap;
11use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
12use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
13use moka::policy::EvictionPolicy;
14use moka::sync::SegmentedCache as MokaCache;
15use mysten_common::fatal;
16use mysten_common::random_util::randomize_cache_capacity_in_tests;
17use parking_lot::Mutex;
18use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
19use sui_types::authenticator_state::ActiveJwk;
20use sui_types::base_types::{AuthorityName, ObjectRef, SequenceNumber};
21use sui_types::crypto::RandomnessRound;
22use sui_types::error::SuiResult;
23use sui_types::executable_transaction::{
24    TrustedExecutableTransactionWithAliases, VerifiedExecutableTransactionWithAliases,
25};
26use sui_types::execution::ExecutionTimeObservationKey;
27use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
28use sui_types::messages_consensus::AuthorityIndex;
29use sui_types::{
30    base_types::{ConsensusObjectSequenceKey, ObjectID},
31    digests::TransactionDigest,
32    messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
33    signature::GenericSignature,
34};
35use tracing::{debug, info};
36use typed_store::Map;
37use typed_store::rocks::DBBatch;
38
39use crate::{
40    authority::{
41        authority_per_epoch_store::AuthorityPerEpochStore,
42        shared_object_congestion_tracker::CongestionPerObjectDebt,
43    },
44    checkpoints::{CheckpointHeight, PendingCheckpoint, PendingCheckpointV2},
45    consensus_handler::SequencedConsensusTransactionKey,
46    epoch::{
47        randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
48        reconfiguration::ReconfigState,
49    },
50};
51
52use super::*;
53
54#[derive(Default)]
55#[allow(clippy::type_complexity)]
56pub(crate) struct ConsensusCommitOutput {
57    // Consensus and reconfig state
58    consensus_round: Round,
59    consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
60    end_of_publish: BTreeSet<AuthorityName>,
61    reconfig_state: Option<ReconfigState>,
62    consensus_commit_stats: Option<ExecutionIndicesWithStats>,
63
64    // transaction scheduling state
65    next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
66
67    deferred_txns: Vec<(DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>)>,
68    deleted_deferred_txns: BTreeSet<DeferralKey>,
69
70    // checkpoint state
71    pending_checkpoints: Vec<PendingCheckpoint>,
72    pending_checkpoints_v2: Vec<PendingCheckpointV2>,
73
74    // random beacon state
75    next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
76
77    dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
78    dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
79    dkg_used_message: Option<VersionedUsedProcessedMessages>,
80    dkg_output: Option<dkg_v1::Output<PkG, EncG>>,
81
82    // jwk state
83    pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
84    active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
85
86    // congestion control state
87    congestion_control_object_debts: Vec<(ObjectID, u64)>,
88    congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
89    execution_time_observations: Vec<(
90        AuthorityIndex,
91        u64, /* generation */
92        Vec<(ExecutionTimeObservationKey, Duration)>,
93    )>,
94
95    // Owned object locks acquired post-consensus (when disable_preconsensus_locking=true)
96    owned_object_locks: HashMap<ObjectRef, LockDetails>,
97}
98
99impl ConsensusCommitOutput {
100    pub fn new(consensus_round: Round) -> Self {
101        Self {
102            consensus_round,
103            ..Default::default()
104        }
105    }
106
107    pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
108        self.deleted_deferred_txns.iter().cloned()
109    }
110
111    pub fn has_deferred_transactions(&self) -> bool {
112        !self.deferred_txns.is_empty()
113    }
114
115    fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
116        self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
117    }
118
119    fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
120        self.pending_checkpoints.last().map(|cp| cp.height())
121    }
122
123    fn get_pending_checkpoints(
124        &self,
125        last: Option<CheckpointHeight>,
126    ) -> impl Iterator<Item = &PendingCheckpoint> {
127        self.pending_checkpoints.iter().filter(move |cp| {
128            if let Some(last) = last {
129                cp.height() > last
130            } else {
131                true
132            }
133        })
134    }
135
136    fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
137        self.pending_checkpoints
138            .iter()
139            .any(|cp| cp.height() == *index)
140    }
141
142    fn get_pending_checkpoints_v2(
143        &self,
144        last: Option<CheckpointHeight>,
145    ) -> impl Iterator<Item = &PendingCheckpointV2> {
146        self.pending_checkpoints_v2.iter().filter(move |cp| {
147            if let Some(last) = last {
148                cp.height() > last
149            } else {
150                true
151            }
152        })
153    }
154
155    fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
156        self.pending_checkpoints_v2
157            .iter()
158            .any(|cp| cp.height() == *index)
159    }
160
161    fn get_round(&self) -> Option<u64> {
162        self.consensus_commit_stats
163            .as_ref()
164            .map(|stats| stats.index.last_committed_round)
165    }
166
167    pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
168        self.end_of_publish.insert(authority);
169    }
170
171    pub fn insert_execution_time_observation(
172        &mut self,
173        source: AuthorityIndex,
174        generation: u64,
175        estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
176    ) {
177        self.execution_time_observations
178            .push((source, generation, estimates));
179    }
180
181    pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStats) {
182        self.consensus_commit_stats = Some(stats);
183    }
184
185    // in testing code we often need to write to the db outside of a consensus commit
186    pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
187        self.record_consensus_commit_stats(Default::default());
188    }
189
190    pub fn store_reconfig_state(&mut self, state: ReconfigState) {
191        self.reconfig_state = Some(state);
192    }
193
194    pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
195        self.consensus_messages_processed.insert(key);
196    }
197
198    pub fn get_consensus_messages_processed(
199        &self,
200    ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
201        self.consensus_messages_processed.iter()
202    }
203
204    pub fn set_next_shared_object_versions(
205        &mut self,
206        next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
207    ) {
208        assert!(self.next_shared_object_versions.is_none());
209        self.next_shared_object_versions = Some(next_versions);
210    }
211
212    pub fn defer_transactions(
213        &mut self,
214        key: DeferralKey,
215        transactions: Vec<VerifiedExecutableTransactionWithAliases>,
216    ) {
217        self.deferred_txns.push((key, transactions));
218    }
219
220    pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
221        self.deleted_deferred_txns
222            .extend(deferral_keys.iter().cloned());
223    }
224
225    pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
226        self.pending_checkpoints.push(checkpoint);
227    }
228
229    pub fn insert_pending_checkpoint_v2(&mut self, checkpoint: PendingCheckpointV2) {
230        self.pending_checkpoints_v2.push(checkpoint);
231    }
232
233    pub fn reserve_next_randomness_round(
234        &mut self,
235        next_randomness_round: RandomnessRound,
236        commit_timestamp: TimestampMs,
237    ) {
238        assert!(self.next_randomness_round.is_none());
239        self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
240    }
241
242    pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
243        self.dkg_confirmations.insert(conf.sender(), conf);
244    }
245
246    pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
247        self.dkg_processed_messages
248            .insert(message.sender(), message);
249    }
250
251    pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
252        self.dkg_used_message = Some(used_messages);
253    }
254
255    pub fn set_dkg_output(&mut self, output: dkg_v1::Output<PkG, EncG>) {
256        self.dkg_output = Some(output);
257    }
258
259    pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
260        self.pending_jwks.insert((authority, id, jwk));
261    }
262
263    pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
264        self.active_jwks.insert((round, key));
265    }
266
267    pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
268        self.congestion_control_object_debts = object_debts;
269    }
270
271    pub fn set_congestion_control_randomness_object_debts(
272        &mut self,
273        object_debts: Vec<(ObjectID, u64)>,
274    ) {
275        self.congestion_control_randomness_object_debts = object_debts;
276    }
277
278    pub fn set_owned_object_locks(&mut self, locks: HashMap<ObjectRef, LockDetails>) {
279        assert!(self.owned_object_locks.is_empty());
280        self.owned_object_locks = locks;
281    }
282
283    pub fn write_to_batch(
284        self,
285        epoch_store: &AuthorityPerEpochStore,
286        batch: &mut DBBatch,
287    ) -> SuiResult {
288        let tables = epoch_store.tables()?;
289        batch.insert_batch(
290            &tables.consensus_message_processed,
291            self.consensus_messages_processed
292                .iter()
293                .map(|key| (key, true)),
294        )?;
295
296        batch.insert_batch(
297            &tables.end_of_publish,
298            self.end_of_publish.iter().map(|authority| (authority, ())),
299        )?;
300
301        if let Some(reconfig_state) = &self.reconfig_state {
302            batch.insert_batch(
303                &tables.reconfig_state,
304                [(RECONFIG_STATE_INDEX, reconfig_state)],
305            )?;
306        }
307
308        let consensus_commit_stats = self
309            .consensus_commit_stats
310            .expect("consensus_commit_stats must be set");
311        let round = consensus_commit_stats.index.last_committed_round;
312
313        batch.insert_batch(
314            &tables.last_consensus_stats,
315            [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
316        )?;
317
318        if let Some(next_versions) = self.next_shared_object_versions {
319            batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
320        }
321
322        if !self.owned_object_locks.is_empty() {
323            batch.insert_batch(
324                &tables.owned_object_locked_transactions,
325                self.owned_object_locks
326                    .into_iter()
327                    .map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))),
328            )?;
329        }
330
331        batch.delete_batch(
332            &tables.deferred_transactions_v2,
333            &self.deleted_deferred_txns,
334        )?;
335        batch.delete_batch(
336            &tables.deferred_transactions_with_aliases_v2,
337            &self.deleted_deferred_txns,
338        )?;
339
340        batch.insert_batch(
341            &tables.deferred_transactions_with_aliases_v2,
342            self.deferred_txns.into_iter().map(|(key, txs)| {
343                (
344                    key,
345                    txs.into_iter()
346                        .map(|tx| {
347                            let tx: TrustedExecutableTransactionWithAliases = tx.serializable();
348                            tx
349                        })
350                        .collect::<Vec<_>>(),
351                )
352            }),
353        )?;
354
355        if let Some((round, commit_timestamp)) = self.next_randomness_round {
356            batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
357            batch.insert_batch(
358                &tables.randomness_last_round_timestamp,
359                [(SINGLETON_KEY, commit_timestamp)],
360            )?;
361        }
362
363        batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
364        batch.insert_batch(
365            &tables.dkg_processed_messages_v2,
366            self.dkg_processed_messages,
367        )?;
368        batch.insert_batch(
369            &tables.dkg_used_messages_v2,
370            // using Option as iter
371            self.dkg_used_message
372                .into_iter()
373                .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
374        )?;
375        if let Some(output) = self.dkg_output {
376            batch.insert_batch(&tables.dkg_output, [(SINGLETON_KEY, output)])?;
377        }
378
379        batch.insert_batch(
380            &tables.pending_jwks,
381            self.pending_jwks.into_iter().map(|j| (j, ())),
382        )?;
383        batch.insert_batch(
384            &tables.active_jwks,
385            self.active_jwks.into_iter().map(|j| {
386                // TODO: we don't need to store the round in this map if it is invariant
387                assert_eq!(j.0, round);
388                (j, ())
389            }),
390        )?;
391
392        batch.insert_batch(
393            &tables.congestion_control_object_debts,
394            self.congestion_control_object_debts
395                .into_iter()
396                .map(|(object_id, debt)| {
397                    (
398                        object_id,
399                        CongestionPerObjectDebt::new(self.consensus_round, debt),
400                    )
401                }),
402        )?;
403        batch.insert_batch(
404            &tables.congestion_control_randomness_object_debts,
405            self.congestion_control_randomness_object_debts
406                .into_iter()
407                .map(|(object_id, debt)| {
408                    (
409                        object_id,
410                        CongestionPerObjectDebt::new(self.consensus_round, debt),
411                    )
412                }),
413        )?;
414
415        batch.insert_batch(
416            &tables.execution_time_observations,
417            self.execution_time_observations
418                .into_iter()
419                .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
420        )?;
421
422        Ok(())
423    }
424}
425
426/// ConsensusOutputCache holds outputs of consensus processing that do not need to be committed to disk.
427/// Data quarantining guarantees that all of this data will be used (e.g. for building checkpoints)
428/// before the consensus commit from which it originated is marked as processed. Therefore we can rely
429/// on replay of consensus commits to recover this data.
430pub(crate) struct ConsensusOutputCache {
431    // deferred transactions is only used by consensus handler so there should never be lock contention
432    // - hence no need for a DashMap.
433    pub(crate) deferred_transactions:
434        Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>>,
435
436    // user_signatures_for_checkpoints is written to by consensus handler and read from by checkpoint builder
437    // The critical sections are small in both cases so a DashMap is probably not helpful.
438    #[allow(clippy::type_complexity)]
439    pub(crate) user_signatures_for_checkpoints:
440        Mutex<HashMap<TransactionDigest, Vec<(GenericSignature, Option<SequenceNumber>)>>>,
441
442    executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
443    executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
444}
445
446impl ConsensusOutputCache {
447    pub(crate) fn new(tables: &AuthorityEpochTables) -> Self {
448        let deferred_transactions = tables
449            .get_all_deferred_transactions_v2()
450            .expect("load deferred transactions cannot fail");
451
452        let executed_in_epoch_cache_capacity = 50_000;
453
454        Self {
455            deferred_transactions: Mutex::new(deferred_transactions),
456            user_signatures_for_checkpoints: Default::default(),
457            executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
458            executed_in_epoch_cache: MokaCache::builder(8)
459                // most queries should be for recent transactions
460                .max_capacity(randomize_cache_capacity_in_tests(
461                    executed_in_epoch_cache_capacity,
462                ))
463                .eviction_policy(EvictionPolicy::lru())
464                .build(),
465        }
466    }
467
468    pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
469        self.executed_in_epoch
470            .read()
471            .contains_key(digest) ||
472            // we use get instead of contains key to mark the entry as read
473            self.executed_in_epoch_cache.get(digest).is_some()
474    }
475
476    // Called by execution
477    pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
478        assert!(
479            self.executed_in_epoch
480                .read()
481                .insert(tx_digest, ())
482                .is_none(),
483            "transaction already executed"
484        );
485        self.executed_in_epoch_cache.insert(tx_digest, ());
486    }
487
488    // CheckpointExecutor calls this (indirectly) in order to prune the in-memory cache of executed
489    // transactions. By the time this is called, the transaction digests will have been committed to
490    // the `executed_transactions_to_checkpoint` table.
491    pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
492        let executed_in_epoch = self.executed_in_epoch.read();
493        for tx_digest in tx_digests {
494            executed_in_epoch.remove(tx_digest);
495        }
496    }
497}
498
499/// ConsensusOutputQuarantine holds outputs of consensus processing in memory until the checkpoints
500/// for the commit have been certified.
501pub(crate) struct ConsensusOutputQuarantine {
502    // Output from consensus handler
503    output_queue: VecDeque<ConsensusCommitOutput>,
504
505    // Highest known certified checkpoint sequence number
506    highest_executed_checkpoint: CheckpointSequenceNumber,
507
508    // Checkpoint Builder output
509    builder_checkpoint_summary:
510        BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
511
512    builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
513
514    // Any un-committed next versions are stored here.
515    shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
516
517    // The most recent congestion control debts for objects. Uses a ref-count to track
518    // which objects still exist in some element of output_queue.
519    congestion_control_randomness_object_debts:
520        RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
521    congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
522
523    processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
524
525    // Owned object locks acquired post-consensus.
526    owned_object_locks: HashMap<ObjectRef, LockDetails>,
527
528    metrics: Arc<EpochMetrics>,
529}
530
531impl ConsensusOutputQuarantine {
532    pub(super) fn new(
533        highest_executed_checkpoint: CheckpointSequenceNumber,
534        authority_metrics: Arc<EpochMetrics>,
535    ) -> Self {
536        Self {
537            highest_executed_checkpoint,
538
539            output_queue: VecDeque::new(),
540            builder_checkpoint_summary: BTreeMap::new(),
541            builder_digest_to_checkpoint: HashMap::new(),
542            shared_object_next_versions: RefCountedHashMap::new(),
543            processed_consensus_messages: RefCountedHashMap::new(),
544            congestion_control_randomness_object_debts: RefCountedHashMap::new(),
545            congestion_control_object_debts: RefCountedHashMap::new(),
546            owned_object_locks: HashMap::new(),
547            metrics: authority_metrics,
548        }
549    }
550}
551
552// Write methods - all methods in this block insert new data into the quarantine.
553// There are only two sources! ConsensusHandler and CheckpointBuilder.
554impl ConsensusOutputQuarantine {
555    // Push all data gathered from a consensus commit into the quarantine.
556    pub(crate) fn push_consensus_output(
557        &mut self,
558        output: ConsensusCommitOutput,
559        epoch_store: &AuthorityPerEpochStore,
560    ) -> SuiResult {
561        self.insert_shared_object_next_versions(&output);
562        self.insert_congestion_control_debts(&output);
563        self.insert_processed_consensus_messages(&output);
564        self.insert_owned_object_locks(&output);
565        self.output_queue.push_back(output);
566
567        self.metrics
568            .consensus_quarantine_queue_size
569            .set(self.output_queue.len() as i64);
570
571        // we may already have observed the certified checkpoint for this round, if state sync is running
572        // ahead of consensus, so there may be data to commit right away.
573        self.commit(epoch_store)
574    }
575
576    // Record a newly built checkpoint.
577    pub(super) fn insert_builder_summary(
578        &mut self,
579        sequence_number: CheckpointSequenceNumber,
580        summary: BuilderCheckpointSummary,
581        contents: CheckpointContents,
582    ) {
583        debug!(?sequence_number, "inserting builder summary {:?}", summary);
584        for tx in contents.iter() {
585            self.builder_digest_to_checkpoint
586                .insert(tx.transaction, sequence_number);
587        }
588        self.builder_checkpoint_summary
589            .insert(sequence_number, (summary, contents));
590    }
591}
592
593// Commit methods.
594impl ConsensusOutputQuarantine {
595    /// Update the highest executed checkpoint and commit any data which is now
596    /// below the watermark.
597    pub(super) fn update_highest_executed_checkpoint(
598        &mut self,
599        checkpoint: CheckpointSequenceNumber,
600        epoch_store: &AuthorityPerEpochStore,
601        batch: &mut DBBatch,
602    ) -> SuiResult {
603        self.highest_executed_checkpoint = checkpoint;
604        self.commit_with_batch(epoch_store, batch)
605    }
606
607    pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
608        let mut batch = epoch_store.db_batch()?;
609        self.commit_with_batch(epoch_store, &mut batch)?;
610        batch.write()?;
611        Ok(())
612    }
613
614    /// Commit all data below the watermark.
615    fn commit_with_batch(
616        &mut self,
617        epoch_store: &AuthorityPerEpochStore,
618        batch: &mut DBBatch,
619    ) -> SuiResult {
620        // The commit algorithm is simple:
621        // 1. First commit all checkpoint builder state which is below the watermark.
622        // 2. Determine the consensus commit height that corresponds to the highest committed
623        //    checkpoint.
624        // 3. Commit all consensus output at that height or below.
625
626        let tables = epoch_store.tables()?;
627
628        let mut highest_committed_height = None;
629
630        while self
631            .builder_checkpoint_summary
632            .first_key_value()
633            .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
634            == Some(true)
635        {
636            let (seq, (builder_summary, contents)) =
637                self.builder_checkpoint_summary.pop_first().unwrap();
638
639            for tx in contents.iter() {
640                let digest = &tx.transaction;
641                assert_eq!(
642                    self.builder_digest_to_checkpoint
643                        .remove(digest)
644                        .unwrap_or_else(|| {
645                            panic!(
646                                "transaction {:?} not found in builder_digest_to_checkpoint",
647                                digest
648                            )
649                        }),
650                    seq
651                );
652            }
653
654            batch.insert_batch(
655                &tables.builder_digest_to_checkpoint,
656                contents.iter().map(|tx| (tx.transaction, seq)),
657            )?;
658
659            batch.insert_batch(
660                &tables.builder_checkpoint_summary_v2,
661                [(seq, &builder_summary)],
662            )?;
663
664            let checkpoint_height = builder_summary
665                .checkpoint_height
666                .expect("non-genesis checkpoint must have height");
667            if let Some(highest) = highest_committed_height {
668                assert!(
669                    checkpoint_height >= highest,
670                    "current checkpoint height {} must be no less than highest committed height {}",
671                    checkpoint_height,
672                    highest
673                );
674            }
675
676            highest_committed_height = Some(checkpoint_height);
677        }
678
679        let Some(highest_committed_height) = highest_committed_height else {
680            return Ok(());
681        };
682
683        let split_checkpoints_in_consensus_handler = epoch_store
684            .protocol_config()
685            .split_checkpoints_in_consensus_handler();
686
687        while !self.output_queue.is_empty() {
688            // A consensus commit can have more than one pending checkpoint (a regular one and a randomness one).
689            // We can only write the consensus commit if the highest pending checkpoint associated with it has
690            // been processed by the builder.
691            let output = self.output_queue.front().unwrap();
692            let highest_in_commit = if split_checkpoints_in_consensus_handler {
693                // In V2, use the checkpoint_height from stats which represents the consensus
694                // commit height watermark.
695                output
696                    .consensus_commit_stats
697                    .as_ref()
698                    .expect("consensus_commit_stats must be set")
699                    .height
700            } else {
701                // In V1, use the highest pending checkpoint height from the output.
702                let Some(h) = output.get_highest_pending_checkpoint_height() else {
703                    // if highest is none, we have already written the pending checkpoint for the final epoch,
704                    // so there is no more data that needs to be committed.
705                    break;
706                };
707                h
708            };
709
710            if highest_in_commit <= highest_committed_height {
711                info!(
712                    "committing output with highest pending checkpoint height {:?}",
713                    highest_in_commit
714                );
715                let output = self.output_queue.pop_front().unwrap();
716                self.remove_shared_object_next_versions(&output);
717                self.remove_processed_consensus_messages(&output);
718                self.remove_congestion_control_debts(&output);
719                self.remove_owned_object_locks(&output);
720
721                output.write_to_batch(epoch_store, batch)?;
722            } else {
723                break;
724            }
725        }
726
727        self.metrics
728            .consensus_quarantine_queue_size
729            .set(self.output_queue.len() as i64);
730
731        Ok(())
732    }
733}
734
735impl ConsensusOutputQuarantine {
736    fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
737        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
738            for (object_id, next_version) in next_versions {
739                self.shared_object_next_versions
740                    .insert(*object_id, *next_version);
741            }
742        }
743    }
744
745    fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
746        let current_round = output.consensus_round;
747
748        for (object_id, debt) in output.congestion_control_object_debts.iter() {
749            self.congestion_control_object_debts.insert(
750                *object_id,
751                CongestionPerObjectDebt::new(current_round, *debt),
752            );
753        }
754
755        for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
756            self.congestion_control_randomness_object_debts.insert(
757                *object_id,
758                CongestionPerObjectDebt::new(current_round, *debt),
759            );
760        }
761    }
762
763    fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
764        for (object_id, _) in output.congestion_control_object_debts.iter() {
765            self.congestion_control_object_debts.remove(object_id);
766        }
767        for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
768            self.congestion_control_randomness_object_debts
769                .remove(object_id);
770        }
771    }
772
773    fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
774        for tx_key in output.consensus_messages_processed.iter() {
775            self.processed_consensus_messages.insert(tx_key.clone(), ());
776        }
777    }
778
779    fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
780        for tx_key in output.consensus_messages_processed.iter() {
781            self.processed_consensus_messages.remove(tx_key);
782        }
783    }
784
785    fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
786        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
787            for object_id in next_versions.keys() {
788                if !self.shared_object_next_versions.remove(object_id) {
789                    fatal!(
790                        "Shared object next version not found in quarantine: {:?}",
791                        object_id
792                    );
793                }
794            }
795        }
796    }
797
798    fn insert_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
799        for (obj_ref, lock) in &output.owned_object_locks {
800            self.owned_object_locks.insert(*obj_ref, *lock);
801        }
802    }
803
804    fn remove_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
805        for obj_ref in output.owned_object_locks.keys() {
806            self.owned_object_locks.remove(obj_ref);
807        }
808    }
809}
810
811// Read methods - all methods in this block return data from the quarantine which would otherwise
812// be found in the database.
813impl ConsensusOutputQuarantine {
814    pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
815        self.builder_checkpoint_summary
816            .values()
817            .last()
818            .map(|(summary, _)| summary)
819    }
820
821    pub(super) fn get_built_summary(
822        &self,
823        sequence: CheckpointSequenceNumber,
824    ) -> Option<&BuilderCheckpointSummary> {
825        self.builder_checkpoint_summary
826            .get(&sequence)
827            .map(|(summary, _)| summary)
828    }
829
830    pub(super) fn included_transaction_in_checkpoint(&self, digest: &TransactionDigest) -> bool {
831        self.builder_digest_to_checkpoint.contains_key(digest)
832    }
833
834    pub(super) fn is_consensus_message_processed(
835        &self,
836        key: &SequencedConsensusTransactionKey,
837    ) -> bool {
838        self.processed_consensus_messages.contains_key(key)
839    }
840
841    pub(super) fn is_empty(&self) -> bool {
842        self.output_queue.is_empty()
843    }
844
845    pub(super) fn get_next_shared_object_versions(
846        &self,
847        tables: &AuthorityEpochTables,
848        objects_to_init: &[ConsensusObjectSequenceKey],
849    ) -> SuiResult<Vec<Option<SequenceNumber>>> {
850        Ok(do_fallback_lookup(
851            objects_to_init,
852            |object_key| {
853                if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
854                    CacheResult::Hit(Some(*next_version))
855                } else {
856                    CacheResult::Miss
857                }
858            },
859            |object_keys| {
860                tables
861                    .next_shared_object_versions_v2
862                    .multi_get(object_keys)
863                    .expect("db error")
864            },
865        ))
866    }
867
868    /// Gets owned object locks, checking quarantine first then falling back to DB.
869    /// Used for post-consensus conflict detection when preconsensus locking is disabled.
870    /// After crash recovery, quarantine is empty so we naturally fall back to DB.
871    pub(super) fn get_owned_object_locks(
872        &self,
873        tables: &AuthorityEpochTables,
874        obj_refs: &[ObjectRef],
875    ) -> SuiResult<Vec<Option<LockDetails>>> {
876        Ok(do_fallback_lookup(
877            obj_refs,
878            |obj_ref| {
879                if let Some(lock) = self.owned_object_locks.get(obj_ref) {
880                    CacheResult::Hit(Some(*lock))
881                } else {
882                    CacheResult::Miss
883                }
884            },
885            |obj_refs| {
886                tables
887                    .multi_get_locked_transactions(obj_refs)
888                    .expect("db error")
889            },
890        ))
891    }
892
893    pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
894        self.output_queue
895            .back()
896            .and_then(|output| output.get_highest_pending_checkpoint_height())
897    }
898
899    pub(super) fn get_pending_checkpoints(
900        &self,
901        last: Option<CheckpointHeight>,
902    ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
903        let mut checkpoints = Vec::new();
904        for output in &self.output_queue {
905            checkpoints.extend(
906                output
907                    .get_pending_checkpoints(last)
908                    .map(|cp| (cp.height(), cp.clone())),
909            );
910        }
911        if cfg!(debug_assertions) {
912            let mut prev = None;
913            for (height, _) in &checkpoints {
914                if let Some(prev) = prev {
915                    assert!(prev < *height);
916                }
917                prev = Some(*height);
918            }
919        }
920        checkpoints
921    }
922
923    pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
924        self.output_queue
925            .iter()
926            .any(|output| output.pending_checkpoint_exists(index))
927    }
928
929    pub(super) fn get_pending_checkpoints_v2(
930        &self,
931        last: Option<CheckpointHeight>,
932    ) -> Vec<(CheckpointHeight, PendingCheckpointV2)> {
933        let mut checkpoints = Vec::new();
934        for output in &self.output_queue {
935            checkpoints.extend(
936                output
937                    .get_pending_checkpoints_v2(last)
938                    .map(|cp| (cp.height(), cp.clone())),
939            );
940        }
941        if cfg!(debug_assertions) {
942            let mut prev = None;
943            for (height, _) in &checkpoints {
944                if let Some(prev) = prev {
945                    assert!(prev < *height);
946                }
947                prev = Some(*height);
948            }
949        }
950        checkpoints
951    }
952
953    pub(super) fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
954        self.output_queue
955            .iter()
956            .any(|output| output.pending_checkpoint_exists_v2(index))
957    }
958
959    pub(super) fn get_new_jwks(
960        &self,
961        epoch_store: &AuthorityPerEpochStore,
962        round: u64,
963    ) -> SuiResult<Vec<ActiveJwk>> {
964        let epoch = epoch_store.epoch();
965
966        // Check if the requested round is in memory
967        for output in self.output_queue.iter().rev() {
968            // unwrap safe because output will always have last consensus stats set before being added
969            // to the quarantine
970            let output_round = output.get_round().unwrap();
971            if round == output_round {
972                return Ok(output
973                    .active_jwks
974                    .iter()
975                    .map(|(_, (jwk_id, jwk))| ActiveJwk {
976                        jwk_id: jwk_id.clone(),
977                        jwk: jwk.clone(),
978                        epoch,
979                    })
980                    .collect());
981            }
982        }
983
984        // Fall back to reading from database
985        let empty_jwk_id = JwkId::new(String::new(), String::new());
986        let empty_jwk = JWK {
987            kty: String::new(),
988            e: String::new(),
989            n: String::new(),
990            alg: String::new(),
991        };
992
993        let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
994        let end = (round + 1, (empty_jwk_id, empty_jwk));
995
996        Ok(epoch_store
997            .tables()?
998            .active_jwks
999            .safe_iter_with_bounds(Some(start), Some(end))
1000            .map_ok(|((r, (jwk_id, jwk)), _)| {
1001                debug_assert!(round == r);
1002                ActiveJwk { jwk_id, jwk, epoch }
1003            })
1004            .collect::<Result<Vec<_>, _>>()?)
1005    }
1006
1007    pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
1008        self.output_queue
1009            .iter()
1010            .rev()
1011            .filter_map(|output| output.get_randomness_last_round_timestamp())
1012            .next()
1013    }
1014
1015    pub(crate) fn load_initial_object_debts(
1016        &self,
1017        epoch_store: &AuthorityPerEpochStore,
1018        current_round: Round,
1019        for_randomness: bool,
1020        transactions: &[VerifiedExecutableTransactionWithAliases],
1021    ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
1022        let protocol_config = epoch_store.protocol_config();
1023        let tables = epoch_store.tables()?;
1024        let default_per_commit_budget = protocol_config
1025            .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
1026            .unwrap_or(0);
1027        let (hash_table, db_table, per_commit_budget) = if for_randomness {
1028            (
1029                &self.congestion_control_randomness_object_debts,
1030                &tables.congestion_control_randomness_object_debts,
1031                protocol_config
1032                    .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
1033                    .unwrap_or(default_per_commit_budget),
1034            )
1035        } else {
1036            (
1037                &self.congestion_control_object_debts,
1038                &tables.congestion_control_object_debts,
1039                default_per_commit_budget,
1040            )
1041        };
1042        let mut shared_input_object_ids: Vec<_> = transactions
1043            .iter()
1044            .flat_map(|tx| tx.tx().shared_input_objects().map(|obj| obj.id))
1045            .collect();
1046        shared_input_object_ids.sort();
1047        shared_input_object_ids.dedup();
1048
1049        let results = do_fallback_lookup(
1050            &shared_input_object_ids,
1051            |object_id| {
1052                if let Some(debt) = hash_table.get(object_id) {
1053                    CacheResult::Hit(Some(debt.into_v1()))
1054                } else {
1055                    CacheResult::Miss
1056                }
1057            },
1058            |object_ids| {
1059                db_table
1060                    .multi_get(object_ids)
1061                    .expect("db error")
1062                    .into_iter()
1063                    .map(|debt| debt.map(|debt| debt.into_v1()))
1064                    .collect()
1065            },
1066        );
1067
1068        Ok(results
1069            .into_iter()
1070            .zip(shared_input_object_ids)
1071            .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1072            .map(move |((round, debt), object_id)| {
1073                // Stored debts already account for the budget of the round in which
1074                // they were accumulated. Application of budget from future rounds to
1075                // the debt is handled here.
1076                assert!(current_round > round);
1077                let num_rounds = current_round - round - 1;
1078                let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1079                (object_id, debt)
1080            }))
1081    }
1082}
1083
1084// A wrapper around HashMap that uses refcounts to keep entries alive until
1085// they are no longer needed.
1086//
1087// If there are N inserts for the same key, the key will not be removed until
1088// there are N removes.
1089//
1090// It is intended to track the *latest* value for a given key, so duplicate
1091// inserts are intended to overwrite any prior value.
1092#[derive(Debug, Default)]
1093struct RefCountedHashMap<K, V> {
1094    map: HashMap<K, (usize, V)>,
1095}
1096
1097impl<K, V> RefCountedHashMap<K, V>
1098where
1099    K: Clone + Eq + std::hash::Hash,
1100{
1101    pub fn new() -> Self {
1102        Self {
1103            map: HashMap::new(),
1104        }
1105    }
1106
1107    pub fn insert(&mut self, key: K, value: V) {
1108        let entry = self.map.entry(key);
1109        match entry {
1110            hash_map::Entry::Occupied(mut entry) => {
1111                let (ref_count, v) = entry.get_mut();
1112                *ref_count += 1;
1113                *v = value;
1114            }
1115            hash_map::Entry::Vacant(entry) => {
1116                entry.insert((1, value));
1117            }
1118        }
1119    }
1120
1121    // Returns true if the key was present, false otherwise.
1122    // Note that the key may not be removed if present, as it may have a refcount > 1.
1123    pub fn remove(&mut self, key: &K) -> bool {
1124        let entry = self.map.entry(key.clone());
1125        match entry {
1126            hash_map::Entry::Occupied(mut entry) => {
1127                let (ref_count, _) = entry.get_mut();
1128                *ref_count -= 1;
1129                if *ref_count == 0 {
1130                    entry.remove();
1131                }
1132                true
1133            }
1134            hash_map::Entry::Vacant(_) => false,
1135        }
1136    }
1137
1138    pub fn get(&self, key: &K) -> Option<&V> {
1139        self.map.get(key).map(|(_, v)| v)
1140    }
1141
1142    pub fn contains_key(&self, key: &K) -> bool {
1143        self.map.contains_key(key)
1144    }
1145}