sui_core/authority/
authority_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::{iter, mem, thread};
6
7use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
8use crate::authority::authority_store_pruner::{
9    AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
10};
11use crate::authority::authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object};
12use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
13use crate::global_state_hasher::GlobalStateHashStore;
14use crate::rpc_index::RpcIndexStore;
15use crate::transaction_outputs::TransactionOutputs;
16use either::Either;
17use fastcrypto::hash::{HashFunction, MultisetHash, Sha3_256};
18use futures::stream::FuturesUnordered;
19use itertools::izip;
20use move_core_types::resolver::ModuleResolver;
21use serde::{Deserialize, Serialize};
22use sui_config::node::AuthorityStorePruningConfig;
23use sui_macros::fail_point_arg;
24use sui_storage::mutex_table::{MutexGuard, MutexTable};
25use sui_types::error::{SuiErrorKind, UserInputError};
26use sui_types::execution::TypeLayoutStore;
27use sui_types::global_state_hash::GlobalStateHash;
28use sui_types::message_envelope::Message;
29use sui_types::storage::{
30    BackingPackageStore, FullObjectKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore,
31    get_module,
32};
33use sui_types::sui_system_state::get_sui_system_state;
34use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure};
35use tokio::time::Instant;
36use tracing::{debug, info, trace};
37use typed_store::traits::Map;
38use typed_store::{
39    TypedStoreError,
40    rocks::{DBBatch, DBMap},
41};
42
43use super::authority_store_tables::LiveObject;
44use super::{authority_store_tables::AuthorityPerpetualTables, *};
45use mysten_common::sync::notify_read::NotifyRead;
46use sui_types::effects::{TransactionEffects, TransactionEvents};
47use sui_types::gas_coin::TOTAL_SUPPLY_MIST;
48
49const NUM_SHARDS: usize = 4096;
50
51struct AuthorityStoreMetrics {
52    sui_conservation_check_latency: IntGauge,
53    sui_conservation_live_object_count: IntGauge,
54    sui_conservation_live_object_size: IntGauge,
55    sui_conservation_imbalance: IntGauge,
56    sui_conservation_storage_fund: IntGauge,
57    sui_conservation_storage_fund_imbalance: IntGauge,
58    epoch_flags: IntGaugeVec,
59}
60
61impl AuthorityStoreMetrics {
62    pub fn new(registry: &Registry) -> Self {
63        Self {
64            sui_conservation_check_latency: register_int_gauge_with_registry!(
65                "sui_conservation_check_latency",
66                "Number of seconds took to scan all live objects in the store for SUI conservation check",
67                registry,
68            ).unwrap(),
69            sui_conservation_live_object_count: register_int_gauge_with_registry!(
70                "sui_conservation_live_object_count",
71                "Number of live objects in the store",
72                registry,
73            ).unwrap(),
74            sui_conservation_live_object_size: register_int_gauge_with_registry!(
75                "sui_conservation_live_object_size",
76                "Size in bytes of live objects in the store",
77                registry,
78            ).unwrap(),
79            sui_conservation_imbalance: register_int_gauge_with_registry!(
80                "sui_conservation_imbalance",
81                "Total amount of SUI in the network - 10B * 10^9. This delta shows the amount of imbalance",
82                registry,
83            ).unwrap(),
84            sui_conservation_storage_fund: register_int_gauge_with_registry!(
85                "sui_conservation_storage_fund",
86                "Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
87                registry,
88            ).unwrap(),
89            sui_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
90                "sui_conservation_storage_fund_imbalance",
91                "Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
92                registry,
93            ).unwrap(),
94            epoch_flags: register_int_gauge_vec_with_registry!(
95                "epoch_flags",
96                "Local flags of the currently running epoch",
97                &["flag"],
98                registry,
99            ).unwrap(),
100        }
101    }
102}
103
104/// ALL_OBJ_VER determines whether we want to store all past
105/// versions of every object in the store. Authority doesn't store
106/// them, but other entities such as replicas will.
107/// S is a template on Authority signature state. This allows SuiDataStore to be used on either
108/// authorities or non-authorities. Specifically, when storing transactions and effects,
109/// S allows SuiDataStore to either store the authority signed version or unsigned version.
110pub struct AuthorityStore {
111    /// Internal vector of locks to manage concurrent writes to the database
112    mutex_table: MutexTable<ObjectDigest>,
113
114    pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
115
116    pub(crate) root_state_notify_read:
117        NotifyRead<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
118
119    /// Whether to enable expensive SUI conservation check at epoch boundaries.
120    enable_epoch_sui_conservation_check: bool,
121
122    metrics: AuthorityStoreMetrics,
123}
124
125pub type ExecutionLockReadGuard<'a> = tokio::sync::RwLockReadGuard<'a, EpochId>;
126pub type ExecutionLockWriteGuard<'a> = tokio::sync::RwLockWriteGuard<'a, EpochId>;
127
128impl AuthorityStore {
129    /// Open an authority store by directory path.
130    /// If the store is empty, initialize it using genesis.
131    pub async fn open(
132        perpetual_tables: Arc<AuthorityPerpetualTables>,
133        genesis: &Genesis,
134        config: &NodeConfig,
135        registry: &Registry,
136    ) -> SuiResult<Arc<Self>> {
137        let enable_epoch_sui_conservation_check = config
138            .expensive_safety_check_config
139            .enable_epoch_sui_conservation_check();
140
141        let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
142            info!("Creating new epoch start config from genesis");
143
144            #[allow(unused_mut)]
145            let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
146            fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
147                info!("Setting initial epoch flags to {:?}", flags);
148                initial_epoch_flags = flags;
149            });
150
151            let epoch_start_configuration = EpochStartConfiguration::new(
152                genesis.sui_system_object().into_epoch_start_state(),
153                *genesis.checkpoint().digest(),
154                &genesis.objects(),
155                initial_epoch_flags,
156            )?;
157            perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
158            epoch_start_configuration
159        } else {
160            info!("Loading epoch start config from DB");
161            perpetual_tables
162                .epoch_start_configuration
163                .get(&())?
164                .expect("Epoch start configuration must be set in non-empty DB")
165        };
166        let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
167        info!("Epoch start config: {:?}", epoch_start_configuration);
168        info!("Cur epoch: {:?}", cur_epoch);
169        let this = Self::open_inner(
170            genesis,
171            perpetual_tables,
172            enable_epoch_sui_conservation_check,
173            registry,
174        )
175        .await?;
176        this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
177        Ok(this)
178    }
179
180    pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
181        for flag in old {
182            self.metrics
183                .epoch_flags
184                .with_label_values(&[&flag.to_string()])
185                .set(0);
186        }
187        for flag in new {
188            self.metrics
189                .epoch_flags
190                .with_label_values(&[&flag.to_string()])
191                .set(1);
192        }
193    }
194
195    // NB: This must only be called at time of reconfiguration. We take the execution lock write
196    // guard as an argument to ensure that this is the case.
197    pub fn clear_object_per_epoch_marker_table(
198        &self,
199        _execution_guard: &ExecutionLockWriteGuard<'_>,
200    ) -> SuiResult<()> {
201        // We can safely delete all entries in the per epoch marker table since this is only called
202        // at epoch boundaries (during reconfiguration). Therefore any entries that currently
203        // exist can be removed. Because of this we can use the `schedule_delete_all` method.
204        self.perpetual_tables
205            .object_per_epoch_marker_table
206            .schedule_delete_all()?;
207        Ok(self
208            .perpetual_tables
209            .object_per_epoch_marker_table_v2
210            .schedule_delete_all()?)
211    }
212
213    pub async fn open_with_committee_for_testing(
214        perpetual_tables: Arc<AuthorityPerpetualTables>,
215        committee: &Committee,
216        genesis: &Genesis,
217    ) -> SuiResult<Arc<Self>> {
218        // TODO: Since we always start at genesis, the committee should be technically the same
219        // as the genesis committee.
220        assert_eq!(committee.epoch, 0);
221        Self::open_inner(genesis, perpetual_tables, true, &Registry::new()).await
222    }
223
224    async fn open_inner(
225        genesis: &Genesis,
226        perpetual_tables: Arc<AuthorityPerpetualTables>,
227        enable_epoch_sui_conservation_check: bool,
228        registry: &Registry,
229    ) -> SuiResult<Arc<Self>> {
230        let store = Arc::new(Self {
231            mutex_table: MutexTable::new(NUM_SHARDS),
232            perpetual_tables,
233            root_state_notify_read: NotifyRead::<
234                EpochId,
235                (CheckpointSequenceNumber, GlobalStateHash),
236            >::new(),
237            enable_epoch_sui_conservation_check,
238            metrics: AuthorityStoreMetrics::new(registry),
239        });
240        // Only initialize an empty database.
241        if store
242            .database_is_empty()
243            .expect("Database read should not fail at init.")
244        {
245            store
246                .bulk_insert_genesis_objects(genesis.objects())
247                .expect("Cannot bulk insert genesis objects");
248
249            // insert txn and effects of genesis
250            let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
251
252            store
253                .perpetual_tables
254                .transactions
255                .insert(transaction.digest(), transaction.serializable_ref())
256                .unwrap();
257
258            store
259                .perpetual_tables
260                .effects
261                .insert(&genesis.effects().digest(), genesis.effects())
262                .unwrap();
263            // We don't insert the effects to executed_effects yet because the genesis tx hasn't but will be executed.
264            // This is important for fullnodes to be able to generate indexing data right now.
265
266            if genesis.effects().events_digest().is_some() {
267                store
268                    .perpetual_tables
269                    .events_2
270                    .insert(transaction.digest(), genesis.events())
271                    .unwrap();
272            }
273        }
274
275        Ok(store)
276    }
277
278    /// Open authority store without any operations that require
279    /// genesis, such as constructing EpochStartConfiguration
280    /// or inserting genesis objects.
281    pub fn open_no_genesis(
282        perpetual_tables: Arc<AuthorityPerpetualTables>,
283        enable_epoch_sui_conservation_check: bool,
284        registry: &Registry,
285    ) -> SuiResult<Arc<Self>> {
286        let store = Arc::new(Self {
287            mutex_table: MutexTable::new(NUM_SHARDS),
288            perpetual_tables,
289            root_state_notify_read: NotifyRead::<
290                EpochId,
291                (CheckpointSequenceNumber, GlobalStateHash),
292            >::new(),
293            enable_epoch_sui_conservation_check,
294            metrics: AuthorityStoreMetrics::new(registry),
295        });
296        Ok(store)
297    }
298
299    pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
300        self.perpetual_tables.get_recovery_epoch_at_restart()
301    }
302
303    pub fn get_effects(
304        &self,
305        effects_digest: &TransactionEffectsDigest,
306    ) -> SuiResult<Option<TransactionEffects>> {
307        Ok(self.perpetual_tables.effects.get(effects_digest)?)
308    }
309
310    /// Returns true if we have an effects structure for this transaction digest
311    pub fn effects_exists(&self, effects_digest: &TransactionEffectsDigest) -> SuiResult<bool> {
312        self.perpetual_tables
313            .effects
314            .contains_key(effects_digest)
315            .map_err(|e| e.into())
316    }
317
318    pub fn get_events(
319        &self,
320        digest: &TransactionDigest,
321    ) -> Result<Option<TransactionEvents>, TypedStoreError> {
322        self.perpetual_tables.events_2.get(digest)
323    }
324
325    pub fn multi_get_events(
326        &self,
327        event_digests: &[TransactionDigest],
328    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
329        Ok(event_digests
330            .iter()
331            .map(|digest| self.get_events(digest))
332            .collect::<Result<Vec<_>, _>>()?)
333    }
334
335    pub fn get_unchanged_loaded_runtime_objects(
336        &self,
337        digest: &TransactionDigest,
338    ) -> Result<Option<Vec<ObjectKey>>, TypedStoreError> {
339        self.perpetual_tables
340            .unchanged_loaded_runtime_objects
341            .get(digest)
342    }
343
344    pub fn multi_get_effects<'a>(
345        &self,
346        effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
347    ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
348        self.perpetual_tables.effects.multi_get(effects_digests)
349    }
350
351    pub fn get_executed_effects(
352        &self,
353        tx_digest: &TransactionDigest,
354    ) -> Result<Option<TransactionEffects>, TypedStoreError> {
355        let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
356        match effects_digest {
357            Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
358            None => Ok(None),
359        }
360    }
361
362    /// Given a list of transaction digests, returns a list of the corresponding effects only if they have been
363    /// executed. For transactions that have not been executed, None is returned.
364    pub fn multi_get_executed_effects_digests(
365        &self,
366        digests: &[TransactionDigest],
367    ) -> Result<Vec<Option<TransactionEffectsDigest>>, TypedStoreError> {
368        self.perpetual_tables.executed_effects.multi_get(digests)
369    }
370
371    /// Given a list of transaction digests, returns a list of the corresponding effects only if they have been
372    /// executed. For transactions that have not been executed, None is returned.
373    pub fn multi_get_executed_effects(
374        &self,
375        digests: &[TransactionDigest],
376    ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
377        let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
378        let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
379        let mut tx_to_effects_map = effects
380            .into_iter()
381            .flatten()
382            .map(|effects| (*effects.transaction_digest(), effects))
383            .collect::<HashMap<_, _>>();
384        Ok(digests
385            .iter()
386            .map(|digest| tx_to_effects_map.remove(digest))
387            .collect())
388    }
389
390    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> SuiResult<bool> {
391        Ok(self
392            .perpetual_tables
393            .executed_effects
394            .contains_key(digest)?)
395    }
396
397    pub fn get_marker_value(
398        &self,
399        object_key: FullObjectKey,
400        epoch_id: EpochId,
401    ) -> SuiResult<Option<MarkerValue>> {
402        Ok(self
403            .perpetual_tables
404            .object_per_epoch_marker_table_v2
405            .get(&(epoch_id, object_key))?)
406    }
407
408    pub fn get_latest_marker(
409        &self,
410        object_id: FullObjectID,
411        epoch_id: EpochId,
412    ) -> SuiResult<Option<(SequenceNumber, MarkerValue)>> {
413        let min_key = (epoch_id, FullObjectKey::min_for_id(&object_id));
414        let max_key = (epoch_id, FullObjectKey::max_for_id(&object_id));
415
416        let marker_entry = self
417            .perpetual_tables
418            .object_per_epoch_marker_table_v2
419            .reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
420            .next();
421        match marker_entry {
422            Some(Ok(((epoch, key), marker))) => {
423                // because of the iterator bounds these cannot fail
424                assert_eq!(epoch, epoch_id);
425                assert_eq!(key.id(), object_id);
426                Ok(Some((key.version(), marker)))
427            }
428            Some(Err(e)) => Err(e.into()),
429            None => Ok(None),
430        }
431    }
432
433    /// Returns future containing the state hash for the given epoch
434    /// once available
435    pub async fn notify_read_root_state_hash(
436        &self,
437        epoch: EpochId,
438    ) -> SuiResult<(CheckpointSequenceNumber, GlobalStateHash)> {
439        // We need to register waiters _before_ reading from the database to avoid race conditions
440        let registration = self.root_state_notify_read.register_one(&epoch);
441        let hash = self.perpetual_tables.root_state_hash_by_epoch.get(&epoch)?;
442
443        let result = match hash {
444            // Note that Some() clause also drops registration that is already fulfilled
445            Some(ready) => Either::Left(futures::future::ready(ready)),
446            None => Either::Right(registration),
447        }
448        .await;
449
450        Ok(result)
451    }
452
453    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
454    pub fn deprecated_insert_finalized_transactions(
455        &self,
456        digests: &[TransactionDigest],
457        epoch: EpochId,
458        sequence: CheckpointSequenceNumber,
459    ) -> SuiResult {
460        let mut batch = self
461            .perpetual_tables
462            .executed_transactions_to_checkpoint
463            .batch();
464        batch.insert_batch(
465            &self.perpetual_tables.executed_transactions_to_checkpoint,
466            digests.iter().map(|d| (*d, (epoch, sequence))),
467        )?;
468        batch.write()?;
469        trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
470        Ok(())
471    }
472
473    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
474    pub fn deprecated_get_transaction_checkpoint(
475        &self,
476        digest: &TransactionDigest,
477    ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
478        Ok(self
479            .perpetual_tables
480            .executed_transactions_to_checkpoint
481            .get(digest)?)
482    }
483
484    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
485    pub fn deprecated_multi_get_transaction_checkpoint(
486        &self,
487        digests: &[TransactionDigest],
488    ) -> SuiResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
489        Ok(self
490            .perpetual_tables
491            .executed_transactions_to_checkpoint
492            .multi_get(digests)?
493            .into_iter()
494            .collect())
495    }
496
497    /// Returns true if there are no objects in the database
498    pub fn database_is_empty(&self) -> SuiResult<bool> {
499        self.perpetual_tables.database_is_empty()
500    }
501
502    /// A function that acquires all locks associated with the objects (in order to avoid deadlocks).
503    fn acquire_locks(&self, input_objects: &[ObjectRef]) -> Vec<MutexGuard> {
504        self.mutex_table
505            .acquire_locks(input_objects.iter().map(|(_, _, digest)| *digest))
506    }
507
508    pub fn object_exists_by_key(
509        &self,
510        object_id: &ObjectID,
511        version: VersionNumber,
512    ) -> SuiResult<bool> {
513        Ok(self
514            .perpetual_tables
515            .objects
516            .contains_key(&ObjectKey(*object_id, version))?)
517    }
518
519    pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<bool>> {
520        Ok(self
521            .perpetual_tables
522            .objects
523            .multi_contains_keys(object_keys.to_vec())?
524            .into_iter()
525            .collect())
526    }
527
528    fn get_object_ref_prior_to_key(
529        &self,
530        object_id: &ObjectID,
531        version: VersionNumber,
532    ) -> Result<Option<ObjectRef>, SuiError> {
533        let Some(prior_version) = version.one_before() else {
534            return Ok(None);
535        };
536        let mut iterator = self
537            .perpetual_tables
538            .objects
539            .reversed_safe_iter_with_bounds(
540                Some(ObjectKey::min_for_id(object_id)),
541                Some(ObjectKey(*object_id, prior_version)),
542            )?;
543
544        if let Some((object_key, value)) = iterator.next().transpose()?
545            && object_key.0 == *object_id
546        {
547            return Ok(Some(
548                self.perpetual_tables.object_reference(&object_key, value)?,
549            ));
550        }
551        Ok(None)
552    }
553
554    pub fn multi_get_objects_by_key(
555        &self,
556        object_keys: &[ObjectKey],
557    ) -> Result<Vec<Option<Object>>, SuiError> {
558        let wrappers = self
559            .perpetual_tables
560            .objects
561            .multi_get(object_keys.to_vec())?;
562        let mut ret = vec![];
563
564        for (idx, w) in wrappers.into_iter().enumerate() {
565            ret.push(
566                w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
567                    .transpose()?
568                    .flatten(),
569            );
570        }
571        Ok(ret)
572    }
573
574    /// Get many objects
575    pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, SuiError> {
576        let mut result = Vec::new();
577        for id in objects {
578            result.push(self.get_object(id));
579        }
580        Ok(result)
581    }
582
583    // Methods to mutate the store
584
585    /// Insert a genesis object.
586    /// TODO: delete this method entirely (still used by authority_tests.rs)
587    pub(crate) fn insert_genesis_object(&self, object: Object) -> SuiResult {
588        // We only side load objects with a genesis parent transaction.
589        debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
590        let object_ref = object.compute_object_reference();
591        self.insert_object_direct(object_ref, &object)
592    }
593
594    /// Insert an object directly into the store, and also update relevant tables
595    /// NOTE: does not handle transaction lock.
596    /// This is used to insert genesis objects
597    fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> SuiResult {
598        let mut write_batch = self.perpetual_tables.objects.batch();
599
600        // Insert object
601        let store_object = get_store_object(object.clone());
602        write_batch.insert_batch(
603            &self.perpetual_tables.objects,
604            std::iter::once((ObjectKey::from(object_ref), store_object)),
605        )?;
606
607        // Update the index
608        if object.get_single_owner().is_some() {
609            // Only initialize lock for address owned objects.
610            if !object.is_child_object() {
611                self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref], false)?;
612            }
613        }
614
615        write_batch.write()?;
616
617        Ok(())
618    }
619
620    /// This function should only be used for initializing genesis and should remain private.
621    #[instrument(level = "debug", skip_all)]
622    pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> SuiResult<()> {
623        let mut batch = self.perpetual_tables.objects.batch();
624        let ref_and_objects: Vec<_> = objects
625            .iter()
626            .map(|o| (o.compute_object_reference(), o))
627            .collect();
628
629        batch.insert_batch(
630            &self.perpetual_tables.objects,
631            ref_and_objects
632                .iter()
633                .map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))),
634        )?;
635
636        let non_child_object_refs: Vec<_> = ref_and_objects
637            .iter()
638            .filter(|(_, object)| !object.is_child_object())
639            .map(|(oref, _)| *oref)
640            .collect();
641
642        self.initialize_live_object_markers_impl(
643            &mut batch,
644            &non_child_object_refs,
645            false, // is_force_reset
646        )?;
647
648        batch.write()?;
649
650        Ok(())
651    }
652
653    pub fn bulk_insert_live_objects(
654        perpetual_db: &AuthorityPerpetualTables,
655        live_objects: impl Iterator<Item = LiveObject>,
656        expected_sha3_digest: &[u8; 32],
657    ) -> SuiResult<()> {
658        let mut hasher = Sha3_256::default();
659        let mut batch = perpetual_db.objects.batch();
660        let mut written = 0usize;
661        const MAX_BATCH_SIZE: usize = 100_000;
662        for object in live_objects {
663            hasher.update(object.object_reference().2.inner());
664            match object {
665                LiveObject::Normal(object) => {
666                    let store_object_wrapper = get_store_object(object.clone());
667                    batch.insert_batch(
668                        &perpetual_db.objects,
669                        std::iter::once((
670                            ObjectKey::from(object.compute_object_reference()),
671                            store_object_wrapper,
672                        )),
673                    )?;
674                    if !object.is_child_object() {
675                        Self::initialize_live_object_markers(
676                            &perpetual_db.live_owned_object_markers,
677                            &mut batch,
678                            &[object.compute_object_reference()],
679                            false, // is_force_reset
680                        )?;
681                    }
682                }
683                LiveObject::Wrapped(object_key) => {
684                    batch.insert_batch(
685                        &perpetual_db.objects,
686                        std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
687                            object_key,
688                            StoreObject::Wrapped.into(),
689                        )),
690                    )?;
691                }
692            }
693            written += 1;
694            if written > MAX_BATCH_SIZE {
695                batch.write()?;
696                batch = perpetual_db.objects.batch();
697                written = 0;
698            }
699        }
700        let sha3_digest = hasher.finalize().digest;
701        if *expected_sha3_digest != sha3_digest {
702            error!(
703                "Sha does not match! expected: {:?}, actual: {:?}",
704                expected_sha3_digest, sha3_digest
705            );
706            return Err(SuiError::from("Sha does not match"));
707        }
708        batch.write()?;
709        Ok(())
710    }
711
712    pub fn set_epoch_start_configuration(
713        &self,
714        epoch_start_configuration: &EpochStartConfiguration,
715    ) -> SuiResult {
716        self.perpetual_tables
717            .set_epoch_start_configuration(epoch_start_configuration)?;
718        Ok(())
719    }
720
721    pub fn get_epoch_start_configuration(&self) -> SuiResult<Option<EpochStartConfiguration>> {
722        Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
723    }
724
725    /// Updates the state resulting from the execution of a certificate.
726    ///
727    /// Internally it checks that all locks for active inputs are at the correct
728    /// version, and then writes objects, certificates, parents and clean up locks atomically.
729    #[instrument(level = "debug", skip_all)]
730    pub fn build_db_batch(
731        &self,
732        epoch_id: EpochId,
733        tx_outputs: &[Arc<TransactionOutputs>],
734    ) -> SuiResult<DBBatch> {
735        let mut written = Vec::with_capacity(tx_outputs.len());
736        for outputs in tx_outputs {
737            written.extend(outputs.written.values().cloned());
738        }
739
740        let mut write_batch = self.perpetual_tables.transactions.batch();
741        for outputs in tx_outputs {
742            self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
743        }
744        // test crashing before writing the batch
745        fail_point!("crash");
746
747        trace!(
748            "built batch for committed transactions: {:?}",
749            tx_outputs
750                .iter()
751                .map(|tx| tx.transaction.digest())
752                .collect::<Vec<_>>()
753        );
754
755        // test crashing before notifying
756        fail_point!("crash");
757
758        Ok(write_batch)
759    }
760
761    fn write_one_transaction_outputs(
762        &self,
763        write_batch: &mut DBBatch,
764        epoch_id: EpochId,
765        tx_outputs: &TransactionOutputs,
766    ) -> SuiResult {
767        let TransactionOutputs {
768            transaction,
769            effects,
770            markers,
771            wrapped,
772            deleted,
773            written,
774            events,
775            unchanged_loaded_runtime_objects,
776            locks_to_delete,
777            new_locks_to_init,
778            ..
779        } = tx_outputs;
780
781        let effects_digest = effects.digest();
782        let transaction_digest = transaction.digest();
783        // effects must be inserted before the corresponding dependent entries
784        // because they carry epoch information necessary for correct pruning via relocation filters
785        write_batch
786            .insert_batch(
787                &self.perpetual_tables.effects,
788                [(effects_digest, effects.clone())],
789            )?
790            .insert_batch(
791                &self.perpetual_tables.executed_effects,
792                [(transaction_digest, effects_digest)],
793            )?;
794
795        // Store the certificate indexed by transaction digest
796        write_batch.insert_batch(
797            &self.perpetual_tables.transactions,
798            iter::once((transaction_digest, transaction.serializable_ref())),
799        )?;
800
801        // Add batched writes for objects and locks.
802        write_batch.insert_batch(
803            &self.perpetual_tables.object_per_epoch_marker_table_v2,
804            markers
805                .iter()
806                .map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
807        )?;
808        write_batch.insert_batch(
809            &self.perpetual_tables.objects,
810            deleted
811                .iter()
812                .map(|key| (key, StoreObject::Deleted))
813                .chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
814                .map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
815        )?;
816
817        // Insert each output object into the stores
818        let new_objects = written.iter().map(|(id, new_object)| {
819            let version = new_object.version();
820            trace!(?id, ?version, "writing object");
821            let store_object = get_store_object(new_object.clone());
822            (ObjectKey(*id, version), store_object)
823        });
824
825        write_batch.insert_batch(&self.perpetual_tables.objects, new_objects)?;
826
827        // Write events into the new table keyed off of transaction_digest
828        if effects.events_digest().is_some() {
829            write_batch.insert_batch(
830                &self.perpetual_tables.events_2,
831                [(transaction_digest, events)],
832            )?;
833        }
834
835        // Write unchanged_loaded_runtime_objects
836        if !unchanged_loaded_runtime_objects.is_empty() {
837            write_batch.insert_batch(
838                &self.perpetual_tables.unchanged_loaded_runtime_objects,
839                [(transaction_digest, unchanged_loaded_runtime_objects)],
840            )?;
841        }
842
843        self.initialize_live_object_markers_impl(write_batch, new_locks_to_init, false)?;
844
845        // Note: deletes locks for received objects as well (but not for objects that were in
846        // `Receiving` arguments which were not received)
847        self.delete_live_object_markers(write_batch, locks_to_delete)?;
848
849        debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
850
851        Ok(())
852    }
853
854    /// Commits transactions only (not effects or other transaction outputs) to the db.
855    /// See ExecutionCache::persist_transaction for more info
856    pub(crate) fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> SuiResult {
857        let mut batch = self.perpetual_tables.transactions.batch();
858        batch.insert_batch(
859            &self.perpetual_tables.transactions,
860            [(tx.digest(), tx.clone().into_unsigned().serializable_ref())],
861        )?;
862        batch.write()?;
863        Ok(())
864    }
865
866    pub fn acquire_transaction_locks(
867        &self,
868        epoch_store: &AuthorityPerEpochStore,
869        owned_input_objects: &[ObjectRef],
870        tx_digest: TransactionDigest,
871        signed_transaction: Option<VerifiedSignedTransaction>,
872    ) -> SuiResult {
873        let epoch = epoch_store.epoch();
874        // Other writers may be attempting to acquire locks on the same objects, so a mutex is
875        // required.
876        // TODO: replace with optimistic db_transactions (i.e. set lock to tx if none)
877        let _mutexes = self.acquire_locks(owned_input_objects);
878
879        trace!(?owned_input_objects, "acquire_locks");
880        let mut locks_to_write = Vec::new();
881
882        let live_object_markers = self
883            .perpetual_tables
884            .live_owned_object_markers
885            .multi_get(owned_input_objects)?;
886
887        let epoch_tables = epoch_store.tables()?;
888
889        let locks = epoch_tables.multi_get_locked_transactions(owned_input_objects)?;
890
891        assert_eq!(locks.len(), live_object_markers.len());
892
893        for (live_marker, lock, obj_ref) in izip!(
894            live_object_markers.into_iter(),
895            locks.into_iter(),
896            owned_input_objects
897        ) {
898            let Some(live_marker) = live_marker else {
899                let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
900                fp_bail!(
901                    UserInputError::ObjectVersionUnavailableForConsumption {
902                        provided_obj_ref: *obj_ref,
903                        current_version: latest_lock.1
904                    }
905                    .into()
906                );
907            };
908
909            let live_marker = live_marker.map(|l| l.migrate().into_inner());
910
911            if let Some(LockDetailsDeprecated {
912                epoch: previous_epoch,
913                ..
914            }) = &live_marker
915            {
916                // this must be from a prior epoch, because we no longer write LockDetails to
917                // owned_object_transaction_locks
918                assert!(
919                    previous_epoch < &epoch,
920                    "lock for {:?} should be from a prior epoch",
921                    obj_ref
922                );
923            }
924
925            if let Some(previous_tx_digest) = &lock {
926                if previous_tx_digest == &tx_digest {
927                    // no need to re-write lock
928                    continue;
929                } else {
930                    // TODO: add metrics here
931                    info!(prev_tx_digest = ?previous_tx_digest,
932                          cur_tx_digest = ?tx_digest,
933                          "Cannot acquire lock: conflicting transaction!");
934                    return Err(SuiErrorKind::ObjectLockConflict {
935                        obj_ref: *obj_ref,
936                        pending_transaction: *previous_tx_digest,
937                    }
938                    .into());
939                }
940            }
941
942            locks_to_write.push((*obj_ref, tx_digest));
943        }
944
945        if !locks_to_write.is_empty() {
946            trace!(?locks_to_write, "Writing locks");
947            epoch_tables.write_transaction_locks(signed_transaction, locks_to_write.into_iter())?;
948        }
949
950        Ok(())
951    }
952
953    /// Gets ObjectLockInfo that represents state of lock on an object.
954    /// Returns UserInputError::ObjectNotFound if cannot find lock record for this object
955    pub(crate) fn get_lock(
956        &self,
957        obj_ref: ObjectRef,
958        epoch_store: &AuthorityPerEpochStore,
959    ) -> SuiLockResult {
960        if self
961            .perpetual_tables
962            .live_owned_object_markers
963            .get(&obj_ref)?
964            .is_none()
965        {
966            return Ok(ObjectLockStatus::LockedAtDifferentVersion {
967                locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
968            });
969        }
970
971        let tables = epoch_store.tables()?;
972        let epoch_id = epoch_store.epoch();
973
974        if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
975            Ok(ObjectLockStatus::LockedToTx {
976                locked_by_tx: LockDetailsDeprecated {
977                    epoch: epoch_id,
978                    tx_digest,
979                },
980            })
981        } else {
982            Ok(ObjectLockStatus::Initialized)
983        }
984    }
985
986    /// Returns UserInputError::ObjectNotFound if no lock records found for this object.
987    pub(crate) fn get_latest_live_version_for_object_id(
988        &self,
989        object_id: ObjectID,
990    ) -> SuiResult<ObjectRef> {
991        let mut iterator = self
992            .perpetual_tables
993            .live_owned_object_markers
994            .reversed_safe_iter_with_bounds(
995                None,
996                Some((object_id, SequenceNumber::MAX, ObjectDigest::MAX)),
997            )?;
998        Ok(iterator
999            .next()
1000            .transpose()?
1001            .and_then(|value| {
1002                if value.0.0 == object_id {
1003                    Some(value)
1004                } else {
1005                    None
1006                }
1007            })
1008            .ok_or_else(|| {
1009                SuiError::from(UserInputError::ObjectNotFound {
1010                    object_id,
1011                    version: None,
1012                })
1013            })?
1014            .0)
1015    }
1016
1017    /// Checks multiple object locks exist.
1018    /// Returns UserInputError::ObjectNotFound if cannot find lock record for at least one of the objects.
1019    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if at least one object lock is not initialized
1020    ///     at the given version.
1021    pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> SuiResult {
1022        let locks = self
1023            .perpetual_tables
1024            .live_owned_object_markers
1025            .multi_get(objects)?;
1026        for (lock, obj_ref) in locks.into_iter().zip(objects) {
1027            if lock.is_none() {
1028                let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
1029                fp_bail!(
1030                    UserInputError::ObjectVersionUnavailableForConsumption {
1031                        provided_obj_ref: *obj_ref,
1032                        current_version: latest_lock.1
1033                    }
1034                    .into()
1035                );
1036            }
1037        }
1038        Ok(())
1039    }
1040
1041    /// Initialize a lock to None (but exists) for a given list of ObjectRefs.
1042    /// Returns SuiErrorKind::ObjectLockAlreadyInitialized if the lock already exists and is locked to a transaction
1043    fn initialize_live_object_markers_impl(
1044        &self,
1045        write_batch: &mut DBBatch,
1046        objects: &[ObjectRef],
1047        is_force_reset: bool,
1048    ) -> SuiResult {
1049        AuthorityStore::initialize_live_object_markers(
1050            &self.perpetual_tables.live_owned_object_markers,
1051            write_batch,
1052            objects,
1053            is_force_reset,
1054        )
1055    }
1056
1057    pub fn initialize_live_object_markers(
1058        live_object_marker_table: &DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
1059        write_batch: &mut DBBatch,
1060        objects: &[ObjectRef],
1061        is_force_reset: bool,
1062    ) -> SuiResult {
1063        trace!(?objects, "initialize_locks");
1064
1065        let live_object_markers = live_object_marker_table.multi_get(objects)?;
1066
1067        if !is_force_reset {
1068            // If any live_object_markers exist and are not None, return errors for them
1069            // Note we don't check if there is a pre-existing lock. this is because initializing the live
1070            // object marker will not overwrite the lock and cause the validator to equivocate.
1071            let existing_live_object_markers: Vec<ObjectRef> = live_object_markers
1072                .iter()
1073                .zip(objects)
1074                .filter_map(|(lock_opt, objref)| {
1075                    lock_opt.clone().flatten().map(|_tx_digest| *objref)
1076                })
1077                .collect();
1078            if !existing_live_object_markers.is_empty() {
1079                info!(
1080                    ?existing_live_object_markers,
1081                    "Cannot initialize live_object_markers because some exist already"
1082                );
1083                return Err(SuiErrorKind::ObjectLockAlreadyInitialized {
1084                    refs: existing_live_object_markers,
1085                }
1086                .into());
1087            }
1088        }
1089
1090        write_batch.insert_batch(
1091            live_object_marker_table,
1092            objects.iter().map(|obj_ref| (obj_ref, None)),
1093        )?;
1094        Ok(())
1095    }
1096
1097    /// Removes locks for a given list of ObjectRefs.
1098    fn delete_live_object_markers(
1099        &self,
1100        write_batch: &mut DBBatch,
1101        objects: &[ObjectRef],
1102    ) -> SuiResult {
1103        trace!(?objects, "delete_locks");
1104        write_batch.delete_batch(
1105            &self.perpetual_tables.live_owned_object_markers,
1106            objects.iter(),
1107        )?;
1108        Ok(())
1109    }
1110
1111    #[cfg(test)]
1112    pub(crate) fn reset_locks_for_test(
1113        &self,
1114        transactions: &[TransactionDigest],
1115        objects: &[ObjectRef],
1116        epoch_store: &AuthorityPerEpochStore,
1117    ) {
1118        for tx in transactions {
1119            epoch_store.delete_signed_transaction_for_test(tx);
1120            epoch_store.delete_object_locks_for_test(objects);
1121        }
1122
1123        let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1124        batch
1125            .delete_batch(
1126                &self.perpetual_tables.live_owned_object_markers,
1127                objects.iter(),
1128            )
1129            .unwrap();
1130        batch.write().unwrap();
1131
1132        let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1133        self.initialize_live_object_markers_impl(&mut batch, objects, false)
1134            .unwrap();
1135        batch.write().unwrap();
1136    }
1137
1138    /// Return the object with version less then or eq to the provided seq number.
1139    /// This is used by indexer to find the correct version of dynamic field child object.
1140    /// We do not store the version of the child object, but because of lamport timestamp,
1141    /// we know the child must have version number less then or eq to the parent.
1142    pub fn find_object_lt_or_eq_version(
1143        &self,
1144        object_id: ObjectID,
1145        version: SequenceNumber,
1146    ) -> SuiResult<Option<Object>> {
1147        self.perpetual_tables
1148            .find_object_lt_or_eq_version(object_id, version)
1149    }
1150
1151    /// Returns the latest object reference we have for this object_id in the objects table.
1152    ///
1153    /// The method may also return the reference to a deleted object with a digest of
1154    /// ObjectDigest::deleted() or ObjectDigest::wrapped() and lamport version
1155    /// of a transaction that deleted the object.
1156    /// Note that a deleted object may re-appear if the deletion was the result of the object
1157    /// being wrapped in another object.
1158    ///
1159    /// If no entry for the object_id is found, return None.
1160    pub fn get_latest_object_ref_or_tombstone(
1161        &self,
1162        object_id: ObjectID,
1163    ) -> Result<Option<ObjectRef>, SuiError> {
1164        self.perpetual_tables
1165            .get_latest_object_ref_or_tombstone(object_id)
1166    }
1167
1168    /// Returns the latest object reference if and only if the object is still live (i.e. it does
1169    /// not return tombstones)
1170    pub fn get_latest_object_ref_if_alive(
1171        &self,
1172        object_id: ObjectID,
1173    ) -> Result<Option<ObjectRef>, SuiError> {
1174        match self.get_latest_object_ref_or_tombstone(object_id)? {
1175            Some(objref) if objref.2.is_alive() => Ok(Some(objref)),
1176            _ => Ok(None),
1177        }
1178    }
1179
1180    /// Returns the latest object we have for this object_id in the objects table.
1181    ///
1182    /// If no entry for the object_id is found, return None.
1183    pub fn get_latest_object_or_tombstone(
1184        &self,
1185        object_id: ObjectID,
1186    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, SuiError> {
1187        let Some((object_key, store_object)) = self
1188            .perpetual_tables
1189            .get_latest_object_or_tombstone(object_id)?
1190        else {
1191            return Ok(None);
1192        };
1193
1194        if let Some(object_ref) = self
1195            .perpetual_tables
1196            .tombstone_reference(&object_key, &store_object)?
1197        {
1198            return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
1199        }
1200
1201        let object = self
1202            .perpetual_tables
1203            .object(&object_key, store_object)?
1204            .expect("Non tombstone store object could not be converted to object");
1205
1206        Ok(Some((object_key, ObjectOrTombstone::Object(object))))
1207    }
1208
1209    pub fn insert_transaction_and_effects(
1210        &self,
1211        transaction: &VerifiedTransaction,
1212        transaction_effects: &TransactionEffects,
1213    ) -> Result<(), TypedStoreError> {
1214        let mut write_batch = self.perpetual_tables.transactions.batch();
1215        // effects must be inserted before the corresponding transaction entry
1216        // because they carry epoch information necessary for correct pruning via relocation filters
1217        write_batch
1218            .insert_batch(
1219                &self.perpetual_tables.effects,
1220                [(transaction_effects.digest(), transaction_effects)],
1221            )?
1222            .insert_batch(
1223                &self.perpetual_tables.transactions,
1224                [(transaction.digest(), transaction.serializable_ref())],
1225            )?;
1226
1227        write_batch.write()?;
1228        Ok(())
1229    }
1230
1231    pub fn multi_insert_transaction_and_effects<'a>(
1232        &self,
1233        transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
1234    ) -> Result<(), TypedStoreError> {
1235        let mut write_batch = self.perpetual_tables.transactions.batch();
1236        for tx in transactions {
1237            write_batch
1238                .insert_batch(
1239                    &self.perpetual_tables.effects,
1240                    [(tx.effects.digest(), &tx.effects)],
1241                )?
1242                .insert_batch(
1243                    &self.perpetual_tables.transactions,
1244                    [(tx.transaction.digest(), tx.transaction.serializable_ref())],
1245                )?;
1246        }
1247
1248        write_batch.write()?;
1249        Ok(())
1250    }
1251
1252    pub fn multi_get_transaction_blocks(
1253        &self,
1254        tx_digests: &[TransactionDigest],
1255    ) -> Result<Vec<Option<VerifiedTransaction>>, TypedStoreError> {
1256        self.perpetual_tables
1257            .transactions
1258            .multi_get(tx_digests)
1259            .map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())
1260    }
1261
1262    pub fn get_transaction_block(
1263        &self,
1264        tx_digest: &TransactionDigest,
1265    ) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
1266        self.perpetual_tables
1267            .transactions
1268            .get(tx_digest)
1269            .map(|v| v.map(|v| v.into()))
1270    }
1271
1272    /// This function reads the DB directly to get the system state object.
1273    /// If reconfiguration is happening at the same time, there is no guarantee whether we would be getting
1274    /// the old or the new system state object.
1275    /// Hence this function should only be called during RPC reads where data race is not a major concern.
1276    /// In general we should avoid this as much as possible.
1277    /// If the intent is for testing, you can use AuthorityState:: get_sui_system_state_object_for_testing.
1278    pub fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1279        get_sui_system_state(self.perpetual_tables.as_ref())
1280    }
1281
1282    pub fn expensive_check_sui_conservation<T>(
1283        self: &Arc<Self>,
1284        type_layout_store: T,
1285        old_epoch_store: &AuthorityPerEpochStore,
1286    ) -> SuiResult
1287    where
1288        T: TypeLayoutStore + Send + Copy,
1289    {
1290        if !self.enable_epoch_sui_conservation_check {
1291            return Ok(());
1292        }
1293
1294        let executor = old_epoch_store.executor();
1295        info!("Starting SUI conservation check. This may take a while..");
1296        let cur_time = Instant::now();
1297        let mut pending_objects = vec![];
1298        let mut count = 0;
1299        let mut size = 0;
1300        let (mut total_sui, mut total_storage_rebate) = thread::scope(|s| {
1301            let pending_tasks = FuturesUnordered::new();
1302            for o in self.iter_live_object_set(false) {
1303                match o {
1304                    LiveObject::Normal(object) => {
1305                        size += object.object_size_for_gas_metering();
1306                        count += 1;
1307                        pending_objects.push(object);
1308                        if count % 1_000_000 == 0 {
1309                            let mut task_objects = vec![];
1310                            mem::swap(&mut pending_objects, &mut task_objects);
1311                            pending_tasks.push(s.spawn(move || {
1312                                let mut layout_resolver =
1313                                    executor.type_layout_resolver(Box::new(type_layout_store));
1314                                let mut total_storage_rebate = 0;
1315                                let mut total_sui = 0;
1316                                for object in task_objects {
1317                                    total_storage_rebate += object.storage_rebate;
1318                                    // get_total_sui includes storage rebate, however all storage rebate is
1319                                    // also stored in the storage fund, so we need to subtract it here.
1320                                    total_sui +=
1321                                        object.get_total_sui(layout_resolver.as_mut()).unwrap()
1322                                            - object.storage_rebate;
1323                                }
1324                                if count % 50_000_000 == 0 {
1325                                    info!("Processed {} objects", count);
1326                                }
1327                                (total_sui, total_storage_rebate)
1328                            }));
1329                        }
1330                    }
1331                    LiveObject::Wrapped(_) => {
1332                        unreachable!("Explicitly asked to not include wrapped tombstones")
1333                    }
1334                }
1335            }
1336            pending_tasks.into_iter().fold((0, 0), |init, result| {
1337                let result = result.join().unwrap();
1338                (init.0 + result.0, init.1 + result.1)
1339            })
1340        });
1341        let mut layout_resolver = executor.type_layout_resolver(Box::new(type_layout_store));
1342        for object in pending_objects {
1343            total_storage_rebate += object.storage_rebate;
1344            total_sui +=
1345                object.get_total_sui(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
1346        }
1347        info!(
1348            "Scanned {} live objects, took {:?}",
1349            count,
1350            cur_time.elapsed()
1351        );
1352        self.metrics
1353            .sui_conservation_live_object_count
1354            .set(count as i64);
1355        self.metrics
1356            .sui_conservation_live_object_size
1357            .set(size as i64);
1358        self.metrics
1359            .sui_conservation_check_latency
1360            .set(cur_time.elapsed().as_secs() as i64);
1361
1362        // It is safe to call this function because we are in the middle of reconfiguration.
1363        let system_state = self
1364            .get_sui_system_state_object_unsafe()
1365            .expect("Reading sui system state object cannot fail")
1366            .into_sui_system_state_summary();
1367        let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
1368        info!(
1369            "Total SUI amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
1370            total_sui, storage_fund_balance, total_storage_rebate, system_state.epoch
1371        );
1372
1373        let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
1374        self.metrics
1375            .sui_conservation_storage_fund
1376            .set(storage_fund_balance as i64);
1377        self.metrics
1378            .sui_conservation_storage_fund_imbalance
1379            .set(imbalance);
1380        self.metrics
1381            .sui_conservation_imbalance
1382            .set((total_sui as i128 - TOTAL_SUPPLY_MIST as i128) as i64);
1383
1384        if let Some(expected_imbalance) = self
1385            .perpetual_tables
1386            .expected_storage_fund_imbalance
1387            .get(&())
1388            .expect("DB read cannot fail")
1389        {
1390            fp_ensure!(
1391                imbalance == expected_imbalance,
1392                SuiError::from(
1393                    format!(
1394                        "Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
1395                        system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
1396                    ).as_str()
1397                )
1398            );
1399        } else {
1400            self.perpetual_tables
1401                .expected_storage_fund_imbalance
1402                .insert(&(), &imbalance)
1403                .expect("DB write cannot fail");
1404        }
1405
1406        if let Some(expected_sui) = self
1407            .perpetual_tables
1408            .expected_network_sui_amount
1409            .get(&())
1410            .expect("DB read cannot fail")
1411        {
1412            fp_ensure!(
1413                total_sui == expected_sui,
1414                SuiError::from(
1415                    format!(
1416                        "Inconsistent state detected at epoch {}: total sui: {}, expecting {}",
1417                        system_state.epoch, total_sui, expected_sui
1418                    )
1419                    .as_str()
1420                )
1421            );
1422        } else {
1423            self.perpetual_tables
1424                .expected_network_sui_amount
1425                .insert(&(), &total_sui)
1426                .expect("DB write cannot fail");
1427        }
1428
1429        Ok(())
1430    }
1431
1432    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
1433    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
1434    #[instrument(level = "error", skip_all)]
1435    pub fn maybe_reaccumulate_state_hash(
1436        &self,
1437        cur_epoch_store: &AuthorityPerEpochStore,
1438        new_protocol_version: ProtocolVersion,
1439    ) {
1440        let old_simplified_unwrap_then_delete = cur_epoch_store
1441            .protocol_config()
1442            .simplified_unwrap_then_delete();
1443        let new_simplified_unwrap_then_delete =
1444            ProtocolConfig::get_for_version(new_protocol_version, cur_epoch_store.get_chain())
1445                .simplified_unwrap_then_delete();
1446        // If in the new epoch the simplified_unwrap_then_delete is enabled for the first time,
1447        // we re-accumulate state root.
1448        let should_reaccumulate =
1449            !old_simplified_unwrap_then_delete && new_simplified_unwrap_then_delete;
1450        if !should_reaccumulate {
1451            return;
1452        }
1453        info!(
1454            "[Re-accumulate] simplified_unwrap_then_delete is enabled in the new protocol version, re-accumulating state hash"
1455        );
1456        let cur_time = Instant::now();
1457        std::thread::scope(|s| {
1458            let pending_tasks = FuturesUnordered::new();
1459            // Shard the object IDs into different ranges so that we can process them in parallel.
1460            // We divide the range into 2^BITS number of ranges. To do so we use the highest BITS bits
1461            // to mark the starting/ending point of the range. For example, when BITS = 5, we
1462            // divide the range into 32 ranges, and the first few ranges are:
1463            // 00000000_.... to 00000111_....
1464            // 00001000_.... to 00001111_....
1465            // 00010000_.... to 00010111_....
1466            // and etc.
1467            const BITS: u8 = 5;
1468            for index in 0u8..(1 << BITS) {
1469                pending_tasks.push(s.spawn(move || {
1470                    let mut id_bytes = [0; ObjectID::LENGTH];
1471                    id_bytes[0] = index << (8 - BITS);
1472                    let start_id = ObjectID::new(id_bytes);
1473
1474                    id_bytes[0] |= (1 << (8 - BITS)) - 1;
1475                    for element in id_bytes.iter_mut().skip(1) {
1476                        *element = u8::MAX;
1477                    }
1478                    let end_id = ObjectID::new(id_bytes);
1479
1480                    info!(
1481                        "[Re-accumulate] Scanning object ID range {:?}..{:?}",
1482                        start_id, end_id
1483                    );
1484                    let mut prev = (
1485                        ObjectKey::min_for_id(&ObjectID::ZERO),
1486                        StoreObjectWrapper::V1(StoreObject::Deleted),
1487                    );
1488                    let mut object_scanned: u64 = 0;
1489                    let mut wrapped_objects_to_remove = vec![];
1490                    for db_result in self.perpetual_tables.objects.safe_range_iter(
1491                        ObjectKey::min_for_id(&start_id)..=ObjectKey::max_for_id(&end_id),
1492                    ) {
1493                        match db_result {
1494                            Ok((object_key, object)) => {
1495                                object_scanned += 1;
1496                                if object_scanned.is_multiple_of(100000) {
1497                                    info!(
1498                                        "[Re-accumulate] Task {}: object scanned: {}",
1499                                        index, object_scanned,
1500                                    );
1501                                }
1502                                if matches!(prev.1.inner(), StoreObject::Wrapped)
1503                                    && object_key.0 != prev.0.0
1504                                {
1505                                    wrapped_objects_to_remove
1506                                        .push(WrappedObject::new(prev.0.0, prev.0.1));
1507                                }
1508
1509                                prev = (object_key, object);
1510                            }
1511                            Err(err) => {
1512                                warn!("Object iterator encounter RocksDB error {:?}", err);
1513                                return Err(err);
1514                            }
1515                        }
1516                    }
1517                    if matches!(prev.1.inner(), StoreObject::Wrapped) {
1518                        wrapped_objects_to_remove.push(WrappedObject::new(prev.0.0, prev.0.1));
1519                    }
1520                    info!(
1521                        "[Re-accumulate] Task {}: object scanned: {}, wrapped objects: {}",
1522                        index,
1523                        object_scanned,
1524                        wrapped_objects_to_remove.len(),
1525                    );
1526                    Ok((wrapped_objects_to_remove, object_scanned))
1527                }));
1528            }
1529            let (last_checkpoint_of_epoch, cur_accumulator) = self
1530                .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
1531                .expect("read cannot fail")
1532                .expect("accumulator must exist");
1533            let (accumulator, total_objects_scanned, total_wrapped_objects) =
1534                pending_tasks.into_iter().fold(
1535                    (cur_accumulator, 0u64, 0usize),
1536                    |(mut accumulator, total_objects_scanned, total_wrapped_objects), task| {
1537                        let (wrapped_objects_to_remove, object_scanned) =
1538                            task.join().unwrap().unwrap();
1539                        accumulator.remove_all(
1540                            wrapped_objects_to_remove
1541                                .iter()
1542                                .map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
1543                                .collect::<Vec<Vec<u8>>>(),
1544                        );
1545                        (
1546                            accumulator,
1547                            total_objects_scanned + object_scanned,
1548                            total_wrapped_objects + wrapped_objects_to_remove.len(),
1549                        )
1550                    },
1551                );
1552            info!(
1553                "[Re-accumulate] Total objects scanned: {}, total wrapped objects: {}",
1554                total_objects_scanned, total_wrapped_objects,
1555            );
1556            info!(
1557                "[Re-accumulate] New accumulator value: {:?}",
1558                accumulator.digest()
1559            );
1560            self.insert_state_hash_for_epoch(
1561                cur_epoch_store.epoch(),
1562                &last_checkpoint_of_epoch,
1563                &accumulator,
1564            )
1565            .unwrap();
1566        });
1567        info!(
1568            "[Re-accumulate] Re-accumulating took {}seconds",
1569            cur_time.elapsed().as_secs()
1570        );
1571    }
1572
1573    pub async fn prune_objects_and_compact_for_testing(
1574        &self,
1575        checkpoint_store: &Arc<CheckpointStore>,
1576        rpc_index: Option<&RpcIndexStore>,
1577    ) {
1578        let pruning_config = AuthorityStorePruningConfig {
1579            num_epochs_to_retain: 0,
1580            ..Default::default()
1581        };
1582        let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
1583            &self.perpetual_tables,
1584            checkpoint_store,
1585            rpc_index,
1586            None,
1587            pruning_config,
1588            AuthorityStorePruningMetrics::new_for_test(),
1589            EPOCH_DURATION_MS_FOR_TESTING,
1590        )
1591        .await;
1592        let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
1593    }
1594
1595    #[cfg(test)]
1596    pub async fn prune_objects_immediately_for_testing(
1597        &self,
1598        transaction_effects: Vec<TransactionEffects>,
1599    ) -> anyhow::Result<()> {
1600        let mut wb = self.perpetual_tables.objects.batch();
1601
1602        let mut object_keys_to_prune = vec![];
1603        for effects in &transaction_effects {
1604            for (object_id, seq_number) in effects.modified_at_versions() {
1605                info!("Pruning object {:?} version {:?}", object_id, seq_number);
1606                object_keys_to_prune.push(ObjectKey(object_id, seq_number));
1607            }
1608        }
1609
1610        wb.delete_batch(
1611            &self.perpetual_tables.objects,
1612            object_keys_to_prune.into_iter(),
1613        )?;
1614        wb.write()?;
1615        Ok(())
1616    }
1617
1618    // Counts the number of versions exist in object store for `object_id`. This includes tombstone.
1619    #[cfg(msim)]
1620    pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
1621        self.perpetual_tables
1622            .objects
1623            .safe_iter_with_bounds(
1624                Some(ObjectKey(object_id, VersionNumber::MIN)),
1625                Some(ObjectKey(object_id, VersionNumber::MAX)),
1626            )
1627            .collect::<Result<Vec<_>, _>>()
1628            .unwrap()
1629            .len()
1630    }
1631}
1632
1633impl GlobalStateHashStore for AuthorityStore {
1634    fn get_object_ref_prior_to_key_deprecated(
1635        &self,
1636        object_id: &ObjectID,
1637        version: VersionNumber,
1638    ) -> SuiResult<Option<ObjectRef>> {
1639        self.get_object_ref_prior_to_key(object_id, version)
1640    }
1641
1642    fn get_root_state_hash_for_epoch(
1643        &self,
1644        epoch: EpochId,
1645    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
1646        self.perpetual_tables
1647            .root_state_hash_by_epoch
1648            .get(&epoch)
1649            .map_err(Into::into)
1650    }
1651
1652    fn get_root_state_hash_for_highest_epoch(
1653        &self,
1654    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
1655        Ok(self
1656            .perpetual_tables
1657            .root_state_hash_by_epoch
1658            .reversed_safe_iter_with_bounds(None, None)?
1659            .next()
1660            .transpose()?)
1661    }
1662
1663    fn insert_state_hash_for_epoch(
1664        &self,
1665        epoch: EpochId,
1666        last_checkpoint_of_epoch: &CheckpointSequenceNumber,
1667        acc: &GlobalStateHash,
1668    ) -> SuiResult {
1669        self.perpetual_tables
1670            .root_state_hash_by_epoch
1671            .insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
1672        self.root_state_notify_read
1673            .notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
1674
1675        Ok(())
1676    }
1677
1678    fn iter_live_object_set(
1679        &self,
1680        include_wrapped_object: bool,
1681    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1682        Box::new(
1683            self.perpetual_tables
1684                .iter_live_object_set(include_wrapped_object),
1685        )
1686    }
1687}
1688
1689impl ObjectStore for AuthorityStore {
1690    /// Read an object and return it, or Ok(None) if the object was not found.
1691    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
1692        self.perpetual_tables.as_ref().get_object(object_id)
1693    }
1694
1695    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
1696        self.perpetual_tables.get_object_by_key(object_id, version)
1697    }
1698}
1699
1700/// A wrapper to make Orphan Rule happy
1701pub struct ResolverWrapper {
1702    pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1703    pub metrics: Arc<ResolverMetrics>,
1704}
1705
1706impl ResolverWrapper {
1707    pub fn new(
1708        resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1709        metrics: Arc<ResolverMetrics>,
1710    ) -> Self {
1711        metrics.module_cache_size.set(0);
1712        ResolverWrapper { resolver, metrics }
1713    }
1714
1715    fn inc_cache_size_gauge(&self) {
1716        // reset the gauge after a restart of the cache
1717        let current = self.metrics.module_cache_size.get();
1718        self.metrics.module_cache_size.set(current + 1);
1719    }
1720}
1721
1722impl ModuleResolver for ResolverWrapper {
1723    type Error = SuiError;
1724    fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
1725        self.inc_cache_size_gauge();
1726        get_module(&*self.resolver, module_id)
1727    }
1728}
1729
1730pub enum UpdateType {
1731    Transaction(TransactionEffectsDigest),
1732    Genesis,
1733}
1734
1735pub type SuiLockResult = SuiResult<ObjectLockStatus>;
1736
1737#[derive(Debug, PartialEq, Eq)]
1738pub enum ObjectLockStatus {
1739    Initialized,
1740    LockedToTx { locked_by_tx: LockDetailsDeprecated },
1741    LockedAtDifferentVersion { locked_ref: ObjectRef },
1742}
1743
1744#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1745pub enum LockDetailsWrapperDeprecated {
1746    V1(LockDetailsV1Deprecated),
1747}
1748
1749impl LockDetailsWrapperDeprecated {
1750    pub fn migrate(self) -> Self {
1751        // TODO: when there are multiple versions, we must iteratively migrate from version N to
1752        // N+1 until we arrive at the latest version
1753        self
1754    }
1755
1756    // Always returns the most recent version. Older versions are migrated to the latest version at
1757    // read time, so there is never a need to access older versions.
1758    pub fn inner(&self) -> &LockDetailsDeprecated {
1759        match self {
1760            Self::V1(v1) => v1,
1761
1762            // can remove #[allow] when there are multiple versions
1763            #[allow(unreachable_patterns)]
1764            _ => panic!("lock details should have been migrated to latest version at read time"),
1765        }
1766    }
1767    pub fn into_inner(self) -> LockDetailsDeprecated {
1768        match self {
1769            Self::V1(v1) => v1,
1770
1771            // can remove #[allow] when there are multiple versions
1772            #[allow(unreachable_patterns)]
1773            _ => panic!("lock details should have been migrated to latest version at read time"),
1774        }
1775    }
1776}
1777
1778#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1779pub struct LockDetailsV1Deprecated {
1780    pub epoch: EpochId,
1781    pub tx_digest: TransactionDigest,
1782}
1783
1784pub type LockDetailsDeprecated = LockDetailsV1Deprecated;
1785
1786impl From<LockDetailsDeprecated> for LockDetailsWrapperDeprecated {
1787    fn from(details: LockDetailsDeprecated) -> Self {
1788        // always use latest version.
1789        LockDetailsWrapperDeprecated::V1(details)
1790    }
1791}