sui_core/authority/
authority_store_tables.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6use serde::{Deserialize, Serialize};
7use std::path::Path;
8use std::sync::atomic::AtomicU64;
9use sui_types::base_types::SequenceNumber;
10use sui_types::digests::TransactionEventsDigest;
11use sui_types::effects::{TransactionEffects, TransactionEvents};
12use sui_types::global_state_hash::GlobalStateHash;
13use sui_types::storage::{FullObjectKey, MarkerValue};
14use tracing::error;
15use typed_store::metrics::SamplingInterval;
16use typed_store::rocks::{
17    DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
18    read_size_from_env,
19};
20use typed_store::traits::Map;
21
22use crate::authority::authority_store_pruner::ObjectsCompactionFilter;
23use crate::authority::authority_store_types::{
24    StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
25};
26use crate::authority::epoch_start_configuration::EpochStartConfiguration;
27use typed_store::rocksdb::compaction_filter::Decision;
28use typed_store::{DBMapUtils, DbIterator};
29
30const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
31pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
32const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
33const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
34
35/// Options to apply to every column family of the `perpetual` DB.
36#[derive(Default)]
37pub struct AuthorityPerpetualTablesOptions {
38    /// Whether to enable write stalling on all column families.
39    pub enable_write_stall: bool,
40    pub compaction_filter: Option<ObjectsCompactionFilter>,
41}
42
43impl AuthorityPerpetualTablesOptions {
44    fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
45        if !self.enable_write_stall {
46            db_options = db_options.disable_write_throttling();
47        }
48        db_options
49    }
50}
51
52/// AuthorityPerpetualTables contains data that must be preserved from one epoch to the next.
53#[derive(DBMapUtils)]
54#[cfg_attr(tidehunter, tidehunter)]
55pub struct AuthorityPerpetualTables {
56    /// This is a map between the object (ID, version) and the latest state of the object, namely the
57    /// state that is needed to process new transactions.
58    /// State is represented by `StoreObject` enum, which is either a move module or a move object.
59    ///
60    /// Note that while this map can store all versions of an object, we will eventually
61    /// prune old object versions from the db.
62    ///
63    /// IMPORTANT: object versions must *only* be pruned if they appear as inputs in some
64    /// TransactionEffects. Simply pruning all objects but the most recent is an error!
65    /// This is because there can be partially executed transactions whose effects have not yet
66    /// been written out, and which must be retried. But, they cannot be retried unless their input
67    /// objects are still accessible!
68    pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
69
70    /// This is a map between object references of currently active objects that can be mutated.
71    ///
72    /// For old epochs, it may also contain the transaction that they are lock on for use by this
73    /// specific validator. The transaction locks themselves are now in AuthorityPerEpochStore.
74    #[rename = "owned_object_transaction_locks"]
75    pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
76
77    /// This is a map between the transaction digest and the corresponding transaction that's known to be
78    /// executable. This means that it may have been executed locally, or it may have been synced through
79    /// state-sync but hasn't been executed yet.
80    pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
81
82    /// A map between the transaction digest of a certificate to the effects of its execution.
83    /// We store effects into this table in two different cases:
84    /// 1. When a transaction is synced through state_sync, we store the effects here. These effects
85    ///    are known to be final in the network, but may not have been executed locally yet.
86    /// 2. When the transaction is executed locally on this node, we store the effects here. This means that
87    ///    it's possible to store the same effects twice (once for the synced transaction, and once for the executed).
88    ///
89    /// It's also possible for the effects to be reverted if the transaction didn't make it into the epoch.
90    pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
91
92    /// Transactions that have been executed locally on this node. We need this table since the `effects` table
93    /// doesn't say anything about the execution status of the transaction on this node. When we wait for transactions
94    /// to be executed, we wait for them to appear in this table. When we revert transactions, we remove them from both
95    /// tables.
96    pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
97
98    #[allow(dead_code)]
99    #[deprecated]
100    events: DBMap<(TransactionEventsDigest, usize), Event>,
101
102    // Events keyed by the digest of the transaction that produced them.
103    pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
104
105    // Loaded (and unchanged) runtime object references.
106    pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
107
108    /// DEPRECATED in favor of the table of the same name in authority_per_epoch_store.
109    /// Please do not add new accessors/callsites.
110    /// When transaction is executed via checkpoint executor, we store association here
111    pub(crate) executed_transactions_to_checkpoint:
112        DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
113
114    // Finalized root state hash for epoch, to be included in CheckpointSummary
115    // of last checkpoint of epoch. These values should only ever be written once
116    // and never changed
117    pub(crate) root_state_hash_by_epoch:
118        DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
119
120    /// Parameters of the system fixed at the epoch start
121    pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
122
123    /// A singleton table that stores latest pruned checkpoint. Used to keep objects pruner progress
124    pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
125
126    /// Expected total amount of SUI in the network. This is expected to remain constant
127    /// throughout the lifetime of the network. We check it at the end of each epoch if
128    /// expensive checks are enabled. We cannot use 10B today because in tests we often
129    /// inject extra gas objects into genesis.
130    pub(crate) expected_network_sui_amount: DBMap<(), u64>,
131
132    /// Expected imbalance between storage fund balance and the sum of storage rebate of all live objects.
133    /// This could be non-zero due to bugs in earlier protocol versions.
134    /// This number is the result of storage_fund_balance - sum(storage_rebate).
135    pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
136
137    /// Table that stores the set of received objects and deleted objects and the version at
138    /// which they were received. This is used to prevent possible race conditions around receiving
139    /// objects (since they are not locked by the transaction manager) and for tracking shared
140    /// objects that have been deleted. This table is meant to be pruned per-epoch, and all
141    /// previous epochs other than the current epoch may be pruned safely.
142    pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
143    pub(crate) object_per_epoch_marker_table_v2: DBMap<(EpochId, FullObjectKey), MarkerValue>,
144
145    /// Tracks executed transaction digests across epochs.
146    /// Used to support address balance gas payments feature.
147    /// This table uses epoch-prefixed keys to support efficient pruning via range delete.
148    pub(crate) executed_transaction_digests: DBMap<(EpochId, TransactionDigest), ()>,
149}
150
151#[derive(DBMapUtils)]
152pub struct AuthorityPrunerTables {
153    pub(crate) object_tombstones: DBMap<ObjectID, SequenceNumber>,
154}
155
156impl AuthorityPrunerTables {
157    pub fn path(parent_path: &Path) -> PathBuf {
158        parent_path.join("pruner")
159    }
160
161    pub fn open(parent_path: &Path) -> Self {
162        Self::open_tables_read_write(
163            Self::path(parent_path),
164            MetricConf::new("pruner")
165                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
166            None,
167            None,
168        )
169    }
170}
171
172impl AuthorityPerpetualTables {
173    pub fn path(parent_path: &Path) -> PathBuf {
174        parent_path.join("perpetual")
175    }
176
177    #[cfg(not(tidehunter))]
178    pub fn open(
179        parent_path: &Path,
180        db_options_override: Option<AuthorityPerpetualTablesOptions>,
181        _pruner_watermark: Option<Arc<AtomicU64>>,
182    ) -> Self {
183        let db_options_override = db_options_override.unwrap_or_default();
184        let db_options =
185            db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
186        let table_options = DBMapTableConfigMap::new(BTreeMap::from([
187            (
188                "objects".to_string(),
189                objects_table_config(db_options.clone(), db_options_override.compaction_filter),
190            ),
191            (
192                "owned_object_transaction_locks".to_string(),
193                owned_object_transaction_locks_table_config(db_options.clone()),
194            ),
195            (
196                "transactions".to_string(),
197                transactions_table_config(db_options.clone()),
198            ),
199            (
200                "effects".to_string(),
201                effects_table_config(db_options.clone()),
202            ),
203        ]));
204
205        Self::open_tables_read_write(
206            Self::path(parent_path),
207            MetricConf::new("perpetual")
208                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
209            Some(db_options.options),
210            Some(table_options),
211        )
212    }
213
214    #[cfg(tidehunter)]
215    pub fn open(
216        parent_path: &Path,
217        _: Option<AuthorityPerpetualTablesOptions>,
218        pruner_watermark: Option<Arc<AtomicU64>>,
219    ) -> Self {
220        use crate::authority::authority_store_pruner::apply_relocation_filter;
221        tracing::warn!("AuthorityPerpetualTables using tidehunter");
222        use typed_store::tidehunter_util::{
223            Bytes, Decision, IndexWalPosition, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
224            default_cells_per_mutex, default_mutex_count, default_value_cache_size,
225        };
226        let mutexes = default_mutex_count() * 2;
227        let value_cache_size = default_value_cache_size();
228        // effectively disables pruning if not set
229        let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
230
231        let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
232        let objects_compactor = |index: &mut BTreeMap<Bytes, IndexWalPosition>| {
233            let mut retain = HashSet::new();
234            let mut previous: Option<&[u8]> = None;
235            const OID_SIZE: usize = 16;
236            for (key, _) in index.iter().rev() {
237                if let Some(prev) = previous {
238                    if prev == &key[..OID_SIZE] {
239                        continue;
240                    }
241                }
242                previous = Some(&key[..OID_SIZE]);
243                retain.insert(key.clone());
244            }
245            index.retain(|k, _| retain.contains(k));
246        };
247        let mut digest_prefix = vec![0; 8];
248        digest_prefix[7] = 32;
249        let uniform_key = KeyType::uniform(default_cells_per_mutex());
250        let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
251        // TransactionDigest is serialized with an 8-byte prefix, so we include it in the key calculation
252        let epoch_tx_digest_prefix_key =
253            KeyType::from_prefix_bits((8/*EpochId*/ + 8/*TransactionDigest prefix*/) * 8 + 12);
254        let object_indexing = KeyIndexing::key_reduction(32 + 8, 16..(32 + 8));
255        // todo can figure way to scramble off 8 bytes in the middle
256        let obj_ref_size = 32 + 8 + 32 + 8;
257        let owned_object_transaction_locks_indexing =
258            KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
259
260        let configs = vec![
261            (
262                "objects".to_string(),
263                ThConfig::new_with_config_indexing(
264                    object_indexing,
265                    mutexes,
266                    KeyType::uniform(default_cells_per_mutex() * 4),
267                    KeySpaceConfig::new()
268                        .with_unloaded_iterator(true)
269                        .with_max_dirty_keys(4048)
270                        .with_compactor(Box::new(objects_compactor)),
271                ),
272            ),
273            (
274                "owned_object_transaction_locks".to_string(),
275                ThConfig::new_with_config_indexing(
276                    owned_object_transaction_locks_indexing,
277                    mutexes,
278                    KeyType::uniform(default_cells_per_mutex() * 4),
279                    bloom_config.clone().with_max_dirty_keys(4048),
280                ),
281            ),
282            (
283                "transactions".to_string(),
284                ThConfig::new_with_rm_prefix_indexing(
285                    KeyIndexing::key_reduction(32, 0..16),
286                    mutexes,
287                    uniform_key,
288                    KeySpaceConfig::new()
289                        .with_value_cache_size(value_cache_size)
290                        .with_relocation_filter(|_, _| Decision::Remove),
291                    digest_prefix.clone(),
292                ),
293            ),
294            (
295                "effects".to_string(),
296                ThConfig::new_with_rm_prefix_indexing(
297                    KeyIndexing::key_reduction(32, 0..16),
298                    mutexes,
299                    uniform_key,
300                    apply_relocation_filter(
301                        bloom_config.clone().with_value_cache_size(value_cache_size),
302                        pruner_watermark.clone(),
303                        |effects: TransactionEffects| effects.executed_epoch(),
304                        false,
305                    ),
306                    digest_prefix.clone(),
307                ),
308            ),
309            (
310                "executed_effects".to_string(),
311                ThConfig::new_with_rm_prefix_indexing(
312                    KeyIndexing::key_reduction(32, 0..16),
313                    mutexes,
314                    uniform_key,
315                    bloom_config
316                        .clone()
317                        .with_value_cache_size(value_cache_size)
318                        .with_relocation_filter(|_, _| Decision::Remove),
319                    digest_prefix.clone(),
320                ),
321            ),
322            (
323                "events".to_string(),
324                ThConfig::new_with_rm_prefix(
325                    32 + 8,
326                    mutexes,
327                    uniform_key,
328                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
329                    digest_prefix.clone(),
330                ),
331            ),
332            (
333                "events_2".to_string(),
334                ThConfig::new_with_rm_prefix(
335                    32,
336                    mutexes,
337                    uniform_key,
338                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
339                    digest_prefix.clone(),
340                ),
341            ),
342            (
343                "unchanged_loaded_runtime_objects".to_string(),
344                ThConfig::new_with_rm_prefix(
345                    32,
346                    mutexes,
347                    uniform_key,
348                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
349                    digest_prefix.clone(),
350                ),
351            ),
352            (
353                "executed_transactions_to_checkpoint".to_string(),
354                ThConfig::new_with_rm_prefix(
355                    32,
356                    mutexes,
357                    uniform_key,
358                    apply_relocation_filter(
359                        KeySpaceConfig::default(),
360                        pruner_watermark.clone(),
361                        |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
362                        false,
363                    ),
364                    digest_prefix.clone(),
365                ),
366            ),
367            (
368                "root_state_hash_by_epoch".to_string(),
369                ThConfig::new(8, 1, KeyType::uniform(1)),
370            ),
371            (
372                "epoch_start_configuration".to_string(),
373                ThConfig::new(0, 1, KeyType::uniform(1)),
374            ),
375            (
376                "pruned_checkpoint".to_string(),
377                ThConfig::new(0, 1, KeyType::uniform(1)),
378            ),
379            (
380                "expected_network_sui_amount".to_string(),
381                ThConfig::new(0, 1, KeyType::uniform(1)),
382            ),
383            (
384                "expected_storage_fund_imbalance".to_string(),
385                ThConfig::new(0, 1, KeyType::uniform(1)),
386            ),
387            (
388                "object_per_epoch_marker_table".to_string(),
389                ThConfig::new_with_config_indexing(
390                    KeyIndexing::VariableLength,
391                    mutexes,
392                    epoch_prefix_key,
393                    apply_relocation_filter(
394                        KeySpaceConfig::default(),
395                        pruner_watermark.clone(),
396                        |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
397                        true,
398                    ),
399                ),
400            ),
401            (
402                "object_per_epoch_marker_table_v2".to_string(),
403                ThConfig::new_with_config_indexing(
404                    KeyIndexing::VariableLength,
405                    mutexes,
406                    epoch_prefix_key,
407                    apply_relocation_filter(
408                        bloom_config.clone(),
409                        pruner_watermark.clone(),
410                        |(epoch_id, _): (EpochId, FullObjectKey)| epoch_id,
411                        true,
412                    ),
413                ),
414            ),
415            (
416                "executed_transaction_digests".to_string(),
417                ThConfig::new_with_config_indexing(
418                    // EpochId + (TransactionDigest)
419                    KeyIndexing::fixed(8 + (32 + 8)),
420                    mutexes,
421                    epoch_tx_digest_prefix_key,
422                    apply_relocation_filter(
423                        bloom_config.clone(),
424                        pruner_watermark.clone(),
425                        |(epoch_id, _): (EpochId, TransactionDigest)| epoch_id,
426                        true,
427                    ),
428                ),
429            ),
430        ];
431        Self::open_tables_read_write(
432            Self::path(parent_path),
433            MetricConf::new("perpetual")
434                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
435            configs.into_iter().collect(),
436        )
437    }
438
439    pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
440        Self::get_read_only_handle(
441            Self::path(parent_path),
442            None,
443            None,
444            MetricConf::new("perpetual_readonly"),
445        )
446    }
447
448    // This is used by indexer to find the correct version of dynamic field child object.
449    // We do not store the version of the child object, but because of lamport timestamp,
450    // we know the child must have version number less then or eq to the parent.
451    pub fn find_object_lt_or_eq_version(
452        &self,
453        object_id: ObjectID,
454        version: SequenceNumber,
455    ) -> SuiResult<Option<Object>> {
456        let mut iter = self.objects.reversed_safe_iter_with_bounds(
457            Some(ObjectKey::min_for_id(&object_id)),
458            Some(ObjectKey(object_id, version)),
459        )?;
460        match iter.next() {
461            Some(Ok((key, o))) => self.object(&key, o),
462            Some(Err(e)) => Err(e.into()),
463            None => Ok(None),
464        }
465    }
466
467    fn construct_object(
468        &self,
469        object_key: &ObjectKey,
470        store_object: StoreObjectValue,
471    ) -> Result<Object, SuiError> {
472        try_construct_object(object_key, store_object)
473    }
474
475    // Constructs `sui_types::object::Object` from `StoreObjectWrapper`.
476    // Returns `None` if object was deleted/wrapped
477    pub fn object(
478        &self,
479        object_key: &ObjectKey,
480        store_object: StoreObjectWrapper,
481    ) -> Result<Option<Object>, SuiError> {
482        let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
483            return Ok(None);
484        };
485        Ok(Some(self.construct_object(object_key, *store_object)?))
486    }
487
488    pub fn object_reference(
489        &self,
490        object_key: &ObjectKey,
491        store_object: StoreObjectWrapper,
492    ) -> Result<ObjectRef, SuiError> {
493        let obj_ref = match store_object.migrate().into_inner() {
494            StoreObject::Value(object) => self
495                .construct_object(object_key, *object)?
496                .compute_object_reference(),
497            StoreObject::Deleted => (
498                object_key.0,
499                object_key.1,
500                ObjectDigest::OBJECT_DIGEST_DELETED,
501            ),
502            StoreObject::Wrapped => (
503                object_key.0,
504                object_key.1,
505                ObjectDigest::OBJECT_DIGEST_WRAPPED,
506            ),
507        };
508        Ok(obj_ref)
509    }
510
511    pub fn tombstone_reference(
512        &self,
513        object_key: &ObjectKey,
514        store_object: &StoreObjectWrapper,
515    ) -> Result<Option<ObjectRef>, SuiError> {
516        let obj_ref = match store_object.inner() {
517            StoreObject::Deleted => Some((
518                object_key.0,
519                object_key.1,
520                ObjectDigest::OBJECT_DIGEST_DELETED,
521            )),
522            StoreObject::Wrapped => Some((
523                object_key.0,
524                object_key.1,
525                ObjectDigest::OBJECT_DIGEST_WRAPPED,
526            )),
527            _ => None,
528        };
529        Ok(obj_ref)
530    }
531
532    pub fn get_latest_object_ref_or_tombstone(
533        &self,
534        object_id: ObjectID,
535    ) -> Result<Option<ObjectRef>, SuiError> {
536        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
537            Some(ObjectKey::min_for_id(&object_id)),
538            Some(ObjectKey::max_for_id(&object_id)),
539        )?;
540
541        if let Some(Ok((object_key, value))) = iterator.next()
542            && object_key.0 == object_id
543        {
544            return Ok(Some(self.object_reference(&object_key, value)?));
545        }
546        Ok(None)
547    }
548
549    pub fn get_latest_object_or_tombstone(
550        &self,
551        object_id: ObjectID,
552    ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
553        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
554            Some(ObjectKey::min_for_id(&object_id)),
555            Some(ObjectKey::max_for_id(&object_id)),
556        )?;
557
558        if let Some(Ok((object_key, value))) = iterator.next()
559            && object_key.0 == object_id
560        {
561            return Ok(Some((object_key, value)));
562        }
563        Ok(None)
564    }
565
566    pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
567        Ok(self
568            .epoch_start_configuration
569            .get(&())?
570            .expect("Must have current epoch.")
571            .epoch_start_state()
572            .epoch())
573    }
574
575    pub fn set_epoch_start_configuration(
576        &self,
577        epoch_start_configuration: &EpochStartConfiguration,
578    ) -> SuiResult {
579        let mut wb = self.epoch_start_configuration.batch();
580        wb.insert_batch(
581            &self.epoch_start_configuration,
582            std::iter::once(((), epoch_start_configuration)),
583        )?;
584        wb.write()?;
585        Ok(())
586    }
587
588    pub fn get_highest_pruned_checkpoint(
589        &self,
590    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
591        self.pruned_checkpoint.get(&())
592    }
593
594    pub fn set_highest_pruned_checkpoint(
595        &self,
596        wb: &mut DBBatch,
597        checkpoint_number: CheckpointSequenceNumber,
598    ) -> SuiResult {
599        wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
600        Ok(())
601    }
602
603    pub fn get_transaction(
604        &self,
605        digest: &TransactionDigest,
606    ) -> SuiResult<Option<TrustedTransaction>> {
607        let Some(transaction) = self.transactions.get(digest)? else {
608            return Ok(None);
609        };
610        Ok(Some(transaction))
611    }
612
613    /// Batch insert executed transaction digests for a given epoch.
614    /// Used by formal snapshot restore to backfill transaction digests from the previous epoch.
615    pub fn insert_executed_transaction_digests_batch(
616        &self,
617        epoch: EpochId,
618        digests: impl Iterator<Item = TransactionDigest>,
619    ) -> SuiResult {
620        let mut batch = self.executed_transaction_digests.batch();
621        batch.insert_batch(
622            &self.executed_transaction_digests,
623            digests.map(|digest| ((epoch, digest), ())),
624        )?;
625        batch.write()?;
626        Ok(())
627    }
628
629    pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
630        let Some(effect_digest) = self.executed_effects.get(digest)? else {
631            return Ok(None);
632        };
633        Ok(self.effects.get(&effect_digest)?)
634    }
635
636    pub(crate) fn was_transaction_executed_in_last_epoch(
637        &self,
638        digest: &TransactionDigest,
639        current_epoch: EpochId,
640    ) -> bool {
641        if current_epoch == 0 {
642            return false;
643        }
644        self.executed_transaction_digests
645            .contains_key(&(current_epoch - 1, *digest))
646            .expect("db error")
647    }
648
649    // DEPRECATED as the backing table has been moved to authority_per_epoch_store.
650    // Please do not add new accessors/callsites.
651    pub fn get_checkpoint_sequence_number(
652        &self,
653        digest: &TransactionDigest,
654    ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
655        Ok(self.executed_transactions_to_checkpoint.get(digest)?)
656    }
657
658    pub fn get_newer_object_keys(
659        &self,
660        object: &(ObjectID, SequenceNumber),
661    ) -> SuiResult<Vec<ObjectKey>> {
662        let mut objects = vec![];
663        for result in self.objects.safe_iter_with_bounds(
664            Some(ObjectKey(object.0, object.1.next())),
665            Some(ObjectKey(object.0, VersionNumber::MAX)),
666        ) {
667            let (key, _) = result?;
668            objects.push(key);
669        }
670        Ok(objects)
671    }
672
673    pub fn set_highest_pruned_checkpoint_without_wb(
674        &self,
675        checkpoint_number: CheckpointSequenceNumber,
676    ) -> SuiResult {
677        let mut wb = self.pruned_checkpoint.batch();
678        self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
679        wb.write()?;
680        Ok(())
681    }
682
683    pub fn database_is_empty(&self) -> SuiResult<bool> {
684        Ok(self.objects.safe_iter().next().is_none())
685    }
686
687    pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
688        LiveSetIter {
689            iter: Box::new(self.objects.safe_iter()),
690            tables: self,
691            prev: None,
692            include_wrapped_object,
693        }
694    }
695
696    pub fn range_iter_live_object_set(
697        &self,
698        lower_bound: Option<ObjectID>,
699        upper_bound: Option<ObjectID>,
700        include_wrapped_object: bool,
701    ) -> LiveSetIter<'_> {
702        let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
703        let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
704
705        LiveSetIter {
706            iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
707            tables: self,
708            prev: None,
709            include_wrapped_object,
710        }
711    }
712
713    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
714        // This checkpoints the entire db and not just objects table
715        self.objects.checkpoint_db(path).map_err(Into::into)
716    }
717
718    pub fn get_root_state_hash(
719        &self,
720        epoch: EpochId,
721    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
722        Ok(self.root_state_hash_by_epoch.get(&epoch)?)
723    }
724
725    pub fn insert_root_state_hash(
726        &self,
727        epoch: EpochId,
728        last_checkpoint_of_epoch: CheckpointSequenceNumber,
729        hash: GlobalStateHash,
730    ) -> SuiResult {
731        self.root_state_hash_by_epoch
732            .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
733        Ok(())
734    }
735
736    pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
737        let object_reference = object.compute_object_reference();
738        let wrapper = get_store_object(object);
739        let mut wb = self.objects.batch();
740        wb.insert_batch(
741            &self.objects,
742            std::iter::once((ObjectKey::from(object_reference), wrapper)),
743        )?;
744        wb.write()?;
745        Ok(())
746    }
747
748    // fallible get object methods for sui-tool, which may need to attempt to read a corrupted database
749    pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
750        let obj_entry = self
751            .objects
752            .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
753            .next();
754
755        match obj_entry.transpose()? {
756            Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
757                Ok(self.object(&ObjectKey(obj_id, version), obj)?)
758            }
759            _ => Ok(None),
760        }
761    }
762
763    pub fn get_object_by_key_fallible(
764        &self,
765        object_id: &ObjectID,
766        version: VersionNumber,
767    ) -> SuiResult<Option<Object>> {
768        Ok(self
769            .objects
770            .get(&ObjectKey(*object_id, version))?
771            .and_then(|object| {
772                self.object(&ObjectKey(*object_id, version), object)
773                    .expect("object construction error")
774            }))
775    }
776}
777
778impl ObjectStore for AuthorityPerpetualTables {
779    /// Read an object and return it, or Ok(None) if the object was not found.
780    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
781        self.get_object_fallible(object_id).expect("db error")
782    }
783
784    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
785        self.get_object_by_key_fallible(object_id, version)
786            .expect("db error")
787    }
788}
789
790pub struct LiveSetIter<'a> {
791    iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
792    tables: &'a AuthorityPerpetualTables,
793    prev: Option<(ObjectKey, StoreObjectWrapper)>,
794    /// Whether a wrapped object is considered as a live object.
795    include_wrapped_object: bool,
796}
797
798#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
799pub enum LiveObject {
800    Normal(Object),
801    Wrapped(ObjectKey),
802}
803
804impl LiveObject {
805    pub fn object_id(&self) -> ObjectID {
806        match self {
807            LiveObject::Normal(obj) => obj.id(),
808            LiveObject::Wrapped(key) => key.0,
809        }
810    }
811
812    pub fn version(&self) -> SequenceNumber {
813        match self {
814            LiveObject::Normal(obj) => obj.version(),
815            LiveObject::Wrapped(key) => key.1,
816        }
817    }
818
819    pub fn object_reference(&self) -> ObjectRef {
820        match self {
821            LiveObject::Normal(obj) => obj.compute_object_reference(),
822            LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
823        }
824    }
825
826    pub fn to_normal(self) -> Option<Object> {
827        match self {
828            LiveObject::Normal(object) => Some(object),
829            LiveObject::Wrapped(_) => None,
830        }
831    }
832}
833
834impl LiveSetIter<'_> {
835    fn store_object_wrapper_to_live_object(
836        &self,
837        object_key: ObjectKey,
838        store_object: StoreObjectWrapper,
839    ) -> Option<LiveObject> {
840        match store_object.migrate().into_inner() {
841            StoreObject::Value(object) => {
842                let object = self
843                    .tables
844                    .construct_object(&object_key, *object)
845                    .expect("Constructing object from store cannot fail");
846                Some(LiveObject::Normal(object))
847            }
848            StoreObject::Wrapped => {
849                if self.include_wrapped_object {
850                    Some(LiveObject::Wrapped(object_key))
851                } else {
852                    None
853                }
854            }
855            StoreObject::Deleted => None,
856        }
857    }
858}
859
860impl Iterator for LiveSetIter<'_> {
861    type Item = LiveObject;
862
863    fn next(&mut self) -> Option<Self::Item> {
864        loop {
865            if let Some(Ok((next_key, next_value))) = self.iter.next() {
866                let prev = self.prev.take();
867                self.prev = Some((next_key, next_value));
868
869                if let Some((prev_key, prev_value)) = prev
870                    && prev_key.0 != next_key.0
871                {
872                    let live_object =
873                        self.store_object_wrapper_to_live_object(prev_key, prev_value);
874                    if live_object.is_some() {
875                        return live_object;
876                    }
877                }
878                continue;
879            }
880            if let Some((key, value)) = self.prev.take() {
881                let live_object = self.store_object_wrapper_to_live_object(key, value);
882                if live_object.is_some() {
883                    return live_object;
884                }
885            }
886            return None;
887        }
888    }
889}
890
891// These functions are used to initialize the DB tables
892fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
893    DBOptions {
894        options: db_options
895            .clone()
896            .optimize_for_write_throughput()
897            .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
898            .options,
899        rw_options: db_options.rw_options.set_ignore_range_deletions(false),
900    }
901}
902
903fn objects_table_config(
904    mut db_options: DBOptions,
905    compaction_filter: Option<ObjectsCompactionFilter>,
906) -> DBOptions {
907    if let Some(mut compaction_filter) = compaction_filter {
908        db_options
909            .options
910            .set_compaction_filter("objects", move |_, key, value| {
911                match compaction_filter.filter(key, value) {
912                    Ok(decision) => decision,
913                    Err(err) => {
914                        error!("Compaction error: {:?}", err);
915                        Decision::Keep
916                    }
917                }
918            });
919    }
920    db_options
921        .optimize_for_write_throughput()
922        .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
923}
924
925fn transactions_table_config(db_options: DBOptions) -> DBOptions {
926    db_options
927        .optimize_for_write_throughput()
928        .optimize_for_point_lookup(
929            read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
930        )
931}
932
933fn effects_table_config(db_options: DBOptions) -> DBOptions {
934    db_options
935        .optimize_for_write_throughput()
936        .optimize_for_point_lookup(
937            read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
938        )
939}