sui_core/authority/
authority_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::{iter, mem, thread};
6
7use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
8use crate::authority::authority_store_pruner::{
9    AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
10};
11use crate::authority::authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object};
12use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
13use crate::global_state_hasher::GlobalStateHashStore;
14use crate::rpc_index::RpcIndexStore;
15use crate::transaction_outputs::TransactionOutputs;
16use either::Either;
17use fastcrypto::hash::{HashFunction, MultisetHash, Sha3_256};
18use futures::stream::FuturesUnordered;
19use itertools::izip;
20use move_core_types::resolver::ModuleResolver;
21use serde::{Deserialize, Serialize};
22use sui_config::node::AuthorityStorePruningConfig;
23use sui_macros::fail_point_arg;
24use sui_storage::mutex_table::{MutexGuard, MutexTable};
25use sui_types::error::{SuiErrorKind, UserInputError};
26use sui_types::execution::TypeLayoutStore;
27use sui_types::global_state_hash::GlobalStateHash;
28use sui_types::message_envelope::Message;
29use sui_types::storage::{
30    BackingPackageStore, FullObjectKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore,
31    get_module,
32};
33use sui_types::sui_system_state::get_sui_system_state;
34use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure};
35use tokio::time::Instant;
36use tracing::{debug, info, trace};
37use typed_store::traits::Map;
38use typed_store::{
39    TypedStoreError,
40    rocks::{DBBatch, DBMap},
41};
42
43use super::authority_store_tables::LiveObject;
44use super::{authority_store_tables::AuthorityPerpetualTables, *};
45use mysten_common::sync::notify_read::NotifyRead;
46use sui_types::effects::{TransactionEffects, TransactionEvents};
47use sui_types::gas_coin::TOTAL_SUPPLY_MIST;
48
49const NUM_SHARDS: usize = 4096;
50
51struct AuthorityStoreMetrics {
52    sui_conservation_check_latency: IntGauge,
53    sui_conservation_live_object_count: IntGauge,
54    sui_conservation_live_object_size: IntGauge,
55    sui_conservation_imbalance: IntGauge,
56    sui_conservation_storage_fund: IntGauge,
57    sui_conservation_storage_fund_imbalance: IntGauge,
58    epoch_flags: IntGaugeVec,
59}
60
61impl AuthorityStoreMetrics {
62    pub fn new(registry: &Registry) -> Self {
63        Self {
64            sui_conservation_check_latency: register_int_gauge_with_registry!(
65                "sui_conservation_check_latency",
66                "Number of seconds took to scan all live objects in the store for SUI conservation check",
67                registry,
68            ).unwrap(),
69            sui_conservation_live_object_count: register_int_gauge_with_registry!(
70                "sui_conservation_live_object_count",
71                "Number of live objects in the store",
72                registry,
73            ).unwrap(),
74            sui_conservation_live_object_size: register_int_gauge_with_registry!(
75                "sui_conservation_live_object_size",
76                "Size in bytes of live objects in the store",
77                registry,
78            ).unwrap(),
79            sui_conservation_imbalance: register_int_gauge_with_registry!(
80                "sui_conservation_imbalance",
81                "Total amount of SUI in the network - 10B * 10^9. This delta shows the amount of imbalance",
82                registry,
83            ).unwrap(),
84            sui_conservation_storage_fund: register_int_gauge_with_registry!(
85                "sui_conservation_storage_fund",
86                "Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
87                registry,
88            ).unwrap(),
89            sui_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
90                "sui_conservation_storage_fund_imbalance",
91                "Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
92                registry,
93            ).unwrap(),
94            epoch_flags: register_int_gauge_vec_with_registry!(
95                "epoch_flags",
96                "Local flags of the currently running epoch",
97                &["flag"],
98                registry,
99            ).unwrap(),
100        }
101    }
102}
103
104/// ALL_OBJ_VER determines whether we want to store all past
105/// versions of every object in the store. Authority doesn't store
106/// them, but other entities such as replicas will.
107/// S is a template on Authority signature state. This allows SuiDataStore to be used on either
108/// authorities or non-authorities. Specifically, when storing transactions and effects,
109/// S allows SuiDataStore to either store the authority signed version or unsigned version.
110pub struct AuthorityStore {
111    /// Internal vector of locks to manage concurrent writes to the database
112    mutex_table: MutexTable<ObjectDigest>,
113
114    pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
115
116    pub(crate) root_state_notify_read:
117        NotifyRead<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
118
119    /// Whether to enable expensive SUI conservation check at epoch boundaries.
120    enable_epoch_sui_conservation_check: bool,
121
122    metrics: AuthorityStoreMetrics,
123}
124
125pub type ExecutionLockReadGuard<'a> = tokio::sync::RwLockReadGuard<'a, EpochId>;
126pub type ExecutionLockWriteGuard<'a> = tokio::sync::RwLockWriteGuard<'a, EpochId>;
127
128impl AuthorityStore {
129    /// Open an authority store by directory path.
130    /// If the store is empty, initialize it using genesis.
131    pub async fn open(
132        perpetual_tables: Arc<AuthorityPerpetualTables>,
133        genesis: &Genesis,
134        config: &NodeConfig,
135        registry: &Registry,
136    ) -> SuiResult<Arc<Self>> {
137        let enable_epoch_sui_conservation_check = config
138            .expensive_safety_check_config
139            .enable_epoch_sui_conservation_check();
140
141        let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
142            info!("Creating new epoch start config from genesis");
143
144            #[allow(unused_mut)]
145            let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
146            fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
147                info!("Setting initial epoch flags to {:?}", flags);
148                initial_epoch_flags = flags;
149            });
150
151            let epoch_start_configuration = EpochStartConfiguration::new(
152                genesis.sui_system_object().into_epoch_start_state(),
153                *genesis.checkpoint().digest(),
154                &genesis.objects(),
155                initial_epoch_flags,
156            )?;
157            perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
158            epoch_start_configuration
159        } else {
160            info!("Loading epoch start config from DB");
161            perpetual_tables
162                .epoch_start_configuration
163                .get(&())?
164                .expect("Epoch start configuration must be set in non-empty DB")
165        };
166        let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
167        info!("Epoch start config: {:?}", epoch_start_configuration);
168        info!("Cur epoch: {:?}", cur_epoch);
169        let this = Self::open_inner(
170            genesis,
171            perpetual_tables,
172            enable_epoch_sui_conservation_check,
173            registry,
174        )
175        .await?;
176        this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
177        Ok(this)
178    }
179
180    pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
181        for flag in old {
182            self.metrics
183                .epoch_flags
184                .with_label_values(&[&flag.to_string()])
185                .set(0);
186        }
187        for flag in new {
188            self.metrics
189                .epoch_flags
190                .with_label_values(&[&flag.to_string()])
191                .set(1);
192        }
193    }
194
195    // NB: This must only be called at time of reconfiguration. We take the execution lock write
196    // guard as an argument to ensure that this is the case.
197    pub fn clear_object_per_epoch_marker_table(
198        &self,
199        _execution_guard: &ExecutionLockWriteGuard<'_>,
200    ) -> SuiResult<()> {
201        // We can safely delete all entries in the per epoch marker table since this is only called
202        // at epoch boundaries (during reconfiguration). Therefore any entries that currently
203        // exist can be removed. Because of this we can use the `schedule_delete_all` method.
204        self.perpetual_tables
205            .object_per_epoch_marker_table
206            .schedule_delete_all()?;
207        Ok(self
208            .perpetual_tables
209            .object_per_epoch_marker_table_v2
210            .schedule_delete_all()?)
211    }
212
213    pub async fn open_with_committee_for_testing(
214        perpetual_tables: Arc<AuthorityPerpetualTables>,
215        committee: &Committee,
216        genesis: &Genesis,
217    ) -> SuiResult<Arc<Self>> {
218        // TODO: Since we always start at genesis, the committee should be technically the same
219        // as the genesis committee.
220        assert_eq!(committee.epoch, 0);
221        Self::open_inner(genesis, perpetual_tables, true, &Registry::new()).await
222    }
223
224    async fn open_inner(
225        genesis: &Genesis,
226        perpetual_tables: Arc<AuthorityPerpetualTables>,
227        enable_epoch_sui_conservation_check: bool,
228        registry: &Registry,
229    ) -> SuiResult<Arc<Self>> {
230        let store = Arc::new(Self {
231            mutex_table: MutexTable::new(NUM_SHARDS),
232            perpetual_tables,
233            root_state_notify_read: NotifyRead::<
234                EpochId,
235                (CheckpointSequenceNumber, GlobalStateHash),
236            >::new(),
237            enable_epoch_sui_conservation_check,
238            metrics: AuthorityStoreMetrics::new(registry),
239        });
240        // Only initialize an empty database.
241        if store
242            .database_is_empty()
243            .expect("Database read should not fail at init.")
244        {
245            store
246                .bulk_insert_genesis_objects(genesis.objects())
247                .expect("Cannot bulk insert genesis objects");
248
249            // insert txn and effects of genesis
250            let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
251
252            store
253                .perpetual_tables
254                .transactions
255                .insert(transaction.digest(), transaction.serializable_ref())
256                .unwrap();
257
258            store
259                .perpetual_tables
260                .effects
261                .insert(&genesis.effects().digest(), genesis.effects())
262                .unwrap();
263            // We don't insert the effects to executed_effects yet because the genesis tx hasn't but will be executed.
264            // This is important for fullnodes to be able to generate indexing data right now.
265
266            if genesis.effects().events_digest().is_some() {
267                store
268                    .perpetual_tables
269                    .events_2
270                    .insert(transaction.digest(), genesis.events())
271                    .unwrap();
272            }
273        }
274
275        Ok(store)
276    }
277
278    /// Open authority store without any operations that require
279    /// genesis, such as constructing EpochStartConfiguration
280    /// or inserting genesis objects.
281    pub fn open_no_genesis(
282        perpetual_tables: Arc<AuthorityPerpetualTables>,
283        enable_epoch_sui_conservation_check: bool,
284        registry: &Registry,
285    ) -> SuiResult<Arc<Self>> {
286        let store = Arc::new(Self {
287            mutex_table: MutexTable::new(NUM_SHARDS),
288            perpetual_tables,
289            root_state_notify_read: NotifyRead::<
290                EpochId,
291                (CheckpointSequenceNumber, GlobalStateHash),
292            >::new(),
293            enable_epoch_sui_conservation_check,
294            metrics: AuthorityStoreMetrics::new(registry),
295        });
296        Ok(store)
297    }
298
299    pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
300        self.perpetual_tables.get_recovery_epoch_at_restart()
301    }
302
303    pub fn get_effects(
304        &self,
305        effects_digest: &TransactionEffectsDigest,
306    ) -> SuiResult<Option<TransactionEffects>> {
307        Ok(self.perpetual_tables.effects.get(effects_digest)?)
308    }
309
310    /// Returns true if we have an effects structure for this transaction digest
311    pub fn effects_exists(&self, effects_digest: &TransactionEffectsDigest) -> SuiResult<bool> {
312        self.perpetual_tables
313            .effects
314            .contains_key(effects_digest)
315            .map_err(|e| e.into())
316    }
317
318    pub fn get_events(
319        &self,
320        digest: &TransactionDigest,
321    ) -> Result<Option<TransactionEvents>, TypedStoreError> {
322        self.perpetual_tables.events_2.get(digest)
323    }
324
325    pub fn multi_get_events(
326        &self,
327        event_digests: &[TransactionDigest],
328    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
329        Ok(event_digests
330            .iter()
331            .map(|digest| self.get_events(digest))
332            .collect::<Result<Vec<_>, _>>()?)
333    }
334
335    pub fn get_unchanged_loaded_runtime_objects(
336        &self,
337        digest: &TransactionDigest,
338    ) -> Result<Option<Vec<ObjectKey>>, TypedStoreError> {
339        self.perpetual_tables
340            .unchanged_loaded_runtime_objects
341            .get(digest)
342    }
343
344    pub fn multi_get_effects<'a>(
345        &self,
346        effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
347    ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
348        self.perpetual_tables.effects.multi_get(effects_digests)
349    }
350
351    pub fn get_executed_effects(
352        &self,
353        tx_digest: &TransactionDigest,
354    ) -> Result<Option<TransactionEffects>, TypedStoreError> {
355        let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
356        match effects_digest {
357            Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
358            None => Ok(None),
359        }
360    }
361
362    /// Given a list of transaction digests, returns a list of the corresponding effects only if they have been
363    /// executed. For transactions that have not been executed, None is returned.
364    pub fn multi_get_executed_effects_digests(
365        &self,
366        digests: &[TransactionDigest],
367    ) -> Result<Vec<Option<TransactionEffectsDigest>>, TypedStoreError> {
368        self.perpetual_tables.executed_effects.multi_get(digests)
369    }
370
371    /// Given a list of transaction digests, returns a list of the corresponding effects only if they have been
372    /// executed. For transactions that have not been executed, None is returned.
373    pub fn multi_get_executed_effects(
374        &self,
375        digests: &[TransactionDigest],
376    ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
377        let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
378        let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
379        let mut tx_to_effects_map = effects
380            .into_iter()
381            .flatten()
382            .map(|effects| (*effects.transaction_digest(), effects))
383            .collect::<HashMap<_, _>>();
384        Ok(digests
385            .iter()
386            .map(|digest| tx_to_effects_map.remove(digest))
387            .collect())
388    }
389
390    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> SuiResult<bool> {
391        Ok(self
392            .perpetual_tables
393            .executed_effects
394            .contains_key(digest)?)
395    }
396
397    pub fn get_marker_value(
398        &self,
399        object_key: FullObjectKey,
400        epoch_id: EpochId,
401    ) -> SuiResult<Option<MarkerValue>> {
402        Ok(self
403            .perpetual_tables
404            .object_per_epoch_marker_table_v2
405            .get(&(epoch_id, object_key))?)
406    }
407
408    pub fn get_latest_marker(
409        &self,
410        object_id: FullObjectID,
411        epoch_id: EpochId,
412    ) -> SuiResult<Option<(SequenceNumber, MarkerValue)>> {
413        let min_key = (epoch_id, FullObjectKey::min_for_id(&object_id));
414        let max_key = (epoch_id, FullObjectKey::max_for_id(&object_id));
415
416        let marker_entry = self
417            .perpetual_tables
418            .object_per_epoch_marker_table_v2
419            .reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
420            .next();
421        match marker_entry {
422            Some(Ok(((epoch, key), marker))) => {
423                // because of the iterator bounds these cannot fail
424                assert_eq!(epoch, epoch_id);
425                assert_eq!(key.id(), object_id);
426                Ok(Some((key.version(), marker)))
427            }
428            Some(Err(e)) => Err(e.into()),
429            None => Ok(None),
430        }
431    }
432
433    /// Returns future containing the state hash for the given epoch
434    /// once available
435    pub async fn notify_read_root_state_hash(
436        &self,
437        epoch: EpochId,
438    ) -> SuiResult<(CheckpointSequenceNumber, GlobalStateHash)> {
439        // We need to register waiters _before_ reading from the database to avoid race conditions
440        let registration = self.root_state_notify_read.register_one(&epoch);
441        let hash = self.perpetual_tables.root_state_hash_by_epoch.get(&epoch)?;
442
443        let result = match hash {
444            // Note that Some() clause also drops registration that is already fulfilled
445            Some(ready) => Either::Left(futures::future::ready(ready)),
446            None => Either::Right(registration),
447        }
448        .await;
449
450        Ok(result)
451    }
452
453    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
454    pub fn deprecated_insert_finalized_transactions(
455        &self,
456        digests: &[TransactionDigest],
457        epoch: EpochId,
458        sequence: CheckpointSequenceNumber,
459    ) -> SuiResult {
460        let mut batch = self
461            .perpetual_tables
462            .executed_transactions_to_checkpoint
463            .batch();
464        batch.insert_batch(
465            &self.perpetual_tables.executed_transactions_to_checkpoint,
466            digests.iter().map(|d| (*d, (epoch, sequence))),
467        )?;
468        batch.write()?;
469        trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
470        Ok(())
471    }
472
473    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
474    pub fn deprecated_get_transaction_checkpoint(
475        &self,
476        digest: &TransactionDigest,
477    ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
478        Ok(self
479            .perpetual_tables
480            .executed_transactions_to_checkpoint
481            .get(digest)?)
482    }
483
484    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
485    pub fn deprecated_multi_get_transaction_checkpoint(
486        &self,
487        digests: &[TransactionDigest],
488    ) -> SuiResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
489        Ok(self
490            .perpetual_tables
491            .executed_transactions_to_checkpoint
492            .multi_get(digests)?
493            .into_iter()
494            .collect())
495    }
496
497    /// Returns true if there are no objects in the database
498    pub fn database_is_empty(&self) -> SuiResult<bool> {
499        self.perpetual_tables.database_is_empty()
500    }
501
502    /// A function that acquires all locks associated with the objects (in order to avoid deadlocks).
503    fn acquire_locks(&self, input_objects: &[ObjectRef]) -> Vec<MutexGuard> {
504        self.mutex_table
505            .acquire_locks(input_objects.iter().map(|(_, _, digest)| *digest))
506    }
507
508    pub fn object_exists_by_key(
509        &self,
510        object_id: &ObjectID,
511        version: VersionNumber,
512    ) -> SuiResult<bool> {
513        Ok(self
514            .perpetual_tables
515            .objects
516            .contains_key(&ObjectKey(*object_id, version))?)
517    }
518
519    pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<bool>> {
520        Ok(self
521            .perpetual_tables
522            .objects
523            .multi_contains_keys(object_keys.to_vec())?
524            .into_iter()
525            .collect())
526    }
527
528    fn get_object_ref_prior_to_key(
529        &self,
530        object_id: &ObjectID,
531        version: VersionNumber,
532    ) -> Result<Option<ObjectRef>, SuiError> {
533        let Some(prior_version) = version.one_before() else {
534            return Ok(None);
535        };
536        let mut iterator = self
537            .perpetual_tables
538            .objects
539            .reversed_safe_iter_with_bounds(
540                Some(ObjectKey::min_for_id(object_id)),
541                Some(ObjectKey(*object_id, prior_version)),
542            )?;
543
544        if let Some((object_key, value)) = iterator.next().transpose()?
545            && object_key.0 == *object_id
546        {
547            return Ok(Some(
548                self.perpetual_tables.object_reference(&object_key, value)?,
549            ));
550        }
551        Ok(None)
552    }
553
554    pub fn multi_get_objects_by_key(
555        &self,
556        object_keys: &[ObjectKey],
557    ) -> Result<Vec<Option<Object>>, SuiError> {
558        let wrappers = self
559            .perpetual_tables
560            .objects
561            .multi_get(object_keys.to_vec())?;
562        let mut ret = vec![];
563
564        for (idx, w) in wrappers.into_iter().enumerate() {
565            ret.push(
566                w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
567                    .transpose()?
568                    .flatten(),
569            );
570        }
571        Ok(ret)
572    }
573
574    /// Get many objects
575    pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, SuiError> {
576        let mut result = Vec::new();
577        for id in objects {
578            result.push(self.get_object(id));
579        }
580        Ok(result)
581    }
582
583    // Methods to mutate the store
584
585    /// Insert a genesis object.
586    /// TODO: delete this method entirely (still used by authority_tests.rs)
587    pub(crate) fn insert_genesis_object(&self, object: Object) -> SuiResult {
588        // We only side load objects with a genesis parent transaction.
589        debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
590        let object_ref = object.compute_object_reference();
591        self.insert_object_direct(object_ref, &object)
592    }
593
594    /// Insert an object directly into the store, and also update relevant tables
595    /// NOTE: does not handle transaction lock.
596    /// This is used to insert genesis objects
597    fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> SuiResult {
598        let mut write_batch = self.perpetual_tables.objects.batch();
599
600        // Insert object
601        let store_object = get_store_object(object.clone());
602        write_batch.insert_batch(
603            &self.perpetual_tables.objects,
604            std::iter::once((ObjectKey::from(object_ref), store_object)),
605        )?;
606
607        // Update the index
608        if object.get_single_owner().is_some() {
609            // Only initialize lock for address owned objects.
610            if !object.is_child_object() {
611                self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref], false)?;
612            }
613        }
614
615        write_batch.write()?;
616
617        Ok(())
618    }
619
620    /// This function should only be used for initializing genesis and should remain private.
621    #[instrument(level = "debug", skip_all)]
622    pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> SuiResult<()> {
623        let mut batch = self.perpetual_tables.objects.batch();
624        let ref_and_objects: Vec<_> = objects
625            .iter()
626            .map(|o| (o.compute_object_reference(), o))
627            .collect();
628
629        batch.insert_batch(
630            &self.perpetual_tables.objects,
631            ref_and_objects
632                .iter()
633                .map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))),
634        )?;
635
636        let non_child_object_refs: Vec<_> = ref_and_objects
637            .iter()
638            .filter(|(_, object)| !object.is_child_object())
639            .map(|(oref, _)| *oref)
640            .collect();
641
642        self.initialize_live_object_markers_impl(
643            &mut batch,
644            &non_child_object_refs,
645            false, // is_force_reset
646        )?;
647
648        batch.write()?;
649
650        Ok(())
651    }
652
653    pub fn bulk_insert_live_objects(
654        perpetual_db: &AuthorityPerpetualTables,
655        live_objects: impl Iterator<Item = LiveObject>,
656        expected_sha3_digest: &[u8; 32],
657    ) -> SuiResult<()> {
658        let mut hasher = Sha3_256::default();
659        let mut batch = perpetual_db.objects.batch();
660        let mut written = 0usize;
661        const MAX_BATCH_SIZE: usize = 100_000;
662        for object in live_objects {
663            hasher.update(object.object_reference().2.inner());
664            match object {
665                LiveObject::Normal(object) => {
666                    let store_object_wrapper = get_store_object(object.clone());
667                    batch.insert_batch(
668                        &perpetual_db.objects,
669                        std::iter::once((
670                            ObjectKey::from(object.compute_object_reference()),
671                            store_object_wrapper,
672                        )),
673                    )?;
674                    if !object.is_child_object() {
675                        Self::initialize_live_object_markers(
676                            &perpetual_db.live_owned_object_markers,
677                            &mut batch,
678                            &[object.compute_object_reference()],
679                            false, // is_force_reset
680                        )?;
681                    }
682                }
683                LiveObject::Wrapped(object_key) => {
684                    batch.insert_batch(
685                        &perpetual_db.objects,
686                        std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
687                            object_key,
688                            StoreObject::Wrapped.into(),
689                        )),
690                    )?;
691                }
692            }
693            written += 1;
694            if written > MAX_BATCH_SIZE {
695                batch.write()?;
696                batch = perpetual_db.objects.batch();
697                written = 0;
698            }
699        }
700        let sha3_digest = hasher.finalize().digest;
701        if *expected_sha3_digest != sha3_digest {
702            error!(
703                "Sha does not match! expected: {:?}, actual: {:?}",
704                expected_sha3_digest, sha3_digest
705            );
706            return Err(SuiError::from("Sha does not match"));
707        }
708        batch.write()?;
709        Ok(())
710    }
711
712    pub fn set_epoch_start_configuration(
713        &self,
714        epoch_start_configuration: &EpochStartConfiguration,
715    ) -> SuiResult {
716        self.perpetual_tables
717            .set_epoch_start_configuration(epoch_start_configuration)?;
718        Ok(())
719    }
720
721    pub fn get_epoch_start_configuration(&self) -> SuiResult<Option<EpochStartConfiguration>> {
722        Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
723    }
724
725    /// Updates the state resulting from the execution of a certificate.
726    ///
727    /// Internally it checks that all locks for active inputs are at the correct
728    /// version, and then writes objects, certificates, parents and clean up locks atomically.
729    #[instrument(level = "debug", skip_all)]
730    pub fn build_db_batch(
731        &self,
732        epoch_id: EpochId,
733        tx_outputs: &[Arc<TransactionOutputs>],
734    ) -> SuiResult<DBBatch> {
735        let mut written = Vec::with_capacity(tx_outputs.len());
736        for outputs in tx_outputs {
737            written.extend(outputs.written.values().cloned());
738        }
739
740        let mut write_batch = self.perpetual_tables.transactions.batch();
741        for outputs in tx_outputs {
742            self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
743        }
744        // test crashing before writing the batch
745        fail_point!("crash");
746
747        trace!(
748            "built batch for committed transactions: {:?}",
749            tx_outputs
750                .iter()
751                .map(|tx| tx.transaction.digest())
752                .collect::<Vec<_>>()
753        );
754
755        // test crashing before notifying
756        fail_point!("crash");
757
758        Ok(write_batch)
759    }
760
761    fn write_one_transaction_outputs(
762        &self,
763        write_batch: &mut DBBatch,
764        epoch_id: EpochId,
765        tx_outputs: &TransactionOutputs,
766    ) -> SuiResult {
767        let TransactionOutputs {
768            transaction,
769            effects,
770            markers,
771            wrapped,
772            deleted,
773            written,
774            events,
775            unchanged_loaded_runtime_objects,
776            locks_to_delete,
777            new_locks_to_init,
778            ..
779        } = tx_outputs;
780
781        let effects_digest = effects.digest();
782        let transaction_digest = transaction.digest();
783        // effects must be inserted before the corresponding dependent entries
784        // because they carry epoch information necessary for correct pruning via relocation filters
785        write_batch
786            .insert_batch(
787                &self.perpetual_tables.effects,
788                [(effects_digest, effects.clone())],
789            )?
790            .insert_batch(
791                &self.perpetual_tables.executed_effects,
792                [(transaction_digest, effects_digest)],
793            )?;
794
795        // Store the certificate indexed by transaction digest
796        write_batch.insert_batch(
797            &self.perpetual_tables.transactions,
798            iter::once((transaction_digest, transaction.serializable_ref())),
799        )?;
800
801        write_batch.insert_batch(
802            &self.perpetual_tables.executed_transaction_digests,
803            [((epoch_id, *transaction_digest), ())],
804        )?;
805
806        // Add batched writes for objects and locks.
807        write_batch.insert_batch(
808            &self.perpetual_tables.object_per_epoch_marker_table_v2,
809            markers
810                .iter()
811                .map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
812        )?;
813        write_batch.insert_batch(
814            &self.perpetual_tables.objects,
815            deleted
816                .iter()
817                .map(|key| (key, StoreObject::Deleted))
818                .chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
819                .map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
820        )?;
821
822        // Insert each output object into the stores
823        let new_objects = written.iter().map(|(id, new_object)| {
824            let version = new_object.version();
825            trace!(?id, ?version, "writing object");
826            let store_object = get_store_object(new_object.clone());
827            (ObjectKey(*id, version), store_object)
828        });
829
830        write_batch.insert_batch(&self.perpetual_tables.objects, new_objects)?;
831
832        // Write events into the new table keyed off of transaction_digest
833        if effects.events_digest().is_some() {
834            write_batch.insert_batch(
835                &self.perpetual_tables.events_2,
836                [(transaction_digest, events)],
837            )?;
838        }
839
840        // Write unchanged_loaded_runtime_objects
841        if !unchanged_loaded_runtime_objects.is_empty() {
842            write_batch.insert_batch(
843                &self.perpetual_tables.unchanged_loaded_runtime_objects,
844                [(transaction_digest, unchanged_loaded_runtime_objects)],
845            )?;
846        }
847
848        self.initialize_live_object_markers_impl(write_batch, new_locks_to_init, false)?;
849
850        // Note: deletes locks for received objects as well (but not for objects that were in
851        // `Receiving` arguments which were not received)
852        self.delete_live_object_markers(write_batch, locks_to_delete)?;
853
854        debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
855
856        Ok(())
857    }
858
859    /// Commits transactions only (not effects or other transaction outputs) to the db.
860    /// See ExecutionCache::persist_transaction for more info
861    pub(crate) fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> SuiResult {
862        let mut batch = self.perpetual_tables.transactions.batch();
863        batch.insert_batch(
864            &self.perpetual_tables.transactions,
865            [(tx.digest(), tx.clone().into_unsigned().serializable_ref())],
866        )?;
867        batch.write()?;
868        Ok(())
869    }
870
871    pub fn acquire_transaction_locks(
872        &self,
873        epoch_store: &AuthorityPerEpochStore,
874        owned_input_objects: &[ObjectRef],
875        tx_digest: TransactionDigest,
876        signed_transaction: Option<VerifiedSignedTransaction>,
877    ) -> SuiResult {
878        let epoch = epoch_store.epoch();
879        // Other writers may be attempting to acquire locks on the same objects, so a mutex is
880        // required.
881        // TODO: replace with optimistic db_transactions (i.e. set lock to tx if none)
882        let _mutexes = self.acquire_locks(owned_input_objects);
883
884        trace!(?owned_input_objects, "acquire_locks");
885        let mut locks_to_write = Vec::new();
886
887        let live_object_markers = self
888            .perpetual_tables
889            .live_owned_object_markers
890            .multi_get(owned_input_objects)?;
891
892        let epoch_tables = epoch_store.tables()?;
893
894        let locks = epoch_tables.multi_get_locked_transactions(owned_input_objects)?;
895
896        assert_eq!(locks.len(), live_object_markers.len());
897
898        for (live_marker, lock, obj_ref) in izip!(
899            live_object_markers.into_iter(),
900            locks.into_iter(),
901            owned_input_objects
902        ) {
903            let Some(live_marker) = live_marker else {
904                let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
905                fp_bail!(
906                    UserInputError::ObjectVersionUnavailableForConsumption {
907                        provided_obj_ref: *obj_ref,
908                        current_version: latest_lock.1
909                    }
910                    .into()
911                );
912            };
913
914            let live_marker = live_marker.map(|l| l.migrate().into_inner());
915
916            if let Some(LockDetailsDeprecated {
917                epoch: previous_epoch,
918                ..
919            }) = &live_marker
920            {
921                // this must be from a prior epoch, because we no longer write LockDetails to
922                // owned_object_transaction_locks
923                assert!(
924                    previous_epoch < &epoch,
925                    "lock for {:?} should be from a prior epoch",
926                    obj_ref
927                );
928            }
929
930            if let Some(previous_tx_digest) = &lock {
931                if previous_tx_digest == &tx_digest {
932                    // no need to re-write lock
933                    continue;
934                } else {
935                    // TODO: add metrics here
936                    info!(prev_tx_digest = ?previous_tx_digest,
937                          cur_tx_digest = ?tx_digest,
938                          "Cannot acquire lock: conflicting transaction!");
939                    return Err(SuiErrorKind::ObjectLockConflict {
940                        obj_ref: *obj_ref,
941                        pending_transaction: *previous_tx_digest,
942                    }
943                    .into());
944                }
945            }
946
947            locks_to_write.push((*obj_ref, tx_digest));
948        }
949
950        if !locks_to_write.is_empty() {
951            trace!(?locks_to_write, "Writing locks");
952            epoch_tables.write_transaction_locks(signed_transaction, locks_to_write.into_iter())?;
953        }
954
955        Ok(())
956    }
957
958    /// Gets ObjectLockInfo that represents state of lock on an object.
959    /// Returns UserInputError::ObjectNotFound if cannot find lock record for this object
960    pub(crate) fn get_lock(
961        &self,
962        obj_ref: ObjectRef,
963        epoch_store: &AuthorityPerEpochStore,
964    ) -> SuiLockResult {
965        if self
966            .perpetual_tables
967            .live_owned_object_markers
968            .get(&obj_ref)?
969            .is_none()
970        {
971            return Ok(ObjectLockStatus::LockedAtDifferentVersion {
972                locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
973            });
974        }
975
976        let tables = epoch_store.tables()?;
977        let epoch_id = epoch_store.epoch();
978
979        if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
980            Ok(ObjectLockStatus::LockedToTx {
981                locked_by_tx: LockDetailsDeprecated {
982                    epoch: epoch_id,
983                    tx_digest,
984                },
985            })
986        } else {
987            Ok(ObjectLockStatus::Initialized)
988        }
989    }
990
991    /// Returns UserInputError::ObjectNotFound if no lock records found for this object.
992    pub(crate) fn get_latest_live_version_for_object_id(
993        &self,
994        object_id: ObjectID,
995    ) -> SuiResult<ObjectRef> {
996        let mut iterator = self
997            .perpetual_tables
998            .live_owned_object_markers
999            .reversed_safe_iter_with_bounds(
1000                None,
1001                Some((object_id, SequenceNumber::MAX, ObjectDigest::MAX)),
1002            )?;
1003        Ok(iterator
1004            .next()
1005            .transpose()?
1006            .and_then(|value| {
1007                if value.0.0 == object_id {
1008                    Some(value)
1009                } else {
1010                    None
1011                }
1012            })
1013            .ok_or_else(|| {
1014                SuiError::from(UserInputError::ObjectNotFound {
1015                    object_id,
1016                    version: None,
1017                })
1018            })?
1019            .0)
1020    }
1021
1022    /// Checks multiple object locks exist.
1023    /// Returns UserInputError::ObjectNotFound if cannot find lock record for at least one of the objects.
1024    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if at least one object lock is not initialized
1025    ///     at the given version.
1026    pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> SuiResult {
1027        let locks = self
1028            .perpetual_tables
1029            .live_owned_object_markers
1030            .multi_get(objects)?;
1031        for (lock, obj_ref) in locks.into_iter().zip(objects) {
1032            if lock.is_none() {
1033                let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
1034                fp_bail!(
1035                    UserInputError::ObjectVersionUnavailableForConsumption {
1036                        provided_obj_ref: *obj_ref,
1037                        current_version: latest_lock.1
1038                    }
1039                    .into()
1040                );
1041            }
1042        }
1043        Ok(())
1044    }
1045
1046    /// Initialize a lock to None (but exists) for a given list of ObjectRefs.
1047    /// Returns SuiErrorKind::ObjectLockAlreadyInitialized if the lock already exists and is locked to a transaction
1048    fn initialize_live_object_markers_impl(
1049        &self,
1050        write_batch: &mut DBBatch,
1051        objects: &[ObjectRef],
1052        is_force_reset: bool,
1053    ) -> SuiResult {
1054        AuthorityStore::initialize_live_object_markers(
1055            &self.perpetual_tables.live_owned_object_markers,
1056            write_batch,
1057            objects,
1058            is_force_reset,
1059        )
1060    }
1061
1062    pub fn initialize_live_object_markers(
1063        live_object_marker_table: &DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
1064        write_batch: &mut DBBatch,
1065        objects: &[ObjectRef],
1066        is_force_reset: bool,
1067    ) -> SuiResult {
1068        trace!(?objects, "initialize_locks");
1069
1070        let live_object_markers = live_object_marker_table.multi_get(objects)?;
1071
1072        if !is_force_reset {
1073            // If any live_object_markers exist and are not None, return errors for them
1074            // Note we don't check if there is a pre-existing lock. this is because initializing the live
1075            // object marker will not overwrite the lock and cause the validator to equivocate.
1076            let existing_live_object_markers: Vec<ObjectRef> = live_object_markers
1077                .iter()
1078                .zip(objects)
1079                .filter_map(|(lock_opt, objref)| {
1080                    lock_opt.clone().flatten().map(|_tx_digest| *objref)
1081                })
1082                .collect();
1083            if !existing_live_object_markers.is_empty() {
1084                info!(
1085                    ?existing_live_object_markers,
1086                    "Cannot initialize live_object_markers because some exist already"
1087                );
1088                return Err(SuiErrorKind::ObjectLockAlreadyInitialized {
1089                    refs: existing_live_object_markers,
1090                }
1091                .into());
1092            }
1093        }
1094
1095        write_batch.insert_batch(
1096            live_object_marker_table,
1097            objects.iter().map(|obj_ref| (obj_ref, None)),
1098        )?;
1099        Ok(())
1100    }
1101
1102    /// Removes locks for a given list of ObjectRefs.
1103    fn delete_live_object_markers(
1104        &self,
1105        write_batch: &mut DBBatch,
1106        objects: &[ObjectRef],
1107    ) -> SuiResult {
1108        trace!(?objects, "delete_locks");
1109        write_batch.delete_batch(
1110            &self.perpetual_tables.live_owned_object_markers,
1111            objects.iter(),
1112        )?;
1113        Ok(())
1114    }
1115
1116    #[cfg(test)]
1117    pub(crate) fn reset_locks_for_test(
1118        &self,
1119        transactions: &[TransactionDigest],
1120        objects: &[ObjectRef],
1121        epoch_store: &AuthorityPerEpochStore,
1122    ) {
1123        for tx in transactions {
1124            epoch_store.delete_signed_transaction_for_test(tx);
1125            epoch_store.delete_object_locks_for_test(objects);
1126        }
1127
1128        let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1129        batch
1130            .delete_batch(
1131                &self.perpetual_tables.live_owned_object_markers,
1132                objects.iter(),
1133            )
1134            .unwrap();
1135        batch.write().unwrap();
1136
1137        let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1138        self.initialize_live_object_markers_impl(&mut batch, objects, false)
1139            .unwrap();
1140        batch.write().unwrap();
1141    }
1142
1143    /// Return the object with version less then or eq to the provided seq number.
1144    /// This is used by indexer to find the correct version of dynamic field child object.
1145    /// We do not store the version of the child object, but because of lamport timestamp,
1146    /// we know the child must have version number less then or eq to the parent.
1147    pub fn find_object_lt_or_eq_version(
1148        &self,
1149        object_id: ObjectID,
1150        version: SequenceNumber,
1151    ) -> SuiResult<Option<Object>> {
1152        self.perpetual_tables
1153            .find_object_lt_or_eq_version(object_id, version)
1154    }
1155
1156    /// Returns the latest object reference we have for this object_id in the objects table.
1157    ///
1158    /// The method may also return the reference to a deleted object with a digest of
1159    /// ObjectDigest::deleted() or ObjectDigest::wrapped() and lamport version
1160    /// of a transaction that deleted the object.
1161    /// Note that a deleted object may re-appear if the deletion was the result of the object
1162    /// being wrapped in another object.
1163    ///
1164    /// If no entry for the object_id is found, return None.
1165    pub fn get_latest_object_ref_or_tombstone(
1166        &self,
1167        object_id: ObjectID,
1168    ) -> Result<Option<ObjectRef>, SuiError> {
1169        self.perpetual_tables
1170            .get_latest_object_ref_or_tombstone(object_id)
1171    }
1172
1173    /// Returns the latest object reference if and only if the object is still live (i.e. it does
1174    /// not return tombstones)
1175    pub fn get_latest_object_ref_if_alive(
1176        &self,
1177        object_id: ObjectID,
1178    ) -> Result<Option<ObjectRef>, SuiError> {
1179        match self.get_latest_object_ref_or_tombstone(object_id)? {
1180            Some(objref) if objref.2.is_alive() => Ok(Some(objref)),
1181            _ => Ok(None),
1182        }
1183    }
1184
1185    /// Returns the latest object we have for this object_id in the objects table.
1186    ///
1187    /// If no entry for the object_id is found, return None.
1188    pub fn get_latest_object_or_tombstone(
1189        &self,
1190        object_id: ObjectID,
1191    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, SuiError> {
1192        let Some((object_key, store_object)) = self
1193            .perpetual_tables
1194            .get_latest_object_or_tombstone(object_id)?
1195        else {
1196            return Ok(None);
1197        };
1198
1199        if let Some(object_ref) = self
1200            .perpetual_tables
1201            .tombstone_reference(&object_key, &store_object)?
1202        {
1203            return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
1204        }
1205
1206        let object = self
1207            .perpetual_tables
1208            .object(&object_key, store_object)?
1209            .expect("Non tombstone store object could not be converted to object");
1210
1211        Ok(Some((object_key, ObjectOrTombstone::Object(object))))
1212    }
1213
1214    pub fn insert_transaction_and_effects(
1215        &self,
1216        transaction: &VerifiedTransaction,
1217        transaction_effects: &TransactionEffects,
1218    ) -> Result<(), TypedStoreError> {
1219        let mut write_batch = self.perpetual_tables.transactions.batch();
1220        // effects must be inserted before the corresponding transaction entry
1221        // because they carry epoch information necessary for correct pruning via relocation filters
1222        write_batch
1223            .insert_batch(
1224                &self.perpetual_tables.effects,
1225                [(transaction_effects.digest(), transaction_effects)],
1226            )?
1227            .insert_batch(
1228                &self.perpetual_tables.transactions,
1229                [(transaction.digest(), transaction.serializable_ref())],
1230            )?;
1231
1232        write_batch.write()?;
1233        Ok(())
1234    }
1235
1236    pub fn multi_insert_transaction_and_effects<'a>(
1237        &self,
1238        transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
1239    ) -> Result<(), TypedStoreError> {
1240        let mut write_batch = self.perpetual_tables.transactions.batch();
1241        for tx in transactions {
1242            write_batch
1243                .insert_batch(
1244                    &self.perpetual_tables.effects,
1245                    [(tx.effects.digest(), &tx.effects)],
1246                )?
1247                .insert_batch(
1248                    &self.perpetual_tables.transactions,
1249                    [(tx.transaction.digest(), tx.transaction.serializable_ref())],
1250                )?;
1251        }
1252
1253        write_batch.write()?;
1254        Ok(())
1255    }
1256
1257    pub fn multi_get_transaction_blocks(
1258        &self,
1259        tx_digests: &[TransactionDigest],
1260    ) -> Result<Vec<Option<VerifiedTransaction>>, TypedStoreError> {
1261        self.perpetual_tables
1262            .transactions
1263            .multi_get(tx_digests)
1264            .map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())
1265    }
1266
1267    pub fn get_transaction_block(
1268        &self,
1269        tx_digest: &TransactionDigest,
1270    ) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
1271        self.perpetual_tables
1272            .transactions
1273            .get(tx_digest)
1274            .map(|v| v.map(|v| v.into()))
1275    }
1276
1277    /// This function reads the DB directly to get the system state object.
1278    /// If reconfiguration is happening at the same time, there is no guarantee whether we would be getting
1279    /// the old or the new system state object.
1280    /// Hence this function should only be called during RPC reads where data race is not a major concern.
1281    /// In general we should avoid this as much as possible.
1282    /// If the intent is for testing, you can use AuthorityState:: get_sui_system_state_object_for_testing.
1283    pub fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1284        get_sui_system_state(self.perpetual_tables.as_ref())
1285    }
1286
1287    pub fn expensive_check_sui_conservation<T>(
1288        self: &Arc<Self>,
1289        type_layout_store: T,
1290        old_epoch_store: &AuthorityPerEpochStore,
1291    ) -> SuiResult
1292    where
1293        T: TypeLayoutStore + Send + Copy,
1294    {
1295        if !self.enable_epoch_sui_conservation_check {
1296            return Ok(());
1297        }
1298
1299        let executor = old_epoch_store.executor();
1300        info!("Starting SUI conservation check. This may take a while..");
1301        let cur_time = Instant::now();
1302        let mut pending_objects = vec![];
1303        let mut count = 0;
1304        let mut size = 0;
1305        let (mut total_sui, mut total_storage_rebate) = thread::scope(|s| {
1306            let pending_tasks = FuturesUnordered::new();
1307            for o in self.iter_live_object_set(false) {
1308                match o {
1309                    LiveObject::Normal(object) => {
1310                        size += object.object_size_for_gas_metering();
1311                        count += 1;
1312                        pending_objects.push(object);
1313                        if count % 1_000_000 == 0 {
1314                            let mut task_objects = vec![];
1315                            mem::swap(&mut pending_objects, &mut task_objects);
1316                            pending_tasks.push(s.spawn(move || {
1317                                let mut layout_resolver =
1318                                    executor.type_layout_resolver(Box::new(type_layout_store));
1319                                let mut total_storage_rebate = 0;
1320                                let mut total_sui = 0;
1321                                for object in task_objects {
1322                                    total_storage_rebate += object.storage_rebate;
1323                                    // get_total_sui includes storage rebate, however all storage rebate is
1324                                    // also stored in the storage fund, so we need to subtract it here.
1325                                    total_sui +=
1326                                        object.get_total_sui(layout_resolver.as_mut()).unwrap()
1327                                            - object.storage_rebate;
1328                                }
1329                                if count % 50_000_000 == 0 {
1330                                    info!("Processed {} objects", count);
1331                                }
1332                                (total_sui, total_storage_rebate)
1333                            }));
1334                        }
1335                    }
1336                    LiveObject::Wrapped(_) => {
1337                        unreachable!("Explicitly asked to not include wrapped tombstones")
1338                    }
1339                }
1340            }
1341            pending_tasks.into_iter().fold((0, 0), |init, result| {
1342                let result = result.join().unwrap();
1343                (init.0 + result.0, init.1 + result.1)
1344            })
1345        });
1346        let mut layout_resolver = executor.type_layout_resolver(Box::new(type_layout_store));
1347        for object in pending_objects {
1348            total_storage_rebate += object.storage_rebate;
1349            total_sui +=
1350                object.get_total_sui(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
1351        }
1352        info!(
1353            "Scanned {} live objects, took {:?}",
1354            count,
1355            cur_time.elapsed()
1356        );
1357        self.metrics
1358            .sui_conservation_live_object_count
1359            .set(count as i64);
1360        self.metrics
1361            .sui_conservation_live_object_size
1362            .set(size as i64);
1363        self.metrics
1364            .sui_conservation_check_latency
1365            .set(cur_time.elapsed().as_secs() as i64);
1366
1367        // It is safe to call this function because we are in the middle of reconfiguration.
1368        let system_state = self
1369            .get_sui_system_state_object_unsafe()
1370            .expect("Reading sui system state object cannot fail")
1371            .into_sui_system_state_summary();
1372        let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
1373        info!(
1374            "Total SUI amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
1375            total_sui, storage_fund_balance, total_storage_rebate, system_state.epoch
1376        );
1377
1378        let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
1379        self.metrics
1380            .sui_conservation_storage_fund
1381            .set(storage_fund_balance as i64);
1382        self.metrics
1383            .sui_conservation_storage_fund_imbalance
1384            .set(imbalance);
1385        self.metrics
1386            .sui_conservation_imbalance
1387            .set((total_sui as i128 - TOTAL_SUPPLY_MIST as i128) as i64);
1388
1389        if let Some(expected_imbalance) = self
1390            .perpetual_tables
1391            .expected_storage_fund_imbalance
1392            .get(&())
1393            .expect("DB read cannot fail")
1394        {
1395            fp_ensure!(
1396                imbalance == expected_imbalance,
1397                SuiError::from(
1398                    format!(
1399                        "Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
1400                        system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
1401                    ).as_str()
1402                )
1403            );
1404        } else {
1405            self.perpetual_tables
1406                .expected_storage_fund_imbalance
1407                .insert(&(), &imbalance)
1408                .expect("DB write cannot fail");
1409        }
1410
1411        if let Some(expected_sui) = self
1412            .perpetual_tables
1413            .expected_network_sui_amount
1414            .get(&())
1415            .expect("DB read cannot fail")
1416        {
1417            fp_ensure!(
1418                total_sui == expected_sui,
1419                SuiError::from(
1420                    format!(
1421                        "Inconsistent state detected at epoch {}: total sui: {}, expecting {}",
1422                        system_state.epoch, total_sui, expected_sui
1423                    )
1424                    .as_str()
1425                )
1426            );
1427        } else {
1428            self.perpetual_tables
1429                .expected_network_sui_amount
1430                .insert(&(), &total_sui)
1431                .expect("DB write cannot fail");
1432        }
1433
1434        Ok(())
1435    }
1436
1437    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
1438    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
1439    #[instrument(level = "error", skip_all)]
1440    pub fn maybe_reaccumulate_state_hash(
1441        &self,
1442        cur_epoch_store: &AuthorityPerEpochStore,
1443        new_protocol_version: ProtocolVersion,
1444    ) {
1445        let old_simplified_unwrap_then_delete = cur_epoch_store
1446            .protocol_config()
1447            .simplified_unwrap_then_delete();
1448        let new_simplified_unwrap_then_delete =
1449            ProtocolConfig::get_for_version(new_protocol_version, cur_epoch_store.get_chain())
1450                .simplified_unwrap_then_delete();
1451        // If in the new epoch the simplified_unwrap_then_delete is enabled for the first time,
1452        // we re-accumulate state root.
1453        let should_reaccumulate =
1454            !old_simplified_unwrap_then_delete && new_simplified_unwrap_then_delete;
1455        if !should_reaccumulate {
1456            return;
1457        }
1458        info!(
1459            "[Re-accumulate] simplified_unwrap_then_delete is enabled in the new protocol version, re-accumulating state hash"
1460        );
1461        let cur_time = Instant::now();
1462        std::thread::scope(|s| {
1463            let pending_tasks = FuturesUnordered::new();
1464            // Shard the object IDs into different ranges so that we can process them in parallel.
1465            // We divide the range into 2^BITS number of ranges. To do so we use the highest BITS bits
1466            // to mark the starting/ending point of the range. For example, when BITS = 5, we
1467            // divide the range into 32 ranges, and the first few ranges are:
1468            // 00000000_.... to 00000111_....
1469            // 00001000_.... to 00001111_....
1470            // 00010000_.... to 00010111_....
1471            // and etc.
1472            const BITS: u8 = 5;
1473            for index in 0u8..(1 << BITS) {
1474                pending_tasks.push(s.spawn(move || {
1475                    let mut id_bytes = [0; ObjectID::LENGTH];
1476                    id_bytes[0] = index << (8 - BITS);
1477                    let start_id = ObjectID::new(id_bytes);
1478
1479                    id_bytes[0] |= (1 << (8 - BITS)) - 1;
1480                    for element in id_bytes.iter_mut().skip(1) {
1481                        *element = u8::MAX;
1482                    }
1483                    let end_id = ObjectID::new(id_bytes);
1484
1485                    info!(
1486                        "[Re-accumulate] Scanning object ID range {:?}..{:?}",
1487                        start_id, end_id
1488                    );
1489                    let mut prev = (
1490                        ObjectKey::min_for_id(&ObjectID::ZERO),
1491                        StoreObjectWrapper::V1(StoreObject::Deleted),
1492                    );
1493                    let mut object_scanned: u64 = 0;
1494                    let mut wrapped_objects_to_remove = vec![];
1495                    for db_result in self.perpetual_tables.objects.safe_range_iter(
1496                        ObjectKey::min_for_id(&start_id)..=ObjectKey::max_for_id(&end_id),
1497                    ) {
1498                        match db_result {
1499                            Ok((object_key, object)) => {
1500                                object_scanned += 1;
1501                                if object_scanned.is_multiple_of(100000) {
1502                                    info!(
1503                                        "[Re-accumulate] Task {}: object scanned: {}",
1504                                        index, object_scanned,
1505                                    );
1506                                }
1507                                if matches!(prev.1.inner(), StoreObject::Wrapped)
1508                                    && object_key.0 != prev.0.0
1509                                {
1510                                    wrapped_objects_to_remove
1511                                        .push(WrappedObject::new(prev.0.0, prev.0.1));
1512                                }
1513
1514                                prev = (object_key, object);
1515                            }
1516                            Err(err) => {
1517                                warn!("Object iterator encounter RocksDB error {:?}", err);
1518                                return Err(err);
1519                            }
1520                        }
1521                    }
1522                    if matches!(prev.1.inner(), StoreObject::Wrapped) {
1523                        wrapped_objects_to_remove.push(WrappedObject::new(prev.0.0, prev.0.1));
1524                    }
1525                    info!(
1526                        "[Re-accumulate] Task {}: object scanned: {}, wrapped objects: {}",
1527                        index,
1528                        object_scanned,
1529                        wrapped_objects_to_remove.len(),
1530                    );
1531                    Ok((wrapped_objects_to_remove, object_scanned))
1532                }));
1533            }
1534            let (last_checkpoint_of_epoch, cur_accumulator) = self
1535                .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
1536                .expect("read cannot fail")
1537                .expect("accumulator must exist");
1538            let (accumulator, total_objects_scanned, total_wrapped_objects) =
1539                pending_tasks.into_iter().fold(
1540                    (cur_accumulator, 0u64, 0usize),
1541                    |(mut accumulator, total_objects_scanned, total_wrapped_objects), task| {
1542                        let (wrapped_objects_to_remove, object_scanned) =
1543                            task.join().unwrap().unwrap();
1544                        accumulator.remove_all(
1545                            wrapped_objects_to_remove
1546                                .iter()
1547                                .map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
1548                                .collect::<Vec<Vec<u8>>>(),
1549                        );
1550                        (
1551                            accumulator,
1552                            total_objects_scanned + object_scanned,
1553                            total_wrapped_objects + wrapped_objects_to_remove.len(),
1554                        )
1555                    },
1556                );
1557            info!(
1558                "[Re-accumulate] Total objects scanned: {}, total wrapped objects: {}",
1559                total_objects_scanned, total_wrapped_objects,
1560            );
1561            info!(
1562                "[Re-accumulate] New accumulator value: {:?}",
1563                accumulator.digest()
1564            );
1565            self.insert_state_hash_for_epoch(
1566                cur_epoch_store.epoch(),
1567                &last_checkpoint_of_epoch,
1568                &accumulator,
1569            )
1570            .unwrap();
1571        });
1572        info!(
1573            "[Re-accumulate] Re-accumulating took {}seconds",
1574            cur_time.elapsed().as_secs()
1575        );
1576    }
1577
1578    pub async fn prune_objects_and_compact_for_testing(
1579        &self,
1580        checkpoint_store: &Arc<CheckpointStore>,
1581        rpc_index: Option<&RpcIndexStore>,
1582    ) {
1583        let pruning_config = AuthorityStorePruningConfig {
1584            num_epochs_to_retain: 0,
1585            ..Default::default()
1586        };
1587        let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
1588            &self.perpetual_tables,
1589            checkpoint_store,
1590            rpc_index,
1591            None,
1592            pruning_config,
1593            AuthorityStorePruningMetrics::new_for_test(),
1594            EPOCH_DURATION_MS_FOR_TESTING,
1595        )
1596        .await;
1597        let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
1598    }
1599
1600    #[cfg(test)]
1601    pub async fn prune_objects_immediately_for_testing(
1602        &self,
1603        transaction_effects: Vec<TransactionEffects>,
1604    ) -> anyhow::Result<()> {
1605        let mut wb = self.perpetual_tables.objects.batch();
1606
1607        let mut object_keys_to_prune = vec![];
1608        for effects in &transaction_effects {
1609            for (object_id, seq_number) in effects.modified_at_versions() {
1610                info!("Pruning object {:?} version {:?}", object_id, seq_number);
1611                object_keys_to_prune.push(ObjectKey(object_id, seq_number));
1612            }
1613        }
1614
1615        wb.delete_batch(
1616            &self.perpetual_tables.objects,
1617            object_keys_to_prune.into_iter(),
1618        )?;
1619        wb.write()?;
1620        Ok(())
1621    }
1622
1623    // Counts the number of versions exist in object store for `object_id`. This includes tombstone.
1624    #[cfg(msim)]
1625    pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
1626        self.perpetual_tables
1627            .objects
1628            .safe_iter_with_bounds(
1629                Some(ObjectKey(object_id, VersionNumber::MIN)),
1630                Some(ObjectKey(object_id, VersionNumber::MAX)),
1631            )
1632            .collect::<Result<Vec<_>, _>>()
1633            .unwrap()
1634            .len()
1635    }
1636}
1637
1638impl GlobalStateHashStore for AuthorityStore {
1639    fn get_object_ref_prior_to_key_deprecated(
1640        &self,
1641        object_id: &ObjectID,
1642        version: VersionNumber,
1643    ) -> SuiResult<Option<ObjectRef>> {
1644        self.get_object_ref_prior_to_key(object_id, version)
1645    }
1646
1647    fn get_root_state_hash_for_epoch(
1648        &self,
1649        epoch: EpochId,
1650    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
1651        self.perpetual_tables
1652            .root_state_hash_by_epoch
1653            .get(&epoch)
1654            .map_err(Into::into)
1655    }
1656
1657    fn get_root_state_hash_for_highest_epoch(
1658        &self,
1659    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
1660        Ok(self
1661            .perpetual_tables
1662            .root_state_hash_by_epoch
1663            .reversed_safe_iter_with_bounds(None, None)?
1664            .next()
1665            .transpose()?)
1666    }
1667
1668    fn insert_state_hash_for_epoch(
1669        &self,
1670        epoch: EpochId,
1671        last_checkpoint_of_epoch: &CheckpointSequenceNumber,
1672        acc: &GlobalStateHash,
1673    ) -> SuiResult {
1674        self.perpetual_tables
1675            .root_state_hash_by_epoch
1676            .insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
1677        self.root_state_notify_read
1678            .notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
1679
1680        Ok(())
1681    }
1682
1683    fn iter_live_object_set(
1684        &self,
1685        include_wrapped_object: bool,
1686    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1687        Box::new(
1688            self.perpetual_tables
1689                .iter_live_object_set(include_wrapped_object),
1690        )
1691    }
1692}
1693
1694impl ObjectStore for AuthorityStore {
1695    /// Read an object and return it, or Ok(None) if the object was not found.
1696    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
1697        self.perpetual_tables.as_ref().get_object(object_id)
1698    }
1699
1700    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
1701        self.perpetual_tables.get_object_by_key(object_id, version)
1702    }
1703}
1704
1705/// A wrapper to make Orphan Rule happy
1706pub struct ResolverWrapper {
1707    pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1708    pub metrics: Arc<ResolverMetrics>,
1709}
1710
1711impl ResolverWrapper {
1712    pub fn new(
1713        resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1714        metrics: Arc<ResolverMetrics>,
1715    ) -> Self {
1716        metrics.module_cache_size.set(0);
1717        ResolverWrapper { resolver, metrics }
1718    }
1719
1720    fn inc_cache_size_gauge(&self) {
1721        // reset the gauge after a restart of the cache
1722        let current = self.metrics.module_cache_size.get();
1723        self.metrics.module_cache_size.set(current + 1);
1724    }
1725}
1726
1727impl ModuleResolver for ResolverWrapper {
1728    type Error = SuiError;
1729    fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
1730        self.inc_cache_size_gauge();
1731        get_module(&*self.resolver, module_id)
1732    }
1733}
1734
1735pub enum UpdateType {
1736    Transaction(TransactionEffectsDigest),
1737    Genesis,
1738}
1739
1740pub type SuiLockResult = SuiResult<ObjectLockStatus>;
1741
1742#[derive(Debug, PartialEq, Eq)]
1743pub enum ObjectLockStatus {
1744    Initialized,
1745    LockedToTx { locked_by_tx: LockDetailsDeprecated },
1746    LockedAtDifferentVersion { locked_ref: ObjectRef },
1747}
1748
1749#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1750pub enum LockDetailsWrapperDeprecated {
1751    V1(LockDetailsV1Deprecated),
1752}
1753
1754impl LockDetailsWrapperDeprecated {
1755    pub fn migrate(self) -> Self {
1756        // TODO: when there are multiple versions, we must iteratively migrate from version N to
1757        // N+1 until we arrive at the latest version
1758        self
1759    }
1760
1761    // Always returns the most recent version. Older versions are migrated to the latest version at
1762    // read time, so there is never a need to access older versions.
1763    pub fn inner(&self) -> &LockDetailsDeprecated {
1764        match self {
1765            Self::V1(v1) => v1,
1766
1767            // can remove #[allow] when there are multiple versions
1768            #[allow(unreachable_patterns)]
1769            _ => panic!("lock details should have been migrated to latest version at read time"),
1770        }
1771    }
1772    pub fn into_inner(self) -> LockDetailsDeprecated {
1773        match self {
1774            Self::V1(v1) => v1,
1775
1776            // can remove #[allow] when there are multiple versions
1777            #[allow(unreachable_patterns)]
1778            _ => panic!("lock details should have been migrated to latest version at read time"),
1779        }
1780    }
1781}
1782
1783#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1784pub struct LockDetailsV1Deprecated {
1785    pub epoch: EpochId,
1786    pub tx_digest: TransactionDigest,
1787}
1788
1789pub type LockDetailsDeprecated = LockDetailsV1Deprecated;
1790
1791impl From<LockDetailsDeprecated> for LockDetailsWrapperDeprecated {
1792    fn from(details: LockDetailsDeprecated) -> Self {
1793        // always use latest version.
1794        LockDetailsWrapperDeprecated::V1(details)
1795    }
1796}