sui_core/authority/
consensus_quarantine.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_per_epoch_store::{
5    AuthorityEpochTables, EncG, ExecutionIndicesWithStats, PkG,
6};
7use crate::authority::transaction_deferral::DeferralKey;
8use crate::checkpoints::BuilderCheckpointSummary;
9use crate::consensus_handler::SequencedConsensusTransactionKind;
10use crate::epoch::randomness::SINGLETON_KEY;
11use dashmap::DashMap;
12use fastcrypto_tbls::{dkg_v1, nodes::PartyId};
13use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
14use moka::policy::EvictionPolicy;
15use moka::sync::SegmentedCache as MokaCache;
16use mysten_common::fatal;
17use mysten_common::random_util::randomize_cache_capacity_in_tests;
18use parking_lot::Mutex;
19use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque, hash_map};
20use sui_types::authenticator_state::ActiveJwk;
21use sui_types::base_types::{AuthorityName, SequenceNumber};
22use sui_types::crypto::RandomnessRound;
23use sui_types::error::SuiResult;
24use sui_types::execution::ExecutionTimeObservationKey;
25use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber};
26use sui_types::messages_consensus::{
27    AuthorityIndex, ConsensusTransaction, ConsensusTransactionKind,
28};
29use sui_types::{
30    base_types::{ConsensusObjectSequenceKey, ObjectID},
31    digests::TransactionDigest,
32    messages_consensus::{Round, TimestampMs, VersionedDkgConfirmation},
33    signature::GenericSignature,
34};
35use tracing::{debug, info};
36use typed_store::Map;
37use typed_store::rocks::DBBatch;
38
39use crate::{
40    authority::{
41        authority_per_epoch_store::AuthorityPerEpochStore,
42        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
43        shared_object_congestion_tracker::CongestionPerObjectDebt,
44    },
45    checkpoints::{CheckpointHeight, PendingCheckpoint},
46    consensus_handler::{SequencedConsensusTransactionKey, VerifiedSequencedConsensusTransaction},
47    epoch::{
48        randomness::{VersionedProcessedMessage, VersionedUsedProcessedMessages},
49        reconfiguration::ReconfigState,
50    },
51};
52
53use super::*;
54
55#[derive(Default)]
56#[allow(clippy::type_complexity)]
57pub(crate) struct ConsensusCommitOutput {
58    // Consensus and reconfig state
59    consensus_round: Round,
60    consensus_messages_processed: BTreeSet<SequencedConsensusTransactionKey>,
61    end_of_publish: BTreeSet<AuthorityName>,
62    reconfig_state: Option<ReconfigState>,
63    consensus_commit_stats: Option<ExecutionIndicesWithStats>,
64
65    // transaction scheduling state
66    next_shared_object_versions: Option<HashMap<ConsensusObjectSequenceKey, SequenceNumber>>,
67
68    // TODO: If we delay committing consensus output until after all deferrals have been loaded,
69    // we can move deferred_txns to the ConsensusOutputCache and save disk bandwidth.
70    deferred_txns: Vec<(DeferralKey, Vec<VerifiedSequencedConsensusTransaction>)>,
71    // TODO(commit-handler-rewrite): remove the original once we no longer need to support the old consensus handler
72    deferred_txns_v2: Vec<(DeferralKey, Vec<VerifiedExecutableTransaction>)>,
73    // deferred txns that have been loaded and can be removed
74    deleted_deferred_txns: BTreeSet<DeferralKey>,
75
76    // checkpoint state
77    pending_checkpoints: Vec<PendingCheckpoint>,
78
79    // random beacon state
80    next_randomness_round: Option<(RandomnessRound, TimestampMs)>,
81
82    dkg_confirmations: BTreeMap<PartyId, VersionedDkgConfirmation>,
83    dkg_processed_messages: BTreeMap<PartyId, VersionedProcessedMessage>,
84    dkg_used_message: Option<VersionedUsedProcessedMessages>,
85    dkg_output: Option<dkg_v1::Output<PkG, EncG>>,
86
87    // jwk state
88    pending_jwks: BTreeSet<(AuthorityName, JwkId, JWK)>,
89    active_jwks: BTreeSet<(u64, (JwkId, JWK))>,
90
91    // congestion control state
92    congestion_control_object_debts: Vec<(ObjectID, u64)>,
93    congestion_control_randomness_object_debts: Vec<(ObjectID, u64)>,
94    execution_time_observations: Vec<(
95        AuthorityIndex,
96        u64, /* generation */
97        Vec<(ExecutionTimeObservationKey, Duration)>,
98    )>,
99}
100
101impl ConsensusCommitOutput {
102    pub fn new(consensus_round: Round) -> Self {
103        Self {
104            consensus_round,
105            ..Default::default()
106        }
107    }
108
109    pub fn get_deleted_deferred_txn_keys(&self) -> impl Iterator<Item = DeferralKey> + use<'_> {
110        self.deleted_deferred_txns.iter().cloned()
111    }
112
113    pub fn has_deferred_transactions(&self) -> bool {
114        !self.deferred_txns.is_empty() || !self.deferred_txns_v2.is_empty()
115    }
116
117    fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
118        self.next_randomness_round.as_ref().map(|(_, ts)| *ts)
119    }
120
121    fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
122        self.pending_checkpoints.last().map(|cp| cp.height())
123    }
124
125    fn get_pending_checkpoints(
126        &self,
127        last: Option<CheckpointHeight>,
128    ) -> impl Iterator<Item = &PendingCheckpoint> {
129        self.pending_checkpoints.iter().filter(move |cp| {
130            if let Some(last) = last {
131                cp.height() > last
132            } else {
133                true
134            }
135        })
136    }
137
138    fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
139        self.pending_checkpoints
140            .iter()
141            .any(|cp| cp.height() == *index)
142    }
143
144    fn get_round(&self) -> Option<u64> {
145        self.consensus_commit_stats
146            .as_ref()
147            .map(|stats| stats.index.last_committed_round)
148    }
149
150    pub fn insert_end_of_publish(&mut self, authority: AuthorityName) {
151        self.end_of_publish.insert(authority);
152    }
153
154    pub fn insert_execution_time_observation(
155        &mut self,
156        source: AuthorityIndex,
157        generation: u64,
158        estimates: Vec<(ExecutionTimeObservationKey, Duration)>,
159    ) {
160        self.execution_time_observations
161            .push((source, generation, estimates));
162    }
163
164    pub(crate) fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStats) {
165        self.consensus_commit_stats = Some(stats);
166    }
167
168    // in testing code we often need to write to the db outside of a consensus commit
169    pub(crate) fn set_default_commit_stats_for_testing(&mut self) {
170        self.record_consensus_commit_stats(Default::default());
171    }
172
173    pub fn store_reconfig_state(&mut self, state: ReconfigState) {
174        self.reconfig_state = Some(state);
175    }
176
177    pub fn record_consensus_message_processed(&mut self, key: SequencedConsensusTransactionKey) {
178        self.consensus_messages_processed.insert(key);
179    }
180
181    pub fn get_consensus_messages_processed(
182        &self,
183    ) -> impl Iterator<Item = &SequencedConsensusTransactionKey> {
184        self.consensus_messages_processed.iter()
185    }
186
187    pub fn set_next_shared_object_versions(
188        &mut self,
189        next_versions: HashMap<ConsensusObjectSequenceKey, SequenceNumber>,
190    ) {
191        assert!(self.next_shared_object_versions.is_none());
192        self.next_shared_object_versions = Some(next_versions);
193    }
194
195    pub fn defer_transactions(
196        &mut self,
197        key: DeferralKey,
198        transactions: Vec<VerifiedSequencedConsensusTransaction>,
199    ) {
200        self.deferred_txns.push((key, transactions));
201    }
202
203    pub fn defer_transactions_v2(
204        &mut self,
205        key: DeferralKey,
206        transactions: Vec<VerifiedExecutableTransaction>,
207    ) {
208        self.deferred_txns_v2.push((key, transactions));
209    }
210
211    pub fn delete_loaded_deferred_transactions(&mut self, deferral_keys: &[DeferralKey]) {
212        self.deleted_deferred_txns
213            .extend(deferral_keys.iter().cloned());
214    }
215
216    pub fn insert_pending_checkpoint(&mut self, checkpoint: PendingCheckpoint) {
217        self.pending_checkpoints.push(checkpoint);
218    }
219
220    pub fn reserve_next_randomness_round(
221        &mut self,
222        next_randomness_round: RandomnessRound,
223        commit_timestamp: TimestampMs,
224    ) {
225        assert!(self.next_randomness_round.is_none());
226        self.next_randomness_round = Some((next_randomness_round, commit_timestamp));
227    }
228
229    pub fn insert_dkg_confirmation(&mut self, conf: VersionedDkgConfirmation) {
230        self.dkg_confirmations.insert(conf.sender(), conf);
231    }
232
233    pub fn insert_dkg_processed_message(&mut self, message: VersionedProcessedMessage) {
234        self.dkg_processed_messages
235            .insert(message.sender(), message);
236    }
237
238    pub fn insert_dkg_used_messages(&mut self, used_messages: VersionedUsedProcessedMessages) {
239        self.dkg_used_message = Some(used_messages);
240    }
241
242    pub fn set_dkg_output(&mut self, output: dkg_v1::Output<PkG, EncG>) {
243        self.dkg_output = Some(output);
244    }
245
246    pub fn insert_pending_jwk(&mut self, authority: AuthorityName, id: JwkId, jwk: JWK) {
247        self.pending_jwks.insert((authority, id, jwk));
248    }
249
250    pub fn insert_active_jwk(&mut self, round: u64, key: (JwkId, JWK)) {
251        self.active_jwks.insert((round, key));
252    }
253
254    pub fn set_congestion_control_object_debts(&mut self, object_debts: Vec<(ObjectID, u64)>) {
255        self.congestion_control_object_debts = object_debts;
256    }
257
258    pub fn set_congestion_control_randomness_object_debts(
259        &mut self,
260        object_debts: Vec<(ObjectID, u64)>,
261    ) {
262        self.congestion_control_randomness_object_debts = object_debts;
263    }
264
265    pub fn write_to_batch(
266        self,
267        epoch_store: &AuthorityPerEpochStore,
268        batch: &mut DBBatch,
269    ) -> SuiResult {
270        let tables = epoch_store.tables()?;
271        batch.insert_batch(
272            &tables.consensus_message_processed,
273            self.consensus_messages_processed
274                .iter()
275                .map(|key| (key, true)),
276        )?;
277
278        batch.insert_batch(
279            &tables.end_of_publish,
280            self.end_of_publish.iter().map(|authority| (authority, ())),
281        )?;
282
283        if let Some(reconfig_state) = &self.reconfig_state {
284            batch.insert_batch(
285                &tables.reconfig_state,
286                [(RECONFIG_STATE_INDEX, reconfig_state)],
287            )?;
288        }
289
290        let consensus_commit_stats = self
291            .consensus_commit_stats
292            .expect("consensus_commit_stats must be set");
293        let round = consensus_commit_stats.index.last_committed_round;
294
295        batch.insert_batch(
296            &tables.last_consensus_stats,
297            [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)],
298        )?;
299
300        if let Some(next_versions) = self.next_shared_object_versions {
301            batch.insert_batch(&tables.next_shared_object_versions_v2, next_versions)?;
302        }
303
304        // TODO(consensus-handler-rewrite): delete the old structures once commit handler rewrite is complete
305        batch.delete_batch(&tables.deferred_transactions, &self.deleted_deferred_txns)?;
306        batch.delete_batch(
307            &tables.deferred_transactions_v2,
308            &self.deleted_deferred_txns,
309        )?;
310
311        batch.insert_batch(&tables.deferred_transactions, self.deferred_txns)?;
312        batch.insert_batch(
313            &tables.deferred_transactions_v2,
314            self.deferred_txns_v2.into_iter().map(|(key, txs)| {
315                (
316                    key,
317                    txs.into_iter()
318                        .map(|tx| {
319                            let tx: TrustedExecutableTransaction = tx.serializable();
320                            tx
321                        })
322                        .collect::<Vec<_>>(),
323                )
324            }),
325        )?;
326
327        if let Some((round, commit_timestamp)) = self.next_randomness_round {
328            batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?;
329            batch.insert_batch(
330                &tables.randomness_last_round_timestamp,
331                [(SINGLETON_KEY, commit_timestamp)],
332            )?;
333        }
334
335        batch.insert_batch(&tables.dkg_confirmations_v2, self.dkg_confirmations)?;
336        batch.insert_batch(
337            &tables.dkg_processed_messages_v2,
338            self.dkg_processed_messages,
339        )?;
340        batch.insert_batch(
341            &tables.dkg_used_messages_v2,
342            // using Option as iter
343            self.dkg_used_message
344                .into_iter()
345                .map(|used_msgs| (SINGLETON_KEY, used_msgs)),
346        )?;
347        if let Some(output) = self.dkg_output {
348            batch.insert_batch(&tables.dkg_output, [(SINGLETON_KEY, output)])?;
349        }
350
351        batch.insert_batch(
352            &tables.pending_jwks,
353            self.pending_jwks.into_iter().map(|j| (j, ())),
354        )?;
355        batch.insert_batch(
356            &tables.active_jwks,
357            self.active_jwks.into_iter().map(|j| {
358                // TODO: we don't need to store the round in this map if it is invariant
359                assert_eq!(j.0, round);
360                (j, ())
361            }),
362        )?;
363
364        batch.insert_batch(
365            &tables.congestion_control_object_debts,
366            self.congestion_control_object_debts
367                .into_iter()
368                .map(|(object_id, debt)| {
369                    (
370                        object_id,
371                        CongestionPerObjectDebt::new(self.consensus_round, debt),
372                    )
373                }),
374        )?;
375        batch.insert_batch(
376            &tables.congestion_control_randomness_object_debts,
377            self.congestion_control_randomness_object_debts
378                .into_iter()
379                .map(|(object_id, debt)| {
380                    (
381                        object_id,
382                        CongestionPerObjectDebt::new(self.consensus_round, debt),
383                    )
384                }),
385        )?;
386
387        batch.insert_batch(
388            &tables.execution_time_observations,
389            self.execution_time_observations
390                .into_iter()
391                .map(|(authority, generation, estimates)| ((generation, authority), estimates)),
392        )?;
393
394        Ok(())
395    }
396}
397
398/// ConsensusOutputCache holds outputs of consensus processing that do not need to be committed to disk.
399/// Data quarantining guarantees that all of this data will be used (e.g. for building checkpoints)
400/// before the consensus commit from which it originated is marked as processed. Therefore we can rely
401/// on replay of consensus commits to recover this data.
402pub(crate) struct ConsensusOutputCache {
403    // deferred transactions is only used by consensus handler so there should never be lock contention
404    // - hence no need for a DashMap.
405    // TODO(consensus-handler-rewrite): remove this once we no longer need to support the old consensus handler
406    pub(crate) deferred_transactions:
407        Mutex<BTreeMap<DeferralKey, Vec<VerifiedSequencedConsensusTransaction>>>,
408
409    pub(crate) deferred_transactions_v2:
410        Mutex<BTreeMap<DeferralKey, Vec<VerifiedExecutableTransaction>>>,
411
412    // user_signatures_for_checkpoints is written to by consensus handler and read from by checkpoint builder
413    // The critical sections are small in both cases so a DashMap is probably not helpful.
414    pub(crate) user_signatures_for_checkpoints:
415        Mutex<HashMap<TransactionDigest, Vec<GenericSignature>>>,
416
417    executed_in_epoch: RwLock<DashMap<TransactionDigest, ()>>,
418    executed_in_epoch_cache: MokaCache<TransactionDigest, ()>,
419}
420
421impl ConsensusOutputCache {
422    pub(crate) fn new(
423        epoch_start_configuration: &EpochStartConfiguration,
424        tables: &AuthorityEpochTables,
425    ) -> Self {
426        let deferred_transactions = tables
427            .get_all_deferred_transactions()
428            .expect("load deferred transactions cannot fail");
429
430        let deferred_transactions_v2 = tables
431            .get_all_deferred_transactions_v2()
432            .expect("load deferred transactions cannot fail");
433
434        assert!(
435            epoch_start_configuration.is_data_quarantine_active_from_beginning_of_epoch(),
436            "This version of sui-node can only run after data quarantining has been enabled. Please run version 1.45.0 or later to the end of the current epoch and retry"
437        );
438
439        let executed_in_epoch_cache_capacity = 50_000;
440
441        Self {
442            deferred_transactions: Mutex::new(deferred_transactions),
443            deferred_transactions_v2: Mutex::new(deferred_transactions_v2),
444            user_signatures_for_checkpoints: Default::default(),
445            executed_in_epoch: RwLock::new(DashMap::with_shard_amount(2048)),
446            executed_in_epoch_cache: MokaCache::builder(8)
447                // most queries should be for recent transactions
448                .max_capacity(randomize_cache_capacity_in_tests(
449                    executed_in_epoch_cache_capacity,
450                ))
451                .eviction_policy(EvictionPolicy::lru())
452                .build(),
453        }
454    }
455
456    pub fn executed_in_current_epoch(&self, digest: &TransactionDigest) -> bool {
457        self.executed_in_epoch
458            .read()
459            .contains_key(digest) ||
460            // we use get instead of contains key to mark the entry as read
461            self.executed_in_epoch_cache.get(digest).is_some()
462    }
463
464    // Called by execution
465    pub fn insert_executed_in_epoch(&self, tx_digest: TransactionDigest) {
466        assert!(
467            self.executed_in_epoch
468                .read()
469                .insert(tx_digest, ())
470                .is_none(),
471            "transaction already executed"
472        );
473        self.executed_in_epoch_cache.insert(tx_digest, ());
474    }
475
476    // CheckpointExecutor calls this (indirectly) in order to prune the in-memory cache of executed
477    // transactions. By the time this is called, the transaction digests will have been committed to
478    // the `executed_transactions_to_checkpoint` table.
479    pub fn remove_executed_in_epoch(&self, tx_digests: &[TransactionDigest]) {
480        let executed_in_epoch = self.executed_in_epoch.read();
481        for tx_digest in tx_digests {
482            executed_in_epoch.remove(tx_digest);
483        }
484    }
485}
486
487/// ConsensusOutputQuarantine holds outputs of consensus processing in memory until the checkpoints
488/// for the commit have been certified.
489pub(crate) struct ConsensusOutputQuarantine {
490    // Output from consensus handler
491    output_queue: VecDeque<ConsensusCommitOutput>,
492
493    // Highest known certified checkpoint sequence number
494    highest_executed_checkpoint: CheckpointSequenceNumber,
495
496    // Checkpoint Builder output
497    builder_checkpoint_summary:
498        BTreeMap<CheckpointSequenceNumber, (BuilderCheckpointSummary, CheckpointContents)>,
499
500    builder_digest_to_checkpoint: HashMap<TransactionDigest, CheckpointSequenceNumber>,
501
502    // Any un-committed next versions are stored here.
503    shared_object_next_versions: RefCountedHashMap<ConsensusObjectSequenceKey, SequenceNumber>,
504
505    // The most recent congestion control debts for objects. Uses a ref-count to track
506    // which objects still exist in some element of output_queue.
507    congestion_control_randomness_object_debts:
508        RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
509    congestion_control_object_debts: RefCountedHashMap<ObjectID, CongestionPerObjectDebt>,
510
511    processed_consensus_messages: RefCountedHashMap<SequencedConsensusTransactionKey, ()>,
512
513    metrics: Arc<EpochMetrics>,
514}
515
516impl ConsensusOutputQuarantine {
517    pub(super) fn new(
518        highest_executed_checkpoint: CheckpointSequenceNumber,
519        authority_metrics: Arc<EpochMetrics>,
520    ) -> Self {
521        Self {
522            highest_executed_checkpoint,
523
524            output_queue: VecDeque::new(),
525            builder_checkpoint_summary: BTreeMap::new(),
526            builder_digest_to_checkpoint: HashMap::new(),
527            shared_object_next_versions: RefCountedHashMap::new(),
528            processed_consensus_messages: RefCountedHashMap::new(),
529            congestion_control_randomness_object_debts: RefCountedHashMap::new(),
530            congestion_control_object_debts: RefCountedHashMap::new(),
531            metrics: authority_metrics,
532        }
533    }
534}
535
536// Write methods - all methods in this block insert new data into the quarantine.
537// There are only two sources! ConsensusHandler and CheckpointBuilder.
538impl ConsensusOutputQuarantine {
539    // Push all data gathered from a consensus commit into the quarantine.
540    pub(crate) fn push_consensus_output(
541        &mut self,
542        output: ConsensusCommitOutput,
543        epoch_store: &AuthorityPerEpochStore,
544    ) -> SuiResult {
545        self.insert_shared_object_next_versions(&output);
546        self.insert_congestion_control_debts(&output);
547        self.insert_processed_consensus_messages(&output);
548        self.output_queue.push_back(output);
549
550        self.metrics
551            .consensus_quarantine_queue_size
552            .set(self.output_queue.len() as i64);
553
554        // we may already have observed the certified checkpoint for this round, if state sync is running
555        // ahead of consensus, so there may be data to commit right away.
556        self.commit(epoch_store)
557    }
558
559    // Record a newly built checkpoint.
560    pub(super) fn insert_builder_summary(
561        &mut self,
562        sequence_number: CheckpointSequenceNumber,
563        summary: BuilderCheckpointSummary,
564        contents: CheckpointContents,
565    ) {
566        debug!(?sequence_number, "inserting builder summary {:?}", summary);
567        for tx in contents.iter() {
568            self.builder_digest_to_checkpoint
569                .insert(tx.transaction, sequence_number);
570        }
571        self.builder_checkpoint_summary
572            .insert(sequence_number, (summary, contents));
573    }
574}
575
576// Commit methods.
577impl ConsensusOutputQuarantine {
578    /// Update the highest executed checkpoint and commit any data which is now
579    /// below the watermark.
580    pub(super) fn update_highest_executed_checkpoint(
581        &mut self,
582        checkpoint: CheckpointSequenceNumber,
583        epoch_store: &AuthorityPerEpochStore,
584        batch: &mut DBBatch,
585    ) -> SuiResult {
586        self.highest_executed_checkpoint = checkpoint;
587        self.commit_with_batch(epoch_store, batch)
588    }
589
590    pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult {
591        let mut batch = epoch_store.db_batch()?;
592        self.commit_with_batch(epoch_store, &mut batch)?;
593        batch.write()?;
594        Ok(())
595    }
596
597    /// Commit all data below the watermark.
598    fn commit_with_batch(
599        &mut self,
600        epoch_store: &AuthorityPerEpochStore,
601        batch: &mut DBBatch,
602    ) -> SuiResult {
603        // The commit algorithm is simple:
604        // 1. First commit all checkpoint builder state which is below the watermark.
605        // 2. Determine the consensus commit height that corresponds to the highest committed
606        //    checkpoint.
607        // 3. Commit all consensus output at that height or below.
608
609        let tables = epoch_store.tables()?;
610
611        let mut highest_committed_height = None;
612
613        while self
614            .builder_checkpoint_summary
615            .first_key_value()
616            .map(|(seq, _)| *seq <= self.highest_executed_checkpoint)
617            == Some(true)
618        {
619            let (seq, (builder_summary, contents)) =
620                self.builder_checkpoint_summary.pop_first().unwrap();
621
622            for tx in contents.iter() {
623                let digest = &tx.transaction;
624                assert_eq!(
625                    self.builder_digest_to_checkpoint
626                        .remove(digest)
627                        .unwrap_or_else(|| {
628                            panic!(
629                                "transaction {:?} not found in builder_digest_to_checkpoint",
630                                digest
631                            )
632                        }),
633                    seq
634                );
635            }
636
637            batch.insert_batch(
638                &tables.builder_digest_to_checkpoint,
639                contents.iter().map(|tx| (tx.transaction, seq)),
640            )?;
641
642            batch.insert_batch(
643                &tables.builder_checkpoint_summary_v2,
644                [(seq, &builder_summary)],
645            )?;
646
647            let checkpoint_height = builder_summary
648                .checkpoint_height
649                .expect("non-genesis checkpoint must have height");
650            if let Some(highest) = highest_committed_height {
651                assert!(
652                    checkpoint_height >= highest,
653                    "current checkpoint height {} must be no less than highest committed height {}",
654                    checkpoint_height,
655                    highest
656                );
657            }
658
659            highest_committed_height = Some(checkpoint_height);
660        }
661
662        let Some(highest_committed_height) = highest_committed_height else {
663            return Ok(());
664        };
665
666        while !self.output_queue.is_empty() {
667            // A consensus commit can have more than one pending checkpoint (a regular one and a randomnes one).
668            // We can only write the consensus commit if the highest pending checkpoint associated with it has
669            // been processed by the builder.
670            let Some(highest_in_commit) = self
671                .output_queue
672                .front()
673                .unwrap()
674                .get_highest_pending_checkpoint_height()
675            else {
676                // if highest is none, we have already written the pending checkpoint for the final epoch,
677                // so there is no more data that needs to be committed.
678                break;
679            };
680
681            if highest_in_commit <= highest_committed_height {
682                info!(
683                    "committing output with highest pending checkpoint height {:?}",
684                    highest_in_commit
685                );
686                let output = self.output_queue.pop_front().unwrap();
687                self.remove_shared_object_next_versions(&output);
688                self.remove_processed_consensus_messages(&output);
689                self.remove_congestion_control_debts(&output);
690
691                output.write_to_batch(epoch_store, batch)?;
692            } else {
693                break;
694            }
695        }
696
697        self.metrics
698            .consensus_quarantine_queue_size
699            .set(self.output_queue.len() as i64);
700
701        Ok(())
702    }
703}
704
705impl ConsensusOutputQuarantine {
706    fn insert_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
707        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
708            for (object_id, next_version) in next_versions {
709                self.shared_object_next_versions
710                    .insert(*object_id, *next_version);
711            }
712        }
713    }
714
715    fn insert_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
716        let current_round = output.consensus_round;
717
718        for (object_id, debt) in output.congestion_control_object_debts.iter() {
719            self.congestion_control_object_debts.insert(
720                *object_id,
721                CongestionPerObjectDebt::new(current_round, *debt),
722            );
723        }
724
725        for (object_id, debt) in output.congestion_control_randomness_object_debts.iter() {
726            self.congestion_control_randomness_object_debts.insert(
727                *object_id,
728                CongestionPerObjectDebt::new(current_round, *debt),
729            );
730        }
731    }
732
733    fn remove_congestion_control_debts(&mut self, output: &ConsensusCommitOutput) {
734        for (object_id, _) in output.congestion_control_object_debts.iter() {
735            self.congestion_control_object_debts.remove(object_id);
736        }
737        for (object_id, _) in output.congestion_control_randomness_object_debts.iter() {
738            self.congestion_control_randomness_object_debts
739                .remove(object_id);
740        }
741    }
742
743    fn insert_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
744        for tx_key in output.consensus_messages_processed.iter() {
745            self.processed_consensus_messages.insert(tx_key.clone(), ());
746        }
747    }
748
749    fn remove_processed_consensus_messages(&mut self, output: &ConsensusCommitOutput) {
750        for tx_key in output.consensus_messages_processed.iter() {
751            self.processed_consensus_messages.remove(tx_key);
752        }
753    }
754
755    fn remove_shared_object_next_versions(&mut self, output: &ConsensusCommitOutput) {
756        if let Some(next_versions) = output.next_shared_object_versions.as_ref() {
757            for object_id in next_versions.keys() {
758                if !self.shared_object_next_versions.remove(object_id) {
759                    fatal!(
760                        "Shared object next version not found in quarantine: {:?}",
761                        object_id
762                    );
763                }
764            }
765        }
766    }
767}
768
769// Read methods - all methods in this block return data from the quarantine which would otherwise
770// be found in the database.
771impl ConsensusOutputQuarantine {
772    pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> {
773        self.builder_checkpoint_summary
774            .values()
775            .last()
776            .map(|(summary, _)| summary)
777    }
778
779    pub(super) fn get_built_summary(
780        &self,
781        sequence: CheckpointSequenceNumber,
782    ) -> Option<&BuilderCheckpointSummary> {
783        self.builder_checkpoint_summary
784            .get(&sequence)
785            .map(|(summary, _)| summary)
786    }
787
788    pub(super) fn included_transaction_in_checkpoint(&self, digest: &TransactionDigest) -> bool {
789        self.builder_digest_to_checkpoint.contains_key(digest)
790    }
791
792    pub(super) fn is_consensus_message_processed(
793        &self,
794        key: &SequencedConsensusTransactionKey,
795    ) -> bool {
796        self.processed_consensus_messages.contains_key(key)
797    }
798
799    pub(super) fn is_empty(&self) -> bool {
800        self.output_queue.is_empty()
801    }
802
803    pub(super) fn get_next_shared_object_versions(
804        &self,
805        tables: &AuthorityEpochTables,
806        objects_to_init: &[ConsensusObjectSequenceKey],
807    ) -> SuiResult<Vec<Option<SequenceNumber>>> {
808        Ok(do_fallback_lookup(
809            objects_to_init,
810            |object_key| {
811                if let Some(next_version) = self.shared_object_next_versions.get(object_key) {
812                    CacheResult::Hit(Some(*next_version))
813                } else {
814                    CacheResult::Miss
815                }
816            },
817            |object_keys| {
818                tables
819                    .next_shared_object_versions_v2
820                    .multi_get(object_keys)
821                    .expect("db error")
822            },
823        ))
824    }
825
826    pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option<CheckpointHeight> {
827        self.output_queue
828            .back()
829            .and_then(|output| output.get_highest_pending_checkpoint_height())
830    }
831
832    pub(super) fn get_pending_checkpoints(
833        &self,
834        last: Option<CheckpointHeight>,
835    ) -> Vec<(CheckpointHeight, PendingCheckpoint)> {
836        let mut checkpoints = Vec::new();
837        for output in &self.output_queue {
838            checkpoints.extend(
839                output
840                    .get_pending_checkpoints(last)
841                    .map(|cp| (cp.height(), cp.clone())),
842            );
843        }
844        if cfg!(debug_assertions) {
845            let mut prev = None;
846            for (height, _) in &checkpoints {
847                if let Some(prev) = prev {
848                    assert!(prev < *height);
849                }
850                prev = Some(*height);
851            }
852        }
853        checkpoints
854    }
855
856    pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool {
857        self.output_queue
858            .iter()
859            .any(|output| output.pending_checkpoint_exists(index))
860    }
861
862    pub(super) fn get_new_jwks(
863        &self,
864        epoch_store: &AuthorityPerEpochStore,
865        round: u64,
866    ) -> SuiResult<Vec<ActiveJwk>> {
867        let epoch = epoch_store.epoch();
868
869        // Check if the requested round is in memory
870        for output in self.output_queue.iter().rev() {
871            // unwrap safe because output will always have last consensus stats set before being added
872            // to the quarantine
873            let output_round = output.get_round().unwrap();
874            if round == output_round {
875                return Ok(output
876                    .active_jwks
877                    .iter()
878                    .map(|(_, (jwk_id, jwk))| ActiveJwk {
879                        jwk_id: jwk_id.clone(),
880                        jwk: jwk.clone(),
881                        epoch,
882                    })
883                    .collect());
884            }
885        }
886
887        // Fall back to reading from database
888        let empty_jwk_id = JwkId::new(String::new(), String::new());
889        let empty_jwk = JWK {
890            kty: String::new(),
891            e: String::new(),
892            n: String::new(),
893            alg: String::new(),
894        };
895
896        let start = (round, (empty_jwk_id.clone(), empty_jwk.clone()));
897        let end = (round + 1, (empty_jwk_id, empty_jwk));
898
899        Ok(epoch_store
900            .tables()?
901            .active_jwks
902            .safe_iter_with_bounds(Some(start), Some(end))
903            .map_ok(|((r, (jwk_id, jwk)), _)| {
904                debug_assert!(round == r);
905                ActiveJwk { jwk_id, jwk, epoch }
906            })
907            .collect::<Result<Vec<_>, _>>()?)
908    }
909
910    pub(super) fn get_randomness_last_round_timestamp(&self) -> Option<TimestampMs> {
911        self.output_queue
912            .iter()
913            .rev()
914            .filter_map(|output| output.get_randomness_last_round_timestamp())
915            .next()
916    }
917
918    pub(super) fn load_initial_object_debts(
919        &self,
920        epoch_store: &AuthorityPerEpochStore,
921        current_round: Round,
922        for_randomness: bool,
923        transactions: &[VerifiedSequencedConsensusTransaction],
924    ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
925        let protocol_config = epoch_store.protocol_config();
926        let tables = epoch_store.tables()?;
927        let default_per_commit_budget = protocol_config
928            .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
929            .unwrap_or(0);
930        let (hash_table, db_table, per_commit_budget) = if for_randomness {
931            (
932                &self.congestion_control_randomness_object_debts,
933                &tables.congestion_control_randomness_object_debts,
934                protocol_config
935                    .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
936                    .unwrap_or(default_per_commit_budget),
937            )
938        } else {
939            (
940                &self.congestion_control_object_debts,
941                &tables.congestion_control_object_debts,
942                default_per_commit_budget,
943            )
944        };
945        let mut shared_input_object_ids: Vec<_> = transactions
946            .iter()
947            .filter_map(|tx| {
948                match &tx.0.transaction {
949                    SequencedConsensusTransactionKind::External(ConsensusTransaction {
950                        kind: ConsensusTransactionKind::CertifiedTransaction(tx),
951                        ..
952                    }) => Some(itertools::Either::Left(
953                        tx.shared_input_objects().map(|obj| obj.id),
954                    )),
955                    SequencedConsensusTransactionKind::External(ConsensusTransaction {
956                        kind: ConsensusTransactionKind::UserTransaction(tx),
957                        ..
958                    // Bug fix that required a protocol flag.
959                    }) if protocol_config.use_mfp_txns_in_load_initial_object_debts() => Some(
960                        itertools::Either::Right(tx.shared_input_objects().map(|obj| obj.id)),
961                    ),
962                    _ => None,
963                }
964            })
965            .flatten()
966            .collect();
967        shared_input_object_ids.sort();
968        shared_input_object_ids.dedup();
969
970        let results = do_fallback_lookup(
971            &shared_input_object_ids,
972            |object_id| {
973                if let Some(debt) = hash_table.get(object_id) {
974                    CacheResult::Hit(Some(debt.into_v1()))
975                } else {
976                    CacheResult::Miss
977                }
978            },
979            |object_ids| {
980                db_table
981                    .multi_get(object_ids)
982                    .expect("db error")
983                    .into_iter()
984                    .map(|debt| debt.map(|debt| debt.into_v1()))
985                    .collect()
986            },
987        );
988
989        Ok(results
990            .into_iter()
991            .zip(shared_input_object_ids)
992            .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
993            .map(move |((round, debt), object_id)| {
994                // Stored debts already account for the budget of the round in which
995                // they were accumulated. Application of budget from future rounds to
996                // the debt is handled here.
997                assert!(current_round > round);
998                let num_rounds = current_round - round - 1;
999                let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1000                (object_id, debt)
1001            }))
1002    }
1003
1004    // TODO: Remove the above version and rename this without _v2
1005    pub(crate) fn load_initial_object_debts_v2(
1006        &self,
1007        epoch_store: &AuthorityPerEpochStore,
1008        current_round: Round,
1009        for_randomness: bool,
1010        transactions: &[VerifiedExecutableTransaction],
1011    ) -> SuiResult<impl IntoIterator<Item = (ObjectID, u64)>> {
1012        let protocol_config = epoch_store.protocol_config();
1013        let tables = epoch_store.tables()?;
1014        let default_per_commit_budget = protocol_config
1015            .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option()
1016            .unwrap_or(0);
1017        let (hash_table, db_table, per_commit_budget) = if for_randomness {
1018            (
1019                &self.congestion_control_randomness_object_debts,
1020                &tables.congestion_control_randomness_object_debts,
1021                protocol_config
1022                    .max_accumulated_randomness_txn_cost_per_object_in_mysticeti_commit_as_option()
1023                    .unwrap_or(default_per_commit_budget),
1024            )
1025        } else {
1026            (
1027                &self.congestion_control_object_debts,
1028                &tables.congestion_control_object_debts,
1029                default_per_commit_budget,
1030            )
1031        };
1032        let mut shared_input_object_ids: Vec<_> = transactions
1033            .iter()
1034            .flat_map(|tx| tx.shared_input_objects().map(|obj| obj.id))
1035            .collect();
1036        shared_input_object_ids.sort();
1037        shared_input_object_ids.dedup();
1038
1039        let results = do_fallback_lookup(
1040            &shared_input_object_ids,
1041            |object_id| {
1042                if let Some(debt) = hash_table.get(object_id) {
1043                    CacheResult::Hit(Some(debt.into_v1()))
1044                } else {
1045                    CacheResult::Miss
1046                }
1047            },
1048            |object_ids| {
1049                db_table
1050                    .multi_get(object_ids)
1051                    .expect("db error")
1052                    .into_iter()
1053                    .map(|debt| debt.map(|debt| debt.into_v1()))
1054                    .collect()
1055            },
1056        );
1057
1058        Ok(results
1059            .into_iter()
1060            .zip(shared_input_object_ids)
1061            .filter_map(|(debt, object_id)| debt.map(|debt| (debt, object_id)))
1062            .map(move |((round, debt), object_id)| {
1063                // Stored debts already account for the budget of the round in which
1064                // they were accumulated. Application of budget from future rounds to
1065                // the debt is handled here.
1066                assert!(current_round > round);
1067                let num_rounds = current_round - round - 1;
1068                let debt = debt.saturating_sub(per_commit_budget * num_rounds);
1069                (object_id, debt)
1070            }))
1071    }
1072}
1073
1074// A wrapper around HashMap that uses refcounts to keep entries alive until
1075// they are no longer needed.
1076//
1077// If there are N inserts for the same key, the key will not be removed until
1078// there are N removes.
1079//
1080// It is intended to track the *latest* value for a given key, so duplicate
1081// inserts are intended to overwrite any prior value.
1082#[derive(Debug, Default)]
1083struct RefCountedHashMap<K, V> {
1084    map: HashMap<K, (usize, V)>,
1085}
1086
1087impl<K, V> RefCountedHashMap<K, V>
1088where
1089    K: Clone + Eq + std::hash::Hash,
1090{
1091    pub fn new() -> Self {
1092        Self {
1093            map: HashMap::new(),
1094        }
1095    }
1096
1097    pub fn insert(&mut self, key: K, value: V) {
1098        let entry = self.map.entry(key);
1099        match entry {
1100            hash_map::Entry::Occupied(mut entry) => {
1101                let (ref_count, v) = entry.get_mut();
1102                *ref_count += 1;
1103                *v = value;
1104            }
1105            hash_map::Entry::Vacant(entry) => {
1106                entry.insert((1, value));
1107            }
1108        }
1109    }
1110
1111    // Returns true if the key was present, false otherwise.
1112    // Note that the key may not be removed if present, as it may have a refcount > 1.
1113    pub fn remove(&mut self, key: &K) -> bool {
1114        let entry = self.map.entry(key.clone());
1115        match entry {
1116            hash_map::Entry::Occupied(mut entry) => {
1117                let (ref_count, _) = entry.get_mut();
1118                *ref_count -= 1;
1119                if *ref_count == 0 {
1120                    entry.remove();
1121                }
1122                true
1123            }
1124            hash_map::Entry::Vacant(_) => false,
1125        }
1126    }
1127
1128    pub fn get(&self, key: &K) -> Option<&V> {
1129        self.map.get(key).map(|(_, v)| v)
1130    }
1131
1132    pub fn contains_key(&self, key: &K) -> bool {
1133        self.map.contains_key(key)
1134    }
1135}