sui_core/authority/
consensus_quarantine.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_per_epoch_store::{
5    AuthorityEpochTables, EncG, ExecutionIndicesWithStats, LockDetails, LockDetailsWrapper, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::epoch::randomness::SINGLETON_KEY;
10use dashmap::DashMap;
11use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
12use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
13use moka::policy::EvictionPolicy;
14use moka::sync::SegmentedCache as MokaCache;
15use mysten_common::fatal;
16use mysten_common::random_util::randomize_cache_capacity_in_tests;
17use parking_lot::Mutex;
18use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
19use sui_types::authenticator_state::ActiveJwk;
20use sui_types::base_types::{AuthorityName, ObjectRef, SequenceNumber};
21use sui_types::crypto::RandomnessRound;
22use sui_types::error::SuiResult;
23use sui_types::executable_transaction::{
24    TrustedExecutableTransactionWithAliases, VerifiedExecutableTransactionWithAliases,
25};
26use sui_types::execution::ExecutionTimeObservationKey;
27use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
28use sui_types::messages_consensus::AuthorityIndex;
29use sui_types::{
30    base_types::{ConsensusObjectSequenceKey, ObjectID},
31    digests::TransactionDigest,
32    messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
33    signature::GenericSignature,
34};
35use tracing::{debug, info};
36use typed_store::Map;
37use typed_store::rocks::DBBatch;
38
39use crate::{
40    authority::{
41        authority_per_epoch_store::AuthorityPerEpochStore,
42        shared_object_congestion_tracker::CongestionPerObjectDebt,
43    },
44    checkpoints::{CheckpointHeight, PendingCheckpoint},
45    consensus_handler::SequencedConsensusTransactionKey,
46    epoch::{
47        randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
48        reconfiguration::ReconfigState,
49    },
50};
51
52use super::*;
53
54#[derive(Default)]
55#[allow(clippy::type_complexity)]
56pub(crate) struct ConsensusCommitOutput {
57    // Consensus and reconfig state
58    consensus_round: Round,
59    consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
60    end_of_publish: BTreeSet<AuthorityName>,
61    reconfig_state: Option<ReconfigState>,
62    consensus_commit_stats: Option<ExecutionIndicesWithStats>,
63
64    // transaction scheduling state
65    next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
66
67    deferred_txns: Vec<(DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>)>,
68    deleted_deferred_txns: BTreeSet<DeferralKey>,
69
70    // checkpoint state
71    pending_checkpoints: Vec<PendingCheckpoint>,
72
73    // random beacon state
74    next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
75
76    dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
77    dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
78    dkg_used_message: Option<VersionedUsedProcessedMessages>,
79    dkg_output: Option<dkg_v1::Output<PkG, EncG>>,
80
81    // jwk state
82    pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
83    active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
84
85    // congestion control state
86    congestion_control_object_debts: Vec<(ObjectID, u64)>,
87    congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
88    execution_time_observations: Vec<(
89        AuthorityIndex,
90        u64, /* generation */
91        Vec<(ExecutionTimeObservationKey, Duration)>,
92    )>,
93
94    // Owned object locks acquired post-consensus (when disable_preconsensus_locking=true)
95    owned_object_locks: HashMap<ObjectRef, LockDetails>,
96}
97
98impl ConsensusCommitOutput {
99    pub fn new(consensus_round: Round) -> Self {
100        Self {
101            consensus_round,
102            ..Default::default()
103        }
104    }
105
106    pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
107        self.deleted_deferred_txns.iter().cloned()
108    }
109
110    pub fn has_deferred_transactions(&self) -> bool {
111        !self.deferred_txns.is_empty()
112    }
113
114    fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
115        self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
116    }
117
118    fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
119        self.pending_checkpoints.last().map(|cp| cp.height())
120    }
121
122    fn get_pending_checkpoints(
123        &self,
124        last: Option<CheckpointHeight>,
125    ) -> impl Iterator<Item = &PendingCheckpoint> {
126        self.pending_checkpoints.iter().filter(move |cp| {
127            if let Some(last) = last {
128                cp.height() > last
129            } else {
130                true
131            }
132        })
133    }
134
135    fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
136        self.pending_checkpoints
137            .iter()
138            .any(|cp| cp.height() == *index)
139    }
140
141    fn get_round(&self) -> Option<u64> {
142        self.consensus_commit_stats
143            .as_ref()
144            .map(|stats| stats.index.last_committed_round)
145    }
146
147    pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
148        self.end_of_publish.insert(authority);
149    }
150
151    pub fn insert_execution_time_observation(
152        &mut self,
153        source: AuthorityIndex,
154        generation: u64,
155        estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
156    ) {
157        self.execution_time_observations
158            .push((source, generation, estimates));
159    }
160
161    pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStats) {
162        self.consensus_commit_stats = Some(stats);
163    }
164
165    // in testing code we often need to write to the db outside of a consensus commit
166    pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
167        self.record_consensus_commit_stats(Default::default());
168    }
169
170    pub fn store_reconfig_state(&mut self, state: ReconfigState) {
171        self.reconfig_state = Some(state);
172    }
173
174    pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
175        self.consensus_messages_processed.insert(key);
176    }
177
178    pub fn get_consensus_messages_processed(
179        &self,
180    ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
181        self.consensus_messages_processed.iter()
182    }
183
184    pub fn set_next_shared_object_versions(
185        &mut self,
186        next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
187    ) {
188        assert!(self.next_shared_object_versions.is_none());
189        self.next_shared_object_versions = Some(next_versions);
190    }
191
192    pub fn defer_transactions(
193        &mut self,
194        key: DeferralKey,
195        transactions: Vec<VerifiedExecutableTransactionWithAliases>,
196    ) {
197        self.deferred_txns.push((key, transactions));
198    }
199
200    pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
201        self.deleted_deferred_txns
202            .extend(deferral_keys.iter().cloned());
203    }
204
205    pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
206        self.pending_checkpoints.push(checkpoint);
207    }
208
209    pub fn reserve_next_randomness_round(
210        &mut self,
211        next_randomness_round: RandomnessRound,
212        commit_timestamp: TimestampMs,
213    ) {
214        assert!(self.next_randomness_round.is_none());
215        self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
216    }
217
218    pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
219        self.dkg_confirmations.insert(conf.sender(), conf);
220    }
221
222    pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
223        self.dkg_processed_messages
224            .insert(message.sender(), message);
225    }
226
227    pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
228        self.dkg_used_message = Some(used_messages);
229    }
230
231    pub fn set_dkg_output(&mut self, output: dkg_v1::Output<PkG, EncG>) {
232        self.dkg_output = Some(output);
233    }
234
235    pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
236        self.pending_jwks.insert((authority, id, jwk));
237    }
238
239    pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
240        self.active_jwks.insert((round, key));
241    }
242
243    pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
244        self.congestion_control_object_debts = object_debts;
245    }
246
247    pub fn set_congestion_control_randomness_object_debts(
248        &mut self,
249        object_debts: Vec<(ObjectID, u64)>,
250    ) {
251        self.congestion_control_randomness_object_debts = object_debts;
252    }
253
254    pub fn set_owned_object_locks(&mut self, locks: HashMap<ObjectRef, LockDetails>) {
255        assert!(self.owned_object_locks.is_empty());
256        self.owned_object_locks = locks;
257    }
258
259    pub fn write_to_batch(
260        self,
261        epoch_store: &AuthorityPerEpochStore,
262        batch: &mut DBBatch,
263    ) -> SuiResult {
264        let tables = epoch_store.tables()?;
265        batch.insert_batch(
266            &tables.consensus_message_processed,
267            self.consensus_messages_processed
268                .iter()
269                .map(|key| (key, true)),
270        )?;
271
272        batch.insert_batch(
273            &tables.end_of_publish,
274            self.end_of_publish.iter().map(|authority| (authority, ())),
275        )?;
276
277        if let Some(reconfig_state) = &self.reconfig_state {
278            batch.insert_batch(
279                &tables.reconfig_state,
280                [(RECONFIG_STATE_INDEX, reconfig_state)],
281            )?;
282        }
283
284        let consensus_commit_stats = self
285            .consensus_commit_stats
286            .expect("consensus_commit_stats must be set");
287        let round = consensus_commit_stats.index.last_committed_round;
288
289        batch.insert_batch(
290            &tables.last_consensus_stats,
291            [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
292        )?;
293
294        if let Some(next_versions) = self.next_shared_object_versions {
295            batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
296        }
297
298        if !self.owned_object_locks.is_empty() {
299            batch.insert_batch(
300                &tables.owned_object_locked_transactions,
301                self.owned_object_locks
302                    .into_iter()
303                    .map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))),
304            )?;
305        }
306
307        batch.delete_batch(
308            &tables.deferred_transactions_v2,
309            &self.deleted_deferred_txns,
310        )?;
311        batch.delete_batch(
312            &tables.deferred_transactions_with_aliases_v2,
313            &self.deleted_deferred_txns,
314        )?;
315
316        batch.insert_batch(
317            &tables.deferred_transactions_with_aliases_v2,
318            self.deferred_txns.into_iter().map(|(key, txs)| {
319                (
320                    key,
321                    txs.into_iter()
322                        .map(|tx| {
323                            let tx: TrustedExecutableTransactionWithAliases = tx.serializable();
324                            tx
325                        })
326                        .collect::<Vec<_>>(),
327                )
328            }),
329        )?;
330
331        if let Some((round, commit_timestamp)) = self.next_randomness_round {
332            batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
333            batch.insert_batch(
334                &tables.randomness_last_round_timestamp,
335                [(SINGLETON_KEY, commit_timestamp)],
336            )?;
337        }
338
339        batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
340        batch.insert_batch(
341            &tables.dkg_processed_messages_v2,
342            self.dkg_processed_messages,
343        )?;
344        batch.insert_batch(
345            &tables.dkg_used_messages_v2,
346            // using Option as iter
347            self.dkg_used_message
348                .into_iter()
349                .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
350        )?;
351        if let Some(output) = self.dkg_output {
352            batch.insert_batch(&tables.dkg_output, [(SINGLETON_KEY, output)])?;
353        }
354
355        batch.insert_batch(
356            &tables.pending_jwks,
357            self.pending_jwks.into_iter().map(|j| (j, ())),
358        )?;
359        batch.insert_batch(
360            &tables.active_jwks,
361            self.active_jwks.into_iter().map(|j| {
362                // TODO: we don't need to store the round in this map if it is invariant
363                assert_eq!(j.0, round);
364                (j, ())
365            }),
366        )?;
367
368        batch.insert_batch(
369            &tables.congestion_control_object_debts,
370            self.congestion_control_object_debts
371                .into_iter()
372                .map(|(object_id, debt)| {
373                    (
374                        object_id,
375                        CongestionPerObjectDebt::new(self.consensus_round, debt),
376                    )
377                }),
378        )?;
379        batch.insert_batch(
380            &tables.congestion_control_randomness_object_debts,
381            self.congestion_control_randomness_object_debts
382                .into_iter()
383                .map(|(object_id, debt)| {
384                    (
385                        object_id,
386                        CongestionPerObjectDebt::new(self.consensus_round, debt),
387                    )
388                }),
389        )?;
390
391        batch.insert_batch(
392            &tables.execution_time_observations,
393            self.execution_time_observations
394                .into_iter()
395                .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
396        )?;
397
398        Ok(())
399    }
400}
401
402/// ConsensusOutputCache holds outputs of consensus processing that do not need to be committed to disk.
403/// Data quarantining guarantees that all of this data will be used (e.g. for building checkpoints)
404/// before the consensus commit from which it originated is marked as processed. Therefore we can rely
405/// on replay of consensus commits to recover this data.
406pub(crate) struct ConsensusOutputCache {
407    // deferred transactions is only used by consensus handler so there should never be lock contention
408    // - hence no need for a DashMap.
409    pub(crate) deferred_transactions:
410        Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>>,
411
412    // user_signatures_for_checkpoints is written to by consensus handler and read from by checkpoint builder
413    // The critical sections are small in both cases so a DashMap is probably not helpful.
414    #[allow(clippy::type_complexity)]
415    pub(crate) user_signatures_for_checkpoints:
416        Mutex<HashMap<TransactionDigest, Vec<(GenericSignature, Option<SequenceNumber>)>>>,
417
418    executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
419    executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
420}
421
422impl ConsensusOutputCache {
423    pub(crate) fn new(tables: &AuthorityEpochTables) -> Self {
424        let deferred_transactions = tables
425            .get_all_deferred_transactions_v2()
426            .expect("load deferred transactions cannot fail");
427
428        let executed_in_epoch_cache_capacity = 50_000;
429
430        Self {
431            deferred_transactions: Mutex::new(deferred_transactions),
432            user_signatures_for_checkpoints: Default::default(),
433            executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
434            executed_in_epoch_cache: MokaCache::builder(8)
435                // most queries should be for recent transactions
436                .max_capacity(randomize_cache_capacity_in_tests(
437                    executed_in_epoch_cache_capacity,
438                ))
439                .eviction_policy(EvictionPolicy::lru())
440                .build(),
441        }
442    }
443
444    pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
445        self.executed_in_epoch
446            .read()
447            .contains_key(digest) ||
448            // we use get instead of contains key to mark the entry as read
449            self.executed_in_epoch_cache.get(digest).is_some()
450    }
451
452    // Called by execution
453    pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
454        assert!(
455            self.executed_in_epoch
456                .read()
457                .insert(tx_digest, ())
458                .is_none(),
459            "transaction already executed"
460        );
461        self.executed_in_epoch_cache.insert(tx_digest, ());
462    }
463
464    // CheckpointExecutor calls this (indirectly) in order to prune the in-memory cache of executed
465    // transactions. By the time this is called, the transaction digests will have been committed to
466    // the `executed_transactions_to_checkpoint` table.
467    pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
468        let executed_in_epoch = self.executed_in_epoch.read();
469        for tx_digest in tx_digests {
470            executed_in_epoch.remove(tx_digest);
471        }
472    }
473}
474
475/// ConsensusOutputQuarantine holds outputs of consensus processing in memory until the checkpoints
476/// for the commit have been certified.
477pub(crate) struct ConsensusOutputQuarantine {
478    // Output from consensus handler
479    output_queue: VecDeque<ConsensusCommitOutput>,
480
481    // Highest known certified checkpoint sequence number
482    highest_executed_checkpoint: CheckpointSequenceNumber,
483
484    // Checkpoint Builder output
485    builder_checkpoint_summary:
486        BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
487
488    builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
489
490    // Any un-committed next versions are stored here.
491    shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
492
493    // The most recent congestion control debts for objects. Uses a ref-count to track
494    // which objects still exist in some element of output_queue.
495    congestion_control_randomness_object_debts:
496        RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
497    congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
498
499    processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
500
501    // Owned object locks acquired post-consensus.
502    owned_object_locks: HashMap<ObjectRef, LockDetails>,
503
504    metrics: Arc<EpochMetrics>,
505}
506
507impl ConsensusOutputQuarantine {
508    pub(super) fn new(
509        highest_executed_checkpoint: CheckpointSequenceNumber,
510        authority_metrics: Arc<EpochMetrics>,
511    ) -> Self {
512        Self {
513            highest_executed_checkpoint,
514
515            output_queue: VecDeque::new(),
516            builder_checkpoint_summary: BTreeMap::new(),
517            builder_digest_to_checkpoint: HashMap::new(),
518            shared_object_next_versions: RefCountedHashMap::new(),
519            processed_consensus_messages: RefCountedHashMap::new(),
520            congestion_control_randomness_object_debts: RefCountedHashMap::new(),
521            congestion_control_object_debts: RefCountedHashMap::new(),
522            owned_object_locks: HashMap::new(),
523            metrics: authority_metrics,
524        }
525    }
526}
527
528// Write methods - all methods in this block insert new data into the quarantine.
529// There are only two sources! ConsensusHandler and CheckpointBuilder.
530impl ConsensusOutputQuarantine {
531    // Push all data gathered from a consensus commit into the quarantine.
532    pub(crate) fn push_consensus_output(
533        &mut self,
534        output: ConsensusCommitOutput,
535        epoch_store: &AuthorityPerEpochStore,
536    ) -> SuiResult {
537        self.insert_shared_object_next_versions(&output);
538        self.insert_congestion_control_debts(&output);
539        self.insert_processed_consensus_messages(&output);
540        self.insert_owned_object_locks(&output);
541        self.output_queue.push_back(output);
542
543        self.metrics
544            .consensus_quarantine_queue_size
545            .set(self.output_queue.len() as i64);
546
547        // we may already have observed the certified checkpoint for this round, if state sync is running
548        // ahead of consensus, so there may be data to commit right away.
549        self.commit(epoch_store)
550    }
551
552    // Record a newly built checkpoint.
553    pub(super) fn insert_builder_summary(
554        &mut self,
555        sequence_number: CheckpointSequenceNumber,
556        summary: BuilderCheckpointSummary,
557        contents: CheckpointContents,
558    ) {
559        debug!(?sequence_number, "inserting builder summary {:?}", summary);
560        for tx in contents.iter() {
561            self.builder_digest_to_checkpoint
562                .insert(tx.transaction, sequence_number);
563        }
564        self.builder_checkpoint_summary
565            .insert(sequence_number, (summary, contents));
566    }
567}
568
569// Commit methods.
570impl ConsensusOutputQuarantine {
571    /// Update the highest executed checkpoint and commit any data which is now
572    /// below the watermark.
573    pub(super) fn update_highest_executed_checkpoint(
574        &mut self,
575        checkpoint: CheckpointSequenceNumber,
576        epoch_store: &AuthorityPerEpochStore,
577        batch: &mut DBBatch,
578    ) -> SuiResult {
579        self.highest_executed_checkpoint = checkpoint;
580        self.commit_with_batch(epoch_store, batch)
581    }
582
583    pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
584        let mut batch = epoch_store.db_batch()?;
585        self.commit_with_batch(epoch_store, &mut batch)?;
586        batch.write()?;
587        Ok(())
588    }
589
590    /// Commit all data below the watermark.
591    fn commit_with_batch(
592        &mut self,
593        epoch_store: &AuthorityPerEpochStore,
594        batch: &mut DBBatch,
595    ) -> SuiResult {
596        // The commit algorithm is simple:
597        // 1. First commit all checkpoint builder state which is below the watermark.
598        // 2. Determine the consensus commit height that corresponds to the highest committed
599        //    checkpoint.
600        // 3. Commit all consensus output at that height or below.
601
602        let tables = epoch_store.tables()?;
603
604        let mut highest_committed_height = None;
605
606        while self
607            .builder_checkpoint_summary
608            .first_key_value()
609            .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
610            == Some(true)
611        {
612            let (seq, (builder_summary, contents)) =
613                self.builder_checkpoint_summary.pop_first().unwrap();
614
615            for tx in contents.iter() {
616                let digest = &tx.transaction;
617                assert_eq!(
618                    self.builder_digest_to_checkpoint
619                        .remove(digest)
620                        .unwrap_or_else(|| {
621                            panic!(
622                                "transaction {:?} not found in builder_digest_to_checkpoint",
623                                digest
624                            )
625                        }),
626                    seq
627                );
628            }
629
630            batch.insert_batch(
631                &tables.builder_digest_to_checkpoint,
632                contents.iter().map(|tx| (tx.transaction, seq)),
633            )?;
634
635            batch.insert_batch(
636                &tables.builder_checkpoint_summary_v2,
637                [(seq, &builder_summary)],
638            )?;
639
640            let checkpoint_height = builder_summary
641                .checkpoint_height
642                .expect("non-genesis checkpoint must have height");
643            if let Some(highest) = highest_committed_height {
644                assert!(
645                    checkpoint_height >= highest,
646                    "current checkpoint height {} must be no less than highest committed height {}",
647                    checkpoint_height,
648                    highest
649                );
650            }
651
652            highest_committed_height = Some(checkpoint_height);
653        }
654
655        let Some(highest_committed_height) = highest_committed_height else {
656            return Ok(());
657        };
658
659        while !self.output_queue.is_empty() {
660            // A consensus commit can have more than one pending checkpoint (a regular one and a randomnes one).
661            // We can only write the consensus commit if the highest pending checkpoint associated with it has
662            // been processed by the builder.
663            let Some(highest_in_commit) = self
664                .output_queue
665                .front()
666                .unwrap()
667                .get_highest_pending_checkpoint_height()
668            else {
669                // if highest is none, we have already written the pending checkpoint for the final epoch,
670                // so there is no more data that needs to be committed.
671                break;
672            };
673
674            if highest_in_commit <= highest_committed_height {
675                info!(
676                    "committing output with highest pending checkpoint height {:?}",
677                    highest_in_commit
678                );
679                let output = self.output_queue.pop_front().unwrap();
680                self.remove_shared_object_next_versions(&output);
681                self.remove_processed_consensus_messages(&output);
682                self.remove_congestion_control_debts(&output);
683                self.remove_owned_object_locks(&output);
684
685                output.write_to_batch(epoch_store, batch)?;
686            } else {
687                break;
688            }
689        }
690
691        self.metrics
692            .consensus_quarantine_queue_size
693            .set(self.output_queue.len() as i64);
694
695        Ok(())
696    }
697}
698
699impl ConsensusOutputQuarantine {
700    fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
701        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
702            for (object_id, next_version) in next_versions {
703                self.shared_object_next_versions
704                    .insert(*object_id, *next_version);
705            }
706        }
707    }
708
709    fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
710        let current_round = output.consensus_round;
711
712        for (object_id, debt) in output.congestion_control_object_debts.iter() {
713            self.congestion_control_object_debts.insert(
714                *object_id,
715                CongestionPerObjectDebt::new(current_round, *debt),
716            );
717        }
718
719        for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
720            self.congestion_control_randomness_object_debts.insert(
721                *object_id,
722                CongestionPerObjectDebt::new(current_round, *debt),
723            );
724        }
725    }
726
727    fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
728        for (object_id, _) in output.congestion_control_object_debts.iter() {
729            self.congestion_control_object_debts.remove(object_id);
730        }
731        for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
732            self.congestion_control_randomness_object_debts
733                .remove(object_id);
734        }
735    }
736
737    fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
738        for tx_key in output.consensus_messages_processed.iter() {
739            self.processed_consensus_messages.insert(tx_key.clone(), ());
740        }
741    }
742
743    fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
744        for tx_key in output.consensus_messages_processed.iter() {
745            self.processed_consensus_messages.remove(tx_key);
746        }
747    }
748
749    fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
750        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
751            for object_id in next_versions.keys() {
752                if !self.shared_object_next_versions.remove(object_id) {
753                    fatal!(
754                        "Shared object next version not found in quarantine: {:?}",
755                        object_id
756                    );
757                }
758            }
759        }
760    }
761
762    fn insert_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
763        for (obj_ref, lock) in &output.owned_object_locks {
764            self.owned_object_locks.insert(*obj_ref, *lock);
765        }
766    }
767
768    fn remove_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
769        for obj_ref in output.owned_object_locks.keys() {
770            self.owned_object_locks.remove(obj_ref);
771        }
772    }
773}
774
775// Read methods - all methods in this block return data from the quarantine which would otherwise
776// be found in the database.
777impl ConsensusOutputQuarantine {
778    pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
779        self.builder_checkpoint_summary
780            .values()
781            .last()
782            .map(|(summary, _)| summary)
783    }
784
785    pub(super) fn get_built_summary(
786        &self,
787        sequence: CheckpointSequenceNumber,
788    ) -> Option<&BuilderCheckpointSummary> {
789        self.builder_checkpoint_summary
790            .get(&sequence)
791            .map(|(summary, _)| summary)
792    }
793
794    pub(super) fn included_transaction_in_checkpoint(&self, digest: &TransactionDigest) -> bool {
795        self.builder_digest_to_checkpoint.contains_key(digest)
796    }
797
798    pub(super) fn is_consensus_message_processed(
799        &self,
800        key: &SequencedConsensusTransactionKey,
801    ) -> bool {
802        self.processed_consensus_messages.contains_key(key)
803    }
804
805    pub(super) fn is_empty(&self) -> bool {
806        self.output_queue.is_empty()
807    }
808
809    pub(super) fn get_next_shared_object_versions(
810        &self,
811        tables: &AuthorityEpochTables,
812        objects_to_init: &[ConsensusObjectSequenceKey],
813    ) -> SuiResult<Vec<Option<SequenceNumber>>> {
814        Ok(do_fallback_lookup(
815            objects_to_init,
816            |object_key| {
817                if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
818                    CacheResult::Hit(Some(*next_version))
819                } else {
820                    CacheResult::Miss
821                }
822            },
823            |object_keys| {
824                tables
825                    .next_shared_object_versions_v2
826                    .multi_get(object_keys)
827                    .expect("db error")
828            },
829        ))
830    }
831
832    /// Gets owned object locks, checking quarantine first then falling back to DB.
833    /// Used for post-consensus conflict detection when preconsensus locking is disabled.
834    /// After crash recovery, quarantine is empty so we naturally fall back to DB.
835    pub(super) fn get_owned_object_locks(
836        &self,
837        tables: &AuthorityEpochTables,
838        obj_refs: &[ObjectRef],
839    ) -> SuiResult<Vec<Option<LockDetails>>> {
840        Ok(do_fallback_lookup(
841            obj_refs,
842            |obj_ref| {
843                if let Some(lock) = self.owned_object_locks.get(obj_ref) {
844                    CacheResult::Hit(Some(*lock))
845                } else {
846                    CacheResult::Miss
847                }
848            },
849            |obj_refs| {
850                tables
851                    .multi_get_locked_transactions(obj_refs)
852                    .expect("db error")
853            },
854        ))
855    }
856
857    pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
858        self.output_queue
859            .back()
860            .and_then(|output| output.get_highest_pending_checkpoint_height())
861    }
862
863    pub(super) fn get_pending_checkpoints(
864        &self,
865        last: Option<CheckpointHeight>,
866    ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
867        let mut checkpoints = Vec::new();
868        for output in &self.output_queue {
869            checkpoints.extend(
870                output
871                    .get_pending_checkpoints(last)
872                    .map(|cp| (cp.height(), cp.clone())),
873            );
874        }
875        if cfg!(debug_assertions) {
876            let mut prev = None;
877            for (height, _) in &checkpoints {
878                if let Some(prev) = prev {
879                    assert!(prev < *height);
880                }
881                prev = Some(*height);
882            }
883        }
884        checkpoints
885    }
886
887    pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
888        self.output_queue
889            .iter()
890            .any(|output| output.pending_checkpoint_exists(index))
891    }
892
893    pub(super) fn get_new_jwks(
894        &self,
895        epoch_store: &AuthorityPerEpochStore,
896        round: u64,
897    ) -> SuiResult<Vec<ActiveJwk>> {
898        let epoch = epoch_store.epoch();
899
900        // Check if the requested round is in memory
901        for output in self.output_queue.iter().rev() {
902            // unwrap safe because output will always have last consensus stats set before being added
903            // to the quarantine
904            let output_round = output.get_round().unwrap();
905            if round == output_round {
906                return Ok(output
907                    .active_jwks
908                    .iter()
909                    .map(|(_, (jwk_id, jwk))| ActiveJwk {
910                        jwk_id: jwk_id.clone(),
911                        jwk: jwk.clone(),
912                        epoch,
913                    })
914                    .collect());
915            }
916        }
917
918        // Fall back to reading from database
919        let empty_jwk_id = JwkId::new(String::new(), String::new());
920        let empty_jwk = JWK {
921            kty: String::new(),
922            e: String::new(),
923            n: String::new(),
924            alg: String::new(),
925        };
926
927        let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
928        let end = (round + 1, (empty_jwk_id, empty_jwk));
929
930        Ok(epoch_store
931            .tables()?
932            .active_jwks
933            .safe_iter_with_bounds(Some(start), Some(end))
934            .map_ok(|((r, (jwk_id, jwk)), _)| {
935                debug_assert!(round == r);
936                ActiveJwk { jwk_id, jwk, epoch }
937            })
938            .collect::<Result<Vec<_>, _>>()?)
939    }
940
941    pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
942        self.output_queue
943            .iter()
944            .rev()
945            .filter_map(|output| output.get_randomness_last_round_timestamp())
946            .next()
947    }
948
949    pub(crate) fn load_initial_object_debts(
950        &self,
951        epoch_store: &AuthorityPerEpochStore,
952        current_round: Round,
953        for_randomness: bool,
954        transactions: &[VerifiedExecutableTransactionWithAliases],
955    ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
956        let protocol_config = epoch_store.protocol_config();
957        let tables = epoch_store.tables()?;
958        let default_per_commit_budget = protocol_config
959            .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
960            .unwrap_or(0);
961        let (hash_table, db_table, per_commit_budget) = if for_randomness {
962            (
963                &self.congestion_control_randomness_object_debts,
964                &tables.congestion_control_randomness_object_debts,
965                protocol_config
966                    .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
967                    .unwrap_or(default_per_commit_budget),
968            )
969        } else {
970            (
971                &self.congestion_control_object_debts,
972                &tables.congestion_control_object_debts,
973                default_per_commit_budget,
974            )
975        };
976        let mut shared_input_object_ids: Vec<_> = transactions
977            .iter()
978            .flat_map(|tx| tx.tx().shared_input_objects().map(|obj| obj.id))
979            .collect();
980        shared_input_object_ids.sort();
981        shared_input_object_ids.dedup();
982
983        let results = do_fallback_lookup(
984            &shared_input_object_ids,
985            |object_id| {
986                if let Some(debt) = hash_table.get(object_id) {
987                    CacheResult::Hit(Some(debt.into_v1()))
988                } else {
989                    CacheResult::Miss
990                }
991            },
992            |object_ids| {
993                db_table
994                    .multi_get(object_ids)
995                    .expect("db error")
996                    .into_iter()
997                    .map(|debt| debt.map(|debt| debt.into_v1()))
998                    .collect()
999            },
1000        );
1001
1002        Ok(results
1003            .into_iter()
1004            .zip(shared_input_object_ids)
1005            .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1006            .map(move |((round, debt), object_id)| {
1007                // Stored debts already account for the budget of the round in which
1008                // they were accumulated. Application of budget from future rounds to
1009                // the debt is handled here.
1010                assert!(current_round > round);
1011                let num_rounds = current_round - round - 1;
1012                let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1013                (object_id, debt)
1014            }))
1015    }
1016}
1017
1018// A wrapper around HashMap that uses refcounts to keep entries alive until
1019// they are no longer needed.
1020//
1021// If there are N inserts for the same key, the key will not be removed until
1022// there are N removes.
1023//
1024// It is intended to track the *latest* value for a given key, so duplicate
1025// inserts are intended to overwrite any prior value.
1026#[derive(Debug, Default)]
1027struct RefCountedHashMap<K, V> {
1028    map: HashMap<K, (usize, V)>,
1029}
1030
1031impl<K, V> RefCountedHashMap<K, V>
1032where
1033    K: Clone + Eq + std::hash::Hash,
1034{
1035    pub fn new() -> Self {
1036        Self {
1037            map: HashMap::new(),
1038        }
1039    }
1040
1041    pub fn insert(&mut self, key: K, value: V) {
1042        let entry = self.map.entry(key);
1043        match entry {
1044            hash_map::Entry::Occupied(mut entry) => {
1045                let (ref_count, v) = entry.get_mut();
1046                *ref_count += 1;
1047                *v = value;
1048            }
1049            hash_map::Entry::Vacant(entry) => {
1050                entry.insert((1, value));
1051            }
1052        }
1053    }
1054
1055    // Returns true if the key was present, false otherwise.
1056    // Note that the key may not be removed if present, as it may have a refcount > 1.
1057    pub fn remove(&mut self, key: &K) -> bool {
1058        let entry = self.map.entry(key.clone());
1059        match entry {
1060            hash_map::Entry::Occupied(mut entry) => {
1061                let (ref_count, _) = entry.get_mut();
1062                *ref_count -= 1;
1063                if *ref_count == 0 {
1064                    entry.remove();
1065                }
1066                true
1067            }
1068            hash_map::Entry::Vacant(_) => false,
1069        }
1070    }
1071
1072    pub fn get(&self, key: &K) -> Option<&V> {
1073        self.map.get(key).map(|(_, v)| v)
1074    }
1075
1076    pub fn contains_key(&self, key: &K) -> bool {
1077        self.map.contains_key(key)
1078    }
1079}