sui_core/authority/
authority_store_tables.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6use serde::{Deserialize, Serialize};
7use std::path::Path;
8use std::sync::atomic::AtomicU64;
9use sui_types::base_types::SequenceNumber;
10use sui_types::digests::TransactionEventsDigest;
11use sui_types::effects::{TransactionEffects, TransactionEvents};
12use sui_types::global_state_hash::GlobalStateHash;
13use sui_types::storage::{FullObjectKey, MarkerValue};
14use tracing::error;
15use typed_store::metrics::SamplingInterval;
16use typed_store::rocks::{
17    DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
18    read_size_from_env,
19};
20use typed_store::traits::Map;
21
22use crate::authority::authority_store_pruner::ObjectsCompactionFilter;
23use crate::authority::authority_store_types::{
24    StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
25};
26use crate::authority::epoch_start_configuration::EpochStartConfiguration;
27use typed_store::rocksdb::compaction_filter::Decision;
28use typed_store::{DBMapUtils, DbIterator};
29
30const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
31pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
32const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
33const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
34
35/// Options to apply to every column family of the `perpetual` DB.
36#[derive(Default)]
37pub struct AuthorityPerpetualTablesOptions {
38    /// Whether to enable write stalling on all column families.
39    pub enable_write_stall: bool,
40    pub compaction_filter: Option<ObjectsCompactionFilter>,
41    pub is_validator: bool,
42}
43
44impl AuthorityPerpetualTablesOptions {
45    fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
46        if !self.enable_write_stall {
47            db_options = db_options.disable_write_throttling();
48        }
49        db_options
50    }
51}
52
53/// AuthorityPerpetualTables contains data that must be preserved from one epoch to the next.
54#[derive(DBMapUtils)]
55#[cfg_attr(tidehunter, tidehunter)]
56pub struct AuthorityPerpetualTables {
57    /// This is a map between the object (ID, version) and the latest state of the object, namely the
58    /// state that is needed to process new transactions.
59    /// State is represented by `StoreObject` enum, which is either a move module or a move object.
60    ///
61    /// Note that while this map can store all versions of an object, we will eventually
62    /// prune old object versions from the db.
63    ///
64    /// IMPORTANT: object versions must *only* be pruned if they appear as inputs in some
65    /// TransactionEffects. Simply pruning all objects but the most recent is an error!
66    /// This is because there can be partially executed transactions whose effects have not yet
67    /// been written out, and which must be retried. But, they cannot be retried unless their input
68    /// objects are still accessible!
69    pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
70
71    /// This is a map between object references of currently active objects that can be mutated.
72    ///
73    /// For old epochs, it may also contain the transaction that they are lock on for use by this
74    /// specific validator. The transaction locks themselves are now in AuthorityPerEpochStore.
75    #[rename = "owned_object_transaction_locks"]
76    pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
77
78    /// This is a map between the transaction digest and the corresponding transaction that's known to be
79    /// executable. This means that it may have been executed locally, or it may have been synced through
80    /// state-sync but hasn't been executed yet.
81    pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
82
83    /// A map between the transaction digest of a certificate to the effects of its execution.
84    /// We store effects into this table in two different cases:
85    /// 1. When a transaction is synced through state_sync, we store the effects here. These effects
86    ///    are known to be final in the network, but may not have been executed locally yet.
87    /// 2. When the transaction is executed locally on this node, we store the effects here. This means that
88    ///    it's possible to store the same effects twice (once for the synced transaction, and once for the executed).
89    ///
90    /// It's also possible for the effects to be reverted if the transaction didn't make it into the epoch.
91    pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
92
93    /// Transactions that have been executed locally on this node. We need this table since the `effects` table
94    /// doesn't say anything about the execution status of the transaction on this node. When we wait for transactions
95    /// to be executed, we wait for them to appear in this table. When we revert transactions, we remove them from both
96    /// tables.
97    pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
98
99    #[allow(dead_code)]
100    #[deprecated]
101    events: DBMap<(TransactionEventsDigest, usize), Event>,
102
103    // Events keyed by the digest of the transaction that produced them.
104    pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
105
106    // Loaded (and unchanged) runtime object references.
107    pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
108
109    /// DEPRECATED in favor of the table of the same name in authority_per_epoch_store.
110    /// Please do not add new accessors/callsites.
111    /// When transaction is executed via checkpoint executor, we store association here
112    pub(crate) executed_transactions_to_checkpoint:
113        DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
114
115    // Finalized root state hash for epoch, to be included in CheckpointSummary
116    // of last checkpoint of epoch. These values should only ever be written once
117    // and never changed
118    pub(crate) root_state_hash_by_epoch:
119        DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
120
121    /// Parameters of the system fixed at the epoch start
122    pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
123
124    /// A singleton table that stores latest pruned checkpoint. Used to keep objects pruner progress
125    pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
126
127    /// Expected total amount of SUI in the network. This is expected to remain constant
128    /// throughout the lifetime of the network. We check it at the end of each epoch if
129    /// expensive checks are enabled. We cannot use 10B today because in tests we often
130    /// inject extra gas objects into genesis.
131    pub(crate) expected_network_sui_amount: DBMap<(), u64>,
132
133    /// Expected imbalance between storage fund balance and the sum of storage rebate of all live objects.
134    /// This could be non-zero due to bugs in earlier protocol versions.
135    /// This number is the result of storage_fund_balance - sum(storage_rebate).
136    pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
137
138    /// Table that stores the set of received objects and deleted objects and the version at
139    /// which they were received. This is used to prevent possible race conditions around receiving
140    /// objects (since they are not locked by the transaction manager) and for tracking shared
141    /// objects that have been deleted. This table is meant to be pruned per-epoch, and all
142    /// previous epochs other than the current epoch may be pruned safely.
143    pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
144    pub(crate) object_per_epoch_marker_table_v2: DBMap<(EpochId, FullObjectKey), MarkerValue>,
145
146    /// Tracks executed transaction digests across epochs.
147    /// Used to support address balance gas payments feature.
148    /// This table uses epoch-prefixed keys to support efficient pruning via range delete.
149    pub(crate) executed_transaction_digests: DBMap<(EpochId, TransactionDigest), ()>,
150}
151
152#[derive(DBMapUtils)]
153pub struct AuthorityPrunerTables {
154    pub(crate) object_tombstones: DBMap<ObjectID, SequenceNumber>,
155}
156
157impl AuthorityPrunerTables {
158    pub fn path(parent_path: &Path) -> PathBuf {
159        parent_path.join("pruner")
160    }
161
162    pub fn open(parent_path: &Path) -> Self {
163        Self::open_tables_read_write(
164            Self::path(parent_path),
165            MetricConf::new("pruner")
166                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
167            None,
168            None,
169        )
170    }
171}
172
173impl AuthorityPerpetualTables {
174    pub fn path(parent_path: &Path) -> PathBuf {
175        parent_path.join("perpetual")
176    }
177
178    #[cfg(not(tidehunter))]
179    pub fn open(
180        parent_path: &Path,
181        db_options_override: Option<AuthorityPerpetualTablesOptions>,
182        _pruner_watermark: Option<Arc<AtomicU64>>,
183    ) -> Self {
184        let db_options_override = db_options_override.unwrap_or_default();
185        let db_options =
186            db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
187        let table_options = DBMapTableConfigMap::new(BTreeMap::from([
188            (
189                "objects".to_string(),
190                objects_table_config(db_options.clone(), db_options_override.compaction_filter),
191            ),
192            (
193                "owned_object_transaction_locks".to_string(),
194                owned_object_transaction_locks_table_config(db_options.clone()),
195            ),
196            (
197                "transactions".to_string(),
198                transactions_table_config(db_options.clone()),
199            ),
200            (
201                "effects".to_string(),
202                effects_table_config(db_options.clone()),
203            ),
204        ]));
205
206        Self::open_tables_read_write(
207            Self::path(parent_path),
208            MetricConf::new("perpetual")
209                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
210            Some(db_options.options),
211            Some(table_options),
212        )
213    }
214
215    #[cfg(tidehunter)]
216    pub fn open(
217        parent_path: &Path,
218        db_options_override: Option<AuthorityPerpetualTablesOptions>,
219        pruner_watermark: Option<Arc<AtomicU64>>,
220    ) -> Self {
221        use crate::authority::authority_store_pruner::apply_relocation_filter;
222        tracing::warn!("AuthorityPerpetualTables using tidehunter");
223        use typed_store::tidehunter_util::{
224            Bytes, Decision, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
225            default_cells_per_mutex, default_mutex_count, default_value_cache_size,
226        };
227        let mutexes = default_mutex_count() * 2;
228        let value_cache_size = default_value_cache_size();
229        // effectively disables pruning if not set
230        let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
231
232        let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
233        let objects_compactor = |iter: &mut dyn DoubleEndedIterator<Item = &Bytes>| {
234            let mut retain = HashSet::new();
235            let mut previous: Option<&[u8]> = None;
236            const OID_SIZE: usize = 16;
237            for key in iter.rev() {
238                if let Some(prev) = previous {
239                    if prev == &key[..OID_SIZE] {
240                        continue;
241                    }
242                }
243                previous = Some(&key[..OID_SIZE]);
244                retain.insert(key.clone());
245            }
246            retain
247        };
248        let mut digest_prefix = vec![0; 8];
249        digest_prefix[7] = 32;
250        let uniform_key = KeyType::uniform(default_cells_per_mutex());
251        let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
252        // TransactionDigest is serialized with an 8-byte prefix, so we include it in the key calculation
253        let epoch_tx_digest_prefix_key =
254            KeyType::from_prefix_bits((8/*EpochId*/ + 8/*TransactionDigest prefix*/) * 8 + 12);
255        let object_indexing = KeyIndexing::key_reduction(32 + 8, 16..(32 + 8));
256        // todo can figure way to scramble off 8 bytes in the middle
257        let obj_ref_size = 32 + 8 + 32 + 8;
258        let owned_object_transaction_locks_indexing =
259            KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
260
261        let mut objects_config = KeySpaceConfig::new()
262            .with_unloaded_iterator(true)
263            .with_max_dirty_keys(4048);
264        if matches!(db_options_override, Some(options) if options.is_validator) {
265            objects_config = objects_config.with_compactor(Box::new(objects_compactor));
266        }
267
268        let configs = vec![
269            (
270                "objects".to_string(),
271                ThConfig::new_with_config_indexing(
272                    object_indexing,
273                    mutexes,
274                    KeyType::uniform(default_cells_per_mutex() * 4),
275                    objects_config,
276                ),
277            ),
278            (
279                "owned_object_transaction_locks".to_string(),
280                ThConfig::new_with_config_indexing(
281                    owned_object_transaction_locks_indexing,
282                    mutexes,
283                    KeyType::uniform(default_cells_per_mutex() * 4),
284                    bloom_config.clone().with_max_dirty_keys(4048),
285                ),
286            ),
287            (
288                "transactions".to_string(),
289                ThConfig::new_with_rm_prefix_indexing(
290                    KeyIndexing::key_reduction(32, 0..16),
291                    mutexes,
292                    uniform_key,
293                    KeySpaceConfig::new()
294                        .with_value_cache_size(value_cache_size)
295                        .with_relocation_filter(|_, _| Decision::Remove),
296                    digest_prefix.clone(),
297                ),
298            ),
299            (
300                "effects".to_string(),
301                ThConfig::new_with_rm_prefix_indexing(
302                    KeyIndexing::key_reduction(32, 0..16),
303                    mutexes,
304                    uniform_key,
305                    apply_relocation_filter(
306                        bloom_config.clone().with_value_cache_size(value_cache_size),
307                        pruner_watermark.clone(),
308                        |effects: TransactionEffects| effects.executed_epoch(),
309                        false,
310                    ),
311                    digest_prefix.clone(),
312                ),
313            ),
314            (
315                "executed_effects".to_string(),
316                ThConfig::new_with_rm_prefix_indexing(
317                    KeyIndexing::key_reduction(32, 0..16),
318                    mutexes,
319                    uniform_key,
320                    bloom_config
321                        .clone()
322                        .with_value_cache_size(value_cache_size)
323                        .with_relocation_filter(|_, _| Decision::Remove),
324                    digest_prefix.clone(),
325                ),
326            ),
327            (
328                "events".to_string(),
329                ThConfig::new_with_rm_prefix(
330                    32 + 8,
331                    mutexes,
332                    uniform_key,
333                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
334                    digest_prefix.clone(),
335                ),
336            ),
337            (
338                "events_2".to_string(),
339                ThConfig::new_with_rm_prefix(
340                    32,
341                    mutexes,
342                    uniform_key,
343                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
344                    digest_prefix.clone(),
345                ),
346            ),
347            (
348                "unchanged_loaded_runtime_objects".to_string(),
349                ThConfig::new_with_rm_prefix(
350                    32,
351                    mutexes,
352                    uniform_key,
353                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
354                    digest_prefix.clone(),
355                ),
356            ),
357            (
358                "executed_transactions_to_checkpoint".to_string(),
359                ThConfig::new_with_rm_prefix(
360                    32,
361                    mutexes,
362                    uniform_key,
363                    apply_relocation_filter(
364                        KeySpaceConfig::default(),
365                        pruner_watermark.clone(),
366                        |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
367                        false,
368                    ),
369                    digest_prefix.clone(),
370                ),
371            ),
372            (
373                "root_state_hash_by_epoch".to_string(),
374                ThConfig::new(8, 1, KeyType::uniform(1)),
375            ),
376            (
377                "epoch_start_configuration".to_string(),
378                ThConfig::new(0, 1, KeyType::uniform(1)),
379            ),
380            (
381                "pruned_checkpoint".to_string(),
382                ThConfig::new(0, 1, KeyType::uniform(1)),
383            ),
384            (
385                "expected_network_sui_amount".to_string(),
386                ThConfig::new(0, 1, KeyType::uniform(1)),
387            ),
388            (
389                "expected_storage_fund_imbalance".to_string(),
390                ThConfig::new(0, 1, KeyType::uniform(1)),
391            ),
392            (
393                "object_per_epoch_marker_table".to_string(),
394                ThConfig::new_with_config_indexing(
395                    KeyIndexing::VariableLength,
396                    mutexes,
397                    epoch_prefix_key,
398                    apply_relocation_filter(
399                        KeySpaceConfig::default(),
400                        pruner_watermark.clone(),
401                        |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
402                        true,
403                    ),
404                ),
405            ),
406            (
407                "object_per_epoch_marker_table_v2".to_string(),
408                ThConfig::new_with_config_indexing(
409                    KeyIndexing::VariableLength,
410                    mutexes,
411                    epoch_prefix_key,
412                    apply_relocation_filter(
413                        bloom_config.clone(),
414                        pruner_watermark.clone(),
415                        |(epoch_id, _): (EpochId, FullObjectKey)| epoch_id,
416                        true,
417                    ),
418                ),
419            ),
420            (
421                "executed_transaction_digests".to_string(),
422                ThConfig::new_with_config_indexing(
423                    // EpochId + (TransactionDigest)
424                    KeyIndexing::fixed(8 + (32 + 8)),
425                    mutexes,
426                    epoch_tx_digest_prefix_key,
427                    apply_relocation_filter(
428                        bloom_config.clone(),
429                        pruner_watermark.clone(),
430                        |(epoch_id, _): (EpochId, TransactionDigest)| epoch_id,
431                        true,
432                    ),
433                ),
434            ),
435        ];
436        Self::open_tables_read_write(
437            Self::path(parent_path),
438            MetricConf::new("perpetual")
439                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
440            configs.into_iter().collect(),
441        )
442    }
443
444    pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
445        Self::get_read_only_handle(
446            Self::path(parent_path),
447            None,
448            None,
449            MetricConf::new("perpetual_readonly"),
450        )
451    }
452
453    // This is used by indexer to find the correct version of dynamic field child object.
454    // We do not store the version of the child object, but because of lamport timestamp,
455    // we know the child must have version number less then or eq to the parent.
456    pub fn find_object_lt_or_eq_version(
457        &self,
458        object_id: ObjectID,
459        version: SequenceNumber,
460    ) -> SuiResult<Option<Object>> {
461        let mut iter = self.objects.reversed_safe_iter_with_bounds(
462            Some(ObjectKey::min_for_id(&object_id)),
463            Some(ObjectKey(object_id, version)),
464        )?;
465        match iter.next() {
466            Some(Ok((key, o))) => self.object(&key, o),
467            Some(Err(e)) => Err(e.into()),
468            None => Ok(None),
469        }
470    }
471
472    fn construct_object(
473        &self,
474        object_key: &ObjectKey,
475        store_object: StoreObjectValue,
476    ) -> Result<Object, SuiError> {
477        try_construct_object(object_key, store_object)
478    }
479
480    // Constructs `sui_types::object::Object` from `StoreObjectWrapper`.
481    // Returns `None` if object was deleted/wrapped
482    pub fn object(
483        &self,
484        object_key: &ObjectKey,
485        store_object: StoreObjectWrapper,
486    ) -> Result<Option<Object>, SuiError> {
487        let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
488            return Ok(None);
489        };
490        Ok(Some(self.construct_object(object_key, *store_object)?))
491    }
492
493    pub fn object_reference(
494        &self,
495        object_key: &ObjectKey,
496        store_object: StoreObjectWrapper,
497    ) -> Result<ObjectRef, SuiError> {
498        let obj_ref = match store_object.migrate().into_inner() {
499            StoreObject::Value(object) => self
500                .construct_object(object_key, *object)?
501                .compute_object_reference(),
502            StoreObject::Deleted => (
503                object_key.0,
504                object_key.1,
505                ObjectDigest::OBJECT_DIGEST_DELETED,
506            ),
507            StoreObject::Wrapped => (
508                object_key.0,
509                object_key.1,
510                ObjectDigest::OBJECT_DIGEST_WRAPPED,
511            ),
512        };
513        Ok(obj_ref)
514    }
515
516    pub fn tombstone_reference(
517        &self,
518        object_key: &ObjectKey,
519        store_object: &StoreObjectWrapper,
520    ) -> Result<Option<ObjectRef>, SuiError> {
521        let obj_ref = match store_object.inner() {
522            StoreObject::Deleted => Some((
523                object_key.0,
524                object_key.1,
525                ObjectDigest::OBJECT_DIGEST_DELETED,
526            )),
527            StoreObject::Wrapped => Some((
528                object_key.0,
529                object_key.1,
530                ObjectDigest::OBJECT_DIGEST_WRAPPED,
531            )),
532            _ => None,
533        };
534        Ok(obj_ref)
535    }
536
537    pub fn get_latest_object_ref_or_tombstone(
538        &self,
539        object_id: ObjectID,
540    ) -> Result<Option<ObjectRef>, SuiError> {
541        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
542            Some(ObjectKey::min_for_id(&object_id)),
543            Some(ObjectKey::max_for_id(&object_id)),
544        )?;
545
546        if let Some(Ok((object_key, value))) = iterator.next()
547            && object_key.0 == object_id
548        {
549            return Ok(Some(self.object_reference(&object_key, value)?));
550        }
551        Ok(None)
552    }
553
554    pub fn get_latest_object_or_tombstone(
555        &self,
556        object_id: ObjectID,
557    ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
558        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
559            Some(ObjectKey::min_for_id(&object_id)),
560            Some(ObjectKey::max_for_id(&object_id)),
561        )?;
562
563        if let Some(Ok((object_key, value))) = iterator.next()
564            && object_key.0 == object_id
565        {
566            return Ok(Some((object_key, value)));
567        }
568        Ok(None)
569    }
570
571    pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
572        Ok(self
573            .epoch_start_configuration
574            .get(&())?
575            .expect("Must have current epoch.")
576            .epoch_start_state()
577            .epoch())
578    }
579
580    pub fn set_epoch_start_configuration(
581        &self,
582        epoch_start_configuration: &EpochStartConfiguration,
583    ) -> SuiResult {
584        let mut wb = self.epoch_start_configuration.batch();
585        wb.insert_batch(
586            &self.epoch_start_configuration,
587            std::iter::once(((), epoch_start_configuration)),
588        )?;
589        wb.write()?;
590        Ok(())
591    }
592
593    pub fn get_highest_pruned_checkpoint(
594        &self,
595    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
596        self.pruned_checkpoint.get(&())
597    }
598
599    pub fn set_highest_pruned_checkpoint(
600        &self,
601        wb: &mut DBBatch,
602        checkpoint_number: CheckpointSequenceNumber,
603    ) -> SuiResult {
604        wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
605        Ok(())
606    }
607
608    pub fn get_transaction(
609        &self,
610        digest: &TransactionDigest,
611    ) -> SuiResult<Option<TrustedTransaction>> {
612        let Some(transaction) = self.transactions.get(digest)? else {
613            return Ok(None);
614        };
615        Ok(Some(transaction))
616    }
617
618    /// Batch insert executed transaction digests for a given epoch.
619    /// Used by formal snapshot restore to backfill transaction digests from the previous epoch.
620    pub fn insert_executed_transaction_digests_batch(
621        &self,
622        epoch: EpochId,
623        digests: impl Iterator<Item = TransactionDigest>,
624    ) -> SuiResult {
625        let mut batch = self.executed_transaction_digests.batch();
626        batch.insert_batch(
627            &self.executed_transaction_digests,
628            digests.map(|digest| ((epoch, digest), ())),
629        )?;
630        batch.write()?;
631        Ok(())
632    }
633
634    pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
635        let Some(effect_digest) = self.executed_effects.get(digest)? else {
636            return Ok(None);
637        };
638        Ok(self.effects.get(&effect_digest)?)
639    }
640
641    pub(crate) fn was_transaction_executed_in_last_epoch(
642        &self,
643        digest: &TransactionDigest,
644        current_epoch: EpochId,
645    ) -> bool {
646        if current_epoch == 0 {
647            return false;
648        }
649        self.executed_transaction_digests
650            .contains_key(&(current_epoch - 1, *digest))
651            .expect("db error")
652    }
653
654    // DEPRECATED as the backing table has been moved to authority_per_epoch_store.
655    // Please do not add new accessors/callsites.
656    pub fn get_checkpoint_sequence_number(
657        &self,
658        digest: &TransactionDigest,
659    ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
660        Ok(self.executed_transactions_to_checkpoint.get(digest)?)
661    }
662
663    pub fn get_newer_object_keys(
664        &self,
665        object: &(ObjectID, SequenceNumber),
666    ) -> SuiResult<Vec<ObjectKey>> {
667        let mut objects = vec![];
668        for result in self.objects.safe_iter_with_bounds(
669            Some(ObjectKey(object.0, object.1.next())),
670            Some(ObjectKey(object.0, VersionNumber::MAX)),
671        ) {
672            let (key, _) = result?;
673            objects.push(key);
674        }
675        Ok(objects)
676    }
677
678    pub fn set_highest_pruned_checkpoint_without_wb(
679        &self,
680        checkpoint_number: CheckpointSequenceNumber,
681    ) -> SuiResult {
682        let mut wb = self.pruned_checkpoint.batch();
683        self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
684        wb.write()?;
685        Ok(())
686    }
687
688    pub fn database_is_empty(&self) -> SuiResult<bool> {
689        Ok(self.objects.safe_iter().next().is_none())
690    }
691
692    pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
693        LiveSetIter {
694            iter: Box::new(self.objects.safe_iter()),
695            tables: self,
696            prev: None,
697            include_wrapped_object,
698        }
699    }
700
701    pub fn range_iter_live_object_set(
702        &self,
703        lower_bound: Option<ObjectID>,
704        upper_bound: Option<ObjectID>,
705        include_wrapped_object: bool,
706    ) -> LiveSetIter<'_> {
707        let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
708        let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
709
710        LiveSetIter {
711            iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
712            tables: self,
713            prev: None,
714            include_wrapped_object,
715        }
716    }
717
718    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
719        // This checkpoints the entire db and not just objects table
720        self.objects.checkpoint_db(path).map_err(Into::into)
721    }
722
723    pub fn get_root_state_hash(
724        &self,
725        epoch: EpochId,
726    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
727        Ok(self.root_state_hash_by_epoch.get(&epoch)?)
728    }
729
730    pub fn insert_root_state_hash(
731        &self,
732        epoch: EpochId,
733        last_checkpoint_of_epoch: CheckpointSequenceNumber,
734        hash: GlobalStateHash,
735    ) -> SuiResult {
736        self.root_state_hash_by_epoch
737            .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
738        Ok(())
739    }
740
741    pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
742        let object_reference = object.compute_object_reference();
743        let wrapper = get_store_object(object);
744        let mut wb = self.objects.batch();
745        wb.insert_batch(
746            &self.objects,
747            std::iter::once((ObjectKey::from(object_reference), wrapper)),
748        )?;
749        wb.write()?;
750        Ok(())
751    }
752
753    // fallible get object methods for sui-tool, which may need to attempt to read a corrupted database
754    pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
755        let obj_entry = self
756            .objects
757            .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
758            .next();
759
760        match obj_entry.transpose()? {
761            Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
762                Ok(self.object(&ObjectKey(obj_id, version), obj)?)
763            }
764            _ => Ok(None),
765        }
766    }
767
768    pub fn get_object_by_key_fallible(
769        &self,
770        object_id: &ObjectID,
771        version: VersionNumber,
772    ) -> SuiResult<Option<Object>> {
773        Ok(self
774            .objects
775            .get(&ObjectKey(*object_id, version))?
776            .and_then(|object| {
777                self.object(&ObjectKey(*object_id, version), object)
778                    .expect("object construction error")
779            }))
780    }
781}
782
783impl ObjectStore for AuthorityPerpetualTables {
784    /// Read an object and return it, or Ok(None) if the object was not found.
785    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
786        self.get_object_fallible(object_id).expect("db error")
787    }
788
789    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
790        self.get_object_by_key_fallible(object_id, version)
791            .expect("db error")
792    }
793}
794
795pub struct LiveSetIter<'a> {
796    iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
797    tables: &'a AuthorityPerpetualTables,
798    prev: Option<(ObjectKey, StoreObjectWrapper)>,
799    /// Whether a wrapped object is considered as a live object.
800    include_wrapped_object: bool,
801}
802
803#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
804pub enum LiveObject {
805    Normal(Object),
806    Wrapped(ObjectKey),
807}
808
809impl LiveObject {
810    pub fn object_id(&self) -> ObjectID {
811        match self {
812            LiveObject::Normal(obj) => obj.id(),
813            LiveObject::Wrapped(key) => key.0,
814        }
815    }
816
817    pub fn version(&self) -> SequenceNumber {
818        match self {
819            LiveObject::Normal(obj) => obj.version(),
820            LiveObject::Wrapped(key) => key.1,
821        }
822    }
823
824    pub fn object_reference(&self) -> ObjectRef {
825        match self {
826            LiveObject::Normal(obj) => obj.compute_object_reference(),
827            LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
828        }
829    }
830
831    pub fn to_normal(self) -> Option<Object> {
832        match self {
833            LiveObject::Normal(object) => Some(object),
834            LiveObject::Wrapped(_) => None,
835        }
836    }
837}
838
839impl LiveSetIter<'_> {
840    fn store_object_wrapper_to_live_object(
841        &self,
842        object_key: ObjectKey,
843        store_object: StoreObjectWrapper,
844    ) -> Option<LiveObject> {
845        match store_object.migrate().into_inner() {
846            StoreObject::Value(object) => {
847                let object = self
848                    .tables
849                    .construct_object(&object_key, *object)
850                    .expect("Constructing object from store cannot fail");
851                Some(LiveObject::Normal(object))
852            }
853            StoreObject::Wrapped => {
854                if self.include_wrapped_object {
855                    Some(LiveObject::Wrapped(object_key))
856                } else {
857                    None
858                }
859            }
860            StoreObject::Deleted => None,
861        }
862    }
863}
864
865impl Iterator for LiveSetIter<'_> {
866    type Item = LiveObject;
867
868    fn next(&mut self) -> Option<Self::Item> {
869        loop {
870            if let Some(Ok((next_key, next_value))) = self.iter.next() {
871                let prev = self.prev.take();
872                self.prev = Some((next_key, next_value));
873
874                if let Some((prev_key, prev_value)) = prev
875                    && prev_key.0 != next_key.0
876                {
877                    let live_object =
878                        self.store_object_wrapper_to_live_object(prev_key, prev_value);
879                    if live_object.is_some() {
880                        return live_object;
881                    }
882                }
883                continue;
884            }
885            if let Some((key, value)) = self.prev.take() {
886                let live_object = self.store_object_wrapper_to_live_object(key, value);
887                if live_object.is_some() {
888                    return live_object;
889                }
890            }
891            return None;
892        }
893    }
894}
895
896// These functions are used to initialize the DB tables
897fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
898    DBOptions {
899        options: db_options
900            .clone()
901            .optimize_for_write_throughput()
902            .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
903            .options,
904        rw_options: db_options.rw_options.set_ignore_range_deletions(false),
905    }
906}
907
908fn objects_table_config(
909    mut db_options: DBOptions,
910    compaction_filter: Option<ObjectsCompactionFilter>,
911) -> DBOptions {
912    if let Some(mut compaction_filter) = compaction_filter {
913        db_options
914            .options
915            .set_compaction_filter("objects", move |_, key, value| {
916                match compaction_filter.filter(key, value) {
917                    Ok(decision) => decision,
918                    Err(err) => {
919                        error!("Compaction error: {:?}", err);
920                        Decision::Keep
921                    }
922                }
923            });
924    }
925    db_options
926        .optimize_for_write_throughput()
927        .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
928}
929
930fn transactions_table_config(db_options: DBOptions) -> DBOptions {
931    db_options
932        .optimize_for_write_throughput()
933        .optimize_for_point_lookup(
934            read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
935        )
936}
937
938fn effects_table_config(db_options: DBOptions) -> DBOptions {
939    db_options
940        .optimize_for_write_throughput()
941        .optimize_for_point_lookup(
942            read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
943        )
944}