sui_core/authority/
consensus_quarantine.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_per_epoch_store::{
5    AuthorityEpochTables, EncG, ExecutionIndicesWithStats, LockDetails, LockDetailsWrapper, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::epoch::randomness::SINGLETON_KEY;
10use dashmap::DashMap;
11use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
12use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
13use moka::policy::EvictionPolicy;
14use moka::sync::SegmentedCache as MokaCache;
15use mysten_common::fatal;
16use mysten_common::random_util::randomize_cache_capacity_in_tests;
17use parking_lot::Mutex;
18use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
19use sui_types::authenticator_state::ActiveJwk;
20use sui_types::base_types::{AuthorityName, ObjectRef, SequenceNumber};
21use sui_types::crypto::RandomnessRound;
22use sui_types::error::SuiResult;
23use sui_types::executable_transaction::{
24    TrustedExecutableTransactionWithAliases, VerifiedExecutableTransactionWithAliases,
25};
26use sui_types::execution::ExecutionTimeObservationKey;
27use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
28use sui_types::messages_consensus::AuthorityIndex;
29use sui_types::{
30    base_types::{ConsensusObjectSequenceKey, ObjectID},
31    digests::TransactionDigest,
32    messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
33    signature::GenericSignature,
34};
35use tracing::{debug, info};
36use typed_store::Map;
37use typed_store::rocks::DBBatch;
38
39use crate::{
40    authority::{
41        authority_per_epoch_store::AuthorityPerEpochStore,
42        shared_object_congestion_tracker::CongestionPerObjectDebt,
43    },
44    checkpoints::{CheckpointHeight, PendingCheckpoint, PendingCheckpointV2},
45    consensus_handler::SequencedConsensusTransactionKey,
46    epoch::{
47        randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
48        reconfiguration::ReconfigState,
49    },
50};
51
52use super::*;
53
54#[derive(Default)]
55#[allow(clippy::type_complexity)]
56pub(crate) struct ConsensusCommitOutput {
57    // Consensus and reconfig state
58    consensus_round: Round,
59    consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
60    end_of_publish: BTreeSet<AuthorityName>,
61    reconfig_state: Option<ReconfigState>,
62    consensus_commit_stats: Option<ExecutionIndicesWithStats>,
63
64    // transaction scheduling state
65    next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
66
67    deferred_txns: Vec<(DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>)>,
68    deleted_deferred_txns: BTreeSet<DeferralKey>,
69
70    // checkpoint state
71    pending_checkpoints: Vec<PendingCheckpoint>,
72    pending_checkpoints_v2: Vec<PendingCheckpointV2>,
73
74    // random beacon state
75    next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
76
77    dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
78    dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
79    dkg_used_message: Option<VersionedUsedProcessedMessages>,
80    dkg_output: Option<dkg_v1::Output<PkG, EncG>>,
81
82    // jwk state
83    pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
84    active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
85
86    // congestion control state
87    congestion_control_object_debts: Vec<(ObjectID, u64)>,
88    congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
89    execution_time_observations: Vec<(
90        AuthorityIndex,
91        u64, /* generation */
92        Vec<(ExecutionTimeObservationKey, Duration)>,
93    )>,
94
95    // Owned object locks acquired post-consensus (when disable_preconsensus_locking=true)
96    owned_object_locks: HashMap<ObjectRef, LockDetails>,
97}
98
99impl ConsensusCommitOutput {
100    pub fn new(consensus_round: Round) -> Self {
101        Self {
102            consensus_round,
103            ..Default::default()
104        }
105    }
106
107    pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
108        self.deleted_deferred_txns.iter().cloned()
109    }
110
111    pub fn has_deferred_transactions(&self) -> bool {
112        !self.deferred_txns.is_empty()
113    }
114
115    fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
116        self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
117    }
118
119    fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
120        self.pending_checkpoints.last().map(|cp| cp.height())
121    }
122
123    fn get_pending_checkpoints(
124        &self,
125        last: Option<CheckpointHeight>,
126    ) -> impl Iterator<Item = &PendingCheckpoint> {
127        self.pending_checkpoints.iter().filter(move |cp| {
128            if let Some(last) = last {
129                cp.height() > last
130            } else {
131                true
132            }
133        })
134    }
135
136    fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
137        self.pending_checkpoints
138            .iter()
139            .any(|cp| cp.height() == *index)
140    }
141
142    fn get_pending_checkpoints_v2(
143        &self,
144        last: Option<CheckpointHeight>,
145    ) -> impl Iterator<Item = &PendingCheckpointV2> {
146        self.pending_checkpoints_v2.iter().filter(move |cp| {
147            if let Some(last) = last {
148                cp.height() > last
149            } else {
150                true
151            }
152        })
153    }
154
155    fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
156        self.pending_checkpoints_v2
157            .iter()
158            .any(|cp| cp.height() == *index)
159    }
160
161    fn get_round(&self) -> Option<u64> {
162        self.consensus_commit_stats
163            .as_ref()
164            .map(|stats| stats.index.last_committed_round)
165    }
166
167    pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
168        self.end_of_publish.insert(authority);
169    }
170
171    pub fn insert_execution_time_observation(
172        &mut self,
173        source: AuthorityIndex,
174        generation: u64,
175        estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
176    ) {
177        self.execution_time_observations
178            .push((source, generation, estimates));
179    }
180
181    pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStats) {
182        self.consensus_commit_stats = Some(stats);
183    }
184
185    // in testing code we often need to write to the db outside of a consensus commit
186    pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
187        self.record_consensus_commit_stats(Default::default());
188    }
189
190    pub fn store_reconfig_state(&mut self, state: ReconfigState) {
191        self.reconfig_state = Some(state);
192    }
193
194    pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
195        self.consensus_messages_processed.insert(key);
196    }
197
198    pub fn get_consensus_messages_processed(
199        &self,
200    ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
201        self.consensus_messages_processed.iter()
202    }
203
204    pub fn set_next_shared_object_versions(
205        &mut self,
206        next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
207    ) {
208        assert!(self.next_shared_object_versions.is_none());
209        self.next_shared_object_versions = Some(next_versions);
210    }
211
212    pub fn defer_transactions(
213        &mut self,
214        key: DeferralKey,
215        transactions: Vec<VerifiedExecutableTransactionWithAliases>,
216    ) {
217        self.deferred_txns.push((key, transactions));
218    }
219
220    pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
221        self.deleted_deferred_txns
222            .extend(deferral_keys.iter().cloned());
223    }
224
225    pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
226        self.pending_checkpoints.push(checkpoint);
227    }
228
229    pub fn insert_pending_checkpoint_v2(&mut self, checkpoint: PendingCheckpointV2) {
230        self.pending_checkpoints_v2.push(checkpoint);
231    }
232
233    pub fn reserve_next_randomness_round(
234        &mut self,
235        next_randomness_round: RandomnessRound,
236        commit_timestamp: TimestampMs,
237    ) {
238        assert!(self.next_randomness_round.is_none());
239        self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
240    }
241
242    pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
243        self.dkg_confirmations.insert(conf.sender(), conf);
244    }
245
246    pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
247        self.dkg_processed_messages
248            .insert(message.sender(), message);
249    }
250
251    pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
252        self.dkg_used_message = Some(used_messages);
253    }
254
255    pub fn set_dkg_output(&mut self, output: dkg_v1::Output<PkG, EncG>) {
256        self.dkg_output = Some(output);
257    }
258
259    pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
260        self.pending_jwks.insert((authority, id, jwk));
261    }
262
263    pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
264        self.active_jwks.insert((round, key));
265    }
266
267    pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
268        self.congestion_control_object_debts = object_debts;
269    }
270
271    pub fn set_congestion_control_randomness_object_debts(
272        &mut self,
273        object_debts: Vec<(ObjectID, u64)>,
274    ) {
275        self.congestion_control_randomness_object_debts = object_debts;
276    }
277
278    pub fn set_owned_object_locks(&mut self, locks: HashMap<ObjectRef, LockDetails>) {
279        assert!(self.owned_object_locks.is_empty());
280        self.owned_object_locks = locks;
281    }
282
283    pub fn write_to_batch(
284        self,
285        epoch_store: &AuthorityPerEpochStore,
286        batch: &mut DBBatch,
287    ) -> SuiResult {
288        let tables = epoch_store.tables()?;
289        batch.insert_batch(
290            &tables.consensus_message_processed,
291            self.consensus_messages_processed
292                .iter()
293                .map(|key| (key, true)),
294        )?;
295
296        batch.insert_batch(
297            &tables.end_of_publish,
298            self.end_of_publish.iter().map(|authority| (authority, ())),
299        )?;
300
301        if let Some(reconfig_state) = &self.reconfig_state {
302            batch.insert_batch(
303                &tables.reconfig_state,
304                [(RECONFIG_STATE_INDEX, reconfig_state)],
305            )?;
306        }
307
308        let consensus_commit_stats = self
309            .consensus_commit_stats
310            .expect("consensus_commit_stats must be set");
311        let round = consensus_commit_stats.index.last_committed_round;
312
313        batch.insert_batch(
314            &tables.last_consensus_stats,
315            [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
316        )?;
317
318        if let Some(next_versions) = self.next_shared_object_versions {
319            batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
320        }
321
322        if !self.owned_object_locks.is_empty() {
323            batch.insert_batch(
324                &tables.owned_object_locked_transactions,
325                self.owned_object_locks
326                    .into_iter()
327                    .map(|(obj_ref, lock)| (obj_ref, LockDetailsWrapper::from(lock))),
328            )?;
329        }
330
331        batch.delete_batch(
332            &tables.deferred_transactions_v2,
333            &self.deleted_deferred_txns,
334        )?;
335        batch.delete_batch(
336            &tables.deferred_transactions_with_aliases_v2,
337            &self.deleted_deferred_txns,
338        )?;
339        batch.delete_batch(
340            &tables.deferred_transactions_with_aliases_v3,
341            &self.deleted_deferred_txns,
342        )?;
343
344        batch.insert_batch(
345            &tables.deferred_transactions_with_aliases_v3,
346            self.deferred_txns.into_iter().map(|(key, txs)| {
347                (
348                    key,
349                    txs.into_iter()
350                        .map(|tx| {
351                            let tx: TrustedExecutableTransactionWithAliases = tx.serializable();
352                            tx
353                        })
354                        .collect::<Vec<_>>(),
355                )
356            }),
357        )?;
358
359        if let Some((round, commit_timestamp)) = self.next_randomness_round {
360            batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
361            batch.insert_batch(
362                &tables.randomness_last_round_timestamp,
363                [(SINGLETON_KEY, commit_timestamp)],
364            )?;
365        }
366
367        batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
368        batch.insert_batch(
369            &tables.dkg_processed_messages_v2,
370            self.dkg_processed_messages,
371        )?;
372        batch.insert_batch(
373            &tables.dkg_used_messages_v2,
374            // using Option as iter
375            self.dkg_used_message
376                .into_iter()
377                .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
378        )?;
379        if let Some(output) = self.dkg_output {
380            batch.insert_batch(&tables.dkg_output, [(SINGLETON_KEY, output)])?;
381        }
382
383        batch.insert_batch(
384            &tables.pending_jwks,
385            self.pending_jwks.into_iter().map(|j| (j, ())),
386        )?;
387        batch.insert_batch(
388            &tables.active_jwks,
389            self.active_jwks.into_iter().map(|j| {
390                // TODO: we don't need to store the round in this map if it is invariant
391                assert_eq!(j.0, round);
392                (j, ())
393            }),
394        )?;
395
396        batch.insert_batch(
397            &tables.congestion_control_object_debts,
398            self.congestion_control_object_debts
399                .into_iter()
400                .map(|(object_id, debt)| {
401                    (
402                        object_id,
403                        CongestionPerObjectDebt::new(self.consensus_round, debt),
404                    )
405                }),
406        )?;
407        batch.insert_batch(
408            &tables.congestion_control_randomness_object_debts,
409            self.congestion_control_randomness_object_debts
410                .into_iter()
411                .map(|(object_id, debt)| {
412                    (
413                        object_id,
414                        CongestionPerObjectDebt::new(self.consensus_round, debt),
415                    )
416                }),
417        )?;
418
419        batch.insert_batch(
420            &tables.execution_time_observations,
421            self.execution_time_observations
422                .into_iter()
423                .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
424        )?;
425
426        Ok(())
427    }
428}
429
430/// ConsensusOutputCache holds outputs of consensus processing that do not need to be committed to disk.
431/// Data quarantining guarantees that all of this data will be used (e.g. for building checkpoints)
432/// before the consensus commit from which it originated is marked as processed. Therefore we can rely
433/// on replay of consensus commits to recover this data.
434pub(crate) struct ConsensusOutputCache {
435    // deferred transactions is only used by consensus handler so there should never be lock contention
436    // - hence no need for a DashMap.
437    pub(crate) deferred_transactions:
438        Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransactionWithAliases>>>,
439
440    // user_signatures_for_checkpoints is written to by consensus handler and read from by checkpoint builder
441    // The critical sections are small in both cases so a DashMap is probably not helpful.
442    #[allow(clippy::type_complexity)]
443    pub(crate) user_signatures_for_checkpoints:
444        Mutex<HashMap<TransactionDigest, Vec<(GenericSignature, Option<SequenceNumber>)>>>,
445
446    executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
447    executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
448}
449
450impl ConsensusOutputCache {
451    pub(crate) fn new(tables: &AuthorityEpochTables) -> Self {
452        let deferred_transactions = tables
453            .get_all_deferred_transactions_v2()
454            .expect("load deferred transactions cannot fail");
455
456        let executed_in_epoch_cache_capacity = 50_000;
457
458        Self {
459            deferred_transactions: Mutex::new(deferred_transactions),
460            user_signatures_for_checkpoints: Default::default(),
461            executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
462            executed_in_epoch_cache: MokaCache::builder(8)
463                // most queries should be for recent transactions
464                .max_capacity(randomize_cache_capacity_in_tests(
465                    executed_in_epoch_cache_capacity,
466                ))
467                .eviction_policy(EvictionPolicy::lru())
468                .build(),
469        }
470    }
471
472    pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
473        self.executed_in_epoch
474            .read()
475            .contains_key(digest) ||
476            // we use get instead of contains key to mark the entry as read
477            self.executed_in_epoch_cache.get(digest).is_some()
478    }
479
480    // Called by execution
481    pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
482        assert!(
483            self.executed_in_epoch
484                .read()
485                .insert(tx_digest, ())
486                .is_none(),
487            "transaction already executed"
488        );
489        self.executed_in_epoch_cache.insert(tx_digest, ());
490    }
491
492    // CheckpointExecutor calls this (indirectly) in order to prune the in-memory cache of executed
493    // transactions. By the time this is called, the transaction digests will have been committed to
494    // the `executed_transactions_to_checkpoint` table.
495    pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
496        let executed_in_epoch = self.executed_in_epoch.read();
497        for tx_digest in tx_digests {
498            executed_in_epoch.remove(tx_digest);
499        }
500    }
501}
502
503/// ConsensusOutputQuarantine holds outputs of consensus processing in memory until the checkpoints
504/// for the commit have been certified.
505pub(crate) struct ConsensusOutputQuarantine {
506    // Output from consensus handler
507    output_queue: VecDeque<ConsensusCommitOutput>,
508
509    // Highest known certified checkpoint sequence number
510    highest_executed_checkpoint: CheckpointSequenceNumber,
511
512    // Checkpoint Builder output
513    builder_checkpoint_summary:
514        BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
515
516    builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
517
518    // Any un-committed next versions are stored here.
519    shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
520
521    // The most recent congestion control debts for objects. Uses a ref-count to track
522    // which objects still exist in some element of output_queue.
523    congestion_control_randomness_object_debts:
524        RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
525    congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
526
527    processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
528
529    // Owned object locks acquired post-consensus.
530    owned_object_locks: HashMap<ObjectRef, LockDetails>,
531
532    metrics: Arc<EpochMetrics>,
533}
534
535impl ConsensusOutputQuarantine {
536    pub(super) fn new(
537        highest_executed_checkpoint: CheckpointSequenceNumber,
538        authority_metrics: Arc<EpochMetrics>,
539    ) -> Self {
540        Self {
541            highest_executed_checkpoint,
542
543            output_queue: VecDeque::new(),
544            builder_checkpoint_summary: BTreeMap::new(),
545            builder_digest_to_checkpoint: HashMap::new(),
546            shared_object_next_versions: RefCountedHashMap::new(),
547            processed_consensus_messages: RefCountedHashMap::new(),
548            congestion_control_randomness_object_debts: RefCountedHashMap::new(),
549            congestion_control_object_debts: RefCountedHashMap::new(),
550            owned_object_locks: HashMap::new(),
551            metrics: authority_metrics,
552        }
553    }
554}
555
556// Write methods - all methods in this block insert new data into the quarantine.
557// There are only two sources! ConsensusHandler and CheckpointBuilder.
558impl ConsensusOutputQuarantine {
559    // Push all data gathered from a consensus commit into the quarantine.
560    pub(crate) fn push_consensus_output(
561        &mut self,
562        output: ConsensusCommitOutput,
563        epoch_store: &AuthorityPerEpochStore,
564    ) -> SuiResult {
565        self.insert_shared_object_next_versions(&output);
566        self.insert_congestion_control_debts(&output);
567        self.insert_processed_consensus_messages(&output);
568        self.insert_owned_object_locks(&output);
569        self.output_queue.push_back(output);
570
571        self.metrics
572            .consensus_quarantine_queue_size
573            .set(self.output_queue.len() as i64);
574
575        // we may already have observed the certified checkpoint for this round, if state sync is running
576        // ahead of consensus, so there may be data to commit right away.
577        self.commit(epoch_store)
578    }
579
580    // Record a newly built checkpoint.
581    pub(super) fn insert_builder_summary(
582        &mut self,
583        sequence_number: CheckpointSequenceNumber,
584        summary: BuilderCheckpointSummary,
585        contents: CheckpointContents,
586    ) {
587        debug!(?sequence_number, "inserting builder summary {:?}", summary);
588        for tx in contents.iter() {
589            self.builder_digest_to_checkpoint
590                .insert(tx.transaction, sequence_number);
591        }
592        self.builder_checkpoint_summary
593            .insert(sequence_number, (summary, contents));
594    }
595}
596
597// Commit methods.
598impl ConsensusOutputQuarantine {
599    /// Update the highest executed checkpoint and commit any data which is now
600    /// below the watermark.
601    pub(super) fn update_highest_executed_checkpoint(
602        &mut self,
603        checkpoint: CheckpointSequenceNumber,
604        epoch_store: &AuthorityPerEpochStore,
605        batch: &mut DBBatch,
606    ) -> SuiResult {
607        self.highest_executed_checkpoint = checkpoint;
608        self.commit_with_batch(epoch_store, batch)
609    }
610
611    pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
612        let mut batch = epoch_store.db_batch()?;
613        self.commit_with_batch(epoch_store, &mut batch)?;
614        batch.write()?;
615        Ok(())
616    }
617
618    /// Commit all data below the watermark.
619    fn commit_with_batch(
620        &mut self,
621        epoch_store: &AuthorityPerEpochStore,
622        batch: &mut DBBatch,
623    ) -> SuiResult {
624        // The commit algorithm is simple:
625        // 1. First commit all checkpoint builder state which is below the watermark.
626        // 2. Determine the consensus commit height that corresponds to the highest committed
627        //    checkpoint.
628        // 3. Commit all consensus output at that height or below.
629
630        let tables = epoch_store.tables()?;
631
632        let mut highest_committed_height = None;
633
634        while self
635            .builder_checkpoint_summary
636            .first_key_value()
637            .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
638            == Some(true)
639        {
640            let (seq, (builder_summary, contents)) =
641                self.builder_checkpoint_summary.pop_first().unwrap();
642
643            for tx in contents.iter() {
644                let digest = &tx.transaction;
645                assert_eq!(
646                    self.builder_digest_to_checkpoint
647                        .remove(digest)
648                        .unwrap_or_else(|| {
649                            panic!(
650                                "transaction {:?} not found in builder_digest_to_checkpoint",
651                                digest
652                            )
653                        }),
654                    seq
655                );
656            }
657
658            batch.insert_batch(
659                &tables.builder_digest_to_checkpoint,
660                contents.iter().map(|tx| (tx.transaction, seq)),
661            )?;
662
663            batch.insert_batch(
664                &tables.builder_checkpoint_summary_v2,
665                [(seq, &builder_summary)],
666            )?;
667
668            let checkpoint_height = builder_summary
669                .checkpoint_height
670                .expect("non-genesis checkpoint must have height");
671            if let Some(highest) = highest_committed_height {
672                assert!(
673                    checkpoint_height >= highest,
674                    "current checkpoint height {} must be no less than highest committed height {}",
675                    checkpoint_height,
676                    highest
677                );
678            }
679
680            highest_committed_height = Some(checkpoint_height);
681        }
682
683        let Some(highest_committed_height) = highest_committed_height else {
684            return Ok(());
685        };
686
687        let split_checkpoints_in_consensus_handler = epoch_store
688            .protocol_config()
689            .split_checkpoints_in_consensus_handler();
690
691        while !self.output_queue.is_empty() {
692            // A consensus commit can have more than one pending checkpoint (a regular one and a randomness one).
693            // We can only write the consensus commit if the highest pending checkpoint associated with it has
694            // been processed by the builder.
695            let output = self.output_queue.front().unwrap();
696            let highest_in_commit = if split_checkpoints_in_consensus_handler {
697                // In V2, use the checkpoint_height from stats which represents the consensus
698                // commit height watermark.
699                output
700                    .consensus_commit_stats
701                    .as_ref()
702                    .expect("consensus_commit_stats must be set")
703                    .height
704            } else {
705                // In V1, use the highest pending checkpoint height from the output.
706                let Some(h) = output.get_highest_pending_checkpoint_height() else {
707                    // if highest is none, we have already written the pending checkpoint for the final epoch,
708                    // so there is no more data that needs to be committed.
709                    break;
710                };
711                h
712            };
713
714            if highest_in_commit <= highest_committed_height {
715                info!(
716                    "committing output with highest pending checkpoint height {:?}",
717                    highest_in_commit
718                );
719                let output = self.output_queue.pop_front().unwrap();
720                self.remove_shared_object_next_versions(&output);
721                self.remove_processed_consensus_messages(&output);
722                self.remove_congestion_control_debts(&output);
723                self.remove_owned_object_locks(&output);
724
725                output.write_to_batch(epoch_store, batch)?;
726            } else {
727                break;
728            }
729        }
730
731        self.metrics
732            .consensus_quarantine_queue_size
733            .set(self.output_queue.len() as i64);
734
735        Ok(())
736    }
737}
738
739impl ConsensusOutputQuarantine {
740    fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
741        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
742            for (object_id, next_version) in next_versions {
743                self.shared_object_next_versions
744                    .insert(*object_id, *next_version);
745            }
746        }
747    }
748
749    fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
750        let current_round = output.consensus_round;
751
752        for (object_id, debt) in output.congestion_control_object_debts.iter() {
753            self.congestion_control_object_debts.insert(
754                *object_id,
755                CongestionPerObjectDebt::new(current_round, *debt),
756            );
757        }
758
759        for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
760            self.congestion_control_randomness_object_debts.insert(
761                *object_id,
762                CongestionPerObjectDebt::new(current_round, *debt),
763            );
764        }
765    }
766
767    fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
768        for (object_id, _) in output.congestion_control_object_debts.iter() {
769            self.congestion_control_object_debts.remove(object_id);
770        }
771        for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
772            self.congestion_control_randomness_object_debts
773                .remove(object_id);
774        }
775    }
776
777    fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
778        for tx_key in output.consensus_messages_processed.iter() {
779            self.processed_consensus_messages.insert(tx_key.clone(), ());
780        }
781    }
782
783    fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
784        for tx_key in output.consensus_messages_processed.iter() {
785            self.processed_consensus_messages.remove(tx_key);
786        }
787    }
788
789    fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
790        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
791            for object_id in next_versions.keys() {
792                if !self.shared_object_next_versions.remove(object_id) {
793                    fatal!(
794                        "Shared object next version not found in quarantine: {:?}",
795                        object_id
796                    );
797                }
798            }
799        }
800    }
801
802    fn insert_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
803        for (obj_ref, lock) in &output.owned_object_locks {
804            self.owned_object_locks.insert(*obj_ref, *lock);
805        }
806    }
807
808    fn remove_owned_object_locks(&mut self, output: &ConsensusCommitOutput) {
809        for obj_ref in output.owned_object_locks.keys() {
810            self.owned_object_locks.remove(obj_ref);
811        }
812    }
813}
814
815// Read methods - all methods in this block return data from the quarantine which would otherwise
816// be found in the database.
817impl ConsensusOutputQuarantine {
818    pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
819        self.builder_checkpoint_summary
820            .values()
821            .last()
822            .map(|(summary, _)| summary)
823    }
824
825    pub(super) fn get_built_summary(
826        &self,
827        sequence: CheckpointSequenceNumber,
828    ) -> Option<&BuilderCheckpointSummary> {
829        self.builder_checkpoint_summary
830            .get(&sequence)
831            .map(|(summary, _)| summary)
832    }
833
834    pub(super) fn included_transaction_in_checkpoint(&self, digest: &TransactionDigest) -> bool {
835        self.builder_digest_to_checkpoint.contains_key(digest)
836    }
837
838    pub(super) fn is_consensus_message_processed(
839        &self,
840        key: &SequencedConsensusTransactionKey,
841    ) -> bool {
842        self.processed_consensus_messages.contains_key(key)
843    }
844
845    pub(super) fn is_empty(&self) -> bool {
846        self.output_queue.is_empty()
847    }
848
849    pub(super) fn get_next_shared_object_versions(
850        &self,
851        tables: &AuthorityEpochTables,
852        objects_to_init: &[ConsensusObjectSequenceKey],
853    ) -> SuiResult<Vec<Option<SequenceNumber>>> {
854        Ok(do_fallback_lookup(
855            objects_to_init,
856            |object_key| {
857                if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
858                    CacheResult::Hit(Some(*next_version))
859                } else {
860                    CacheResult::Miss
861                }
862            },
863            |object_keys| {
864                tables
865                    .next_shared_object_versions_v2
866                    .multi_get(object_keys)
867                    .expect("db error")
868            },
869        ))
870    }
871
872    /// Gets owned object locks, checking quarantine first then falling back to DB.
873    /// Used for post-consensus conflict detection when preconsensus locking is disabled.
874    /// After crash recovery, quarantine is empty so we naturally fall back to DB.
875    pub(super) fn get_owned_object_locks(
876        &self,
877        tables: &AuthorityEpochTables,
878        obj_refs: &[ObjectRef],
879    ) -> SuiResult<Vec<Option<LockDetails>>> {
880        Ok(do_fallback_lookup(
881            obj_refs,
882            |obj_ref| {
883                if let Some(lock) = self.owned_object_locks.get(obj_ref) {
884                    CacheResult::Hit(Some(*lock))
885                } else {
886                    CacheResult::Miss
887                }
888            },
889            |obj_refs| {
890                tables
891                    .multi_get_locked_transactions(obj_refs)
892                    .expect("db error")
893            },
894        ))
895    }
896
897    pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
898        self.output_queue
899            .back()
900            .and_then(|output| output.get_highest_pending_checkpoint_height())
901    }
902
903    pub(super) fn get_pending_checkpoints(
904        &self,
905        last: Option<CheckpointHeight>,
906    ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
907        let mut checkpoints = Vec::new();
908        for output in &self.output_queue {
909            checkpoints.extend(
910                output
911                    .get_pending_checkpoints(last)
912                    .map(|cp| (cp.height(), cp.clone())),
913            );
914        }
915        if cfg!(debug_assertions) {
916            let mut prev = None;
917            for (height, _) in &checkpoints {
918                if let Some(prev) = prev {
919                    assert!(prev < *height);
920                }
921                prev = Some(*height);
922            }
923        }
924        checkpoints
925    }
926
927    pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
928        self.output_queue
929            .iter()
930            .any(|output| output.pending_checkpoint_exists(index))
931    }
932
933    pub(super) fn get_pending_checkpoints_v2(
934        &self,
935        last: Option<CheckpointHeight>,
936    ) -> Vec<(CheckpointHeight, PendingCheckpointV2)> {
937        let mut checkpoints = Vec::new();
938        for output in &self.output_queue {
939            checkpoints.extend(
940                output
941                    .get_pending_checkpoints_v2(last)
942                    .map(|cp| (cp.height(), cp.clone())),
943            );
944        }
945        if cfg!(debug_assertions) {
946            let mut prev = None;
947            for (height, _) in &checkpoints {
948                if let Some(prev) = prev {
949                    assert!(prev < *height);
950                }
951                prev = Some(*height);
952            }
953        }
954        checkpoints
955    }
956
957    pub(super) fn pending_checkpoint_exists_v2(&self, index: &CheckpointHeight) -> bool {
958        self.output_queue
959            .iter()
960            .any(|output| output.pending_checkpoint_exists_v2(index))
961    }
962
963    pub(super) fn get_new_jwks(
964        &self,
965        epoch_store: &AuthorityPerEpochStore,
966        round: u64,
967    ) -> SuiResult<Vec<ActiveJwk>> {
968        let epoch = epoch_store.epoch();
969
970        // Check if the requested round is in memory
971        for output in self.output_queue.iter().rev() {
972            // unwrap safe because output will always have last consensus stats set before being added
973            // to the quarantine
974            let output_round = output.get_round().unwrap();
975            if round == output_round {
976                return Ok(output
977                    .active_jwks
978                    .iter()
979                    .map(|(_, (jwk_id, jwk))| ActiveJwk {
980                        jwk_id: jwk_id.clone(),
981                        jwk: jwk.clone(),
982                        epoch,
983                    })
984                    .collect());
985            }
986        }
987
988        // Fall back to reading from database
989        let empty_jwk_id = JwkId::new(String::new(), String::new());
990        let empty_jwk = JWK {
991            kty: String::new(),
992            e: String::new(),
993            n: String::new(),
994            alg: String::new(),
995        };
996
997        let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
998        let end = (round + 1, (empty_jwk_id, empty_jwk));
999
1000        Ok(epoch_store
1001            .tables()?
1002            .active_jwks
1003            .safe_iter_with_bounds(Some(start), Some(end))
1004            .map_ok(|((r, (jwk_id, jwk)), _)| {
1005                debug_assert!(round == r);
1006                ActiveJwk { jwk_id, jwk, epoch }
1007            })
1008            .collect::<Result<Vec<_>, _>>()?)
1009    }
1010
1011    pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
1012        self.output_queue
1013            .iter()
1014            .rev()
1015            .filter_map(|output| output.get_randomness_last_round_timestamp())
1016            .next()
1017    }
1018
1019    pub(crate) fn load_initial_object_debts(
1020        &self,
1021        epoch_store: &AuthorityPerEpochStore,
1022        current_round: Round,
1023        for_randomness: bool,
1024        transactions: &[VerifiedExecutableTransactionWithAliases],
1025    ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
1026        let protocol_config = epoch_store.protocol_config();
1027        let tables = epoch_store.tables()?;
1028        let default_per_commit_budget = protocol_config
1029            .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
1030            .unwrap_or(0);
1031        let (hash_table, db_table, per_commit_budget) = if for_randomness {
1032            (
1033                &self.congestion_control_randomness_object_debts,
1034                &tables.congestion_control_randomness_object_debts,
1035                protocol_config
1036                    .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
1037                    .unwrap_or(default_per_commit_budget),
1038            )
1039        } else {
1040            (
1041                &self.congestion_control_object_debts,
1042                &tables.congestion_control_object_debts,
1043                default_per_commit_budget,
1044            )
1045        };
1046        let mut shared_input_object_ids: Vec<_> = transactions
1047            .iter()
1048            .flat_map(|tx| tx.tx().shared_input_objects().map(|obj| obj.id))
1049            .collect();
1050        shared_input_object_ids.sort();
1051        shared_input_object_ids.dedup();
1052
1053        let results = do_fallback_lookup(
1054            &shared_input_object_ids,
1055            |object_id| {
1056                if let Some(debt) = hash_table.get(object_id) {
1057                    CacheResult::Hit(Some(debt.into_v1()))
1058                } else {
1059                    CacheResult::Miss
1060                }
1061            },
1062            |object_ids| {
1063                db_table
1064                    .multi_get(object_ids)
1065                    .expect("db error")
1066                    .into_iter()
1067                    .map(|debt| debt.map(|debt| debt.into_v1()))
1068                    .collect()
1069            },
1070        );
1071
1072        Ok(results
1073            .into_iter()
1074            .zip(shared_input_object_ids)
1075            .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1076            .map(move |((round, debt), object_id)| {
1077                // Stored debts already account for the budget of the round in which
1078                // they were accumulated. Application of budget from future rounds to
1079                // the debt is handled here.
1080                assert!(current_round > round);
1081                let num_rounds = current_round - round - 1;
1082                let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1083                (object_id, debt)
1084            }))
1085    }
1086}
1087
1088// A wrapper around HashMap that uses refcounts to keep entries alive until
1089// they are no longer needed.
1090//
1091// If there are N inserts for the same key, the key will not be removed until
1092// there are N removes.
1093//
1094// It is intended to track the *latest* value for a given key, so duplicate
1095// inserts are intended to overwrite any prior value.
1096#[derive(Debug, Default)]
1097struct RefCountedHashMap<K, V> {
1098    map: HashMap<K, (usize, V)>,
1099}
1100
1101impl<K, V> RefCountedHashMap<K, V>
1102where
1103    K: Clone + Eq + std::hash::Hash,
1104{
1105    pub fn new() -> Self {
1106        Self {
1107            map: HashMap::new(),
1108        }
1109    }
1110
1111    pub fn insert(&mut self, key: K, value: V) {
1112        let entry = self.map.entry(key);
1113        match entry {
1114            hash_map::Entry::Occupied(mut entry) => {
1115                let (ref_count, v) = entry.get_mut();
1116                *ref_count += 1;
1117                *v = value;
1118            }
1119            hash_map::Entry::Vacant(entry) => {
1120                entry.insert((1, value));
1121            }
1122        }
1123    }
1124
1125    // Returns true if the key was present, false otherwise.
1126    // Note that the key may not be removed if present, as it may have a refcount > 1.
1127    pub fn remove(&mut self, key: &K) -> bool {
1128        let entry = self.map.entry(key.clone());
1129        match entry {
1130            hash_map::Entry::Occupied(mut entry) => {
1131                let (ref_count, _) = entry.get_mut();
1132                *ref_count -= 1;
1133                if *ref_count == 0 {
1134                    entry.remove();
1135                }
1136                true
1137            }
1138            hash_map::Entry::Vacant(_) => false,
1139        }
1140    }
1141
1142    pub fn get(&self, key: &K) -> Option<&V> {
1143        self.map.get(key).map(|(_, v)| v)
1144    }
1145
1146    pub fn contains_key(&self, key: &K) -> bool {
1147        self.map.contains_key(key)
1148    }
1149}