sui_core/authority/
authority_store_tables.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6#[cfg(tidehunter)]
7use crate::authority::epoch_marker_key::EPOCH_MARKER_KEY_SIZE;
8use crate::authority::epoch_marker_key::EpochMarkerKey;
9use serde::{Deserialize, Serialize};
10use std::path::Path;
11use std::sync::atomic::AtomicU64;
12use sui_types::base_types::SequenceNumber;
13use sui_types::effects::{TransactionEffects, TransactionEvents};
14use sui_types::global_state_hash::GlobalStateHash;
15use sui_types::storage::MarkerValue;
16use typed_store::metrics::SamplingInterval;
17use typed_store::rocks::{
18    DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
19    read_size_from_env,
20};
21use typed_store::traits::Map;
22
23use crate::authority::authority_store_types::{
24    StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
25};
26use crate::authority::epoch_start_configuration::EpochStartConfiguration;
27use typed_store::{DBMapUtils, DbIterator};
28
29const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
30pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
31const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
32const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
33
34/// Options to apply to every column family of the `perpetual` DB.
35#[derive(Default)]
36pub struct AuthorityPerpetualTablesOptions {
37    /// Whether to enable write stalling on all column families.
38    pub enable_write_stall: bool,
39    /// On tidehunter, attach the per-keyspace objects compactor that retains
40    /// only the latest version per ObjectID. Mutually exclusive with the
41    /// object pruner — see `AuthorityStorePruner::new`.
42    pub enable_objects_compactor: bool,
43}
44
45impl AuthorityPerpetualTablesOptions {
46    fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
47        if !self.enable_write_stall {
48            db_options = db_options.disable_write_throttling();
49        }
50        db_options
51    }
52}
53
54/// AuthorityPerpetualTables contains data that must be preserved from one epoch to the next.
55#[derive(DBMapUtils)]
56#[cfg_attr(tidehunter, tidehunter)]
57pub struct AuthorityPerpetualTables {
58    /// This is a map between the object (ID, version) and the latest state of the object, namely the
59    /// state that is needed to process new transactions.
60    /// State is represented by `StoreObject` enum, which is either a move module or a move object.
61    ///
62    /// Note that while this map can store all versions of an object, we will eventually
63    /// prune old object versions from the db.
64    ///
65    /// IMPORTANT: object versions must *only* be pruned if they appear as inputs in some
66    /// TransactionEffects. Simply pruning all objects but the most recent is an error!
67    /// This is because there can be partially executed transactions whose effects have not yet
68    /// been written out, and which must be retried. But, they cannot be retried unless their input
69    /// objects are still accessible!
70    pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
71
72    /// This is a map between object references of currently active objects that can be mutated.
73    ///
74    /// For old epochs, it may also contain the transaction that they are lock on for use by this
75    /// specific validator. The transaction locks themselves are now in AuthorityPerEpochStore.
76    #[rename = "owned_object_transaction_locks"]
77    pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
78
79    /// This is a map between the transaction digest and the corresponding transaction that's known to be
80    /// executable. This means that it may have been executed locally, or it may have been synced through
81    /// state-sync but hasn't been executed yet.
82    pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
83
84    /// A map between the transaction digest of a certificate to the effects of its execution.
85    /// We store effects into this table in two different cases:
86    /// 1. When a transaction is synced through state_sync, we store the effects here. These effects
87    ///    are known to be final in the network, but may not have been executed locally yet.
88    /// 2. When the transaction is executed locally on this node, we store the effects here. This means that
89    ///    it's possible to store the same effects twice (once for the synced transaction, and once for the executed).
90    ///
91    /// It's also possible for the effects to be reverted if the transaction didn't make it into the epoch.
92    pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
93
94    /// Transactions that have been executed locally on this node. We need this table since the `effects` table
95    /// doesn't say anything about the execution status of the transaction on this node. When we wait for transactions
96    /// to be executed, we wait for them to appear in this table. When we revert transactions, we remove them from both
97    /// tables.
98    pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
99
100    // Events keyed by the digest of the transaction that produced them.
101    pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
102
103    // Loaded (and unchanged) runtime object references.
104    pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
105
106    /// DEPRECATED in favor of the table of the same name in authority_per_epoch_store.
107    /// Please do not add new accessors/callsites.
108    /// When transaction is executed via checkpoint executor, we store association here
109    pub(crate) executed_transactions_to_checkpoint:
110        DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
111
112    // Finalized root state hash for epoch, to be included in CheckpointSummary
113    // of last checkpoint of epoch. These values should only ever be written once
114    // and never changed
115    pub(crate) root_state_hash_by_epoch:
116        DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
117
118    /// Parameters of the system fixed at the epoch start
119    pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
120
121    /// A singleton table that stores latest pruned checkpoint. Used to keep objects pruner progress
122    pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
123
124    /// Expected total amount of SUI in the network. This is expected to remain constant
125    /// throughout the lifetime of the network. We check it at the end of each epoch if
126    /// expensive checks are enabled. We cannot use 10B today because in tests we often
127    /// inject extra gas objects into genesis.
128    pub(crate) expected_network_sui_amount: DBMap<(), u64>,
129
130    /// Expected imbalance between storage fund balance and the sum of storage rebate of all live objects.
131    /// This could be non-zero due to bugs in earlier protocol versions.
132    /// This number is the result of storage_fund_balance - sum(storage_rebate).
133    pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
134
135    /// Table that stores the set of received objects and deleted objects and the version at
136    /// which they were received. This is used to prevent possible race conditions around receiving
137    /// objects (since they are not locked by the transaction manager) and for tracking shared
138    /// objects that have been deleted. This table is meant to be pruned per-epoch, and all
139    /// previous epochs other than the current epoch may be pruned safely.
140    pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
141    pub(crate) object_per_epoch_marker_table_v2: DBMap<EpochMarkerKey, MarkerValue>,
142
143    /// Tracks executed transaction digests across epochs.
144    /// Used to support address balance gas payments feature.
145    /// This table uses epoch-prefixed keys to support efficient pruning via range delete.
146    pub(crate) executed_transaction_digests: DBMap<(EpochId, TransactionDigest), ()>,
147
148    /// A singleton table recording the highest checkpoint whose transaction
149    /// outputs (objects, effects, etc.) are durably committed to this store.
150    /// It is written in the *same* atomic batch as those outputs (see
151    /// `AuthorityStore::build_db_batch` and the checkpoint executor), so it is
152    /// always consistent with the live object set even after an unclean stop.
153    ///
154    /// This differs from the checkpoint store's `highest_executed` watermark,
155    /// which is bumped in a separate write after the outputs are committed: an
156    /// abrupt stop can leave the object writes durable while `highest_executed`
157    /// still lags. Consumers that read the live object set directly (e.g. the
158    /// embedded rpc-store's bulk restore) use this watermark instead, so they
159    /// never observe objects beyond the checkpoint they think they are at.
160    ///
161    /// IMPORTANT: TideHunter keyspaces are order-sensitive once written to disk.
162    /// Keep new keyspaces append-only to preserve compatibility with existing DBs.
163    pub(crate) highest_committed_checkpoint: DBMap<(), CheckpointSequenceNumber>,
164}
165
166impl AuthorityPerpetualTables {
167    pub fn path(parent_path: &Path) -> PathBuf {
168        parent_path.join("perpetual")
169    }
170
171    #[cfg(not(tidehunter))]
172    pub fn open(
173        parent_path: &Path,
174        db_options_override: Option<AuthorityPerpetualTablesOptions>,
175        _pruner_watermark: Option<Arc<AtomicU64>>,
176    ) -> Self {
177        let db_options_override = db_options_override.unwrap_or_default();
178        let db_options = db_options_override
179            .apply_to(default_db_options().optimize_db_for_write_throughput(4, false));
180        let table_options = DBMapTableConfigMap::new(BTreeMap::from([
181            (
182                "objects".to_string(),
183                objects_table_config(db_options.clone()),
184            ),
185            (
186                "owned_object_transaction_locks".to_string(),
187                owned_object_transaction_locks_table_config(db_options.clone()),
188            ),
189            (
190                "transactions".to_string(),
191                transactions_table_config(db_options.clone()),
192            ),
193            (
194                "effects".to_string(),
195                effects_table_config(db_options.clone()),
196            ),
197        ]));
198
199        Self::open_tables_read_write(
200            Self::path(parent_path),
201            MetricConf::new("perpetual")
202                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
203            Some(db_options.options),
204            Some(table_options),
205        )
206    }
207
208    #[cfg(tidehunter)]
209    pub fn open(
210        parent_path: &Path,
211        db_options_override: Option<AuthorityPerpetualTablesOptions>,
212        pruner_watermark: Option<Arc<AtomicU64>>,
213    ) -> Self {
214        use crate::authority::authority_store_pruner::apply_relocation_filter;
215        tracing::warn!("AuthorityPerpetualTables using tidehunter");
216        use typed_store::tidehunter_util::{
217            Bytes, Decision, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
218            default_cells_per_mutex, default_max_dirty_keys, default_mutex_count,
219            default_value_cache_size,
220        };
221        let mutexes = default_mutex_count() * 2;
222        let transaction_mutexes = mutexes * 4;
223        let value_cache_size = default_value_cache_size();
224        // effectively disables pruning if not set
225        let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
226
227        let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
228        let objects_compactor = |iter: &mut dyn DoubleEndedIterator<Item = &Bytes>| {
229            let mut retain = HashSet::new();
230            let mut previous: Option<&[u8]> = None;
231            const OID_SIZE: usize = 32;
232            for key in iter.rev() {
233                if let Some(prev) = previous {
234                    if prev == &key[..OID_SIZE] {
235                        continue;
236                    }
237                }
238                previous = Some(&key[..OID_SIZE]);
239                retain.insert(key.clone());
240            }
241            retain
242        };
243        let mut digest_prefix = vec![0; 8];
244        digest_prefix[7] = 32;
245        let uniform_key = KeyType::uniform(default_cells_per_mutex());
246        let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
247        // TransactionDigest is serialized with an 8-byte prefix, so we include it in the key calculation
248        let epoch_tx_digest_prefix_key =
249            KeyType::from_prefix_bits((8/*EpochId*/ + 8/*TransactionDigest prefix*/) * 8 + 12);
250        let object_indexing = KeyIndexing::fixed(32 + 8); //  KeyIndexing::key_reduction(32 + 8, 16..(32 + 8));
251        // todo can figure way to scramble off 8 bytes in the middle
252        let obj_ref_size = 32 + 8 + 32 + 8;
253        let owned_object_transaction_locks_indexing =
254            KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
255
256        let mut objects_config = KeySpaceConfig::new()
257            .with_max_dirty_keys(16 * default_max_dirty_keys())
258            .with_value_cache_size(value_cache_size);
259        if matches!(db_options_override, Some(options) if options.enable_objects_compactor) {
260            objects_config = objects_config.with_compactor(Box::new(objects_compactor));
261        }
262
263        let configs = vec![
264            (
265                "objects".to_string(),
266                ThConfig::new_with_config_indexing(
267                    object_indexing,
268                    mutexes * 4,
269                    KeyType::uniform(1),
270                    objects_config,
271                ),
272            ),
273            (
274                "owned_object_transaction_locks".to_string(),
275                ThConfig::new_with_config_indexing(
276                    owned_object_transaction_locks_indexing,
277                    mutexes * 16,
278                    KeyType::uniform(default_cells_per_mutex()),
279                    bloom_config
280                        .clone()
281                        .with_max_dirty_keys(16 * default_max_dirty_keys()),
282                ),
283            ),
284            (
285                "transactions".to_string(),
286                ThConfig::new_with_rm_prefix_indexing(
287                    KeyIndexing::key_reduction(32, 0..16),
288                    transaction_mutexes,
289                    uniform_key,
290                    KeySpaceConfig::new()
291                        .with_value_cache_size(value_cache_size)
292                        .with_relocation_filter(|_, _| Decision::Remove),
293                    digest_prefix.clone(),
294                ),
295            ),
296            (
297                "effects".to_string(),
298                ThConfig::new_with_rm_prefix_indexing(
299                    KeyIndexing::key_reduction(32, 0..16),
300                    transaction_mutexes,
301                    uniform_key,
302                    apply_relocation_filter(
303                        bloom_config.clone().with_value_cache_size(value_cache_size),
304                        pruner_watermark.clone(),
305                        |effects: TransactionEffects| effects.executed_epoch(),
306                        false,
307                    ),
308                    digest_prefix.clone(),
309                ),
310            ),
311            (
312                "executed_effects".to_string(),
313                ThConfig::new_with_rm_prefix_indexing(
314                    KeyIndexing::key_reduction(32, 0..16),
315                    transaction_mutexes,
316                    uniform_key,
317                    bloom_config
318                        .clone()
319                        .with_value_cache_size(value_cache_size)
320                        .with_relocation_filter(|_, _| Decision::Remove),
321                    digest_prefix.clone(),
322                ),
323            ),
324            (
325                "events".to_string(),
326                ThConfig::new_with_rm_prefix(
327                    32 + 8,
328                    mutexes,
329                    uniform_key,
330                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
331                    digest_prefix.clone(),
332                ),
333            ),
334            (
335                "events_2".to_string(),
336                ThConfig::new_with_rm_prefix(
337                    32,
338                    mutexes,
339                    uniform_key,
340                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
341                    digest_prefix.clone(),
342                ),
343            ),
344            (
345                "unchanged_loaded_runtime_objects".to_string(),
346                ThConfig::new_with_rm_prefix(
347                    32,
348                    mutexes,
349                    uniform_key,
350                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
351                    digest_prefix.clone(),
352                ),
353            ),
354            (
355                "executed_transactions_to_checkpoint".to_string(),
356                ThConfig::new_with_rm_prefix(
357                    32,
358                    mutexes,
359                    uniform_key,
360                    apply_relocation_filter(
361                        KeySpaceConfig::default(),
362                        pruner_watermark.clone(),
363                        |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
364                        false,
365                    ),
366                    digest_prefix.clone(),
367                ),
368            ),
369            (
370                "root_state_hash_by_epoch".to_string(),
371                ThConfig::new(8, 1, KeyType::uniform(1)),
372            ),
373            (
374                "epoch_start_configuration".to_string(),
375                ThConfig::new(0, 1, KeyType::uniform(1)),
376            ),
377            (
378                "pruned_checkpoint".to_string(),
379                ThConfig::new(0, 1, KeyType::uniform(1)),
380            ),
381            (
382                "expected_network_sui_amount".to_string(),
383                ThConfig::new(0, 1, KeyType::uniform(1)),
384            ),
385            (
386                "expected_storage_fund_imbalance".to_string(),
387                ThConfig::new(0, 1, KeyType::uniform(1)),
388            ),
389            (
390                "object_per_epoch_marker_table".to_string(),
391                ThConfig::new_with_config_indexing(
392                    KeyIndexing::VariableLength,
393                    mutexes,
394                    epoch_prefix_key,
395                    apply_relocation_filter(
396                        KeySpaceConfig::default(),
397                        pruner_watermark.clone(),
398                        |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
399                        true,
400                    ),
401                ),
402            ),
403            (
404                "object_per_epoch_marker_table_v2".to_string(),
405                ThConfig::new_with_config_indexing(
406                    KeyIndexing::fixed(EPOCH_MARKER_KEY_SIZE),
407                    mutexes,
408                    epoch_prefix_key,
409                    apply_relocation_filter(
410                        bloom_config.clone(),
411                        pruner_watermark.clone(),
412                        |k: EpochMarkerKey| k.0,
413                        true,
414                    ),
415                ),
416            ),
417            (
418                "executed_transaction_digests".to_string(),
419                ThConfig::new_with_config_indexing(
420                    // EpochId + (TransactionDigest)
421                    KeyIndexing::fixed(8 + (32 + 8)),
422                    transaction_mutexes,
423                    epoch_tx_digest_prefix_key,
424                    apply_relocation_filter(
425                        bloom_config.clone(),
426                        pruner_watermark.clone(),
427                        |(epoch_id, _): (EpochId, TransactionDigest)| epoch_id,
428                        true,
429                    ),
430                ),
431            ),
432            (
433                "highest_committed_checkpoint".to_string(),
434                ThConfig::new(0, 1, KeyType::uniform(1)),
435            ),
436        ];
437        Self::open_tables_read_write(
438            Self::path(parent_path),
439            MetricConf::new("perpetual")
440                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
441            configs.into_iter().collect(),
442        )
443    }
444
445    #[cfg(not(tidehunter))]
446    pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
447        Self::get_read_only_handle(
448            Self::path(parent_path),
449            None,
450            None,
451            MetricConf::new("perpetual_readonly"),
452        )
453    }
454
455    #[cfg(tidehunter)]
456    pub fn open_readonly(parent_path: &Path) -> Self {
457        Self::open(parent_path, None, None)
458    }
459
460    #[cfg(tidehunter)]
461    pub fn force_rebuild_control_region(&self) -> anyhow::Result<()> {
462        self.objects.db.force_rebuild_control_region()
463    }
464
465    /// Wait for tidehunter background threads to finish before allowing the caller
466    /// to e.g. rename the database directory. Consumes `Arc<Self>` so all internal
467    /// `Arc<Database>` clones held by the column family `DBMap`s are released as
468    /// part of the drop, leaving only the `Arc<Database>` extracted here.
469    #[cfg(tidehunter)]
470    pub fn wait_for_tidehunter_background_threads(self: Arc<Self>) {
471        let strong = Arc::strong_count(&self);
472        if strong != 1 {
473            println!(
474                "WARNING: wait_for_tidehunter_background_threads called with Arc<AuthorityPerpetualTables> strong_count={} (expected 1); other clones will keep DBMap.db Arc<Database> alive past drop(self) and the inner Database wait will warn/timeout",
475                strong,
476            );
477        }
478        let db = self.objects.db.clone();
479        drop(self);
480        db.wait_for_tidehunter_background_threads();
481    }
482
483    // This is used by indexer to find the correct version of dynamic field child object.
484    // We do not store the version of the child object, but because of lamport timestamp,
485    // we know the child must have version number less then or eq to the parent.
486    pub fn find_object_lt_or_eq_version(
487        &self,
488        object_id: ObjectID,
489        version: SequenceNumber,
490    ) -> SuiResult<Option<Object>> {
491        let mut iter = self.objects.reversed_safe_iter_with_bounds(
492            Some(ObjectKey::min_for_id(&object_id)),
493            Some(ObjectKey(object_id, version)),
494        )?;
495        match iter.next() {
496            Some(Ok((key, o))) => self.object(&key, o),
497            Some(Err(e)) => Err(e.into()),
498            None => Ok(None),
499        }
500    }
501
502    fn construct_object(
503        &self,
504        object_key: &ObjectKey,
505        store_object: StoreObjectValue,
506    ) -> Result<Object, SuiError> {
507        try_construct_object(object_key, store_object)
508    }
509
510    // Constructs `sui_types::object::Object` from `StoreObjectWrapper`.
511    // Returns `None` if object was deleted/wrapped
512    pub fn object(
513        &self,
514        object_key: &ObjectKey,
515        store_object: StoreObjectWrapper,
516    ) -> Result<Option<Object>, SuiError> {
517        let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
518            return Ok(None);
519        };
520        Ok(Some(self.construct_object(object_key, *store_object)?))
521    }
522
523    pub fn object_reference(
524        &self,
525        object_key: &ObjectKey,
526        store_object: StoreObjectWrapper,
527    ) -> Result<ObjectRef, SuiError> {
528        let obj_ref = match store_object.migrate().into_inner() {
529            StoreObject::Value(object) => self
530                .construct_object(object_key, *object)?
531                .compute_object_reference(),
532            StoreObject::Deleted => (
533                object_key.0,
534                object_key.1,
535                ObjectDigest::OBJECT_DIGEST_DELETED,
536            ),
537            StoreObject::Wrapped => (
538                object_key.0,
539                object_key.1,
540                ObjectDigest::OBJECT_DIGEST_WRAPPED,
541            ),
542        };
543        Ok(obj_ref)
544    }
545
546    pub fn tombstone_reference(
547        &self,
548        object_key: &ObjectKey,
549        store_object: &StoreObjectWrapper,
550    ) -> Result<Option<ObjectRef>, SuiError> {
551        let obj_ref = match store_object.inner() {
552            StoreObject::Deleted => Some((
553                object_key.0,
554                object_key.1,
555                ObjectDigest::OBJECT_DIGEST_DELETED,
556            )),
557            StoreObject::Wrapped => Some((
558                object_key.0,
559                object_key.1,
560                ObjectDigest::OBJECT_DIGEST_WRAPPED,
561            )),
562            _ => None,
563        };
564        Ok(obj_ref)
565    }
566
567    pub fn get_latest_object_ref_or_tombstone(
568        &self,
569        object_id: ObjectID,
570    ) -> Result<Option<ObjectRef>, SuiError> {
571        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
572            Some(ObjectKey::min_for_id(&object_id)),
573            Some(ObjectKey::max_for_id(&object_id)),
574        )?;
575
576        if let Some(Ok((object_key, value))) = iterator.next()
577            && object_key.0 == object_id
578        {
579            return Ok(Some(self.object_reference(&object_key, value)?));
580        }
581        Ok(None)
582    }
583
584    pub fn get_latest_object_or_tombstone(
585        &self,
586        object_id: ObjectID,
587    ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
588        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
589            Some(ObjectKey::min_for_id(&object_id)),
590            Some(ObjectKey::max_for_id(&object_id)),
591        )?;
592
593        if let Some(Ok((object_key, value))) = iterator.next()
594            && object_key.0 == object_id
595        {
596            return Ok(Some((object_key, value)));
597        }
598        Ok(None)
599    }
600
601    pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
602        Ok(self
603            .epoch_start_configuration
604            .get(&())?
605            .expect("Must have current epoch.")
606            .epoch_start_state()
607            .epoch())
608    }
609
610    pub fn set_epoch_start_configuration(
611        &self,
612        epoch_start_configuration: &EpochStartConfiguration,
613    ) -> SuiResult {
614        let mut wb = self.epoch_start_configuration.batch();
615        wb.insert_batch(
616            &self.epoch_start_configuration,
617            std::iter::once(((), epoch_start_configuration)),
618        )?;
619        wb.write()?;
620        Ok(())
621    }
622
623    pub fn get_highest_pruned_checkpoint(
624        &self,
625    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
626        self.pruned_checkpoint.get(&())
627    }
628
629    pub fn set_highest_pruned_checkpoint(
630        &self,
631        wb: &mut DBBatch,
632        checkpoint_number: CheckpointSequenceNumber,
633    ) -> SuiResult {
634        wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
635        Ok(())
636    }
637
638    pub fn get_highest_committed_checkpoint(
639        &self,
640    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
641        self.highest_committed_checkpoint.get(&())
642    }
643
644    /// Stage the highest-committed-checkpoint watermark into `wb`. Must be
645    /// called on the same batch that commits the checkpoint's transaction
646    /// outputs so the watermark and the outputs land atomically.
647    pub fn set_highest_committed_checkpoint(
648        &self,
649        wb: &mut DBBatch,
650        checkpoint_number: CheckpointSequenceNumber,
651    ) -> SuiResult {
652        wb.insert_batch(
653            &self.highest_committed_checkpoint,
654            [((), checkpoint_number)],
655        )?;
656        Ok(())
657    }
658
659    pub fn get_transaction(
660        &self,
661        digest: &TransactionDigest,
662    ) -> SuiResult<Option<TrustedTransaction>> {
663        let Some(transaction) = self.transactions.get(digest)? else {
664            return Ok(None);
665        };
666        Ok(Some(transaction))
667    }
668
669    pub fn list_transactions_from(
670        &self,
671        start: Option<TransactionDigest>,
672        limit: usize,
673    ) -> Result<Vec<TransactionDigest>, typed_store::TypedStoreError> {
674        let iter = self.transactions.safe_iter_with_bounds(start, None);
675        let mut result = Vec::with_capacity(limit);
676        for item in iter.take(limit) {
677            let (digest, _) = item?;
678            result.push(digest);
679        }
680        Ok(result)
681    }
682
683    pub fn get_executed_effects_digest(
684        &self,
685        tx_digest: &TransactionDigest,
686    ) -> Result<Option<TransactionEffectsDigest>, typed_store::TypedStoreError> {
687        self.executed_effects.get(tx_digest)
688    }
689
690    pub fn get_effects_by_digest(
691        &self,
692        effects_digest: &TransactionEffectsDigest,
693    ) -> Result<Option<TransactionEffects>, typed_store::TypedStoreError> {
694        self.effects.get(effects_digest)
695    }
696
697    /// Batch insert executed transaction digests for a given epoch.
698    /// Used by formal snapshot restore to backfill transaction digests from the previous epoch.
699    pub fn insert_executed_transaction_digests_batch(
700        &self,
701        epoch: EpochId,
702        digests: impl Iterator<Item = TransactionDigest>,
703    ) -> SuiResult {
704        let mut batch = self.executed_transaction_digests.batch();
705        batch.insert_batch(
706            &self.executed_transaction_digests,
707            digests.map(|digest| ((epoch, digest), ())),
708        )?;
709        batch.write()?;
710        Ok(())
711    }
712
713    pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
714        let Some(effect_digest) = self.executed_effects.get(digest)? else {
715            return Ok(None);
716        };
717        Ok(self.effects.get(&effect_digest)?)
718    }
719
720    pub(crate) fn was_transaction_executed_in_last_epoch(
721        &self,
722        digest: &TransactionDigest,
723        current_epoch: EpochId,
724    ) -> bool {
725        if current_epoch == 0 {
726            return false;
727        }
728        self.executed_transaction_digests
729            .contains_key(&(current_epoch - 1, *digest))
730            .expect("db error")
731    }
732
733    // DEPRECATED as the backing table has been moved to authority_per_epoch_store.
734    // Please do not add new accessors/callsites.
735    pub fn get_checkpoint_sequence_number(
736        &self,
737        digest: &TransactionDigest,
738    ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
739        Ok(self.executed_transactions_to_checkpoint.get(digest)?)
740    }
741
742    pub fn set_highest_pruned_checkpoint_without_wb(
743        &self,
744        checkpoint_number: CheckpointSequenceNumber,
745    ) -> SuiResult {
746        let mut wb = self.pruned_checkpoint.batch();
747        self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
748        wb.write()?;
749        Ok(())
750    }
751
752    pub fn database_is_empty(&self) -> SuiResult<bool> {
753        Ok(self.objects.safe_iter().next().is_none())
754    }
755
756    pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
757        LiveSetIter {
758            iter: Box::new(self.objects.safe_iter()),
759            tables: self,
760            prev: None,
761            include_wrapped_object,
762        }
763    }
764
765    pub fn range_iter_live_object_set(
766        &self,
767        lower_bound: Option<ObjectID>,
768        upper_bound: Option<ObjectID>,
769        include_wrapped_object: bool,
770    ) -> LiveSetIter<'_> {
771        let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
772        let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
773
774        LiveSetIter {
775            iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
776            tables: self,
777            prev: None,
778            include_wrapped_object,
779        }
780    }
781
782    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
783        // This checkpoints the entire db and not just objects table
784        self.objects.checkpoint_db(path).map_err(Into::into)
785    }
786
787    pub fn insert_root_state_hash(
788        &self,
789        epoch: EpochId,
790        last_checkpoint_of_epoch: CheckpointSequenceNumber,
791        hash: GlobalStateHash,
792    ) -> SuiResult {
793        self.root_state_hash_by_epoch
794            .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
795        Ok(())
796    }
797
798    pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
799        let object_reference = object.compute_object_reference();
800        let wrapper = get_store_object(object);
801        let mut wb = self.objects.batch();
802        wb.insert_batch(
803            &self.objects,
804            std::iter::once((ObjectKey::from(object_reference), wrapper)),
805        )?;
806        wb.write()?;
807        Ok(())
808    }
809
810    // fallible get object methods for sui-tool, which may need to attempt to read a corrupted database
811    pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
812        let obj_entry = self
813            .objects
814            .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
815            .next();
816
817        match obj_entry.transpose()? {
818            Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
819                Ok(self.object(&ObjectKey(obj_id, version), obj)?)
820            }
821            _ => Ok(None),
822        }
823    }
824
825    pub fn get_object_by_key_fallible(
826        &self,
827        object_id: &ObjectID,
828        version: VersionNumber,
829    ) -> SuiResult<Option<Object>> {
830        Ok(self
831            .objects
832            .get(&ObjectKey(*object_id, version))?
833            .and_then(|object| {
834                self.object(&ObjectKey(*object_id, version), object)
835                    .expect("object construction error")
836            }))
837    }
838}
839
840impl ObjectStore for AuthorityPerpetualTables {
841    /// Read an object and return it, or Ok(None) if the object was not found.
842    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
843        self.get_object_fallible(object_id).expect("db error")
844    }
845
846    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
847        self.get_object_by_key_fallible(object_id, version)
848            .expect("db error")
849    }
850}
851
852pub struct LiveSetIter<'a> {
853    iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
854    tables: &'a AuthorityPerpetualTables,
855    prev: Option<(ObjectKey, StoreObjectWrapper)>,
856    /// Whether a wrapped object is considered as a live object.
857    include_wrapped_object: bool,
858}
859
860#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
861pub enum LiveObject {
862    Normal(Object),
863    Wrapped(ObjectKey),
864}
865
866impl LiveObject {
867    pub fn object_id(&self) -> ObjectID {
868        match self {
869            LiveObject::Normal(obj) => obj.id(),
870            LiveObject::Wrapped(key) => key.0,
871        }
872    }
873
874    pub fn version(&self) -> SequenceNumber {
875        match self {
876            LiveObject::Normal(obj) => obj.version(),
877            LiveObject::Wrapped(key) => key.1,
878        }
879    }
880
881    pub fn object_reference(&self) -> ObjectRef {
882        match self {
883            LiveObject::Normal(obj) => obj.compute_object_reference(),
884            LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
885        }
886    }
887}
888
889impl LiveSetIter<'_> {
890    fn store_object_wrapper_to_live_object(
891        &self,
892        object_key: ObjectKey,
893        store_object: StoreObjectWrapper,
894    ) -> Option<LiveObject> {
895        match store_object.migrate().into_inner() {
896            StoreObject::Value(object) => {
897                let object = self
898                    .tables
899                    .construct_object(&object_key, *object)
900                    .expect("Constructing object from store cannot fail");
901                Some(LiveObject::Normal(object))
902            }
903            StoreObject::Wrapped => {
904                if self.include_wrapped_object {
905                    Some(LiveObject::Wrapped(object_key))
906                } else {
907                    None
908                }
909            }
910            StoreObject::Deleted => None,
911        }
912    }
913}
914
915impl Iterator for LiveSetIter<'_> {
916    type Item = LiveObject;
917
918    fn next(&mut self) -> Option<Self::Item> {
919        loop {
920            if let Some(Ok((next_key, next_value))) = self.iter.next() {
921                let prev = self.prev.take();
922                self.prev = Some((next_key, next_value));
923
924                if let Some((prev_key, prev_value)) = prev
925                    && prev_key.0 != next_key.0
926                {
927                    let live_object =
928                        self.store_object_wrapper_to_live_object(prev_key, prev_value);
929                    if live_object.is_some() {
930                        return live_object;
931                    }
932                }
933                continue;
934            }
935            if let Some((key, value)) = self.prev.take() {
936                let live_object = self.store_object_wrapper_to_live_object(key, value);
937                if live_object.is_some() {
938                    return live_object;
939                }
940            }
941            return None;
942        }
943    }
944}
945
946// These functions are used to initialize the DB tables
947fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
948    DBOptions {
949        options: db_options
950            .clone()
951            .optimize_for_write_throughput()
952            .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
953            .options,
954        rw_options: db_options.rw_options.set_ignore_range_deletions(false),
955    }
956}
957
958fn objects_table_config(db_options: DBOptions) -> DBOptions {
959    db_options
960        .optimize_for_write_throughput()
961        .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
962}
963
964fn transactions_table_config(db_options: DBOptions) -> DBOptions {
965    db_options
966        .optimize_for_write_throughput()
967        .optimize_for_point_lookup(
968            read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
969        )
970}
971
972fn effects_table_config(db_options: DBOptions) -> DBOptions {
973    db_options
974        .optimize_for_write_throughput()
975        .optimize_for_point_lookup(
976            read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
977        )
978}