sui_core/authority/
authority_store_tables.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6#[cfg(tidehunter)]
7use crate::authority::epoch_marker_key::EPOCH_MARKER_KEY_SIZE;
8use crate::authority::epoch_marker_key::EpochMarkerKey;
9use serde::{Deserialize, Serialize};
10use std::path::Path;
11use std::sync::atomic::AtomicU64;
12use sui_types::base_types::SequenceNumber;
13use sui_types::effects::{TransactionEffects, TransactionEvents};
14use sui_types::global_state_hash::GlobalStateHash;
15use sui_types::storage::MarkerValue;
16use typed_store::metrics::SamplingInterval;
17use typed_store::rocks::{
18    DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
19    read_size_from_env,
20};
21use typed_store::traits::Map;
22
23use crate::authority::authority_store_types::{
24    StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
25};
26use crate::authority::epoch_start_configuration::EpochStartConfiguration;
27use typed_store::{DBMapUtils, DbIterator};
28
29const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
30pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
31const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
32const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
33
34/// Options to apply to every column family of the `perpetual` DB.
35#[derive(Default)]
36pub struct AuthorityPerpetualTablesOptions {
37    /// Whether to enable write stalling on all column families.
38    pub enable_write_stall: bool,
39    pub is_validator: bool,
40}
41
42impl AuthorityPerpetualTablesOptions {
43    fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
44        if !self.enable_write_stall {
45            db_options = db_options.disable_write_throttling();
46        }
47        db_options
48    }
49}
50
51/// AuthorityPerpetualTables contains data that must be preserved from one epoch to the next.
52#[derive(DBMapUtils)]
53#[cfg_attr(tidehunter, tidehunter)]
54pub struct AuthorityPerpetualTables {
55    /// This is a map between the object (ID, version) and the latest state of the object, namely the
56    /// state that is needed to process new transactions.
57    /// State is represented by `StoreObject` enum, which is either a move module or a move object.
58    ///
59    /// Note that while this map can store all versions of an object, we will eventually
60    /// prune old object versions from the db.
61    ///
62    /// IMPORTANT: object versions must *only* be pruned if they appear as inputs in some
63    /// TransactionEffects. Simply pruning all objects but the most recent is an error!
64    /// This is because there can be partially executed transactions whose effects have not yet
65    /// been written out, and which must be retried. But, they cannot be retried unless their input
66    /// objects are still accessible!
67    pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
68
69    /// This is a map between object references of currently active objects that can be mutated.
70    ///
71    /// For old epochs, it may also contain the transaction that they are lock on for use by this
72    /// specific validator. The transaction locks themselves are now in AuthorityPerEpochStore.
73    #[rename = "owned_object_transaction_locks"]
74    pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
75
76    /// This is a map between the transaction digest and the corresponding transaction that's known to be
77    /// executable. This means that it may have been executed locally, or it may have been synced through
78    /// state-sync but hasn't been executed yet.
79    pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
80
81    /// A map between the transaction digest of a certificate to the effects of its execution.
82    /// We store effects into this table in two different cases:
83    /// 1. When a transaction is synced through state_sync, we store the effects here. These effects
84    ///    are known to be final in the network, but may not have been executed locally yet.
85    /// 2. When the transaction is executed locally on this node, we store the effects here. This means that
86    ///    it's possible to store the same effects twice (once for the synced transaction, and once for the executed).
87    ///
88    /// It's also possible for the effects to be reverted if the transaction didn't make it into the epoch.
89    pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
90
91    /// Transactions that have been executed locally on this node. We need this table since the `effects` table
92    /// doesn't say anything about the execution status of the transaction on this node. When we wait for transactions
93    /// to be executed, we wait for them to appear in this table. When we revert transactions, we remove them from both
94    /// tables.
95    pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
96
97    // Events keyed by the digest of the transaction that produced them.
98    pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
99
100    // Loaded (and unchanged) runtime object references.
101    pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
102
103    /// DEPRECATED in favor of the table of the same name in authority_per_epoch_store.
104    /// Please do not add new accessors/callsites.
105    /// When transaction is executed via checkpoint executor, we store association here
106    pub(crate) executed_transactions_to_checkpoint:
107        DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
108
109    // Finalized root state hash for epoch, to be included in CheckpointSummary
110    // of last checkpoint of epoch. These values should only ever be written once
111    // and never changed
112    pub(crate) root_state_hash_by_epoch:
113        DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
114
115    /// Parameters of the system fixed at the epoch start
116    pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
117
118    /// A singleton table that stores latest pruned checkpoint. Used to keep objects pruner progress
119    pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
120
121    /// Expected total amount of SUI in the network. This is expected to remain constant
122    /// throughout the lifetime of the network. We check it at the end of each epoch if
123    /// expensive checks are enabled. We cannot use 10B today because in tests we often
124    /// inject extra gas objects into genesis.
125    pub(crate) expected_network_sui_amount: DBMap<(), u64>,
126
127    /// Expected imbalance between storage fund balance and the sum of storage rebate of all live objects.
128    /// This could be non-zero due to bugs in earlier protocol versions.
129    /// This number is the result of storage_fund_balance - sum(storage_rebate).
130    pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
131
132    /// Table that stores the set of received objects and deleted objects and the version at
133    /// which they were received. This is used to prevent possible race conditions around receiving
134    /// objects (since they are not locked by the transaction manager) and for tracking shared
135    /// objects that have been deleted. This table is meant to be pruned per-epoch, and all
136    /// previous epochs other than the current epoch may be pruned safely.
137    pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
138    pub(crate) object_per_epoch_marker_table_v2: DBMap<EpochMarkerKey, MarkerValue>,
139
140    /// Tracks executed transaction digests across epochs.
141    /// Used to support address balance gas payments feature.
142    /// This table uses epoch-prefixed keys to support efficient pruning via range delete.
143    pub(crate) executed_transaction_digests: DBMap<(EpochId, TransactionDigest), ()>,
144}
145
146impl AuthorityPerpetualTables {
147    pub fn path(parent_path: &Path) -> PathBuf {
148        parent_path.join("perpetual")
149    }
150
151    #[cfg(not(tidehunter))]
152    pub fn open(
153        parent_path: &Path,
154        db_options_override: Option<AuthorityPerpetualTablesOptions>,
155        _pruner_watermark: Option<Arc<AtomicU64>>,
156    ) -> Self {
157        let db_options_override = db_options_override.unwrap_or_default();
158        let db_options = db_options_override
159            .apply_to(default_db_options().optimize_db_for_write_throughput(4, false));
160        let table_options = DBMapTableConfigMap::new(BTreeMap::from([
161            (
162                "objects".to_string(),
163                objects_table_config(db_options.clone()),
164            ),
165            (
166                "owned_object_transaction_locks".to_string(),
167                owned_object_transaction_locks_table_config(db_options.clone()),
168            ),
169            (
170                "transactions".to_string(),
171                transactions_table_config(db_options.clone()),
172            ),
173            (
174                "effects".to_string(),
175                effects_table_config(db_options.clone()),
176            ),
177        ]));
178
179        Self::open_tables_read_write(
180            Self::path(parent_path),
181            MetricConf::new("perpetual")
182                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
183            Some(db_options.options),
184            Some(table_options),
185        )
186    }
187
188    #[cfg(tidehunter)]
189    pub fn open(
190        parent_path: &Path,
191        db_options_override: Option<AuthorityPerpetualTablesOptions>,
192        pruner_watermark: Option<Arc<AtomicU64>>,
193    ) -> Self {
194        use crate::authority::authority_store_pruner::apply_relocation_filter;
195        tracing::warn!("AuthorityPerpetualTables using tidehunter");
196        use typed_store::tidehunter_util::{
197            Bytes, Decision, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
198            default_cells_per_mutex, default_max_dirty_keys, default_mutex_count,
199            default_value_cache_size,
200        };
201        let mutexes = default_mutex_count() * 2;
202        let transaction_mutexes = mutexes * 4;
203        let value_cache_size = default_value_cache_size();
204        // effectively disables pruning if not set
205        let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
206
207        let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
208        let objects_compactor = |iter: &mut dyn DoubleEndedIterator<Item = &Bytes>| {
209            let mut retain = HashSet::new();
210            let mut previous: Option<&[u8]> = None;
211            const OID_SIZE: usize = 32;
212            for key in iter.rev() {
213                if let Some(prev) = previous {
214                    if prev == &key[..OID_SIZE] {
215                        continue;
216                    }
217                }
218                previous = Some(&key[..OID_SIZE]);
219                retain.insert(key.clone());
220            }
221            retain
222        };
223        let mut digest_prefix = vec![0; 8];
224        digest_prefix[7] = 32;
225        let uniform_key = KeyType::uniform(default_cells_per_mutex());
226        let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
227        // TransactionDigest is serialized with an 8-byte prefix, so we include it in the key calculation
228        let epoch_tx_digest_prefix_key =
229            KeyType::from_prefix_bits((8/*EpochId*/ + 8/*TransactionDigest prefix*/) * 8 + 12);
230        let object_indexing = KeyIndexing::fixed(32 + 8); //  KeyIndexing::key_reduction(32 + 8, 16..(32 + 8));
231        // todo can figure way to scramble off 8 bytes in the middle
232        let obj_ref_size = 32 + 8 + 32 + 8;
233        let owned_object_transaction_locks_indexing =
234            KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
235
236        let mut objects_config = KeySpaceConfig::new()
237            .with_max_dirty_keys(4 * default_max_dirty_keys())
238            .with_value_cache_size(value_cache_size);
239        if matches!(db_options_override, Some(options) if options.is_validator) {
240            objects_config = objects_config.with_compactor(Box::new(objects_compactor));
241        }
242
243        let configs = vec![
244            (
245                "objects".to_string(),
246                ThConfig::new_with_config_indexing(
247                    object_indexing,
248                    mutexes * 4,
249                    KeyType::uniform(1),
250                    objects_config,
251                ),
252            ),
253            (
254                "owned_object_transaction_locks".to_string(),
255                ThConfig::new_with_config_indexing(
256                    owned_object_transaction_locks_indexing,
257                    mutexes * 16,
258                    KeyType::uniform(default_cells_per_mutex()),
259                    bloom_config
260                        .clone()
261                        .with_max_dirty_keys(16 * default_max_dirty_keys()),
262                ),
263            ),
264            (
265                "transactions".to_string(),
266                ThConfig::new_with_rm_prefix_indexing(
267                    KeyIndexing::key_reduction(32, 0..16),
268                    transaction_mutexes,
269                    uniform_key,
270                    KeySpaceConfig::new()
271                        .with_value_cache_size(value_cache_size)
272                        .with_relocation_filter(|_, _| Decision::Remove),
273                    digest_prefix.clone(),
274                ),
275            ),
276            (
277                "effects".to_string(),
278                ThConfig::new_with_rm_prefix_indexing(
279                    KeyIndexing::key_reduction(32, 0..16),
280                    transaction_mutexes,
281                    uniform_key,
282                    apply_relocation_filter(
283                        bloom_config.clone().with_value_cache_size(value_cache_size),
284                        pruner_watermark.clone(),
285                        |effects: TransactionEffects| effects.executed_epoch(),
286                        false,
287                    ),
288                    digest_prefix.clone(),
289                ),
290            ),
291            (
292                "executed_effects".to_string(),
293                ThConfig::new_with_rm_prefix_indexing(
294                    KeyIndexing::key_reduction(32, 0..16),
295                    transaction_mutexes,
296                    uniform_key,
297                    bloom_config
298                        .clone()
299                        .with_value_cache_size(value_cache_size)
300                        .with_relocation_filter(|_, _| Decision::Remove),
301                    digest_prefix.clone(),
302                ),
303            ),
304            (
305                "events".to_string(),
306                ThConfig::new_with_rm_prefix(
307                    32 + 8,
308                    mutexes,
309                    uniform_key,
310                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
311                    digest_prefix.clone(),
312                ),
313            ),
314            (
315                "events_2".to_string(),
316                ThConfig::new_with_rm_prefix(
317                    32,
318                    mutexes,
319                    uniform_key,
320                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
321                    digest_prefix.clone(),
322                ),
323            ),
324            (
325                "unchanged_loaded_runtime_objects".to_string(),
326                ThConfig::new_with_rm_prefix(
327                    32,
328                    mutexes,
329                    uniform_key,
330                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
331                    digest_prefix.clone(),
332                ),
333            ),
334            (
335                "executed_transactions_to_checkpoint".to_string(),
336                ThConfig::new_with_rm_prefix(
337                    32,
338                    mutexes,
339                    uniform_key,
340                    apply_relocation_filter(
341                        KeySpaceConfig::default(),
342                        pruner_watermark.clone(),
343                        |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
344                        false,
345                    ),
346                    digest_prefix.clone(),
347                ),
348            ),
349            (
350                "root_state_hash_by_epoch".to_string(),
351                ThConfig::new(8, 1, KeyType::uniform(1)),
352            ),
353            (
354                "epoch_start_configuration".to_string(),
355                ThConfig::new(0, 1, KeyType::uniform(1)),
356            ),
357            (
358                "pruned_checkpoint".to_string(),
359                ThConfig::new(0, 1, KeyType::uniform(1)),
360            ),
361            (
362                "expected_network_sui_amount".to_string(),
363                ThConfig::new(0, 1, KeyType::uniform(1)),
364            ),
365            (
366                "expected_storage_fund_imbalance".to_string(),
367                ThConfig::new(0, 1, KeyType::uniform(1)),
368            ),
369            (
370                "object_per_epoch_marker_table".to_string(),
371                ThConfig::new_with_config_indexing(
372                    KeyIndexing::VariableLength,
373                    mutexes,
374                    epoch_prefix_key,
375                    apply_relocation_filter(
376                        KeySpaceConfig::default(),
377                        pruner_watermark.clone(),
378                        |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
379                        true,
380                    ),
381                ),
382            ),
383            (
384                "object_per_epoch_marker_table_v2".to_string(),
385                ThConfig::new_with_config_indexing(
386                    KeyIndexing::fixed(EPOCH_MARKER_KEY_SIZE),
387                    mutexes,
388                    epoch_prefix_key,
389                    apply_relocation_filter(
390                        bloom_config.clone(),
391                        pruner_watermark.clone(),
392                        |k: EpochMarkerKey| k.0,
393                        true,
394                    ),
395                ),
396            ),
397            (
398                "executed_transaction_digests".to_string(),
399                ThConfig::new_with_config_indexing(
400                    // EpochId + (TransactionDigest)
401                    KeyIndexing::fixed(8 + (32 + 8)),
402                    transaction_mutexes,
403                    epoch_tx_digest_prefix_key,
404                    apply_relocation_filter(
405                        bloom_config.clone(),
406                        pruner_watermark.clone(),
407                        |(epoch_id, _): (EpochId, TransactionDigest)| epoch_id,
408                        true,
409                    ),
410                ),
411            ),
412        ];
413        Self::open_tables_read_write(
414            Self::path(parent_path),
415            MetricConf::new("perpetual")
416                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
417            configs.into_iter().collect(),
418        )
419    }
420
421    #[cfg(not(tidehunter))]
422    pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
423        Self::get_read_only_handle(
424            Self::path(parent_path),
425            None,
426            None,
427            MetricConf::new("perpetual_readonly"),
428        )
429    }
430
431    #[cfg(tidehunter)]
432    pub fn open_readonly(parent_path: &Path) -> Self {
433        Self::open(parent_path, None, None)
434    }
435
436    #[cfg(tidehunter)]
437    pub fn force_rebuild_control_region(&self) -> anyhow::Result<()> {
438        self.objects.db.force_rebuild_control_region()
439    }
440
441    /// Wait for tidehunter background threads to finish before allowing the caller
442    /// to e.g. rename the database directory. Consumes `Arc<Self>` so all internal
443    /// `Arc<Database>` clones held by the column family `DBMap`s are released as
444    /// part of the drop, leaving only the `Arc<Database>` extracted here.
445    #[cfg(tidehunter)]
446    pub fn wait_for_tidehunter_background_threads(self: Arc<Self>) {
447        let strong = Arc::strong_count(&self);
448        if strong != 1 {
449            println!(
450                "WARNING: wait_for_tidehunter_background_threads called with Arc<AuthorityPerpetualTables> strong_count={} (expected 1); other clones will keep DBMap.db Arc<Database> alive past drop(self) and the inner Database wait will warn/timeout",
451                strong,
452            );
453        }
454        let db = self.objects.db.clone();
455        drop(self);
456        db.wait_for_tidehunter_background_threads();
457    }
458
459    // This is used by indexer to find the correct version of dynamic field child object.
460    // We do not store the version of the child object, but because of lamport timestamp,
461    // we know the child must have version number less then or eq to the parent.
462    pub fn find_object_lt_or_eq_version(
463        &self,
464        object_id: ObjectID,
465        version: SequenceNumber,
466    ) -> SuiResult<Option<Object>> {
467        let mut iter = self.objects.reversed_safe_iter_with_bounds(
468            Some(ObjectKey::min_for_id(&object_id)),
469            Some(ObjectKey(object_id, version)),
470        )?;
471        match iter.next() {
472            Some(Ok((key, o))) => self.object(&key, o),
473            Some(Err(e)) => Err(e.into()),
474            None => Ok(None),
475        }
476    }
477
478    fn construct_object(
479        &self,
480        object_key: &ObjectKey,
481        store_object: StoreObjectValue,
482    ) -> Result<Object, SuiError> {
483        try_construct_object(object_key, store_object)
484    }
485
486    // Constructs `sui_types::object::Object` from `StoreObjectWrapper`.
487    // Returns `None` if object was deleted/wrapped
488    pub fn object(
489        &self,
490        object_key: &ObjectKey,
491        store_object: StoreObjectWrapper,
492    ) -> Result<Option<Object>, SuiError> {
493        let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
494            return Ok(None);
495        };
496        Ok(Some(self.construct_object(object_key, *store_object)?))
497    }
498
499    pub fn object_reference(
500        &self,
501        object_key: &ObjectKey,
502        store_object: StoreObjectWrapper,
503    ) -> Result<ObjectRef, SuiError> {
504        let obj_ref = match store_object.migrate().into_inner() {
505            StoreObject::Value(object) => self
506                .construct_object(object_key, *object)?
507                .compute_object_reference(),
508            StoreObject::Deleted => (
509                object_key.0,
510                object_key.1,
511                ObjectDigest::OBJECT_DIGEST_DELETED,
512            ),
513            StoreObject::Wrapped => (
514                object_key.0,
515                object_key.1,
516                ObjectDigest::OBJECT_DIGEST_WRAPPED,
517            ),
518        };
519        Ok(obj_ref)
520    }
521
522    pub fn tombstone_reference(
523        &self,
524        object_key: &ObjectKey,
525        store_object: &StoreObjectWrapper,
526    ) -> Result<Option<ObjectRef>, SuiError> {
527        let obj_ref = match store_object.inner() {
528            StoreObject::Deleted => Some((
529                object_key.0,
530                object_key.1,
531                ObjectDigest::OBJECT_DIGEST_DELETED,
532            )),
533            StoreObject::Wrapped => Some((
534                object_key.0,
535                object_key.1,
536                ObjectDigest::OBJECT_DIGEST_WRAPPED,
537            )),
538            _ => None,
539        };
540        Ok(obj_ref)
541    }
542
543    pub fn get_latest_object_ref_or_tombstone(
544        &self,
545        object_id: ObjectID,
546    ) -> Result<Option<ObjectRef>, SuiError> {
547        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
548            Some(ObjectKey::min_for_id(&object_id)),
549            Some(ObjectKey::max_for_id(&object_id)),
550        )?;
551
552        if let Some(Ok((object_key, value))) = iterator.next()
553            && object_key.0 == object_id
554        {
555            return Ok(Some(self.object_reference(&object_key, value)?));
556        }
557        Ok(None)
558    }
559
560    pub fn get_latest_object_or_tombstone(
561        &self,
562        object_id: ObjectID,
563    ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
564        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
565            Some(ObjectKey::min_for_id(&object_id)),
566            Some(ObjectKey::max_for_id(&object_id)),
567        )?;
568
569        if let Some(Ok((object_key, value))) = iterator.next()
570            && object_key.0 == object_id
571        {
572            return Ok(Some((object_key, value)));
573        }
574        Ok(None)
575    }
576
577    pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
578        Ok(self
579            .epoch_start_configuration
580            .get(&())?
581            .expect("Must have current epoch.")
582            .epoch_start_state()
583            .epoch())
584    }
585
586    pub fn set_epoch_start_configuration(
587        &self,
588        epoch_start_configuration: &EpochStartConfiguration,
589    ) -> SuiResult {
590        let mut wb = self.epoch_start_configuration.batch();
591        wb.insert_batch(
592            &self.epoch_start_configuration,
593            std::iter::once(((), epoch_start_configuration)),
594        )?;
595        wb.write()?;
596        Ok(())
597    }
598
599    pub fn get_highest_pruned_checkpoint(
600        &self,
601    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
602        self.pruned_checkpoint.get(&())
603    }
604
605    pub fn set_highest_pruned_checkpoint(
606        &self,
607        wb: &mut DBBatch,
608        checkpoint_number: CheckpointSequenceNumber,
609    ) -> SuiResult {
610        wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
611        Ok(())
612    }
613
614    pub fn get_transaction(
615        &self,
616        digest: &TransactionDigest,
617    ) -> SuiResult<Option<TrustedTransaction>> {
618        let Some(transaction) = self.transactions.get(digest)? else {
619            return Ok(None);
620        };
621        Ok(Some(transaction))
622    }
623
624    /// Batch insert executed transaction digests for a given epoch.
625    /// Used by formal snapshot restore to backfill transaction digests from the previous epoch.
626    pub fn insert_executed_transaction_digests_batch(
627        &self,
628        epoch: EpochId,
629        digests: impl Iterator<Item = TransactionDigest>,
630    ) -> SuiResult {
631        let mut batch = self.executed_transaction_digests.batch();
632        batch.insert_batch(
633            &self.executed_transaction_digests,
634            digests.map(|digest| ((epoch, digest), ())),
635        )?;
636        batch.write()?;
637        Ok(())
638    }
639
640    pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
641        let Some(effect_digest) = self.executed_effects.get(digest)? else {
642            return Ok(None);
643        };
644        Ok(self.effects.get(&effect_digest)?)
645    }
646
647    pub(crate) fn was_transaction_executed_in_last_epoch(
648        &self,
649        digest: &TransactionDigest,
650        current_epoch: EpochId,
651    ) -> bool {
652        if current_epoch == 0 {
653            return false;
654        }
655        self.executed_transaction_digests
656            .contains_key(&(current_epoch - 1, *digest))
657            .expect("db error")
658    }
659
660    // DEPRECATED as the backing table has been moved to authority_per_epoch_store.
661    // Please do not add new accessors/callsites.
662    pub fn get_checkpoint_sequence_number(
663        &self,
664        digest: &TransactionDigest,
665    ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
666        Ok(self.executed_transactions_to_checkpoint.get(digest)?)
667    }
668
669    pub fn get_newer_object_keys(
670        &self,
671        object: &(ObjectID, SequenceNumber),
672    ) -> SuiResult<Vec<ObjectKey>> {
673        let mut objects = vec![];
674        for result in self.objects.safe_iter_with_bounds(
675            Some(ObjectKey(object.0, object.1.next())),
676            Some(ObjectKey(object.0, VersionNumber::MAX)),
677        ) {
678            let (key, _) = result?;
679            objects.push(key);
680        }
681        Ok(objects)
682    }
683
684    pub fn set_highest_pruned_checkpoint_without_wb(
685        &self,
686        checkpoint_number: CheckpointSequenceNumber,
687    ) -> SuiResult {
688        let mut wb = self.pruned_checkpoint.batch();
689        self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
690        wb.write()?;
691        Ok(())
692    }
693
694    pub fn database_is_empty(&self) -> SuiResult<bool> {
695        Ok(self.objects.safe_iter().next().is_none())
696    }
697
698    pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
699        LiveSetIter {
700            iter: Box::new(self.objects.safe_iter()),
701            tables: self,
702            prev: None,
703            include_wrapped_object,
704        }
705    }
706
707    pub fn range_iter_live_object_set(
708        &self,
709        lower_bound: Option<ObjectID>,
710        upper_bound: Option<ObjectID>,
711        include_wrapped_object: bool,
712    ) -> LiveSetIter<'_> {
713        let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
714        let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
715
716        LiveSetIter {
717            iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
718            tables: self,
719            prev: None,
720            include_wrapped_object,
721        }
722    }
723
724    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
725        // This checkpoints the entire db and not just objects table
726        self.objects.checkpoint_db(path).map_err(Into::into)
727    }
728
729    pub fn get_root_state_hash(
730        &self,
731        epoch: EpochId,
732    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
733        Ok(self.root_state_hash_by_epoch.get(&epoch)?)
734    }
735
736    pub fn insert_root_state_hash(
737        &self,
738        epoch: EpochId,
739        last_checkpoint_of_epoch: CheckpointSequenceNumber,
740        hash: GlobalStateHash,
741    ) -> SuiResult {
742        self.root_state_hash_by_epoch
743            .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
744        Ok(())
745    }
746
747    pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
748        let object_reference = object.compute_object_reference();
749        let wrapper = get_store_object(object);
750        let mut wb = self.objects.batch();
751        wb.insert_batch(
752            &self.objects,
753            std::iter::once((ObjectKey::from(object_reference), wrapper)),
754        )?;
755        wb.write()?;
756        Ok(())
757    }
758
759    // fallible get object methods for sui-tool, which may need to attempt to read a corrupted database
760    pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
761        let obj_entry = self
762            .objects
763            .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
764            .next();
765
766        match obj_entry.transpose()? {
767            Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
768                Ok(self.object(&ObjectKey(obj_id, version), obj)?)
769            }
770            _ => Ok(None),
771        }
772    }
773
774    pub fn get_object_by_key_fallible(
775        &self,
776        object_id: &ObjectID,
777        version: VersionNumber,
778    ) -> SuiResult<Option<Object>> {
779        Ok(self
780            .objects
781            .get(&ObjectKey(*object_id, version))?
782            .and_then(|object| {
783                self.object(&ObjectKey(*object_id, version), object)
784                    .expect("object construction error")
785            }))
786    }
787}
788
789impl ObjectStore for AuthorityPerpetualTables {
790    /// Read an object and return it, or Ok(None) if the object was not found.
791    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
792        self.get_object_fallible(object_id).expect("db error")
793    }
794
795    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
796        self.get_object_by_key_fallible(object_id, version)
797            .expect("db error")
798    }
799}
800
801pub struct LiveSetIter<'a> {
802    iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
803    tables: &'a AuthorityPerpetualTables,
804    prev: Option<(ObjectKey, StoreObjectWrapper)>,
805    /// Whether a wrapped object is considered as a live object.
806    include_wrapped_object: bool,
807}
808
809#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
810pub enum LiveObject {
811    Normal(Object),
812    Wrapped(ObjectKey),
813}
814
815impl LiveObject {
816    pub fn object_id(&self) -> ObjectID {
817        match self {
818            LiveObject::Normal(obj) => obj.id(),
819            LiveObject::Wrapped(key) => key.0,
820        }
821    }
822
823    pub fn version(&self) -> SequenceNumber {
824        match self {
825            LiveObject::Normal(obj) => obj.version(),
826            LiveObject::Wrapped(key) => key.1,
827        }
828    }
829
830    pub fn object_reference(&self) -> ObjectRef {
831        match self {
832            LiveObject::Normal(obj) => obj.compute_object_reference(),
833            LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
834        }
835    }
836
837    pub fn to_normal(self) -> Option<Object> {
838        match self {
839            LiveObject::Normal(object) => Some(object),
840            LiveObject::Wrapped(_) => None,
841        }
842    }
843}
844
845impl LiveSetIter<'_> {
846    fn store_object_wrapper_to_live_object(
847        &self,
848        object_key: ObjectKey,
849        store_object: StoreObjectWrapper,
850    ) -> Option<LiveObject> {
851        match store_object.migrate().into_inner() {
852            StoreObject::Value(object) => {
853                let object = self
854                    .tables
855                    .construct_object(&object_key, *object)
856                    .expect("Constructing object from store cannot fail");
857                Some(LiveObject::Normal(object))
858            }
859            StoreObject::Wrapped => {
860                if self.include_wrapped_object {
861                    Some(LiveObject::Wrapped(object_key))
862                } else {
863                    None
864                }
865            }
866            StoreObject::Deleted => None,
867        }
868    }
869}
870
871impl Iterator for LiveSetIter<'_> {
872    type Item = LiveObject;
873
874    fn next(&mut self) -> Option<Self::Item> {
875        loop {
876            if let Some(Ok((next_key, next_value))) = self.iter.next() {
877                let prev = self.prev.take();
878                self.prev = Some((next_key, next_value));
879
880                if let Some((prev_key, prev_value)) = prev
881                    && prev_key.0 != next_key.0
882                {
883                    let live_object =
884                        self.store_object_wrapper_to_live_object(prev_key, prev_value);
885                    if live_object.is_some() {
886                        return live_object;
887                    }
888                }
889                continue;
890            }
891            if let Some((key, value)) = self.prev.take() {
892                let live_object = self.store_object_wrapper_to_live_object(key, value);
893                if live_object.is_some() {
894                    return live_object;
895                }
896            }
897            return None;
898        }
899    }
900}
901
902// These functions are used to initialize the DB tables
903fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
904    DBOptions {
905        options: db_options
906            .clone()
907            .optimize_for_write_throughput()
908            .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
909            .options,
910        rw_options: db_options.rw_options.set_ignore_range_deletions(false),
911    }
912}
913
914fn objects_table_config(db_options: DBOptions) -> DBOptions {
915    db_options
916        .optimize_for_write_throughput()
917        .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
918}
919
920fn transactions_table_config(db_options: DBOptions) -> DBOptions {
921    db_options
922        .optimize_for_write_throughput()
923        .optimize_for_point_lookup(
924            read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
925        )
926}
927
928fn effects_table_config(db_options: DBOptions) -> DBOptions {
929    db_options
930        .optimize_for_write_throughput()
931        .optimize_for_point_lookup(
932            read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
933        )
934}