sui_core/authority/
authority_store_tables.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6use serde::{Deserialize, Serialize};
7use std::path::Path;
8use std::sync::atomic::AtomicU64;
9use sui_types::base_types::SequenceNumber;
10use sui_types::digests::TransactionEventsDigest;
11use sui_types::effects::{TransactionEffects, TransactionEvents};
12use sui_types::global_state_hash::GlobalStateHash;
13use sui_types::storage::{FullObjectKey, MarkerValue};
14use typed_store::metrics::SamplingInterval;
15use typed_store::rocks::{
16    DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
17    read_size_from_env,
18};
19use typed_store::traits::Map;
20
21use crate::authority::authority_store_types::{
22    StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
23};
24use crate::authority::epoch_start_configuration::EpochStartConfiguration;
25use typed_store::{DBMapUtils, DbIterator};
26
27const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
28pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
29const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
30const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
31
32/// Options to apply to every column family of the `perpetual` DB.
33#[derive(Default)]
34pub struct AuthorityPerpetualTablesOptions {
35    /// Whether to enable write stalling on all column families.
36    pub enable_write_stall: bool,
37    pub is_validator: bool,
38}
39
40impl AuthorityPerpetualTablesOptions {
41    fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
42        if !self.enable_write_stall {
43            db_options = db_options.disable_write_throttling();
44        }
45        db_options
46    }
47}
48
49/// AuthorityPerpetualTables contains data that must be preserved from one epoch to the next.
50#[derive(DBMapUtils)]
51#[cfg_attr(tidehunter, tidehunter)]
52pub struct AuthorityPerpetualTables {
53    /// This is a map between the object (ID, version) and the latest state of the object, namely the
54    /// state that is needed to process new transactions.
55    /// State is represented by `StoreObject` enum, which is either a move module or a move object.
56    ///
57    /// Note that while this map can store all versions of an object, we will eventually
58    /// prune old object versions from the db.
59    ///
60    /// IMPORTANT: object versions must *only* be pruned if they appear as inputs in some
61    /// TransactionEffects. Simply pruning all objects but the most recent is an error!
62    /// This is because there can be partially executed transactions whose effects have not yet
63    /// been written out, and which must be retried. But, they cannot be retried unless their input
64    /// objects are still accessible!
65    pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
66
67    /// This is a map between object references of currently active objects that can be mutated.
68    ///
69    /// For old epochs, it may also contain the transaction that they are lock on for use by this
70    /// specific validator. The transaction locks themselves are now in AuthorityPerEpochStore.
71    #[rename = "owned_object_transaction_locks"]
72    pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
73
74    /// This is a map between the transaction digest and the corresponding transaction that's known to be
75    /// executable. This means that it may have been executed locally, or it may have been synced through
76    /// state-sync but hasn't been executed yet.
77    pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
78
79    /// A map between the transaction digest of a certificate to the effects of its execution.
80    /// We store effects into this table in two different cases:
81    /// 1. When a transaction is synced through state_sync, we store the effects here. These effects
82    ///    are known to be final in the network, but may not have been executed locally yet.
83    /// 2. When the transaction is executed locally on this node, we store the effects here. This means that
84    ///    it's possible to store the same effects twice (once for the synced transaction, and once for the executed).
85    ///
86    /// It's also possible for the effects to be reverted if the transaction didn't make it into the epoch.
87    pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
88
89    /// Transactions that have been executed locally on this node. We need this table since the `effects` table
90    /// doesn't say anything about the execution status of the transaction on this node. When we wait for transactions
91    /// to be executed, we wait for them to appear in this table. When we revert transactions, we remove them from both
92    /// tables.
93    pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
94
95    #[allow(dead_code)]
96    #[deprecated]
97    events: DBMap<(TransactionEventsDigest, usize), Event>,
98
99    // Events keyed by the digest of the transaction that produced them.
100    pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
101
102    // Loaded (and unchanged) runtime object references.
103    pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
104
105    /// DEPRECATED in favor of the table of the same name in authority_per_epoch_store.
106    /// Please do not add new accessors/callsites.
107    /// When transaction is executed via checkpoint executor, we store association here
108    pub(crate) executed_transactions_to_checkpoint:
109        DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
110
111    // Finalized root state hash for epoch, to be included in CheckpointSummary
112    // of last checkpoint of epoch. These values should only ever be written once
113    // and never changed
114    pub(crate) root_state_hash_by_epoch:
115        DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
116
117    /// Parameters of the system fixed at the epoch start
118    pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
119
120    /// A singleton table that stores latest pruned checkpoint. Used to keep objects pruner progress
121    pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
122
123    /// Expected total amount of SUI in the network. This is expected to remain constant
124    /// throughout the lifetime of the network. We check it at the end of each epoch if
125    /// expensive checks are enabled. We cannot use 10B today because in tests we often
126    /// inject extra gas objects into genesis.
127    pub(crate) expected_network_sui_amount: DBMap<(), u64>,
128
129    /// Expected imbalance between storage fund balance and the sum of storage rebate of all live objects.
130    /// This could be non-zero due to bugs in earlier protocol versions.
131    /// This number is the result of storage_fund_balance - sum(storage_rebate).
132    pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
133
134    /// Table that stores the set of received objects and deleted objects and the version at
135    /// which they were received. This is used to prevent possible race conditions around receiving
136    /// objects (since they are not locked by the transaction manager) and for tracking shared
137    /// objects that have been deleted. This table is meant to be pruned per-epoch, and all
138    /// previous epochs other than the current epoch may be pruned safely.
139    pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
140    pub(crate) object_per_epoch_marker_table_v2: DBMap<(EpochId, FullObjectKey), MarkerValue>,
141
142    /// Tracks executed transaction digests across epochs.
143    /// Used to support address balance gas payments feature.
144    /// This table uses epoch-prefixed keys to support efficient pruning via range delete.
145    pub(crate) executed_transaction_digests: DBMap<(EpochId, TransactionDigest), ()>,
146}
147
148impl AuthorityPerpetualTables {
149    pub fn path(parent_path: &Path) -> PathBuf {
150        parent_path.join("perpetual")
151    }
152
153    #[cfg(not(tidehunter))]
154    pub fn open(
155        parent_path: &Path,
156        db_options_override: Option<AuthorityPerpetualTablesOptions>,
157        _pruner_watermark: Option<Arc<AtomicU64>>,
158    ) -> Self {
159        let db_options_override = db_options_override.unwrap_or_default();
160        let db_options =
161            db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
162        let table_options = DBMapTableConfigMap::new(BTreeMap::from([
163            (
164                "objects".to_string(),
165                objects_table_config(db_options.clone()),
166            ),
167            (
168                "owned_object_transaction_locks".to_string(),
169                owned_object_transaction_locks_table_config(db_options.clone()),
170            ),
171            (
172                "transactions".to_string(),
173                transactions_table_config(db_options.clone()),
174            ),
175            (
176                "effects".to_string(),
177                effects_table_config(db_options.clone()),
178            ),
179        ]));
180
181        Self::open_tables_read_write(
182            Self::path(parent_path),
183            MetricConf::new("perpetual")
184                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
185            Some(db_options.options),
186            Some(table_options),
187        )
188    }
189
190    #[cfg(tidehunter)]
191    pub fn open(
192        parent_path: &Path,
193        db_options_override: Option<AuthorityPerpetualTablesOptions>,
194        pruner_watermark: Option<Arc<AtomicU64>>,
195    ) -> Self {
196        use crate::authority::authority_store_pruner::apply_relocation_filter;
197        tracing::warn!("AuthorityPerpetualTables using tidehunter");
198        use typed_store::tidehunter_util::{
199            Bytes, Decision, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
200            default_cells_per_mutex, default_mutex_count, default_value_cache_size,
201        };
202        let mutexes = default_mutex_count() * 2;
203        let value_cache_size = default_value_cache_size();
204        // effectively disables pruning if not set
205        let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
206
207        let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
208        let objects_compactor = |iter: &mut dyn DoubleEndedIterator<Item = &Bytes>| {
209            let mut retain = HashSet::new();
210            let mut previous: Option<&[u8]> = None;
211            const OID_SIZE: usize = 16;
212            for key in iter.rev() {
213                if let Some(prev) = previous {
214                    if prev == &key[..OID_SIZE] {
215                        continue;
216                    }
217                }
218                previous = Some(&key[..OID_SIZE]);
219                retain.insert(key.clone());
220            }
221            retain
222        };
223        let mut digest_prefix = vec![0; 8];
224        digest_prefix[7] = 32;
225        let uniform_key = KeyType::uniform(default_cells_per_mutex());
226        let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
227        // TransactionDigest is serialized with an 8-byte prefix, so we include it in the key calculation
228        let epoch_tx_digest_prefix_key =
229            KeyType::from_prefix_bits((8/*EpochId*/ + 8/*TransactionDigest prefix*/) * 8 + 12);
230        let object_indexing = KeyIndexing::key_reduction(32 + 8, 16..(32 + 8));
231        // todo can figure way to scramble off 8 bytes in the middle
232        let obj_ref_size = 32 + 8 + 32 + 8;
233        let owned_object_transaction_locks_indexing =
234            KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
235
236        let mut objects_config = KeySpaceConfig::new()
237            .with_unloaded_iterator(true)
238            .with_max_dirty_keys(4048);
239        if matches!(db_options_override, Some(options) if options.is_validator) {
240            objects_config = objects_config.with_compactor(Box::new(objects_compactor));
241        }
242
243        let configs = vec![
244            (
245                "objects".to_string(),
246                ThConfig::new_with_config_indexing(
247                    object_indexing,
248                    mutexes,
249                    KeyType::uniform(default_cells_per_mutex() * 4),
250                    objects_config,
251                ),
252            ),
253            (
254                "owned_object_transaction_locks".to_string(),
255                ThConfig::new_with_config_indexing(
256                    owned_object_transaction_locks_indexing,
257                    mutexes,
258                    KeyType::uniform(default_cells_per_mutex() * 4),
259                    bloom_config.clone().with_max_dirty_keys(4048),
260                ),
261            ),
262            (
263                "transactions".to_string(),
264                ThConfig::new_with_rm_prefix_indexing(
265                    KeyIndexing::key_reduction(32, 0..16),
266                    mutexes,
267                    uniform_key,
268                    KeySpaceConfig::new()
269                        .with_value_cache_size(value_cache_size)
270                        .with_relocation_filter(|_, _| Decision::Remove),
271                    digest_prefix.clone(),
272                ),
273            ),
274            (
275                "effects".to_string(),
276                ThConfig::new_with_rm_prefix_indexing(
277                    KeyIndexing::key_reduction(32, 0..16),
278                    mutexes,
279                    uniform_key,
280                    apply_relocation_filter(
281                        bloom_config.clone().with_value_cache_size(value_cache_size),
282                        pruner_watermark.clone(),
283                        |effects: TransactionEffects| effects.executed_epoch(),
284                        false,
285                    ),
286                    digest_prefix.clone(),
287                ),
288            ),
289            (
290                "executed_effects".to_string(),
291                ThConfig::new_with_rm_prefix_indexing(
292                    KeyIndexing::key_reduction(32, 0..16),
293                    mutexes,
294                    uniform_key,
295                    bloom_config
296                        .clone()
297                        .with_value_cache_size(value_cache_size)
298                        .with_relocation_filter(|_, _| Decision::Remove),
299                    digest_prefix.clone(),
300                ),
301            ),
302            (
303                "events".to_string(),
304                ThConfig::new_with_rm_prefix(
305                    32 + 8,
306                    mutexes,
307                    uniform_key,
308                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
309                    digest_prefix.clone(),
310                ),
311            ),
312            (
313                "events_2".to_string(),
314                ThConfig::new_with_rm_prefix(
315                    32,
316                    mutexes,
317                    uniform_key,
318                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
319                    digest_prefix.clone(),
320                ),
321            ),
322            (
323                "unchanged_loaded_runtime_objects".to_string(),
324                ThConfig::new_with_rm_prefix(
325                    32,
326                    mutexes,
327                    uniform_key,
328                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
329                    digest_prefix.clone(),
330                ),
331            ),
332            (
333                "executed_transactions_to_checkpoint".to_string(),
334                ThConfig::new_with_rm_prefix(
335                    32,
336                    mutexes,
337                    uniform_key,
338                    apply_relocation_filter(
339                        KeySpaceConfig::default(),
340                        pruner_watermark.clone(),
341                        |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
342                        false,
343                    ),
344                    digest_prefix.clone(),
345                ),
346            ),
347            (
348                "root_state_hash_by_epoch".to_string(),
349                ThConfig::new(8, 1, KeyType::uniform(1)),
350            ),
351            (
352                "epoch_start_configuration".to_string(),
353                ThConfig::new(0, 1, KeyType::uniform(1)),
354            ),
355            (
356                "pruned_checkpoint".to_string(),
357                ThConfig::new(0, 1, KeyType::uniform(1)),
358            ),
359            (
360                "expected_network_sui_amount".to_string(),
361                ThConfig::new(0, 1, KeyType::uniform(1)),
362            ),
363            (
364                "expected_storage_fund_imbalance".to_string(),
365                ThConfig::new(0, 1, KeyType::uniform(1)),
366            ),
367            (
368                "object_per_epoch_marker_table".to_string(),
369                ThConfig::new_with_config_indexing(
370                    KeyIndexing::VariableLength,
371                    mutexes,
372                    epoch_prefix_key,
373                    apply_relocation_filter(
374                        KeySpaceConfig::default(),
375                        pruner_watermark.clone(),
376                        |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
377                        true,
378                    ),
379                ),
380            ),
381            (
382                "object_per_epoch_marker_table_v2".to_string(),
383                ThConfig::new_with_config_indexing(
384                    KeyIndexing::VariableLength,
385                    mutexes,
386                    epoch_prefix_key,
387                    apply_relocation_filter(
388                        bloom_config.clone(),
389                        pruner_watermark.clone(),
390                        |(epoch_id, _): (EpochId, FullObjectKey)| epoch_id,
391                        true,
392                    ),
393                ),
394            ),
395            (
396                "executed_transaction_digests".to_string(),
397                ThConfig::new_with_config_indexing(
398                    // EpochId + (TransactionDigest)
399                    KeyIndexing::fixed(8 + (32 + 8)),
400                    mutexes,
401                    epoch_tx_digest_prefix_key,
402                    apply_relocation_filter(
403                        bloom_config.clone(),
404                        pruner_watermark.clone(),
405                        |(epoch_id, _): (EpochId, TransactionDigest)| epoch_id,
406                        true,
407                    ),
408                ),
409            ),
410        ];
411        Self::open_tables_read_write(
412            Self::path(parent_path),
413            MetricConf::new("perpetual")
414                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
415            configs.into_iter().collect(),
416        )
417    }
418
419    #[cfg(not(tidehunter))]
420    pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
421        Self::get_read_only_handle(
422            Self::path(parent_path),
423            None,
424            None,
425            MetricConf::new("perpetual_readonly"),
426        )
427    }
428
429    #[cfg(tidehunter)]
430    pub fn open_readonly(parent_path: &Path) -> Self {
431        Self::open(parent_path, None, None)
432    }
433
434    // This is used by indexer to find the correct version of dynamic field child object.
435    // We do not store the version of the child object, but because of lamport timestamp,
436    // we know the child must have version number less then or eq to the parent.
437    pub fn find_object_lt_or_eq_version(
438        &self,
439        object_id: ObjectID,
440        version: SequenceNumber,
441    ) -> SuiResult<Option<Object>> {
442        let mut iter = self.objects.reversed_safe_iter_with_bounds(
443            Some(ObjectKey::min_for_id(&object_id)),
444            Some(ObjectKey(object_id, version)),
445        )?;
446        match iter.next() {
447            Some(Ok((key, o))) => self.object(&key, o),
448            Some(Err(e)) => Err(e.into()),
449            None => Ok(None),
450        }
451    }
452
453    fn construct_object(
454        &self,
455        object_key: &ObjectKey,
456        store_object: StoreObjectValue,
457    ) -> Result<Object, SuiError> {
458        try_construct_object(object_key, store_object)
459    }
460
461    // Constructs `sui_types::object::Object` from `StoreObjectWrapper`.
462    // Returns `None` if object was deleted/wrapped
463    pub fn object(
464        &self,
465        object_key: &ObjectKey,
466        store_object: StoreObjectWrapper,
467    ) -> Result<Option<Object>, SuiError> {
468        let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
469            return Ok(None);
470        };
471        Ok(Some(self.construct_object(object_key, *store_object)?))
472    }
473
474    pub fn object_reference(
475        &self,
476        object_key: &ObjectKey,
477        store_object: StoreObjectWrapper,
478    ) -> Result<ObjectRef, SuiError> {
479        let obj_ref = match store_object.migrate().into_inner() {
480            StoreObject::Value(object) => self
481                .construct_object(object_key, *object)?
482                .compute_object_reference(),
483            StoreObject::Deleted => (
484                object_key.0,
485                object_key.1,
486                ObjectDigest::OBJECT_DIGEST_DELETED,
487            ),
488            StoreObject::Wrapped => (
489                object_key.0,
490                object_key.1,
491                ObjectDigest::OBJECT_DIGEST_WRAPPED,
492            ),
493        };
494        Ok(obj_ref)
495    }
496
497    pub fn tombstone_reference(
498        &self,
499        object_key: &ObjectKey,
500        store_object: &StoreObjectWrapper,
501    ) -> Result<Option<ObjectRef>, SuiError> {
502        let obj_ref = match store_object.inner() {
503            StoreObject::Deleted => Some((
504                object_key.0,
505                object_key.1,
506                ObjectDigest::OBJECT_DIGEST_DELETED,
507            )),
508            StoreObject::Wrapped => Some((
509                object_key.0,
510                object_key.1,
511                ObjectDigest::OBJECT_DIGEST_WRAPPED,
512            )),
513            _ => None,
514        };
515        Ok(obj_ref)
516    }
517
518    pub fn get_latest_object_ref_or_tombstone(
519        &self,
520        object_id: ObjectID,
521    ) -> Result<Option<ObjectRef>, SuiError> {
522        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
523            Some(ObjectKey::min_for_id(&object_id)),
524            Some(ObjectKey::max_for_id(&object_id)),
525        )?;
526
527        if let Some(Ok((object_key, value))) = iterator.next()
528            && object_key.0 == object_id
529        {
530            return Ok(Some(self.object_reference(&object_key, value)?));
531        }
532        Ok(None)
533    }
534
535    pub fn get_latest_object_or_tombstone(
536        &self,
537        object_id: ObjectID,
538    ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
539        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
540            Some(ObjectKey::min_for_id(&object_id)),
541            Some(ObjectKey::max_for_id(&object_id)),
542        )?;
543
544        if let Some(Ok((object_key, value))) = iterator.next()
545            && object_key.0 == object_id
546        {
547            return Ok(Some((object_key, value)));
548        }
549        Ok(None)
550    }
551
552    pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
553        Ok(self
554            .epoch_start_configuration
555            .get(&())?
556            .expect("Must have current epoch.")
557            .epoch_start_state()
558            .epoch())
559    }
560
561    pub fn set_epoch_start_configuration(
562        &self,
563        epoch_start_configuration: &EpochStartConfiguration,
564    ) -> SuiResult {
565        let mut wb = self.epoch_start_configuration.batch();
566        wb.insert_batch(
567            &self.epoch_start_configuration,
568            std::iter::once(((), epoch_start_configuration)),
569        )?;
570        wb.write()?;
571        Ok(())
572    }
573
574    pub fn get_highest_pruned_checkpoint(
575        &self,
576    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
577        self.pruned_checkpoint.get(&())
578    }
579
580    pub fn set_highest_pruned_checkpoint(
581        &self,
582        wb: &mut DBBatch,
583        checkpoint_number: CheckpointSequenceNumber,
584    ) -> SuiResult {
585        wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
586        Ok(())
587    }
588
589    pub fn get_transaction(
590        &self,
591        digest: &TransactionDigest,
592    ) -> SuiResult<Option<TrustedTransaction>> {
593        let Some(transaction) = self.transactions.get(digest)? else {
594            return Ok(None);
595        };
596        Ok(Some(transaction))
597    }
598
599    /// Batch insert executed transaction digests for a given epoch.
600    /// Used by formal snapshot restore to backfill transaction digests from the previous epoch.
601    pub fn insert_executed_transaction_digests_batch(
602        &self,
603        epoch: EpochId,
604        digests: impl Iterator<Item = TransactionDigest>,
605    ) -> SuiResult {
606        let mut batch = self.executed_transaction_digests.batch();
607        batch.insert_batch(
608            &self.executed_transaction_digests,
609            digests.map(|digest| ((epoch, digest), ())),
610        )?;
611        batch.write()?;
612        Ok(())
613    }
614
615    pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
616        let Some(effect_digest) = self.executed_effects.get(digest)? else {
617            return Ok(None);
618        };
619        Ok(self.effects.get(&effect_digest)?)
620    }
621
622    pub(crate) fn was_transaction_executed_in_last_epoch(
623        &self,
624        digest: &TransactionDigest,
625        current_epoch: EpochId,
626    ) -> bool {
627        if current_epoch == 0 {
628            return false;
629        }
630        self.executed_transaction_digests
631            .contains_key(&(current_epoch - 1, *digest))
632            .expect("db error")
633    }
634
635    // DEPRECATED as the backing table has been moved to authority_per_epoch_store.
636    // Please do not add new accessors/callsites.
637    pub fn get_checkpoint_sequence_number(
638        &self,
639        digest: &TransactionDigest,
640    ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
641        Ok(self.executed_transactions_to_checkpoint.get(digest)?)
642    }
643
644    pub fn get_newer_object_keys(
645        &self,
646        object: &(ObjectID, SequenceNumber),
647    ) -> SuiResult<Vec<ObjectKey>> {
648        let mut objects = vec![];
649        for result in self.objects.safe_iter_with_bounds(
650            Some(ObjectKey(object.0, object.1.next())),
651            Some(ObjectKey(object.0, VersionNumber::MAX)),
652        ) {
653            let (key, _) = result?;
654            objects.push(key);
655        }
656        Ok(objects)
657    }
658
659    pub fn set_highest_pruned_checkpoint_without_wb(
660        &self,
661        checkpoint_number: CheckpointSequenceNumber,
662    ) -> SuiResult {
663        let mut wb = self.pruned_checkpoint.batch();
664        self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
665        wb.write()?;
666        Ok(())
667    }
668
669    pub fn database_is_empty(&self) -> SuiResult<bool> {
670        Ok(self.objects.safe_iter().next().is_none())
671    }
672
673    pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
674        LiveSetIter {
675            iter: Box::new(self.objects.safe_iter()),
676            tables: self,
677            prev: None,
678            include_wrapped_object,
679        }
680    }
681
682    pub fn range_iter_live_object_set(
683        &self,
684        lower_bound: Option<ObjectID>,
685        upper_bound: Option<ObjectID>,
686        include_wrapped_object: bool,
687    ) -> LiveSetIter<'_> {
688        let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
689        let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
690
691        LiveSetIter {
692            iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
693            tables: self,
694            prev: None,
695            include_wrapped_object,
696        }
697    }
698
699    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
700        // This checkpoints the entire db and not just objects table
701        self.objects.checkpoint_db(path).map_err(Into::into)
702    }
703
704    pub fn get_root_state_hash(
705        &self,
706        epoch: EpochId,
707    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
708        Ok(self.root_state_hash_by_epoch.get(&epoch)?)
709    }
710
711    pub fn insert_root_state_hash(
712        &self,
713        epoch: EpochId,
714        last_checkpoint_of_epoch: CheckpointSequenceNumber,
715        hash: GlobalStateHash,
716    ) -> SuiResult {
717        self.root_state_hash_by_epoch
718            .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
719        Ok(())
720    }
721
722    pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
723        let object_reference = object.compute_object_reference();
724        let wrapper = get_store_object(object);
725        let mut wb = self.objects.batch();
726        wb.insert_batch(
727            &self.objects,
728            std::iter::once((ObjectKey::from(object_reference), wrapper)),
729        )?;
730        wb.write()?;
731        Ok(())
732    }
733
734    // fallible get object methods for sui-tool, which may need to attempt to read a corrupted database
735    pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
736        let obj_entry = self
737            .objects
738            .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
739            .next();
740
741        match obj_entry.transpose()? {
742            Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
743                Ok(self.object(&ObjectKey(obj_id, version), obj)?)
744            }
745            _ => Ok(None),
746        }
747    }
748
749    pub fn get_object_by_key_fallible(
750        &self,
751        object_id: &ObjectID,
752        version: VersionNumber,
753    ) -> SuiResult<Option<Object>> {
754        Ok(self
755            .objects
756            .get(&ObjectKey(*object_id, version))?
757            .and_then(|object| {
758                self.object(&ObjectKey(*object_id, version), object)
759                    .expect("object construction error")
760            }))
761    }
762}
763
764impl ObjectStore for AuthorityPerpetualTables {
765    /// Read an object and return it, or Ok(None) if the object was not found.
766    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
767        self.get_object_fallible(object_id).expect("db error")
768    }
769
770    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
771        self.get_object_by_key_fallible(object_id, version)
772            .expect("db error")
773    }
774}
775
776pub struct LiveSetIter<'a> {
777    iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
778    tables: &'a AuthorityPerpetualTables,
779    prev: Option<(ObjectKey, StoreObjectWrapper)>,
780    /// Whether a wrapped object is considered as a live object.
781    include_wrapped_object: bool,
782}
783
784#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
785pub enum LiveObject {
786    Normal(Object),
787    Wrapped(ObjectKey),
788}
789
790impl LiveObject {
791    pub fn object_id(&self) -> ObjectID {
792        match self {
793            LiveObject::Normal(obj) => obj.id(),
794            LiveObject::Wrapped(key) => key.0,
795        }
796    }
797
798    pub fn version(&self) -> SequenceNumber {
799        match self {
800            LiveObject::Normal(obj) => obj.version(),
801            LiveObject::Wrapped(key) => key.1,
802        }
803    }
804
805    pub fn object_reference(&self) -> ObjectRef {
806        match self {
807            LiveObject::Normal(obj) => obj.compute_object_reference(),
808            LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
809        }
810    }
811
812    pub fn to_normal(self) -> Option<Object> {
813        match self {
814            LiveObject::Normal(object) => Some(object),
815            LiveObject::Wrapped(_) => None,
816        }
817    }
818}
819
820impl LiveSetIter<'_> {
821    fn store_object_wrapper_to_live_object(
822        &self,
823        object_key: ObjectKey,
824        store_object: StoreObjectWrapper,
825    ) -> Option<LiveObject> {
826        match store_object.migrate().into_inner() {
827            StoreObject::Value(object) => {
828                let object = self
829                    .tables
830                    .construct_object(&object_key, *object)
831                    .expect("Constructing object from store cannot fail");
832                Some(LiveObject::Normal(object))
833            }
834            StoreObject::Wrapped => {
835                if self.include_wrapped_object {
836                    Some(LiveObject::Wrapped(object_key))
837                } else {
838                    None
839                }
840            }
841            StoreObject::Deleted => None,
842        }
843    }
844}
845
846impl Iterator for LiveSetIter<'_> {
847    type Item = LiveObject;
848
849    fn next(&mut self) -> Option<Self::Item> {
850        loop {
851            if let Some(Ok((next_key, next_value))) = self.iter.next() {
852                let prev = self.prev.take();
853                self.prev = Some((next_key, next_value));
854
855                if let Some((prev_key, prev_value)) = prev
856                    && prev_key.0 != next_key.0
857                {
858                    let live_object =
859                        self.store_object_wrapper_to_live_object(prev_key, prev_value);
860                    if live_object.is_some() {
861                        return live_object;
862                    }
863                }
864                continue;
865            }
866            if let Some((key, value)) = self.prev.take() {
867                let live_object = self.store_object_wrapper_to_live_object(key, value);
868                if live_object.is_some() {
869                    return live_object;
870                }
871            }
872            return None;
873        }
874    }
875}
876
877// These functions are used to initialize the DB tables
878fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
879    DBOptions {
880        options: db_options
881            .clone()
882            .optimize_for_write_throughput()
883            .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
884            .options,
885        rw_options: db_options.rw_options.set_ignore_range_deletions(false),
886    }
887}
888
889fn objects_table_config(db_options: DBOptions) -> DBOptions {
890    db_options
891        .optimize_for_write_throughput()
892        .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
893}
894
895fn transactions_table_config(db_options: DBOptions) -> DBOptions {
896    db_options
897        .optimize_for_write_throughput()
898        .optimize_for_point_lookup(
899            read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
900        )
901}
902
903fn effects_table_config(db_options: DBOptions) -> DBOptions {
904    db_options
905        .optimize_for_write_throughput()
906        .optimize_for_point_lookup(
907            read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
908        )
909}