sui_core/authority/
authority_store_tables.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6#[cfg(tidehunter)]
7use crate::authority::epoch_marker_key::EPOCH_MARKER_KEY_SIZE;
8use crate::authority::epoch_marker_key::EpochMarkerKey;
9use serde::{Deserialize, Serialize};
10use std::path::Path;
11use std::sync::atomic::AtomicU64;
12use sui_types::base_types::SequenceNumber;
13use sui_types::effects::{TransactionEffects, TransactionEvents};
14use sui_types::global_state_hash::GlobalStateHash;
15use sui_types::storage::MarkerValue;
16use typed_store::metrics::SamplingInterval;
17use typed_store::rocks::{
18    DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
19    read_size_from_env,
20};
21use typed_store::traits::Map;
22
23use crate::authority::authority_store_types::{
24    StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
25};
26use crate::authority::epoch_start_configuration::EpochStartConfiguration;
27use typed_store::{DBMapUtils, DbIterator};
28
29const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
30pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
31const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
32const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
33
34/// Options to apply to every column family of the `perpetual` DB.
35#[derive(Default)]
36pub struct AuthorityPerpetualTablesOptions {
37    /// Whether to enable write stalling on all column families.
38    pub enable_write_stall: bool,
39    /// On tidehunter, attach the per-keyspace objects compactor that retains
40    /// only the latest version per ObjectID. Mutually exclusive with the
41    /// object pruner — see `AuthorityStorePruner::new`.
42    pub enable_objects_compactor: bool,
43}
44
45impl AuthorityPerpetualTablesOptions {
46    fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
47        if !self.enable_write_stall {
48            db_options = db_options.disable_write_throttling();
49        }
50        db_options
51    }
52}
53
54/// AuthorityPerpetualTables contains data that must be preserved from one epoch to the next.
55#[derive(DBMapUtils)]
56#[cfg_attr(tidehunter, tidehunter)]
57pub struct AuthorityPerpetualTables {
58    /// This is a map between the object (ID, version) and the latest state of the object, namely the
59    /// state that is needed to process new transactions.
60    /// State is represented by `StoreObject` enum, which is either a move module or a move object.
61    ///
62    /// Note that while this map can store all versions of an object, we will eventually
63    /// prune old object versions from the db.
64    ///
65    /// IMPORTANT: object versions must *only* be pruned if they appear as inputs in some
66    /// TransactionEffects. Simply pruning all objects but the most recent is an error!
67    /// This is because there can be partially executed transactions whose effects have not yet
68    /// been written out, and which must be retried. But, they cannot be retried unless their input
69    /// objects are still accessible!
70    pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
71
72    /// This is a map between object references of currently active objects that can be mutated.
73    ///
74    /// For old epochs, it may also contain the transaction that they are lock on for use by this
75    /// specific validator. The transaction locks themselves are now in AuthorityPerEpochStore.
76    #[rename = "owned_object_transaction_locks"]
77    pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
78
79    /// This is a map between the transaction digest and the corresponding transaction that's known to be
80    /// executable. This means that it may have been executed locally, or it may have been synced through
81    /// state-sync but hasn't been executed yet.
82    pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
83
84    /// A map between the transaction digest of a certificate to the effects of its execution.
85    /// We store effects into this table in two different cases:
86    /// 1. When a transaction is synced through state_sync, we store the effects here. These effects
87    ///    are known to be final in the network, but may not have been executed locally yet.
88    /// 2. When the transaction is executed locally on this node, we store the effects here. This means that
89    ///    it's possible to store the same effects twice (once for the synced transaction, and once for the executed).
90    ///
91    /// It's also possible for the effects to be reverted if the transaction didn't make it into the epoch.
92    pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
93
94    /// Transactions that have been executed locally on this node. We need this table since the `effects` table
95    /// doesn't say anything about the execution status of the transaction on this node. When we wait for transactions
96    /// to be executed, we wait for them to appear in this table. When we revert transactions, we remove them from both
97    /// tables.
98    pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
99
100    // Events keyed by the digest of the transaction that produced them.
101    pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
102
103    // Loaded (and unchanged) runtime object references.
104    pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
105
106    /// DEPRECATED in favor of the table of the same name in authority_per_epoch_store.
107    /// Please do not add new accessors/callsites.
108    /// When transaction is executed via checkpoint executor, we store association here
109    pub(crate) executed_transactions_to_checkpoint:
110        DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
111
112    // Finalized root state hash for epoch, to be included in CheckpointSummary
113    // of last checkpoint of epoch. These values should only ever be written once
114    // and never changed
115    pub(crate) root_state_hash_by_epoch:
116        DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
117
118    /// Parameters of the system fixed at the epoch start
119    pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
120
121    /// A singleton table that stores latest pruned checkpoint. Used to keep objects pruner progress
122    pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
123
124    /// A singleton table recording the highest checkpoint whose transaction
125    /// outputs (objects, effects, etc.) are durably committed to this store.
126    /// It is written in the *same* atomic batch as those outputs (see
127    /// `AuthorityStore::build_db_batch` and the checkpoint executor), so it is
128    /// always consistent with the live object set even after an unclean stop.
129    ///
130    /// This differs from the checkpoint store's `highest_executed` watermark,
131    /// which is bumped in a separate write after the outputs are committed: an
132    /// abrupt stop can leave the object writes durable while `highest_executed`
133    /// still lags. Consumers that read the live object set directly (e.g. the
134    /// embedded rpc-store's bulk restore) use this watermark instead, so they
135    /// never observe objects beyond the checkpoint they think they are at.
136    pub(crate) highest_committed_checkpoint: DBMap<(), CheckpointSequenceNumber>,
137
138    /// Expected total amount of SUI in the network. This is expected to remain constant
139    /// throughout the lifetime of the network. We check it at the end of each epoch if
140    /// expensive checks are enabled. We cannot use 10B today because in tests we often
141    /// inject extra gas objects into genesis.
142    pub(crate) expected_network_sui_amount: DBMap<(), u64>,
143
144    /// Expected imbalance between storage fund balance and the sum of storage rebate of all live objects.
145    /// This could be non-zero due to bugs in earlier protocol versions.
146    /// This number is the result of storage_fund_balance - sum(storage_rebate).
147    pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
148
149    /// Table that stores the set of received objects and deleted objects and the version at
150    /// which they were received. This is used to prevent possible race conditions around receiving
151    /// objects (since they are not locked by the transaction manager) and for tracking shared
152    /// objects that have been deleted. This table is meant to be pruned per-epoch, and all
153    /// previous epochs other than the current epoch may be pruned safely.
154    pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
155    pub(crate) object_per_epoch_marker_table_v2: DBMap<EpochMarkerKey, MarkerValue>,
156
157    /// Tracks executed transaction digests across epochs.
158    /// Used to support address balance gas payments feature.
159    /// This table uses epoch-prefixed keys to support efficient pruning via range delete.
160    pub(crate) executed_transaction_digests: DBMap<(EpochId, TransactionDigest), ()>,
161}
162
163impl AuthorityPerpetualTables {
164    pub fn path(parent_path: &Path) -> PathBuf {
165        parent_path.join("perpetual")
166    }
167
168    #[cfg(not(tidehunter))]
169    pub fn open(
170        parent_path: &Path,
171        db_options_override: Option<AuthorityPerpetualTablesOptions>,
172        _pruner_watermark: Option<Arc<AtomicU64>>,
173    ) -> Self {
174        let db_options_override = db_options_override.unwrap_or_default();
175        let db_options = db_options_override
176            .apply_to(default_db_options().optimize_db_for_write_throughput(4, false));
177        let table_options = DBMapTableConfigMap::new(BTreeMap::from([
178            (
179                "objects".to_string(),
180                objects_table_config(db_options.clone()),
181            ),
182            (
183                "owned_object_transaction_locks".to_string(),
184                owned_object_transaction_locks_table_config(db_options.clone()),
185            ),
186            (
187                "transactions".to_string(),
188                transactions_table_config(db_options.clone()),
189            ),
190            (
191                "effects".to_string(),
192                effects_table_config(db_options.clone()),
193            ),
194        ]));
195
196        Self::open_tables_read_write(
197            Self::path(parent_path),
198            MetricConf::new("perpetual")
199                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
200            Some(db_options.options),
201            Some(table_options),
202        )
203    }
204
205    #[cfg(tidehunter)]
206    pub fn open(
207        parent_path: &Path,
208        db_options_override: Option<AuthorityPerpetualTablesOptions>,
209        pruner_watermark: Option<Arc<AtomicU64>>,
210    ) -> Self {
211        use crate::authority::authority_store_pruner::apply_relocation_filter;
212        tracing::warn!("AuthorityPerpetualTables using tidehunter");
213        use typed_store::tidehunter_util::{
214            Bytes, Decision, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
215            default_cells_per_mutex, default_max_dirty_keys, default_mutex_count,
216            default_value_cache_size,
217        };
218        let mutexes = default_mutex_count() * 2;
219        let transaction_mutexes = mutexes * 4;
220        let value_cache_size = default_value_cache_size();
221        // effectively disables pruning if not set
222        let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
223
224        let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
225        let objects_compactor = |iter: &mut dyn DoubleEndedIterator<Item = &Bytes>| {
226            let mut retain = HashSet::new();
227            let mut previous: Option<&[u8]> = None;
228            const OID_SIZE: usize = 32;
229            for key in iter.rev() {
230                if let Some(prev) = previous {
231                    if prev == &key[..OID_SIZE] {
232                        continue;
233                    }
234                }
235                previous = Some(&key[..OID_SIZE]);
236                retain.insert(key.clone());
237            }
238            retain
239        };
240        let mut digest_prefix = vec![0; 8];
241        digest_prefix[7] = 32;
242        let uniform_key = KeyType::uniform(default_cells_per_mutex());
243        let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
244        // TransactionDigest is serialized with an 8-byte prefix, so we include it in the key calculation
245        let epoch_tx_digest_prefix_key =
246            KeyType::from_prefix_bits((8/*EpochId*/ + 8/*TransactionDigest prefix*/) * 8 + 12);
247        let object_indexing = KeyIndexing::fixed(32 + 8); //  KeyIndexing::key_reduction(32 + 8, 16..(32 + 8));
248        // todo can figure way to scramble off 8 bytes in the middle
249        let obj_ref_size = 32 + 8 + 32 + 8;
250        let owned_object_transaction_locks_indexing =
251            KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
252
253        let mut objects_config = KeySpaceConfig::new()
254            .with_max_dirty_keys(16 * default_max_dirty_keys())
255            .with_value_cache_size(value_cache_size);
256        if matches!(db_options_override, Some(options) if options.enable_objects_compactor) {
257            objects_config = objects_config.with_compactor(Box::new(objects_compactor));
258        }
259
260        let configs = vec![
261            (
262                "objects".to_string(),
263                ThConfig::new_with_config_indexing(
264                    object_indexing,
265                    mutexes * 4,
266                    KeyType::uniform(1),
267                    objects_config,
268                ),
269            ),
270            (
271                "owned_object_transaction_locks".to_string(),
272                ThConfig::new_with_config_indexing(
273                    owned_object_transaction_locks_indexing,
274                    mutexes * 16,
275                    KeyType::uniform(default_cells_per_mutex()),
276                    bloom_config
277                        .clone()
278                        .with_max_dirty_keys(16 * default_max_dirty_keys()),
279                ),
280            ),
281            (
282                "transactions".to_string(),
283                ThConfig::new_with_rm_prefix_indexing(
284                    KeyIndexing::key_reduction(32, 0..16),
285                    transaction_mutexes,
286                    uniform_key,
287                    KeySpaceConfig::new()
288                        .with_value_cache_size(value_cache_size)
289                        .with_relocation_filter(|_, _| Decision::Remove),
290                    digest_prefix.clone(),
291                ),
292            ),
293            (
294                "effects".to_string(),
295                ThConfig::new_with_rm_prefix_indexing(
296                    KeyIndexing::key_reduction(32, 0..16),
297                    transaction_mutexes,
298                    uniform_key,
299                    apply_relocation_filter(
300                        bloom_config.clone().with_value_cache_size(value_cache_size),
301                        pruner_watermark.clone(),
302                        |effects: TransactionEffects| effects.executed_epoch(),
303                        false,
304                    ),
305                    digest_prefix.clone(),
306                ),
307            ),
308            (
309                "executed_effects".to_string(),
310                ThConfig::new_with_rm_prefix_indexing(
311                    KeyIndexing::key_reduction(32, 0..16),
312                    transaction_mutexes,
313                    uniform_key,
314                    bloom_config
315                        .clone()
316                        .with_value_cache_size(value_cache_size)
317                        .with_relocation_filter(|_, _| Decision::Remove),
318                    digest_prefix.clone(),
319                ),
320            ),
321            (
322                "events".to_string(),
323                ThConfig::new_with_rm_prefix(
324                    32 + 8,
325                    mutexes,
326                    uniform_key,
327                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
328                    digest_prefix.clone(),
329                ),
330            ),
331            (
332                "events_2".to_string(),
333                ThConfig::new_with_rm_prefix(
334                    32,
335                    mutexes,
336                    uniform_key,
337                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
338                    digest_prefix.clone(),
339                ),
340            ),
341            (
342                "unchanged_loaded_runtime_objects".to_string(),
343                ThConfig::new_with_rm_prefix(
344                    32,
345                    mutexes,
346                    uniform_key,
347                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
348                    digest_prefix.clone(),
349                ),
350            ),
351            (
352                "executed_transactions_to_checkpoint".to_string(),
353                ThConfig::new_with_rm_prefix(
354                    32,
355                    mutexes,
356                    uniform_key,
357                    apply_relocation_filter(
358                        KeySpaceConfig::default(),
359                        pruner_watermark.clone(),
360                        |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
361                        false,
362                    ),
363                    digest_prefix.clone(),
364                ),
365            ),
366            (
367                "root_state_hash_by_epoch".to_string(),
368                ThConfig::new(8, 1, KeyType::uniform(1)),
369            ),
370            (
371                "epoch_start_configuration".to_string(),
372                ThConfig::new(0, 1, KeyType::uniform(1)),
373            ),
374            (
375                "pruned_checkpoint".to_string(),
376                ThConfig::new(0, 1, KeyType::uniform(1)),
377            ),
378            (
379                "highest_committed_checkpoint".to_string(),
380                ThConfig::new(0, 1, KeyType::uniform(1)),
381            ),
382            (
383                "expected_network_sui_amount".to_string(),
384                ThConfig::new(0, 1, KeyType::uniform(1)),
385            ),
386            (
387                "expected_storage_fund_imbalance".to_string(),
388                ThConfig::new(0, 1, KeyType::uniform(1)),
389            ),
390            (
391                "object_per_epoch_marker_table".to_string(),
392                ThConfig::new_with_config_indexing(
393                    KeyIndexing::VariableLength,
394                    mutexes,
395                    epoch_prefix_key,
396                    apply_relocation_filter(
397                        KeySpaceConfig::default(),
398                        pruner_watermark.clone(),
399                        |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
400                        true,
401                    ),
402                ),
403            ),
404            (
405                "object_per_epoch_marker_table_v2".to_string(),
406                ThConfig::new_with_config_indexing(
407                    KeyIndexing::fixed(EPOCH_MARKER_KEY_SIZE),
408                    mutexes,
409                    epoch_prefix_key,
410                    apply_relocation_filter(
411                        bloom_config.clone(),
412                        pruner_watermark.clone(),
413                        |k: EpochMarkerKey| k.0,
414                        true,
415                    ),
416                ),
417            ),
418            (
419                "executed_transaction_digests".to_string(),
420                ThConfig::new_with_config_indexing(
421                    // EpochId + (TransactionDigest)
422                    KeyIndexing::fixed(8 + (32 + 8)),
423                    transaction_mutexes,
424                    epoch_tx_digest_prefix_key,
425                    apply_relocation_filter(
426                        bloom_config.clone(),
427                        pruner_watermark.clone(),
428                        |(epoch_id, _): (EpochId, TransactionDigest)| epoch_id,
429                        true,
430                    ),
431                ),
432            ),
433        ];
434        Self::open_tables_read_write(
435            Self::path(parent_path),
436            MetricConf::new("perpetual")
437                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
438            configs.into_iter().collect(),
439        )
440    }
441
442    #[cfg(not(tidehunter))]
443    pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
444        Self::get_read_only_handle(
445            Self::path(parent_path),
446            None,
447            None,
448            MetricConf::new("perpetual_readonly"),
449        )
450    }
451
452    #[cfg(tidehunter)]
453    pub fn open_readonly(parent_path: &Path) -> Self {
454        Self::open(parent_path, None, None)
455    }
456
457    #[cfg(tidehunter)]
458    pub fn force_rebuild_control_region(&self) -> anyhow::Result<()> {
459        self.objects.db.force_rebuild_control_region()
460    }
461
462    /// Wait for tidehunter background threads to finish before allowing the caller
463    /// to e.g. rename the database directory. Consumes `Arc<Self>` so all internal
464    /// `Arc<Database>` clones held by the column family `DBMap`s are released as
465    /// part of the drop, leaving only the `Arc<Database>` extracted here.
466    #[cfg(tidehunter)]
467    pub fn wait_for_tidehunter_background_threads(self: Arc<Self>) {
468        let strong = Arc::strong_count(&self);
469        if strong != 1 {
470            println!(
471                "WARNING: wait_for_tidehunter_background_threads called with Arc<AuthorityPerpetualTables> strong_count={} (expected 1); other clones will keep DBMap.db Arc<Database> alive past drop(self) and the inner Database wait will warn/timeout",
472                strong,
473            );
474        }
475        let db = self.objects.db.clone();
476        drop(self);
477        db.wait_for_tidehunter_background_threads();
478    }
479
480    // This is used by indexer to find the correct version of dynamic field child object.
481    // We do not store the version of the child object, but because of lamport timestamp,
482    // we know the child must have version number less then or eq to the parent.
483    pub fn find_object_lt_or_eq_version(
484        &self,
485        object_id: ObjectID,
486        version: SequenceNumber,
487    ) -> SuiResult<Option<Object>> {
488        let mut iter = self.objects.reversed_safe_iter_with_bounds(
489            Some(ObjectKey::min_for_id(&object_id)),
490            Some(ObjectKey(object_id, version)),
491        )?;
492        match iter.next() {
493            Some(Ok((key, o))) => self.object(&key, o),
494            Some(Err(e)) => Err(e.into()),
495            None => Ok(None),
496        }
497    }
498
499    fn construct_object(
500        &self,
501        object_key: &ObjectKey,
502        store_object: StoreObjectValue,
503    ) -> Result<Object, SuiError> {
504        try_construct_object(object_key, store_object)
505    }
506
507    // Constructs `sui_types::object::Object` from `StoreObjectWrapper`.
508    // Returns `None` if object was deleted/wrapped
509    pub fn object(
510        &self,
511        object_key: &ObjectKey,
512        store_object: StoreObjectWrapper,
513    ) -> Result<Option<Object>, SuiError> {
514        let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
515            return Ok(None);
516        };
517        Ok(Some(self.construct_object(object_key, *store_object)?))
518    }
519
520    pub fn object_reference(
521        &self,
522        object_key: &ObjectKey,
523        store_object: StoreObjectWrapper,
524    ) -> Result<ObjectRef, SuiError> {
525        let obj_ref = match store_object.migrate().into_inner() {
526            StoreObject::Value(object) => self
527                .construct_object(object_key, *object)?
528                .compute_object_reference(),
529            StoreObject::Deleted => (
530                object_key.0,
531                object_key.1,
532                ObjectDigest::OBJECT_DIGEST_DELETED,
533            ),
534            StoreObject::Wrapped => (
535                object_key.0,
536                object_key.1,
537                ObjectDigest::OBJECT_DIGEST_WRAPPED,
538            ),
539        };
540        Ok(obj_ref)
541    }
542
543    pub fn tombstone_reference(
544        &self,
545        object_key: &ObjectKey,
546        store_object: &StoreObjectWrapper,
547    ) -> Result<Option<ObjectRef>, SuiError> {
548        let obj_ref = match store_object.inner() {
549            StoreObject::Deleted => Some((
550                object_key.0,
551                object_key.1,
552                ObjectDigest::OBJECT_DIGEST_DELETED,
553            )),
554            StoreObject::Wrapped => Some((
555                object_key.0,
556                object_key.1,
557                ObjectDigest::OBJECT_DIGEST_WRAPPED,
558            )),
559            _ => None,
560        };
561        Ok(obj_ref)
562    }
563
564    pub fn get_latest_object_ref_or_tombstone(
565        &self,
566        object_id: ObjectID,
567    ) -> Result<Option<ObjectRef>, SuiError> {
568        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
569            Some(ObjectKey::min_for_id(&object_id)),
570            Some(ObjectKey::max_for_id(&object_id)),
571        )?;
572
573        if let Some(Ok((object_key, value))) = iterator.next()
574            && object_key.0 == object_id
575        {
576            return Ok(Some(self.object_reference(&object_key, value)?));
577        }
578        Ok(None)
579    }
580
581    pub fn get_latest_object_or_tombstone(
582        &self,
583        object_id: ObjectID,
584    ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
585        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
586            Some(ObjectKey::min_for_id(&object_id)),
587            Some(ObjectKey::max_for_id(&object_id)),
588        )?;
589
590        if let Some(Ok((object_key, value))) = iterator.next()
591            && object_key.0 == object_id
592        {
593            return Ok(Some((object_key, value)));
594        }
595        Ok(None)
596    }
597
598    pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
599        Ok(self
600            .epoch_start_configuration
601            .get(&())?
602            .expect("Must have current epoch.")
603            .epoch_start_state()
604            .epoch())
605    }
606
607    pub fn set_epoch_start_configuration(
608        &self,
609        epoch_start_configuration: &EpochStartConfiguration,
610    ) -> SuiResult {
611        let mut wb = self.epoch_start_configuration.batch();
612        wb.insert_batch(
613            &self.epoch_start_configuration,
614            std::iter::once(((), epoch_start_configuration)),
615        )?;
616        wb.write()?;
617        Ok(())
618    }
619
620    pub fn get_highest_pruned_checkpoint(
621        &self,
622    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
623        self.pruned_checkpoint.get(&())
624    }
625
626    pub fn set_highest_pruned_checkpoint(
627        &self,
628        wb: &mut DBBatch,
629        checkpoint_number: CheckpointSequenceNumber,
630    ) -> SuiResult {
631        wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
632        Ok(())
633    }
634
635    pub fn get_highest_committed_checkpoint(
636        &self,
637    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
638        self.highest_committed_checkpoint.get(&())
639    }
640
641    /// Stage the highest-committed-checkpoint watermark into `wb`. Must be
642    /// called on the same batch that commits the checkpoint's transaction
643    /// outputs so the watermark and the outputs land atomically.
644    pub fn set_highest_committed_checkpoint(
645        &self,
646        wb: &mut DBBatch,
647        checkpoint_number: CheckpointSequenceNumber,
648    ) -> SuiResult {
649        wb.insert_batch(
650            &self.highest_committed_checkpoint,
651            [((), checkpoint_number)],
652        )?;
653        Ok(())
654    }
655
656    pub fn get_transaction(
657        &self,
658        digest: &TransactionDigest,
659    ) -> SuiResult<Option<TrustedTransaction>> {
660        let Some(transaction) = self.transactions.get(digest)? else {
661            return Ok(None);
662        };
663        Ok(Some(transaction))
664    }
665
666    pub fn list_transactions_from(
667        &self,
668        start: Option<TransactionDigest>,
669        limit: usize,
670    ) -> Result<Vec<TransactionDigest>, typed_store::TypedStoreError> {
671        let iter = self.transactions.safe_iter_with_bounds(start, None);
672        let mut result = Vec::with_capacity(limit);
673        for item in iter.take(limit) {
674            let (digest, _) = item?;
675            result.push(digest);
676        }
677        Ok(result)
678    }
679
680    pub fn get_executed_effects_digest(
681        &self,
682        tx_digest: &TransactionDigest,
683    ) -> Result<Option<TransactionEffectsDigest>, typed_store::TypedStoreError> {
684        self.executed_effects.get(tx_digest)
685    }
686
687    pub fn get_effects_by_digest(
688        &self,
689        effects_digest: &TransactionEffectsDigest,
690    ) -> Result<Option<TransactionEffects>, typed_store::TypedStoreError> {
691        self.effects.get(effects_digest)
692    }
693
694    /// Batch insert executed transaction digests for a given epoch.
695    /// Used by formal snapshot restore to backfill transaction digests from the previous epoch.
696    pub fn insert_executed_transaction_digests_batch(
697        &self,
698        epoch: EpochId,
699        digests: impl Iterator<Item = TransactionDigest>,
700    ) -> SuiResult {
701        let mut batch = self.executed_transaction_digests.batch();
702        batch.insert_batch(
703            &self.executed_transaction_digests,
704            digests.map(|digest| ((epoch, digest), ())),
705        )?;
706        batch.write()?;
707        Ok(())
708    }
709
710    pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
711        let Some(effect_digest) = self.executed_effects.get(digest)? else {
712            return Ok(None);
713        };
714        Ok(self.effects.get(&effect_digest)?)
715    }
716
717    pub(crate) fn was_transaction_executed_in_last_epoch(
718        &self,
719        digest: &TransactionDigest,
720        current_epoch: EpochId,
721    ) -> bool {
722        if current_epoch == 0 {
723            return false;
724        }
725        self.executed_transaction_digests
726            .contains_key(&(current_epoch - 1, *digest))
727            .expect("db error")
728    }
729
730    // DEPRECATED as the backing table has been moved to authority_per_epoch_store.
731    // Please do not add new accessors/callsites.
732    pub fn get_checkpoint_sequence_number(
733        &self,
734        digest: &TransactionDigest,
735    ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
736        Ok(self.executed_transactions_to_checkpoint.get(digest)?)
737    }
738
739    pub fn set_highest_pruned_checkpoint_without_wb(
740        &self,
741        checkpoint_number: CheckpointSequenceNumber,
742    ) -> SuiResult {
743        let mut wb = self.pruned_checkpoint.batch();
744        self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
745        wb.write()?;
746        Ok(())
747    }
748
749    pub fn database_is_empty(&self) -> SuiResult<bool> {
750        Ok(self.objects.safe_iter().next().is_none())
751    }
752
753    pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
754        LiveSetIter {
755            iter: Box::new(self.objects.safe_iter()),
756            tables: self,
757            prev: None,
758            include_wrapped_object,
759        }
760    }
761
762    pub fn range_iter_live_object_set(
763        &self,
764        lower_bound: Option<ObjectID>,
765        upper_bound: Option<ObjectID>,
766        include_wrapped_object: bool,
767    ) -> LiveSetIter<'_> {
768        let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
769        let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
770
771        LiveSetIter {
772            iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
773            tables: self,
774            prev: None,
775            include_wrapped_object,
776        }
777    }
778
779    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
780        // This checkpoints the entire db and not just objects table
781        self.objects.checkpoint_db(path).map_err(Into::into)
782    }
783
784    pub fn insert_root_state_hash(
785        &self,
786        epoch: EpochId,
787        last_checkpoint_of_epoch: CheckpointSequenceNumber,
788        hash: GlobalStateHash,
789    ) -> SuiResult {
790        self.root_state_hash_by_epoch
791            .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
792        Ok(())
793    }
794
795    pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
796        let object_reference = object.compute_object_reference();
797        let wrapper = get_store_object(object);
798        let mut wb = self.objects.batch();
799        wb.insert_batch(
800            &self.objects,
801            std::iter::once((ObjectKey::from(object_reference), wrapper)),
802        )?;
803        wb.write()?;
804        Ok(())
805    }
806
807    // fallible get object methods for sui-tool, which may need to attempt to read a corrupted database
808    pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
809        let obj_entry = self
810            .objects
811            .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
812            .next();
813
814        match obj_entry.transpose()? {
815            Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
816                Ok(self.object(&ObjectKey(obj_id, version), obj)?)
817            }
818            _ => Ok(None),
819        }
820    }
821
822    pub fn get_object_by_key_fallible(
823        &self,
824        object_id: &ObjectID,
825        version: VersionNumber,
826    ) -> SuiResult<Option<Object>> {
827        Ok(self
828            .objects
829            .get(&ObjectKey(*object_id, version))?
830            .and_then(|object| {
831                self.object(&ObjectKey(*object_id, version), object)
832                    .expect("object construction error")
833            }))
834    }
835}
836
837impl ObjectStore for AuthorityPerpetualTables {
838    /// Read an object and return it, or Ok(None) if the object was not found.
839    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
840        self.get_object_fallible(object_id).expect("db error")
841    }
842
843    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
844        self.get_object_by_key_fallible(object_id, version)
845            .expect("db error")
846    }
847}
848
849pub struct LiveSetIter<'a> {
850    iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
851    tables: &'a AuthorityPerpetualTables,
852    prev: Option<(ObjectKey, StoreObjectWrapper)>,
853    /// Whether a wrapped object is considered as a live object.
854    include_wrapped_object: bool,
855}
856
857#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
858pub enum LiveObject {
859    Normal(Object),
860    Wrapped(ObjectKey),
861}
862
863impl LiveObject {
864    pub fn object_id(&self) -> ObjectID {
865        match self {
866            LiveObject::Normal(obj) => obj.id(),
867            LiveObject::Wrapped(key) => key.0,
868        }
869    }
870
871    pub fn version(&self) -> SequenceNumber {
872        match self {
873            LiveObject::Normal(obj) => obj.version(),
874            LiveObject::Wrapped(key) => key.1,
875        }
876    }
877
878    pub fn object_reference(&self) -> ObjectRef {
879        match self {
880            LiveObject::Normal(obj) => obj.compute_object_reference(),
881            LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
882        }
883    }
884}
885
886impl LiveSetIter<'_> {
887    fn store_object_wrapper_to_live_object(
888        &self,
889        object_key: ObjectKey,
890        store_object: StoreObjectWrapper,
891    ) -> Option<LiveObject> {
892        match store_object.migrate().into_inner() {
893            StoreObject::Value(object) => {
894                let object = self
895                    .tables
896                    .construct_object(&object_key, *object)
897                    .expect("Constructing object from store cannot fail");
898                Some(LiveObject::Normal(object))
899            }
900            StoreObject::Wrapped => {
901                if self.include_wrapped_object {
902                    Some(LiveObject::Wrapped(object_key))
903                } else {
904                    None
905                }
906            }
907            StoreObject::Deleted => None,
908        }
909    }
910}
911
912impl Iterator for LiveSetIter<'_> {
913    type Item = LiveObject;
914
915    fn next(&mut self) -> Option<Self::Item> {
916        loop {
917            if let Some(Ok((next_key, next_value))) = self.iter.next() {
918                let prev = self.prev.take();
919                self.prev = Some((next_key, next_value));
920
921                if let Some((prev_key, prev_value)) = prev
922                    && prev_key.0 != next_key.0
923                {
924                    let live_object =
925                        self.store_object_wrapper_to_live_object(prev_key, prev_value);
926                    if live_object.is_some() {
927                        return live_object;
928                    }
929                }
930                continue;
931            }
932            if let Some((key, value)) = self.prev.take() {
933                let live_object = self.store_object_wrapper_to_live_object(key, value);
934                if live_object.is_some() {
935                    return live_object;
936                }
937            }
938            return None;
939        }
940    }
941}
942
943// These functions are used to initialize the DB tables
944fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
945    DBOptions {
946        options: db_options
947            .clone()
948            .optimize_for_write_throughput()
949            .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
950            .options,
951        rw_options: db_options.rw_options.set_ignore_range_deletions(false),
952    }
953}
954
955fn objects_table_config(db_options: DBOptions) -> DBOptions {
956    db_options
957        .optimize_for_write_throughput()
958        .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
959}
960
961fn transactions_table_config(db_options: DBOptions) -> DBOptions {
962    db_options
963        .optimize_for_write_throughput()
964        .optimize_for_point_lookup(
965            read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
966        )
967}
968
969fn effects_table_config(db_options: DBOptions) -> DBOptions {
970    db_options
971        .optimize_for_write_throughput()
972        .optimize_for_point_lookup(
973            read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
974        )
975}