sui_core/authority/
authority_store_tables.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6use serde::{Deserialize, Serialize};
7use std::path::Path;
8use std::sync::atomic::AtomicU64;
9use sui_types::base_types::SequenceNumber;
10use sui_types::digests::TransactionEventsDigest;
11use sui_types::effects::{TransactionEffects, TransactionEvents};
12use sui_types::global_state_hash::GlobalStateHash;
13use sui_types::storage::{FullObjectKey, MarkerValue};
14use tracing::error;
15use typed_store::metrics::SamplingInterval;
16use typed_store::rocks::{
17    DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
18    read_size_from_env,
19};
20use typed_store::traits::Map;
21
22use crate::authority::authority_store_pruner::ObjectsCompactionFilter;
23use crate::authority::authority_store_types::{
24    StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
25};
26use crate::authority::epoch_start_configuration::EpochStartConfiguration;
27use typed_store::rocksdb::compaction_filter::Decision;
28use typed_store::{DBMapUtils, DbIterator};
29
30const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
31pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
32const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
33const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
34
35/// Options to apply to every column family of the `perpetual` DB.
36#[derive(Default)]
37pub struct AuthorityPerpetualTablesOptions {
38    /// Whether to enable write stalling on all column families.
39    pub enable_write_stall: bool,
40    pub compaction_filter: Option<ObjectsCompactionFilter>,
41}
42
43impl AuthorityPerpetualTablesOptions {
44    fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
45        if !self.enable_write_stall {
46            db_options = db_options.disable_write_throttling();
47        }
48        db_options
49    }
50}
51
52/// AuthorityPerpetualTables contains data that must be preserved from one epoch to the next.
53#[derive(DBMapUtils)]
54#[cfg_attr(tidehunter, tidehunter)]
55pub struct AuthorityPerpetualTables {
56    /// This is a map between the object (ID, version) and the latest state of the object, namely the
57    /// state that is needed to process new transactions.
58    /// State is represented by `StoreObject` enum, which is either a move module or a move object.
59    ///
60    /// Note that while this map can store all versions of an object, we will eventually
61    /// prune old object versions from the db.
62    ///
63    /// IMPORTANT: object versions must *only* be pruned if they appear as inputs in some
64    /// TransactionEffects. Simply pruning all objects but the most recent is an error!
65    /// This is because there can be partially executed transactions whose effects have not yet
66    /// been written out, and which must be retried. But, they cannot be retried unless their input
67    /// objects are still accessible!
68    pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
69
70    /// This is a map between object references of currently active objects that can be mutated.
71    ///
72    /// For old epochs, it may also contain the transaction that they are lock on for use by this
73    /// specific validator. The transaction locks themselves are now in AuthorityPerEpochStore.
74    #[rename = "owned_object_transaction_locks"]
75    pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
76
77    /// This is a map between the transaction digest and the corresponding transaction that's known to be
78    /// executable. This means that it may have been executed locally, or it may have been synced through
79    /// state-sync but hasn't been executed yet.
80    pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
81
82    /// A map between the transaction digest of a certificate to the effects of its execution.
83    /// We store effects into this table in two different cases:
84    /// 1. When a transaction is synced through state_sync, we store the effects here. These effects
85    ///    are known to be final in the network, but may not have been executed locally yet.
86    /// 2. When the transaction is executed locally on this node, we store the effects here. This means that
87    ///    it's possible to store the same effects twice (once for the synced transaction, and once for the executed).
88    ///
89    /// It's also possible for the effects to be reverted if the transaction didn't make it into the epoch.
90    pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
91
92    /// Transactions that have been executed locally on this node. We need this table since the `effects` table
93    /// doesn't say anything about the execution status of the transaction on this node. When we wait for transactions
94    /// to be executed, we wait for them to appear in this table. When we revert transactions, we remove them from both
95    /// tables.
96    pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
97
98    #[allow(dead_code)]
99    #[deprecated]
100    events: DBMap<(TransactionEventsDigest, usize), Event>,
101
102    // Events keyed by the digest of the transaction that produced them.
103    pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
104
105    // Loaded (and unchanged) runtime object references.
106    pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
107
108    /// DEPRECATED in favor of the table of the same name in authority_per_epoch_store.
109    /// Please do not add new accessors/callsites.
110    /// When transaction is executed via checkpoint executor, we store association here
111    pub(crate) executed_transactions_to_checkpoint:
112        DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
113
114    // Finalized root state hash for epoch, to be included in CheckpointSummary
115    // of last checkpoint of epoch. These values should only ever be written once
116    // and never changed
117    pub(crate) root_state_hash_by_epoch:
118        DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
119
120    /// Parameters of the system fixed at the epoch start
121    pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
122
123    /// A singleton table that stores latest pruned checkpoint. Used to keep objects pruner progress
124    pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
125
126    /// Expected total amount of SUI in the network. This is expected to remain constant
127    /// throughout the lifetime of the network. We check it at the end of each epoch if
128    /// expensive checks are enabled. We cannot use 10B today because in tests we often
129    /// inject extra gas objects into genesis.
130    pub(crate) expected_network_sui_amount: DBMap<(), u64>,
131
132    /// Expected imbalance between storage fund balance and the sum of storage rebate of all live objects.
133    /// This could be non-zero due to bugs in earlier protocol versions.
134    /// This number is the result of storage_fund_balance - sum(storage_rebate).
135    pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
136
137    /// Table that stores the set of received objects and deleted objects and the version at
138    /// which they were received. This is used to prevent possible race conditions around receiving
139    /// objects (since they are not locked by the transaction manager) and for tracking shared
140    /// objects that have been deleted. This table is meant to be pruned per-epoch, and all
141    /// previous epochs other than the current epoch may be pruned safely.
142    pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
143    pub(crate) object_per_epoch_marker_table_v2: DBMap<(EpochId, FullObjectKey), MarkerValue>,
144}
145
146#[derive(DBMapUtils)]
147pub struct AuthorityPrunerTables {
148    pub(crate) object_tombstones: DBMap<ObjectID, SequenceNumber>,
149}
150
151impl AuthorityPrunerTables {
152    pub fn path(parent_path: &Path) -> PathBuf {
153        parent_path.join("pruner")
154    }
155
156    pub fn open(parent_path: &Path) -> Self {
157        Self::open_tables_read_write(
158            Self::path(parent_path),
159            MetricConf::new("pruner")
160                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
161            None,
162            None,
163        )
164    }
165}
166
167impl AuthorityPerpetualTables {
168    pub fn path(parent_path: &Path) -> PathBuf {
169        parent_path.join("perpetual")
170    }
171
172    #[cfg(not(tidehunter))]
173    pub fn open(
174        parent_path: &Path,
175        db_options_override: Option<AuthorityPerpetualTablesOptions>,
176        _pruner_watermark: Option<Arc<AtomicU64>>,
177    ) -> Self {
178        let db_options_override = db_options_override.unwrap_or_default();
179        let db_options =
180            db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
181        let table_options = DBMapTableConfigMap::new(BTreeMap::from([
182            (
183                "objects".to_string(),
184                objects_table_config(db_options.clone(), db_options_override.compaction_filter),
185            ),
186            (
187                "owned_object_transaction_locks".to_string(),
188                owned_object_transaction_locks_table_config(db_options.clone()),
189            ),
190            (
191                "transactions".to_string(),
192                transactions_table_config(db_options.clone()),
193            ),
194            (
195                "effects".to_string(),
196                effects_table_config(db_options.clone()),
197            ),
198        ]));
199
200        Self::open_tables_read_write(
201            Self::path(parent_path),
202            MetricConf::new("perpetual")
203                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
204            Some(db_options.options),
205            Some(table_options),
206        )
207    }
208
209    #[cfg(tidehunter)]
210    pub fn open(
211        parent_path: &Path,
212        _: Option<AuthorityPerpetualTablesOptions>,
213        pruner_watermark: Option<Arc<AtomicU64>>,
214    ) -> Self {
215        use crate::authority::authority_store_pruner::apply_relocation_filter;
216        tracing::warn!("AuthorityPerpetualTables using tidehunter");
217        use typed_store::tidehunter_util::{
218            Bytes, Decision, IndexWalPosition, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
219            default_cells_per_mutex, default_mutex_count, default_value_cache_size,
220        };
221        let mutexes = default_mutex_count() * 2;
222        let value_cache_size = default_value_cache_size();
223        // effectively disables pruning if not set
224        let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
225
226        let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
227        let objects_compactor = |index: &mut BTreeMap<Bytes, IndexWalPosition>| {
228            let mut retain = HashSet::new();
229            let mut previous: Option<&[u8]> = None;
230            const OID_SIZE: usize = 16;
231            for (key, _) in index.iter().rev() {
232                if let Some(prev) = previous {
233                    if prev == &key[..OID_SIZE] {
234                        continue;
235                    }
236                }
237                previous = Some(&key[..OID_SIZE]);
238                retain.insert(key.clone());
239            }
240            index.retain(|k, _| retain.contains(k));
241        };
242        let mut digest_prefix = vec![0; 8];
243        digest_prefix[7] = 32;
244        let uniform_key = KeyType::uniform(default_cells_per_mutex());
245        let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
246        let object_indexing = KeyIndexing::key_reduction(32 + 8, 16..(32 + 8));
247        // todo can figure way to scramble off 8 bytes in the middle
248        let obj_ref_size = 32 + 8 + 32 + 8;
249        let owned_object_transaction_locks_indexing =
250            KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
251
252        let configs = vec![
253            (
254                "objects".to_string(),
255                ThConfig::new_with_config_indexing(
256                    object_indexing,
257                    mutexes,
258                    KeyType::uniform(default_cells_per_mutex() * 4),
259                    KeySpaceConfig::new()
260                        .with_unloaded_iterator(true)
261                        .with_max_dirty_keys(4048)
262                        .with_compactor(Box::new(objects_compactor)),
263                ),
264            ),
265            (
266                "owned_object_transaction_locks".to_string(),
267                ThConfig::new_with_config_indexing(
268                    owned_object_transaction_locks_indexing,
269                    mutexes,
270                    KeyType::uniform(default_cells_per_mutex() * 4),
271                    bloom_config.clone().with_max_dirty_keys(4048),
272                ),
273            ),
274            (
275                "transactions".to_string(),
276                ThConfig::new_with_rm_prefix_indexing(
277                    KeyIndexing::key_reduction(32, 0..16),
278                    mutexes,
279                    uniform_key,
280                    KeySpaceConfig::new()
281                        .with_value_cache_size(value_cache_size)
282                        .with_relocation_filter(|_, _| Decision::Remove),
283                    digest_prefix.clone(),
284                ),
285            ),
286            (
287                "effects".to_string(),
288                ThConfig::new_with_rm_prefix_indexing(
289                    KeyIndexing::key_reduction(32, 0..16),
290                    mutexes,
291                    uniform_key,
292                    apply_relocation_filter(
293                        bloom_config.clone().with_value_cache_size(value_cache_size),
294                        pruner_watermark.clone(),
295                        |effects: TransactionEffects| effects.executed_epoch(),
296                        false,
297                    ),
298                    digest_prefix.clone(),
299                ),
300            ),
301            (
302                "executed_effects".to_string(),
303                ThConfig::new_with_rm_prefix_indexing(
304                    KeyIndexing::key_reduction(32, 0..16),
305                    mutexes,
306                    uniform_key,
307                    bloom_config
308                        .clone()
309                        .with_value_cache_size(value_cache_size)
310                        .with_relocation_filter(|_, _| Decision::Remove),
311                    digest_prefix.clone(),
312                ),
313            ),
314            (
315                "events".to_string(),
316                ThConfig::new_with_rm_prefix(
317                    32 + 8,
318                    mutexes,
319                    uniform_key,
320                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
321                    digest_prefix.clone(),
322                ),
323            ),
324            (
325                "events_2".to_string(),
326                ThConfig::new_with_rm_prefix(
327                    32,
328                    mutexes,
329                    uniform_key,
330                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
331                    digest_prefix.clone(),
332                ),
333            ),
334            (
335                "unchanged_loaded_runtime_objects".to_string(),
336                ThConfig::new_with_rm_prefix(
337                    32,
338                    mutexes,
339                    uniform_key,
340                    KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
341                    digest_prefix.clone(),
342                ),
343            ),
344            (
345                "executed_transactions_to_checkpoint".to_string(),
346                ThConfig::new_with_rm_prefix(
347                    32,
348                    mutexes,
349                    uniform_key,
350                    apply_relocation_filter(
351                        KeySpaceConfig::default(),
352                        pruner_watermark.clone(),
353                        |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
354                        false,
355                    ),
356                    digest_prefix.clone(),
357                ),
358            ),
359            (
360                "root_state_hash_by_epoch".to_string(),
361                ThConfig::new(8, 1, KeyType::uniform(1)),
362            ),
363            (
364                "epoch_start_configuration".to_string(),
365                ThConfig::new(0, 1, KeyType::uniform(1)),
366            ),
367            (
368                "pruned_checkpoint".to_string(),
369                ThConfig::new(0, 1, KeyType::uniform(1)),
370            ),
371            (
372                "expected_network_sui_amount".to_string(),
373                ThConfig::new(0, 1, KeyType::uniform(1)),
374            ),
375            (
376                "expected_storage_fund_imbalance".to_string(),
377                ThConfig::new(0, 1, KeyType::uniform(1)),
378            ),
379            (
380                "object_per_epoch_marker_table".to_string(),
381                ThConfig::new_with_config_indexing(
382                    KeyIndexing::VariableLength,
383                    mutexes,
384                    epoch_prefix_key,
385                    apply_relocation_filter(
386                        KeySpaceConfig::default(),
387                        pruner_watermark.clone(),
388                        |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
389                        true,
390                    ),
391                ),
392            ),
393            (
394                "object_per_epoch_marker_table_v2".to_string(),
395                ThConfig::new_with_config_indexing(
396                    KeyIndexing::VariableLength,
397                    mutexes,
398                    epoch_prefix_key,
399                    apply_relocation_filter(
400                        bloom_config.clone(),
401                        pruner_watermark.clone(),
402                        |(epoch_id, _): (EpochId, FullObjectKey)| epoch_id,
403                        true,
404                    ),
405                ),
406            ),
407        ];
408        Self::open_tables_read_write(
409            Self::path(parent_path),
410            MetricConf::new("perpetual")
411                .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
412            configs.into_iter().collect(),
413        )
414    }
415
416    pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
417        Self::get_read_only_handle(
418            Self::path(parent_path),
419            None,
420            None,
421            MetricConf::new("perpetual_readonly"),
422        )
423    }
424
425    // This is used by indexer to find the correct version of dynamic field child object.
426    // We do not store the version of the child object, but because of lamport timestamp,
427    // we know the child must have version number less then or eq to the parent.
428    pub fn find_object_lt_or_eq_version(
429        &self,
430        object_id: ObjectID,
431        version: SequenceNumber,
432    ) -> SuiResult<Option<Object>> {
433        let mut iter = self.objects.reversed_safe_iter_with_bounds(
434            Some(ObjectKey::min_for_id(&object_id)),
435            Some(ObjectKey(object_id, version)),
436        )?;
437        match iter.next() {
438            Some(Ok((key, o))) => self.object(&key, o),
439            Some(Err(e)) => Err(e.into()),
440            None => Ok(None),
441        }
442    }
443
444    fn construct_object(
445        &self,
446        object_key: &ObjectKey,
447        store_object: StoreObjectValue,
448    ) -> Result<Object, SuiError> {
449        try_construct_object(object_key, store_object)
450    }
451
452    // Constructs `sui_types::object::Object` from `StoreObjectWrapper`.
453    // Returns `None` if object was deleted/wrapped
454    pub fn object(
455        &self,
456        object_key: &ObjectKey,
457        store_object: StoreObjectWrapper,
458    ) -> Result<Option<Object>, SuiError> {
459        let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
460            return Ok(None);
461        };
462        Ok(Some(self.construct_object(object_key, *store_object)?))
463    }
464
465    pub fn object_reference(
466        &self,
467        object_key: &ObjectKey,
468        store_object: StoreObjectWrapper,
469    ) -> Result<ObjectRef, SuiError> {
470        let obj_ref = match store_object.migrate().into_inner() {
471            StoreObject::Value(object) => self
472                .construct_object(object_key, *object)?
473                .compute_object_reference(),
474            StoreObject::Deleted => (
475                object_key.0,
476                object_key.1,
477                ObjectDigest::OBJECT_DIGEST_DELETED,
478            ),
479            StoreObject::Wrapped => (
480                object_key.0,
481                object_key.1,
482                ObjectDigest::OBJECT_DIGEST_WRAPPED,
483            ),
484        };
485        Ok(obj_ref)
486    }
487
488    pub fn tombstone_reference(
489        &self,
490        object_key: &ObjectKey,
491        store_object: &StoreObjectWrapper,
492    ) -> Result<Option<ObjectRef>, SuiError> {
493        let obj_ref = match store_object.inner() {
494            StoreObject::Deleted => Some((
495                object_key.0,
496                object_key.1,
497                ObjectDigest::OBJECT_DIGEST_DELETED,
498            )),
499            StoreObject::Wrapped => Some((
500                object_key.0,
501                object_key.1,
502                ObjectDigest::OBJECT_DIGEST_WRAPPED,
503            )),
504            _ => None,
505        };
506        Ok(obj_ref)
507    }
508
509    pub fn get_latest_object_ref_or_tombstone(
510        &self,
511        object_id: ObjectID,
512    ) -> Result<Option<ObjectRef>, SuiError> {
513        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
514            Some(ObjectKey::min_for_id(&object_id)),
515            Some(ObjectKey::max_for_id(&object_id)),
516        )?;
517
518        if let Some(Ok((object_key, value))) = iterator.next()
519            && object_key.0 == object_id
520        {
521            return Ok(Some(self.object_reference(&object_key, value)?));
522        }
523        Ok(None)
524    }
525
526    pub fn get_latest_object_or_tombstone(
527        &self,
528        object_id: ObjectID,
529    ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
530        let mut iterator = self.objects.reversed_safe_iter_with_bounds(
531            Some(ObjectKey::min_for_id(&object_id)),
532            Some(ObjectKey::max_for_id(&object_id)),
533        )?;
534
535        if let Some(Ok((object_key, value))) = iterator.next()
536            && object_key.0 == object_id
537        {
538            return Ok(Some((object_key, value)));
539        }
540        Ok(None)
541    }
542
543    pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
544        Ok(self
545            .epoch_start_configuration
546            .get(&())?
547            .expect("Must have current epoch.")
548            .epoch_start_state()
549            .epoch())
550    }
551
552    pub fn set_epoch_start_configuration(
553        &self,
554        epoch_start_configuration: &EpochStartConfiguration,
555    ) -> SuiResult {
556        let mut wb = self.epoch_start_configuration.batch();
557        wb.insert_batch(
558            &self.epoch_start_configuration,
559            std::iter::once(((), epoch_start_configuration)),
560        )?;
561        wb.write()?;
562        Ok(())
563    }
564
565    pub fn get_highest_pruned_checkpoint(
566        &self,
567    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
568        self.pruned_checkpoint.get(&())
569    }
570
571    pub fn set_highest_pruned_checkpoint(
572        &self,
573        wb: &mut DBBatch,
574        checkpoint_number: CheckpointSequenceNumber,
575    ) -> SuiResult {
576        wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
577        Ok(())
578    }
579
580    pub fn get_transaction(
581        &self,
582        digest: &TransactionDigest,
583    ) -> SuiResult<Option<TrustedTransaction>> {
584        let Some(transaction) = self.transactions.get(digest)? else {
585            return Ok(None);
586        };
587        Ok(Some(transaction))
588    }
589
590    pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
591        let Some(effect_digest) = self.executed_effects.get(digest)? else {
592            return Ok(None);
593        };
594        Ok(self.effects.get(&effect_digest)?)
595    }
596
597    // DEPRECATED as the backing table has been moved to authority_per_epoch_store.
598    // Please do not add new accessors/callsites.
599    pub fn get_checkpoint_sequence_number(
600        &self,
601        digest: &TransactionDigest,
602    ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
603        Ok(self.executed_transactions_to_checkpoint.get(digest)?)
604    }
605
606    pub fn get_newer_object_keys(
607        &self,
608        object: &(ObjectID, SequenceNumber),
609    ) -> SuiResult<Vec<ObjectKey>> {
610        let mut objects = vec![];
611        for result in self.objects.safe_iter_with_bounds(
612            Some(ObjectKey(object.0, object.1.next())),
613            Some(ObjectKey(object.0, VersionNumber::MAX)),
614        ) {
615            let (key, _) = result?;
616            objects.push(key);
617        }
618        Ok(objects)
619    }
620
621    pub fn set_highest_pruned_checkpoint_without_wb(
622        &self,
623        checkpoint_number: CheckpointSequenceNumber,
624    ) -> SuiResult {
625        let mut wb = self.pruned_checkpoint.batch();
626        self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
627        wb.write()?;
628        Ok(())
629    }
630
631    pub fn database_is_empty(&self) -> SuiResult<bool> {
632        Ok(self.objects.safe_iter().next().is_none())
633    }
634
635    pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
636        LiveSetIter {
637            iter: Box::new(self.objects.safe_iter()),
638            tables: self,
639            prev: None,
640            include_wrapped_object,
641        }
642    }
643
644    pub fn range_iter_live_object_set(
645        &self,
646        lower_bound: Option<ObjectID>,
647        upper_bound: Option<ObjectID>,
648        include_wrapped_object: bool,
649    ) -> LiveSetIter<'_> {
650        let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
651        let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
652
653        LiveSetIter {
654            iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
655            tables: self,
656            prev: None,
657            include_wrapped_object,
658        }
659    }
660
661    pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
662        // This checkpoints the entire db and not just objects table
663        self.objects.checkpoint_db(path).map_err(Into::into)
664    }
665
666    pub fn get_root_state_hash(
667        &self,
668        epoch: EpochId,
669    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
670        Ok(self.root_state_hash_by_epoch.get(&epoch)?)
671    }
672
673    pub fn insert_root_state_hash(
674        &self,
675        epoch: EpochId,
676        last_checkpoint_of_epoch: CheckpointSequenceNumber,
677        hash: GlobalStateHash,
678    ) -> SuiResult {
679        self.root_state_hash_by_epoch
680            .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
681        Ok(())
682    }
683
684    pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
685        let object_reference = object.compute_object_reference();
686        let wrapper = get_store_object(object);
687        let mut wb = self.objects.batch();
688        wb.insert_batch(
689            &self.objects,
690            std::iter::once((ObjectKey::from(object_reference), wrapper)),
691        )?;
692        wb.write()?;
693        Ok(())
694    }
695
696    // fallible get object methods for sui-tool, which may need to attempt to read a corrupted database
697    pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
698        let obj_entry = self
699            .objects
700            .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
701            .next();
702
703        match obj_entry.transpose()? {
704            Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
705                Ok(self.object(&ObjectKey(obj_id, version), obj)?)
706            }
707            _ => Ok(None),
708        }
709    }
710
711    pub fn get_object_by_key_fallible(
712        &self,
713        object_id: &ObjectID,
714        version: VersionNumber,
715    ) -> SuiResult<Option<Object>> {
716        Ok(self
717            .objects
718            .get(&ObjectKey(*object_id, version))?
719            .and_then(|object| {
720                self.object(&ObjectKey(*object_id, version), object)
721                    .expect("object construction error")
722            }))
723    }
724}
725
726impl ObjectStore for AuthorityPerpetualTables {
727    /// Read an object and return it, or Ok(None) if the object was not found.
728    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
729        self.get_object_fallible(object_id).expect("db error")
730    }
731
732    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
733        self.get_object_by_key_fallible(object_id, version)
734            .expect("db error")
735    }
736}
737
738pub struct LiveSetIter<'a> {
739    iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
740    tables: &'a AuthorityPerpetualTables,
741    prev: Option<(ObjectKey, StoreObjectWrapper)>,
742    /// Whether a wrapped object is considered as a live object.
743    include_wrapped_object: bool,
744}
745
746#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
747pub enum LiveObject {
748    Normal(Object),
749    Wrapped(ObjectKey),
750}
751
752impl LiveObject {
753    pub fn object_id(&self) -> ObjectID {
754        match self {
755            LiveObject::Normal(obj) => obj.id(),
756            LiveObject::Wrapped(key) => key.0,
757        }
758    }
759
760    pub fn version(&self) -> SequenceNumber {
761        match self {
762            LiveObject::Normal(obj) => obj.version(),
763            LiveObject::Wrapped(key) => key.1,
764        }
765    }
766
767    pub fn object_reference(&self) -> ObjectRef {
768        match self {
769            LiveObject::Normal(obj) => obj.compute_object_reference(),
770            LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
771        }
772    }
773
774    pub fn to_normal(self) -> Option<Object> {
775        match self {
776            LiveObject::Normal(object) => Some(object),
777            LiveObject::Wrapped(_) => None,
778        }
779    }
780}
781
782impl LiveSetIter<'_> {
783    fn store_object_wrapper_to_live_object(
784        &self,
785        object_key: ObjectKey,
786        store_object: StoreObjectWrapper,
787    ) -> Option<LiveObject> {
788        match store_object.migrate().into_inner() {
789            StoreObject::Value(object) => {
790                let object = self
791                    .tables
792                    .construct_object(&object_key, *object)
793                    .expect("Constructing object from store cannot fail");
794                Some(LiveObject::Normal(object))
795            }
796            StoreObject::Wrapped => {
797                if self.include_wrapped_object {
798                    Some(LiveObject::Wrapped(object_key))
799                } else {
800                    None
801                }
802            }
803            StoreObject::Deleted => None,
804        }
805    }
806}
807
808impl Iterator for LiveSetIter<'_> {
809    type Item = LiveObject;
810
811    fn next(&mut self) -> Option<Self::Item> {
812        loop {
813            if let Some(Ok((next_key, next_value))) = self.iter.next() {
814                let prev = self.prev.take();
815                self.prev = Some((next_key, next_value));
816
817                if let Some((prev_key, prev_value)) = prev
818                    && prev_key.0 != next_key.0
819                {
820                    let live_object =
821                        self.store_object_wrapper_to_live_object(prev_key, prev_value);
822                    if live_object.is_some() {
823                        return live_object;
824                    }
825                }
826                continue;
827            }
828            if let Some((key, value)) = self.prev.take() {
829                let live_object = self.store_object_wrapper_to_live_object(key, value);
830                if live_object.is_some() {
831                    return live_object;
832                }
833            }
834            return None;
835        }
836    }
837}
838
839// These functions are used to initialize the DB tables
840fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
841    DBOptions {
842        options: db_options
843            .clone()
844            .optimize_for_write_throughput()
845            .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
846            .options,
847        rw_options: db_options.rw_options.set_ignore_range_deletions(false),
848    }
849}
850
851fn objects_table_config(
852    mut db_options: DBOptions,
853    compaction_filter: Option<ObjectsCompactionFilter>,
854) -> DBOptions {
855    if let Some(mut compaction_filter) = compaction_filter {
856        db_options
857            .options
858            .set_compaction_filter("objects", move |_, key, value| {
859                match compaction_filter.filter(key, value) {
860                    Ok(decision) => decision,
861                    Err(err) => {
862                        error!("Compaction error: {:?}", err);
863                        Decision::Keep
864                    }
865                }
866            });
867    }
868    db_options
869        .optimize_for_write_throughput()
870        .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
871}
872
873fn transactions_table_config(db_options: DBOptions) -> DBOptions {
874    db_options
875        .optimize_for_write_throughput()
876        .optimize_for_point_lookup(
877            read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
878        )
879}
880
881fn effects_table_config(db_options: DBOptions) -> DBOptions {
882    db_options
883        .optimize_for_write_throughput()
884        .optimize_for_point_lookup(
885            read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
886        )
887}