sui_core/authority/
authority_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::{iter, mem, thread};
6
7use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
8use crate::authority::authority_store_pruner::{
9    AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
10};
11use crate::authority::authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object};
12use crate::authority::epoch_marker_key::EpochMarkerKey;
13use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
14use crate::global_state_hasher::GlobalStateHashStore;
15use crate::rpc_index::RpcIndexStore;
16use crate::transaction_outputs::TransactionOutputs;
17use either::Either;
18use fastcrypto::hash::{HashFunction, MultisetHash, Sha3_256};
19use futures::stream::FuturesUnordered;
20use move_core_types::account_address::AccountAddress;
21use move_core_types::resolver::{ModuleResolver, SerializedPackage};
22use serde::{Deserialize, Serialize};
23use sui_config::node::AuthorityStorePruningConfig;
24use sui_macros::fail_point_arg;
25use sui_types::error::{SuiErrorKind, UserInputError};
26use sui_types::execution::TypeLayoutStore;
27use sui_types::global_state_hash::GlobalStateHash;
28use sui_types::message_envelope::Message;
29use sui_types::storage::{
30    BackingPackageStore, FullObjectKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore,
31    get_module, get_package,
32};
33use sui_types::sui_system_state::get_sui_system_state;
34use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure};
35use tokio::time::Instant;
36use tracing::{debug, info, trace};
37use typed_store::traits::Map;
38use typed_store::{
39    TypedStoreError,
40    rocks::{DBBatch, DBMap},
41};
42
43use super::authority_store_tables::LiveObject;
44use super::{authority_store_tables::AuthorityPerpetualTables, *};
45use mysten_common::sync::notify_read::NotifyRead;
46use sui_types::effects::{TransactionEffects, TransactionEvents};
47use sui_types::gas_coin::TOTAL_SUPPLY_MIST;
48
49struct AuthorityStoreMetrics {
50    sui_conservation_check_latency: IntGauge,
51    sui_conservation_live_object_count: IntGauge,
52    sui_conservation_live_object_size: IntGauge,
53    sui_conservation_imbalance: IntGauge,
54    sui_conservation_storage_fund: IntGauge,
55    sui_conservation_storage_fund_imbalance: IntGauge,
56    epoch_flags: IntGaugeVec,
57}
58
59impl AuthorityStoreMetrics {
60    pub fn new(registry: &Registry) -> Self {
61        Self {
62            sui_conservation_check_latency: register_int_gauge_with_registry!(
63                "sui_conservation_check_latency",
64                "Number of seconds took to scan all live objects in the store for SUI conservation check",
65                registry,
66            ).unwrap(),
67            sui_conservation_live_object_count: register_int_gauge_with_registry!(
68                "sui_conservation_live_object_count",
69                "Number of live objects in the store",
70                registry,
71            ).unwrap(),
72            sui_conservation_live_object_size: register_int_gauge_with_registry!(
73                "sui_conservation_live_object_size",
74                "Size in bytes of live objects in the store",
75                registry,
76            ).unwrap(),
77            sui_conservation_imbalance: register_int_gauge_with_registry!(
78                "sui_conservation_imbalance",
79                "Total amount of SUI in the network - 10B * 10^9. This delta shows the amount of imbalance",
80                registry,
81            ).unwrap(),
82            sui_conservation_storage_fund: register_int_gauge_with_registry!(
83                "sui_conservation_storage_fund",
84                "Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
85                registry,
86            ).unwrap(),
87            sui_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
88                "sui_conservation_storage_fund_imbalance",
89                "Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
90                registry,
91            ).unwrap(),
92            epoch_flags: register_int_gauge_vec_with_registry!(
93                "epoch_flags",
94                "Local flags of the currently running epoch",
95                &["flag"],
96                registry,
97            ).unwrap(),
98        }
99    }
100}
101
102/// ALL_OBJ_VER determines whether we want to store all past
103/// versions of every object in the store. Authority doesn't store
104/// them, but other entities such as replicas will.
105/// S is a template on Authority signature state. This allows SuiDataStore to be used on either
106/// authorities or non-authorities. Specifically, when storing transactions and effects,
107/// S allows SuiDataStore to either store the authority signed version or unsigned version.
108pub struct AuthorityStore {
109    pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
110
111    pub(crate) root_state_notify_read:
112        NotifyRead<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
113
114    /// Whether to enable expensive SUI conservation check at epoch boundaries.
115    enable_epoch_sui_conservation_check: bool,
116
117    metrics: AuthorityStoreMetrics,
118}
119
120pub type ExecutionLockReadGuard<'a> = tokio::sync::RwLockReadGuard<'a, EpochId>;
121pub type ExecutionLockWriteGuard<'a> = tokio::sync::RwLockWriteGuard<'a, EpochId>;
122
123impl AuthorityStore {
124    /// Open an authority store by directory path.
125    /// If the store is empty, initialize it using genesis.
126    pub async fn open(
127        perpetual_tables: Arc<AuthorityPerpetualTables>,
128        genesis: &Genesis,
129        config: &NodeConfig,
130        registry: &Registry,
131    ) -> SuiResult<Arc<Self>> {
132        let enable_epoch_sui_conservation_check = config
133            .expensive_safety_check_config
134            .enable_epoch_sui_conservation_check();
135
136        let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
137            info!("Creating new epoch start config from genesis");
138
139            #[allow(unused_mut)]
140            let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
141            fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
142                info!("Setting initial epoch flags to {:?}", flags);
143                initial_epoch_flags = flags;
144            });
145
146            let epoch_start_configuration = EpochStartConfiguration::new(
147                genesis.sui_system_object().into_epoch_start_state(),
148                *genesis.checkpoint().digest(),
149                &genesis.objects(),
150                initial_epoch_flags,
151            )?;
152            perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
153            epoch_start_configuration
154        } else {
155            info!("Loading epoch start config from DB");
156            perpetual_tables
157                .epoch_start_configuration
158                .get(&())?
159                .expect("Epoch start configuration must be set in non-empty DB")
160        };
161        let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
162        info!("Epoch start config: {:?}", epoch_start_configuration);
163        info!("Cur epoch: {:?}", cur_epoch);
164        let this = Self::open_inner(
165            genesis,
166            perpetual_tables,
167            enable_epoch_sui_conservation_check,
168            registry,
169        )
170        .await?;
171        this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
172        Ok(this)
173    }
174
175    pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
176        for flag in old {
177            self.metrics
178                .epoch_flags
179                .with_label_values(&[&flag.to_string()])
180                .set(0);
181        }
182        for flag in new {
183            self.metrics
184                .epoch_flags
185                .with_label_values(&[&flag.to_string()])
186                .set(1);
187        }
188    }
189
190    // NB: This must only be called at time of reconfiguration. We take the execution lock write
191    // guard as an argument to ensure that this is the case.
192    pub fn clear_object_per_epoch_marker_table(
193        &self,
194        _execution_guard: &ExecutionLockWriteGuard<'_>,
195    ) -> SuiResult<()> {
196        // We can safely delete all entries in the per epoch marker table since this is only called
197        // at epoch boundaries (during reconfiguration). Therefore any entries that currently
198        // exist can be removed. Because of this we can use the `schedule_delete_all` method.
199        self.perpetual_tables
200            .object_per_epoch_marker_table
201            .schedule_delete_all()?;
202        #[cfg(not(tidehunter))]
203        {
204            self.perpetual_tables
205                .object_per_epoch_marker_table_v2
206                .schedule_delete_all()?;
207        }
208        #[cfg(tidehunter)]
209        {
210            self.perpetual_tables
211                .object_per_epoch_marker_table_v2
212                .drop_cells_in_range_raw(&EpochMarkerKey::MIN_KEY, &EpochMarkerKey::MAX_KEY)?;
213        }
214        Ok(())
215    }
216
217    pub async fn open_with_committee_for_testing(
218        perpetual_tables: Arc<AuthorityPerpetualTables>,
219        committee: &Committee,
220        genesis: &Genesis,
221    ) -> SuiResult<Arc<Self>> {
222        // TODO: Since we always start at genesis, the committee should be technically the same
223        // as the genesis committee.
224        assert_eq!(committee.epoch, 0);
225        Self::open_inner(genesis, perpetual_tables, true, &Registry::new()).await
226    }
227
228    async fn open_inner(
229        genesis: &Genesis,
230        perpetual_tables: Arc<AuthorityPerpetualTables>,
231        enable_epoch_sui_conservation_check: bool,
232        registry: &Registry,
233    ) -> SuiResult<Arc<Self>> {
234        let store = Arc::new(Self {
235            perpetual_tables,
236            root_state_notify_read: NotifyRead::<
237                EpochId,
238                (CheckpointSequenceNumber, GlobalStateHash),
239            >::new(),
240            enable_epoch_sui_conservation_check,
241            metrics: AuthorityStoreMetrics::new(registry),
242        });
243        // Only initialize an empty database.
244        if store
245            .database_is_empty()
246            .expect("Database read should not fail at init.")
247        {
248            store
249                .bulk_insert_genesis_objects(genesis.objects())
250                .expect("Cannot bulk insert genesis objects");
251
252            // insert txn and effects of genesis
253            let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
254
255            store
256                .perpetual_tables
257                .transactions
258                .insert(transaction.digest(), transaction.serializable_ref())
259                .unwrap();
260
261            store
262                .perpetual_tables
263                .effects
264                .insert(&genesis.effects().digest(), genesis.effects())
265                .unwrap();
266            // We don't insert the effects to executed_effects yet because the genesis tx hasn't but will be executed.
267            // This is important for fullnodes to be able to generate indexing data right now.
268
269            if genesis.effects().events_digest().is_some() {
270                store
271                    .perpetual_tables
272                    .events_2
273                    .insert(transaction.digest(), genesis.events())
274                    .unwrap();
275            }
276        }
277
278        Ok(store)
279    }
280
281    /// Open authority store without any operations that require
282    /// genesis, such as constructing EpochStartConfiguration
283    /// or inserting genesis objects.
284    pub fn open_no_genesis(
285        perpetual_tables: Arc<AuthorityPerpetualTables>,
286        enable_epoch_sui_conservation_check: bool,
287        registry: &Registry,
288    ) -> SuiResult<Arc<Self>> {
289        let store = Arc::new(Self {
290            perpetual_tables,
291            root_state_notify_read: NotifyRead::<
292                EpochId,
293                (CheckpointSequenceNumber, GlobalStateHash),
294            >::new(),
295            enable_epoch_sui_conservation_check,
296            metrics: AuthorityStoreMetrics::new(registry),
297        });
298        Ok(store)
299    }
300
301    pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
302        self.perpetual_tables.get_recovery_epoch_at_restart()
303    }
304
305    pub fn get_effects(
306        &self,
307        effects_digest: &TransactionEffectsDigest,
308    ) -> SuiResult<Option<TransactionEffects>> {
309        Ok(self.perpetual_tables.effects.get(effects_digest)?)
310    }
311
312    /// Returns true if we have an effects structure for this transaction digest
313    pub fn effects_exists(&self, effects_digest: &TransactionEffectsDigest) -> SuiResult<bool> {
314        self.perpetual_tables
315            .effects
316            .contains_key(effects_digest)
317            .map_err(|e| e.into())
318    }
319
320    pub fn get_events(
321        &self,
322        digest: &TransactionDigest,
323    ) -> Result<Option<TransactionEvents>, TypedStoreError> {
324        self.perpetual_tables.events_2.get(digest)
325    }
326
327    pub fn multi_get_events(
328        &self,
329        event_digests: &[TransactionDigest],
330    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
331        Ok(event_digests
332            .iter()
333            .map(|digest| self.get_events(digest))
334            .collect::<Result<Vec<_>, _>>()?)
335    }
336
337    pub fn get_unchanged_loaded_runtime_objects(
338        &self,
339        digest: &TransactionDigest,
340    ) -> Result<Option<Vec<ObjectKey>>, TypedStoreError> {
341        self.perpetual_tables
342            .unchanged_loaded_runtime_objects
343            .get(digest)
344    }
345
346    pub fn multi_get_effects<'a>(
347        &self,
348        effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
349    ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
350        self.perpetual_tables.effects.multi_get(effects_digests)
351    }
352
353    pub fn get_executed_effects(
354        &self,
355        tx_digest: &TransactionDigest,
356    ) -> Result<Option<TransactionEffects>, TypedStoreError> {
357        let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
358        match effects_digest {
359            Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
360            None => Ok(None),
361        }
362    }
363
364    /// Given a list of transaction digests, returns a list of the corresponding effects only if they have been
365    /// executed. For transactions that have not been executed, None is returned.
366    pub fn multi_get_executed_effects_digests(
367        &self,
368        digests: &[TransactionDigest],
369    ) -> Result<Vec<Option<TransactionEffectsDigest>>, TypedStoreError> {
370        self.perpetual_tables.executed_effects.multi_get(digests)
371    }
372
373    /// Given a list of transaction digests, returns a list of the corresponding effects only if they have been
374    /// executed. For transactions that have not been executed, None is returned.
375    pub fn multi_get_executed_effects(
376        &self,
377        digests: &[TransactionDigest],
378    ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
379        let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
380        let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
381        let mut tx_to_effects_map = effects
382            .into_iter()
383            .flatten()
384            .map(|effects| (*effects.transaction_digest(), effects))
385            .collect::<HashMap<_, _>>();
386        Ok(digests
387            .iter()
388            .map(|digest| tx_to_effects_map.remove(digest))
389            .collect())
390    }
391
392    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> SuiResult<bool> {
393        Ok(self
394            .perpetual_tables
395            .executed_effects
396            .contains_key(digest)?)
397    }
398
399    pub fn get_marker_value(
400        &self,
401        object_key: FullObjectKey,
402        epoch_id: EpochId,
403    ) -> SuiResult<Option<MarkerValue>> {
404        Ok(self
405            .perpetual_tables
406            .object_per_epoch_marker_table_v2
407            .get(&EpochMarkerKey(epoch_id, object_key))?)
408    }
409
410    pub fn get_latest_marker(
411        &self,
412        object_id: FullObjectID,
413        epoch_id: EpochId,
414    ) -> SuiResult<Option<(SequenceNumber, MarkerValue)>> {
415        let min_key = EpochMarkerKey(epoch_id, FullObjectKey::min_for_id(&object_id));
416        let max_key = EpochMarkerKey(epoch_id, FullObjectKey::max_for_id(&object_id));
417
418        let marker_entry = self
419            .perpetual_tables
420            .object_per_epoch_marker_table_v2
421            .reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
422            .next();
423        match marker_entry {
424            Some(Ok((EpochMarkerKey(epoch, key), marker))) => {
425                // because of the iterator bounds these cannot fail
426                assert_eq!(epoch, epoch_id);
427                assert_eq!(key.id(), object_id);
428                Ok(Some((key.version(), marker)))
429            }
430            Some(Err(e)) => Err(e.into()),
431            None => Ok(None),
432        }
433    }
434
435    /// Returns future containing the state hash for the given epoch
436    /// once available
437    pub async fn notify_read_root_state_hash(
438        &self,
439        epoch: EpochId,
440    ) -> SuiResult<(CheckpointSequenceNumber, GlobalStateHash)> {
441        // We need to register waiters _before_ reading from the database to avoid race conditions
442        let registration = self.root_state_notify_read.register_one(&epoch);
443        let hash = self.perpetual_tables.root_state_hash_by_epoch.get(&epoch)?;
444
445        let result = match hash {
446            // Note that Some() clause also drops registration that is already fulfilled
447            Some(ready) => Either::Left(futures::future::ready(ready)),
448            None => Either::Right(registration),
449        }
450        .await;
451
452        Ok(result)
453    }
454
455    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
456    pub fn deprecated_insert_finalized_transactions(
457        &self,
458        digests: &[TransactionDigest],
459        epoch: EpochId,
460        sequence: CheckpointSequenceNumber,
461    ) -> SuiResult {
462        let mut batch = self
463            .perpetual_tables
464            .executed_transactions_to_checkpoint
465            .batch();
466        batch.insert_batch(
467            &self.perpetual_tables.executed_transactions_to_checkpoint,
468            digests.iter().map(|d| (*d, (epoch, sequence))),
469        )?;
470        batch.write()?;
471        trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
472        Ok(())
473    }
474
475    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
476    pub fn deprecated_get_transaction_checkpoint(
477        &self,
478        digest: &TransactionDigest,
479    ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
480        Ok(self
481            .perpetual_tables
482            .executed_transactions_to_checkpoint
483            .get(digest)?)
484    }
485
486    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
487    pub fn deprecated_multi_get_transaction_checkpoint(
488        &self,
489        digests: &[TransactionDigest],
490    ) -> SuiResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
491        Ok(self
492            .perpetual_tables
493            .executed_transactions_to_checkpoint
494            .multi_get(digests)?
495            .into_iter()
496            .collect())
497    }
498
499    /// Returns true if there are no objects in the database
500    pub fn database_is_empty(&self) -> SuiResult<bool> {
501        self.perpetual_tables.database_is_empty()
502    }
503
504    pub fn object_exists_by_key(
505        &self,
506        object_id: &ObjectID,
507        version: VersionNumber,
508    ) -> SuiResult<bool> {
509        Ok(self
510            .perpetual_tables
511            .objects
512            .contains_key(&ObjectKey(*object_id, version))?)
513    }
514
515    pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<bool>> {
516        Ok(self
517            .perpetual_tables
518            .objects
519            .multi_contains_keys(object_keys.to_vec())?
520            .into_iter()
521            .collect())
522    }
523
524    fn get_object_ref_prior_to_key(
525        &self,
526        object_id: &ObjectID,
527        version: VersionNumber,
528    ) -> Result<Option<ObjectRef>, SuiError> {
529        let Some(prior_version) = version.one_before() else {
530            return Ok(None);
531        };
532        let mut iterator = self
533            .perpetual_tables
534            .objects
535            .reversed_safe_iter_with_bounds(
536                Some(ObjectKey::min_for_id(object_id)),
537                Some(ObjectKey(*object_id, prior_version)),
538            )?;
539
540        if let Some((object_key, value)) = iterator.next().transpose()?
541            && object_key.0 == *object_id
542        {
543            return Ok(Some(
544                self.perpetual_tables.object_reference(&object_key, value)?,
545            ));
546        }
547        Ok(None)
548    }
549
550    pub fn multi_get_objects_by_key(
551        &self,
552        object_keys: &[ObjectKey],
553    ) -> Result<Vec<Option<Object>>, SuiError> {
554        let wrappers = self
555            .perpetual_tables
556            .objects
557            .multi_get(object_keys.to_vec())?;
558        let mut ret = vec![];
559
560        for (idx, w) in wrappers.into_iter().enumerate() {
561            ret.push(
562                w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
563                    .transpose()?
564                    .flatten(),
565            );
566        }
567        Ok(ret)
568    }
569
570    /// Get many objects
571    pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, SuiError> {
572        let mut result = Vec::new();
573        for id in objects {
574            result.push(self.get_object(id));
575        }
576        Ok(result)
577    }
578
579    // Methods to mutate the store
580
581    /// Insert a genesis object.
582    /// TODO: delete this method entirely (still used by authority_tests.rs)
583    pub(crate) fn insert_genesis_object(&self, object: Object) -> SuiResult {
584        // We only side load objects with a genesis parent transaction.
585        debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
586        let object_ref = object.compute_object_reference();
587        self.insert_object_direct(object_ref, &object)
588    }
589
590    /// Insert an object directly into the store, and also update relevant tables
591    /// NOTE: does not handle transaction lock.
592    /// This is used to insert genesis objects
593    fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> SuiResult {
594        let mut write_batch = self.perpetual_tables.objects.batch();
595
596        // Insert object
597        let store_object = get_store_object(object.clone());
598        write_batch.insert_batch(
599            &self.perpetual_tables.objects,
600            std::iter::once((ObjectKey::from(object_ref), store_object)),
601        )?;
602
603        // Update the index
604        if object.get_single_owner().is_some() {
605            // Only initialize lock for address owned objects.
606            if !object.is_child_object() {
607                self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref], false)?;
608            }
609        }
610
611        write_batch.write()?;
612
613        Ok(())
614    }
615
616    /// This function should only be used for initializing genesis and should remain private.
617    #[instrument(level = "debug", skip_all)]
618    pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> SuiResult<()> {
619        let mut batch = self.perpetual_tables.objects.batch();
620        let ref_and_objects: Vec<_> = objects
621            .iter()
622            .map(|o| (o.compute_object_reference(), o))
623            .collect();
624
625        batch.insert_batch(
626            &self.perpetual_tables.objects,
627            ref_and_objects
628                .iter()
629                .map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))),
630        )?;
631
632        let non_child_object_refs: Vec<_> = ref_and_objects
633            .iter()
634            .filter(|(_, object)| !object.is_child_object())
635            .map(|(oref, _)| *oref)
636            .collect();
637
638        self.initialize_live_object_markers_impl(
639            &mut batch,
640            &non_child_object_refs,
641            false, // is_force_reset
642        )?;
643
644        batch.write()?;
645
646        Ok(())
647    }
648
649    pub async fn bulk_insert_live_objects(
650        perpetual_db: Arc<AuthorityPerpetualTables>,
651        objects: Vec<LiveObject>,
652        expected_sha3_digest: &[u8; 32],
653        num_parallel_chunks: usize,
654    ) -> SuiResult<()> {
655        // Verify SHA3 over the full object set before inserting.
656        let mut hasher = Sha3_256::default();
657        for object in &objects {
658            hasher.update(object.object_reference().2.inner());
659        }
660        let sha3_digest = hasher.finalize().digest;
661        if *expected_sha3_digest != sha3_digest {
662            error!(
663                "Sha does not match! expected: {:?}, actual: {:?}",
664                expected_sha3_digest, sha3_digest
665            );
666            return Err(SuiError::from("Sha does not match"));
667        }
668
669        let chunk_size = objects.len().div_ceil(num_parallel_chunks).max(1);
670        let mut remaining = objects;
671        let mut handles = Vec::new();
672        while !remaining.is_empty() {
673            let take = chunk_size.min(remaining.len());
674            let chunk: Vec<LiveObject> = remaining.drain(..take).collect();
675            let db = perpetual_db.clone();
676            handles.push(tokio::task::spawn_blocking(move || {
677                Self::insert_objects_chunk(db, chunk)
678            }));
679        }
680        for handle in handles {
681            handle.await.expect("insert task panicked")?;
682        }
683        Ok(())
684    }
685
686    fn insert_objects_chunk(
687        perpetual_db: Arc<AuthorityPerpetualTables>,
688        objects: Vec<LiveObject>,
689    ) -> SuiResult<()> {
690        let mut batch = perpetual_db.objects.batch();
691        let mut written = 0usize;
692        const MAX_BATCH_SIZE: usize = 100_000;
693        for object in objects {
694            match object {
695                LiveObject::Normal(object) => {
696                    let store_object_wrapper = get_store_object(object.clone());
697                    batch.insert_batch(
698                        &perpetual_db.objects,
699                        std::iter::once((
700                            ObjectKey::from(object.compute_object_reference()),
701                            store_object_wrapper,
702                        )),
703                    )?;
704                    if !object.is_child_object() {
705                        Self::initialize_live_object_markers(
706                            &perpetual_db.live_owned_object_markers,
707                            &mut batch,
708                            &[object.compute_object_reference()],
709                            true, // is_force_reset
710                        )?;
711                    }
712                }
713                LiveObject::Wrapped(object_key) => {
714                    batch.insert_batch(
715                        &perpetual_db.objects,
716                        std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
717                            object_key,
718                            StoreObject::Wrapped.into(),
719                        )),
720                    )?;
721                }
722            }
723            written += 1;
724            if written > MAX_BATCH_SIZE {
725                batch.write()?;
726                batch = perpetual_db.objects.batch();
727                written = 0;
728            }
729        }
730        batch.write()?;
731        Ok(())
732    }
733
734    pub fn set_epoch_start_configuration(
735        &self,
736        epoch_start_configuration: &EpochStartConfiguration,
737    ) -> SuiResult {
738        self.perpetual_tables
739            .set_epoch_start_configuration(epoch_start_configuration)?;
740        Ok(())
741    }
742
743    pub fn get_epoch_start_configuration(&self) -> SuiResult<Option<EpochStartConfiguration>> {
744        Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
745    }
746
747    /// Updates the state resulting from the execution of a certificate.
748    ///
749    /// Internally it checks that all locks for active inputs are at the correct
750    /// version, and then writes objects, certificates, parents and clean up locks atomically.
751    #[instrument(level = "debug", skip_all)]
752    pub fn build_db_batch(
753        &self,
754        epoch_id: EpochId,
755        tx_outputs: &[Arc<TransactionOutputs>],
756    ) -> SuiResult<DBBatch> {
757        let mut written = Vec::with_capacity(tx_outputs.len());
758        for outputs in tx_outputs {
759            written.extend(outputs.written.values().cloned());
760        }
761
762        let mut write_batch = self.perpetual_tables.transactions.batch();
763        for outputs in tx_outputs {
764            self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
765        }
766        // test crashing before writing the batch
767        fail_point!("crash");
768
769        trace!(
770            "built batch for committed transactions: {:?}",
771            tx_outputs
772                .iter()
773                .map(|tx| tx.transaction.digest())
774                .collect::<Vec<_>>()
775        );
776
777        // test crashing before notifying
778        fail_point!("crash");
779
780        Ok(write_batch)
781    }
782
783    fn write_one_transaction_outputs(
784        &self,
785        write_batch: &mut DBBatch,
786        epoch_id: EpochId,
787        tx_outputs: &TransactionOutputs,
788    ) -> SuiResult {
789        let TransactionOutputs {
790            transaction,
791            effects,
792            markers,
793            wrapped,
794            deleted,
795            written,
796            events,
797            unchanged_loaded_runtime_objects,
798            locks_to_delete,
799            new_locks_to_init,
800            ..
801        } = tx_outputs;
802
803        let effects_digest = effects.digest();
804        let transaction_digest = transaction.digest();
805        // effects must be inserted before the corresponding dependent entries
806        // because they carry epoch information necessary for correct pruning via relocation filters
807        write_batch
808            .insert_batch(
809                &self.perpetual_tables.effects,
810                [(effects_digest, effects.clone())],
811            )?
812            .insert_batch(
813                &self.perpetual_tables.executed_effects,
814                [(transaction_digest, effects_digest)],
815            )?;
816
817        // Store the certificate indexed by transaction digest
818        write_batch.insert_batch(
819            &self.perpetual_tables.transactions,
820            iter::once((transaction_digest, transaction.serializable_ref())),
821        )?;
822
823        write_batch.insert_batch(
824            &self.perpetual_tables.executed_transaction_digests,
825            [((epoch_id, *transaction_digest), ())],
826        )?;
827
828        // Add batched writes for objects and locks.
829        write_batch.insert_batch(
830            &self.perpetual_tables.object_per_epoch_marker_table_v2,
831            markers
832                .iter()
833                .map(|(key, marker_value)| (EpochMarkerKey(epoch_id, *key), *marker_value)),
834        )?;
835        write_batch.insert_batch(
836            &self.perpetual_tables.objects,
837            deleted
838                .iter()
839                .map(|key| (key, StoreObject::Deleted))
840                .chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
841                .map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
842        )?;
843
844        // Insert each output object into the stores
845        let new_objects = written.iter().map(|(id, new_object)| {
846            let version = new_object.version();
847            trace!(?id, ?version, "writing object");
848            let store_object = get_store_object(new_object.clone());
849            (ObjectKey(*id, version), store_object)
850        });
851
852        write_batch.insert_batch(&self.perpetual_tables.objects, new_objects)?;
853
854        // Write events into the new table keyed off of transaction_digest
855        if effects.events_digest().is_some() {
856            write_batch.insert_batch(
857                &self.perpetual_tables.events_2,
858                [(transaction_digest, events)],
859            )?;
860        }
861
862        // Write unchanged_loaded_runtime_objects
863        if !unchanged_loaded_runtime_objects.is_empty() {
864            write_batch.insert_batch(
865                &self.perpetual_tables.unchanged_loaded_runtime_objects,
866                [(transaction_digest, unchanged_loaded_runtime_objects)],
867            )?;
868        }
869
870        self.initialize_live_object_markers_impl(write_batch, new_locks_to_init, false)?;
871
872        // Note: deletes locks for received objects as well (but not for objects that were in
873        // `Receiving` arguments which were not received)
874        self.delete_live_object_markers(write_batch, locks_to_delete)?;
875
876        debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
877
878        Ok(())
879    }
880
881    /// Commits transactions only (not effects or other transaction outputs) to the db.
882    /// See ExecutionCache::persist_transaction for more info
883    pub(crate) fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> SuiResult {
884        let mut batch = self.perpetual_tables.transactions.batch();
885        batch.insert_batch(
886            &self.perpetual_tables.transactions,
887            [(tx.digest(), tx.clone().into_unsigned().serializable_ref())],
888        )?;
889        batch.write()?;
890        Ok(())
891    }
892
893    /// Gets ObjectLockInfo that represents state of lock on an object.
894    /// Returns UserInputError::ObjectNotFound if cannot find lock record for this object
895    pub(crate) fn get_lock(
896        &self,
897        obj_ref: ObjectRef,
898        epoch_store: &AuthorityPerEpochStore,
899    ) -> SuiLockResult {
900        if self
901            .perpetual_tables
902            .live_owned_object_markers
903            .get(&obj_ref)?
904            .is_none()
905        {
906            return Ok(ObjectLockStatus::LockedAtDifferentVersion {
907                locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
908            });
909        }
910
911        let tables = epoch_store.tables()?;
912        let epoch_id = epoch_store.epoch();
913
914        if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
915            Ok(ObjectLockStatus::LockedToTx {
916                locked_by_tx: LockDetailsDeprecated {
917                    epoch: epoch_id,
918                    tx_digest,
919                },
920            })
921        } else {
922            Ok(ObjectLockStatus::Initialized)
923        }
924    }
925
926    /// Returns UserInputError::ObjectNotFound if no lock records found for this object.
927    pub(crate) fn get_latest_live_version_for_object_id(
928        &self,
929        object_id: ObjectID,
930    ) -> SuiResult<ObjectRef> {
931        let mut iterator = self
932            .perpetual_tables
933            .live_owned_object_markers
934            .reversed_safe_iter_with_bounds(
935                None,
936                Some((object_id, SequenceNumber::MAX, ObjectDigest::MAX)),
937            )?;
938        Ok(iterator
939            .next()
940            .transpose()?
941            .and_then(|value| {
942                if value.0.0 == object_id {
943                    Some(value)
944                } else {
945                    None
946                }
947            })
948            .ok_or_else(|| {
949                SuiError::from(UserInputError::ObjectNotFound {
950                    object_id,
951                    version: None,
952                })
953            })?
954            .0)
955    }
956
957    /// Checks multiple object locks exist.
958    /// Returns UserInputError::ObjectNotFound if cannot find lock record for at least one of the objects.
959    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if at least one object lock is not initialized
960    ///     at the given version.
961    pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> SuiResult {
962        let locks = self
963            .perpetual_tables
964            .live_owned_object_markers
965            .multi_get(objects)?;
966        for (lock, obj_ref) in locks.into_iter().zip(objects) {
967            if lock.is_none() {
968                let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
969                fp_bail!(
970                    UserInputError::ObjectVersionUnavailableForConsumption {
971                        provided_obj_ref: *obj_ref,
972                        current_version: latest_lock.1
973                    }
974                    .into()
975                );
976            }
977        }
978        Ok(())
979    }
980
981    /// Initialize a lock to None (but exists) for a given list of ObjectRefs.
982    /// Returns SuiErrorKind::ObjectLockAlreadyInitialized if the lock already exists and is locked to a transaction
983    fn initialize_live_object_markers_impl(
984        &self,
985        write_batch: &mut DBBatch,
986        objects: &[ObjectRef],
987        is_force_reset: bool,
988    ) -> SuiResult {
989        AuthorityStore::initialize_live_object_markers(
990            &self.perpetual_tables.live_owned_object_markers,
991            write_batch,
992            objects,
993            is_force_reset,
994        )
995    }
996
997    pub fn initialize_live_object_markers(
998        live_object_marker_table: &DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
999        write_batch: &mut DBBatch,
1000        objects: &[ObjectRef],
1001        is_force_reset: bool,
1002    ) -> SuiResult {
1003        trace!(?objects, "initialize_locks");
1004
1005        if !is_force_reset {
1006            let live_object_markers = live_object_marker_table.multi_get(objects)?;
1007            // If any live_object_markers exist and are not None, return errors for them
1008            // Note we don't check if there is a pre-existing lock. this is because initializing the live
1009            // object marker will not overwrite the lock and cause the validator to equivocate.
1010            let existing_live_object_markers: Vec<ObjectRef> = live_object_markers
1011                .iter()
1012                .zip(objects)
1013                .filter_map(|(lock_opt, objref)| {
1014                    lock_opt.clone().flatten().map(|_tx_digest| *objref)
1015                })
1016                .collect();
1017            if !existing_live_object_markers.is_empty() {
1018                info!(
1019                    ?existing_live_object_markers,
1020                    "Cannot initialize live_object_markers because some exist already"
1021                );
1022                return Err(SuiErrorKind::ObjectLockAlreadyInitialized {
1023                    refs: existing_live_object_markers,
1024                }
1025                .into());
1026            }
1027        }
1028
1029        write_batch.insert_batch(
1030            live_object_marker_table,
1031            objects.iter().map(|obj_ref| (obj_ref, None)),
1032        )?;
1033        Ok(())
1034    }
1035
1036    /// Removes locks for a given list of ObjectRefs.
1037    fn delete_live_object_markers(
1038        &self,
1039        write_batch: &mut DBBatch,
1040        objects: &[ObjectRef],
1041    ) -> SuiResult {
1042        trace!(?objects, "delete_locks");
1043        write_batch.delete_batch(
1044            &self.perpetual_tables.live_owned_object_markers,
1045            objects.iter(),
1046        )?;
1047        Ok(())
1048    }
1049
1050    /// Return the object with version less then or eq to the provided seq number.
1051    /// This is used by indexer to find the correct version of dynamic field child object.
1052    /// We do not store the version of the child object, but because of lamport timestamp,
1053    /// we know the child must have version number less then or eq to the parent.
1054    pub fn find_object_lt_or_eq_version(
1055        &self,
1056        object_id: ObjectID,
1057        version: SequenceNumber,
1058    ) -> SuiResult<Option<Object>> {
1059        self.perpetual_tables
1060            .find_object_lt_or_eq_version(object_id, version)
1061    }
1062
1063    /// Returns the latest object reference we have for this object_id in the objects table.
1064    ///
1065    /// The method may also return the reference to a deleted object with a digest of
1066    /// ObjectDigest::deleted() or ObjectDigest::wrapped() and lamport version
1067    /// of a transaction that deleted the object.
1068    /// Note that a deleted object may re-appear if the deletion was the result of the object
1069    /// being wrapped in another object.
1070    ///
1071    /// If no entry for the object_id is found, return None.
1072    pub fn get_latest_object_ref_or_tombstone(
1073        &self,
1074        object_id: ObjectID,
1075    ) -> Result<Option<ObjectRef>, SuiError> {
1076        self.perpetual_tables
1077            .get_latest_object_ref_or_tombstone(object_id)
1078    }
1079
1080    /// Returns the latest object reference if and only if the object is still live (i.e. it does
1081    /// not return tombstones)
1082    pub fn get_latest_object_ref_if_alive(
1083        &self,
1084        object_id: ObjectID,
1085    ) -> Result<Option<ObjectRef>, SuiError> {
1086        match self.get_latest_object_ref_or_tombstone(object_id)? {
1087            Some(objref) if objref.2.is_alive() => Ok(Some(objref)),
1088            _ => Ok(None),
1089        }
1090    }
1091
1092    /// Returns the latest object we have for this object_id in the objects table.
1093    ///
1094    /// If no entry for the object_id is found, return None.
1095    pub fn get_latest_object_or_tombstone(
1096        &self,
1097        object_id: ObjectID,
1098    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, SuiError> {
1099        let Some((object_key, store_object)) = self
1100            .perpetual_tables
1101            .get_latest_object_or_tombstone(object_id)?
1102        else {
1103            return Ok(None);
1104        };
1105
1106        if let Some(object_ref) = self
1107            .perpetual_tables
1108            .tombstone_reference(&object_key, &store_object)?
1109        {
1110            return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
1111        }
1112
1113        let object = self
1114            .perpetual_tables
1115            .object(&object_key, store_object)?
1116            .expect("Non tombstone store object could not be converted to object");
1117
1118        Ok(Some((object_key, ObjectOrTombstone::Object(object))))
1119    }
1120
1121    pub fn insert_transaction_and_effects(
1122        &self,
1123        transaction: &VerifiedTransaction,
1124        transaction_effects: &TransactionEffects,
1125    ) -> Result<(), TypedStoreError> {
1126        let mut write_batch = self.perpetual_tables.transactions.batch();
1127        // effects must be inserted before the corresponding transaction entry
1128        // because they carry epoch information necessary for correct pruning via relocation filters
1129        write_batch
1130            .insert_batch(
1131                &self.perpetual_tables.effects,
1132                [(transaction_effects.digest(), transaction_effects)],
1133            )?
1134            .insert_batch(
1135                &self.perpetual_tables.transactions,
1136                [(transaction.digest(), transaction.serializable_ref())],
1137            )?;
1138
1139        write_batch.write()?;
1140        Ok(())
1141    }
1142
1143    pub fn multi_insert_transaction_and_effects<'a>(
1144        &self,
1145        transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
1146    ) -> Result<(), TypedStoreError> {
1147        let mut write_batch = self.perpetual_tables.transactions.batch();
1148        for tx in transactions {
1149            write_batch
1150                .insert_batch(
1151                    &self.perpetual_tables.effects,
1152                    [(tx.effects.digest(), &tx.effects)],
1153                )?
1154                .insert_batch(
1155                    &self.perpetual_tables.transactions,
1156                    [(tx.transaction.digest(), tx.transaction.serializable_ref())],
1157                )?;
1158        }
1159
1160        write_batch.write()?;
1161        Ok(())
1162    }
1163
1164    pub fn multi_get_transaction_blocks(
1165        &self,
1166        tx_digests: &[TransactionDigest],
1167    ) -> Result<Vec<Option<VerifiedTransaction>>, TypedStoreError> {
1168        self.perpetual_tables
1169            .transactions
1170            .multi_get(tx_digests)
1171            .map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())
1172    }
1173
1174    pub fn get_transaction_block(
1175        &self,
1176        tx_digest: &TransactionDigest,
1177    ) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
1178        self.perpetual_tables
1179            .transactions
1180            .get(tx_digest)
1181            .map(|v| v.map(|v| v.into()))
1182    }
1183
1184    /// This function reads the DB directly to get the system state object.
1185    /// If reconfiguration is happening at the same time, there is no guarantee whether we would be getting
1186    /// the old or the new system state object.
1187    /// Hence this function should only be called during RPC reads where data race is not a major concern.
1188    /// In general we should avoid this as much as possible.
1189    /// If the intent is for testing, you can use AuthorityState:: get_sui_system_state_object_for_testing.
1190    pub fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1191        get_sui_system_state(self.perpetual_tables.as_ref())
1192    }
1193
1194    pub fn expensive_check_sui_conservation<T>(
1195        self: &Arc<Self>,
1196        type_layout_store: T,
1197        old_epoch_store: &AuthorityPerEpochStore,
1198    ) -> SuiResult
1199    where
1200        T: TypeLayoutStore + Send + Copy,
1201    {
1202        if !self.enable_epoch_sui_conservation_check {
1203            return Ok(());
1204        }
1205
1206        let executor = old_epoch_store.executor();
1207        info!("Starting SUI conservation check. This may take a while..");
1208        let cur_time = Instant::now();
1209        let mut pending_objects = vec![];
1210        let mut count = 0;
1211        let mut size = 0;
1212        let (mut total_sui, mut total_storage_rebate) = thread::scope(|s| {
1213            let pending_tasks = FuturesUnordered::new();
1214            for o in self.iter_live_object_set(false) {
1215                match o {
1216                    LiveObject::Normal(object) => {
1217                        size += object.object_size_for_gas_metering();
1218                        count += 1;
1219                        pending_objects.push(object);
1220                        if count % 1_000_000 == 0 {
1221                            let mut task_objects = vec![];
1222                            mem::swap(&mut pending_objects, &mut task_objects);
1223                            pending_tasks.push(s.spawn(move || {
1224                                let mut layout_resolver =
1225                                    executor.type_layout_resolver(Box::new(type_layout_store));
1226                                let mut total_storage_rebate = 0;
1227                                let mut total_sui = 0;
1228                                for object in task_objects {
1229                                    total_storage_rebate += object.storage_rebate;
1230                                    // get_total_sui includes storage rebate, however all storage rebate is
1231                                    // also stored in the storage fund, so we need to subtract it here.
1232                                    let object_contained_sui = match object
1233                                        .get_total_sui(layout_resolver.as_mut())
1234                                    {
1235                                        Ok(sui) => sui,
1236                                        Err(e)
1237                                            if old_epoch_store.get_chain()
1238                                                == sui_protocol_config::Chain::Testnet =>
1239                                        {
1240                                            error!(
1241                                                "Error calculating total SUI for object {:?}: {:?}",
1242                                                object.compute_object_reference(),
1243                                                e
1244                                            );
1245                                            0
1246                                        }
1247                                        Err(e) => panic!(
1248                                            "Error calculating total SUI for object {:?}: {:?}",
1249                                            object.compute_object_reference(),
1250                                            e
1251                                        ),
1252                                    };
1253                                    total_sui += object_contained_sui - object.storage_rebate;
1254                                }
1255                                if count % 50_000_000 == 0 {
1256                                    info!("Processed {} objects", count);
1257                                }
1258                                (total_sui, total_storage_rebate)
1259                            }));
1260                        }
1261                    }
1262                    LiveObject::Wrapped(_) => {
1263                        unreachable!("Explicitly asked to not include wrapped tombstones")
1264                    }
1265                }
1266            }
1267            pending_tasks.into_iter().fold((0, 0), |init, result| {
1268                let result = result.join().unwrap();
1269                (init.0 + result.0, init.1 + result.1)
1270            })
1271        });
1272        let mut layout_resolver = executor.type_layout_resolver(Box::new(type_layout_store));
1273        for object in pending_objects {
1274            total_storage_rebate += object.storage_rebate;
1275            total_sui +=
1276                object.get_total_sui(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
1277        }
1278        info!(
1279            "Scanned {} live objects, took {:?}",
1280            count,
1281            cur_time.elapsed()
1282        );
1283        self.metrics
1284            .sui_conservation_live_object_count
1285            .set(count as i64);
1286        self.metrics
1287            .sui_conservation_live_object_size
1288            .set(size as i64);
1289        self.metrics
1290            .sui_conservation_check_latency
1291            .set(cur_time.elapsed().as_secs() as i64);
1292
1293        // It is safe to call this function because we are in the middle of reconfiguration.
1294        let system_state = self
1295            .get_sui_system_state_object_unsafe()
1296            .expect("Reading sui system state object cannot fail")
1297            .into_sui_system_state_summary();
1298        let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
1299        info!(
1300            "Total SUI amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
1301            total_sui, storage_fund_balance, total_storage_rebate, system_state.epoch
1302        );
1303
1304        let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
1305        self.metrics
1306            .sui_conservation_storage_fund
1307            .set(storage_fund_balance as i64);
1308        self.metrics
1309            .sui_conservation_storage_fund_imbalance
1310            .set(imbalance);
1311        self.metrics
1312            .sui_conservation_imbalance
1313            .set((total_sui as i128 - TOTAL_SUPPLY_MIST as i128) as i64);
1314
1315        if let Some(expected_imbalance) = self
1316            .perpetual_tables
1317            .expected_storage_fund_imbalance
1318            .get(&())
1319            .expect("DB read cannot fail")
1320        {
1321            fp_ensure!(
1322                imbalance == expected_imbalance,
1323                SuiError::from(
1324                    format!(
1325                        "Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
1326                        system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
1327                    ).as_str()
1328                )
1329            );
1330        } else {
1331            self.perpetual_tables
1332                .expected_storage_fund_imbalance
1333                .insert(&(), &imbalance)
1334                .expect("DB write cannot fail");
1335        }
1336
1337        if let Some(expected_sui) = self
1338            .perpetual_tables
1339            .expected_network_sui_amount
1340            .get(&())
1341            .expect("DB read cannot fail")
1342        {
1343            fp_ensure!(
1344                total_sui == expected_sui,
1345                SuiError::from(
1346                    format!(
1347                        "Inconsistent state detected at epoch {}: total sui: {}, expecting {}",
1348                        system_state.epoch, total_sui, expected_sui
1349                    )
1350                    .as_str()
1351                )
1352            );
1353        } else {
1354            self.perpetual_tables
1355                .expected_network_sui_amount
1356                .insert(&(), &total_sui)
1357                .expect("DB write cannot fail");
1358        }
1359
1360        Ok(())
1361    }
1362
1363    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
1364    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
1365    #[instrument(level = "error", skip_all)]
1366    pub fn maybe_reaccumulate_state_hash(
1367        &self,
1368        cur_epoch_store: &AuthorityPerEpochStore,
1369        new_protocol_version: ProtocolVersion,
1370    ) {
1371        let old_simplified_unwrap_then_delete = cur_epoch_store
1372            .protocol_config()
1373            .simplified_unwrap_then_delete();
1374        let new_simplified_unwrap_then_delete =
1375            ProtocolConfig::get_for_version(new_protocol_version, cur_epoch_store.get_chain())
1376                .simplified_unwrap_then_delete();
1377        // If in the new epoch the simplified_unwrap_then_delete is enabled for the first time,
1378        // we re-accumulate state root.
1379        let should_reaccumulate =
1380            !old_simplified_unwrap_then_delete && new_simplified_unwrap_then_delete;
1381        if !should_reaccumulate {
1382            return;
1383        }
1384        info!(
1385            "[Re-accumulate] simplified_unwrap_then_delete is enabled in the new protocol version, re-accumulating state hash"
1386        );
1387        let cur_time = Instant::now();
1388        std::thread::scope(|s| {
1389            let pending_tasks = FuturesUnordered::new();
1390            // Shard the object IDs into different ranges so that we can process them in parallel.
1391            // We divide the range into 2^BITS number of ranges. To do so we use the highest BITS bits
1392            // to mark the starting/ending point of the range. For example, when BITS = 5, we
1393            // divide the range into 32 ranges, and the first few ranges are:
1394            // 00000000_.... to 00000111_....
1395            // 00001000_.... to 00001111_....
1396            // 00010000_.... to 00010111_....
1397            // and etc.
1398            const BITS: u8 = 5;
1399            for index in 0u8..(1 << BITS) {
1400                pending_tasks.push(s.spawn(move || {
1401                    let mut id_bytes = [0; ObjectID::LENGTH];
1402                    id_bytes[0] = index << (8 - BITS);
1403                    let start_id = ObjectID::new(id_bytes);
1404
1405                    id_bytes[0] |= (1 << (8 - BITS)) - 1;
1406                    for element in id_bytes.iter_mut().skip(1) {
1407                        *element = u8::MAX;
1408                    }
1409                    let end_id = ObjectID::new(id_bytes);
1410
1411                    info!(
1412                        "[Re-accumulate] Scanning object ID range {:?}..{:?}",
1413                        start_id, end_id
1414                    );
1415                    let mut prev = (
1416                        ObjectKey::min_for_id(&ObjectID::ZERO),
1417                        StoreObjectWrapper::V1(StoreObject::Deleted),
1418                    );
1419                    let mut object_scanned: u64 = 0;
1420                    let mut wrapped_objects_to_remove = vec![];
1421                    for db_result in self.perpetual_tables.objects.safe_range_iter(
1422                        ObjectKey::min_for_id(&start_id)..=ObjectKey::max_for_id(&end_id),
1423                    ) {
1424                        match db_result {
1425                            Ok((object_key, object)) => {
1426                                object_scanned += 1;
1427                                if object_scanned.is_multiple_of(100000) {
1428                                    info!(
1429                                        "[Re-accumulate] Task {}: object scanned: {}",
1430                                        index, object_scanned,
1431                                    );
1432                                }
1433                                if matches!(prev.1.inner(), StoreObject::Wrapped)
1434                                    && object_key.0 != prev.0.0
1435                                {
1436                                    wrapped_objects_to_remove
1437                                        .push(WrappedObject::new(prev.0.0, prev.0.1));
1438                                }
1439
1440                                prev = (object_key, object);
1441                            }
1442                            Err(err) => {
1443                                warn!("Object iterator encounter RocksDB error {:?}", err);
1444                                return Err(err);
1445                            }
1446                        }
1447                    }
1448                    if matches!(prev.1.inner(), StoreObject::Wrapped) {
1449                        wrapped_objects_to_remove.push(WrappedObject::new(prev.0.0, prev.0.1));
1450                    }
1451                    info!(
1452                        "[Re-accumulate] Task {}: object scanned: {}, wrapped objects: {}",
1453                        index,
1454                        object_scanned,
1455                        wrapped_objects_to_remove.len(),
1456                    );
1457                    Ok((wrapped_objects_to_remove, object_scanned))
1458                }));
1459            }
1460            let (last_checkpoint_of_epoch, cur_accumulator) = self
1461                .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
1462                .expect("read cannot fail")
1463                .expect("accumulator must exist");
1464            let (accumulator, total_objects_scanned, total_wrapped_objects) =
1465                pending_tasks.into_iter().fold(
1466                    (cur_accumulator, 0u64, 0usize),
1467                    |(mut accumulator, total_objects_scanned, total_wrapped_objects), task| {
1468                        let (wrapped_objects_to_remove, object_scanned) =
1469                            task.join().unwrap().unwrap();
1470                        accumulator.remove_all(
1471                            wrapped_objects_to_remove
1472                                .iter()
1473                                .map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
1474                                .collect::<Vec<Vec<u8>>>(),
1475                        );
1476                        (
1477                            accumulator,
1478                            total_objects_scanned + object_scanned,
1479                            total_wrapped_objects + wrapped_objects_to_remove.len(),
1480                        )
1481                    },
1482                );
1483            info!(
1484                "[Re-accumulate] Total objects scanned: {}, total wrapped objects: {}",
1485                total_objects_scanned, total_wrapped_objects,
1486            );
1487            info!(
1488                "[Re-accumulate] New accumulator value: {:?}",
1489                accumulator.digest()
1490            );
1491            self.insert_state_hash_for_epoch(
1492                cur_epoch_store.epoch(),
1493                &last_checkpoint_of_epoch,
1494                &accumulator,
1495            )
1496            .unwrap();
1497        });
1498        info!(
1499            "[Re-accumulate] Re-accumulating took {}seconds",
1500            cur_time.elapsed().as_secs()
1501        );
1502    }
1503
1504    pub async fn prune_objects_and_compact_for_testing(
1505        &self,
1506        checkpoint_store: &Arc<CheckpointStore>,
1507        rpc_index: Option<&RpcIndexStore>,
1508    ) {
1509        let pruning_config = AuthorityStorePruningConfig {
1510            num_epochs_to_retain: 0,
1511            ..Default::default()
1512        };
1513        let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
1514            &self.perpetual_tables,
1515            checkpoint_store,
1516            rpc_index,
1517            pruning_config,
1518            AuthorityStorePruningMetrics::new_for_test(),
1519            EPOCH_DURATION_MS_FOR_TESTING,
1520        )
1521        .await;
1522        let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
1523    }
1524
1525    pub fn remove_executed_effects_for_testing(
1526        &self,
1527        tx_digest: &TransactionDigest,
1528    ) -> anyhow::Result<()> {
1529        let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
1530        if let Some(effects_digest) = effects_digest {
1531            self.perpetual_tables.executed_effects.remove(tx_digest)?;
1532            self.perpetual_tables.effects.remove(&effects_digest)?;
1533        }
1534        Ok(())
1535    }
1536
1537    #[cfg(test)]
1538    pub async fn prune_objects_immediately_for_testing(
1539        &self,
1540        transaction_effects: Vec<TransactionEffects>,
1541    ) -> anyhow::Result<()> {
1542        let mut wb = self.perpetual_tables.objects.batch();
1543
1544        let mut object_keys_to_prune = vec![];
1545        for effects in &transaction_effects {
1546            for (object_id, seq_number) in effects.modified_at_versions() {
1547                info!("Pruning object {:?} version {:?}", object_id, seq_number);
1548                object_keys_to_prune.push(ObjectKey(object_id, seq_number));
1549            }
1550        }
1551
1552        wb.delete_batch(
1553            &self.perpetual_tables.objects,
1554            object_keys_to_prune.into_iter(),
1555        )?;
1556        wb.write()?;
1557        Ok(())
1558    }
1559
1560    // Counts the number of versions exist in object store for `object_id`. This includes tombstone.
1561    #[cfg(msim)]
1562    pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
1563        self.perpetual_tables
1564            .objects
1565            .safe_iter_with_bounds(
1566                Some(ObjectKey(object_id, VersionNumber::MIN)),
1567                Some(ObjectKey(object_id, VersionNumber::MAX)),
1568            )
1569            .collect::<Result<Vec<_>, _>>()
1570            .unwrap()
1571            .len()
1572    }
1573}
1574
1575impl GlobalStateHashStore for AuthorityStore {
1576    fn get_object_ref_prior_to_key_deprecated(
1577        &self,
1578        object_id: &ObjectID,
1579        version: VersionNumber,
1580    ) -> SuiResult<Option<ObjectRef>> {
1581        self.get_object_ref_prior_to_key(object_id, version)
1582    }
1583
1584    fn get_root_state_hash_for_epoch(
1585        &self,
1586        epoch: EpochId,
1587    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
1588        self.perpetual_tables
1589            .root_state_hash_by_epoch
1590            .get(&epoch)
1591            .map_err(Into::into)
1592    }
1593
1594    fn get_root_state_hash_for_highest_epoch(
1595        &self,
1596    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
1597        Ok(self
1598            .perpetual_tables
1599            .root_state_hash_by_epoch
1600            .reversed_safe_iter_with_bounds(None, None)?
1601            .next()
1602            .transpose()?)
1603    }
1604
1605    fn insert_state_hash_for_epoch(
1606        &self,
1607        epoch: EpochId,
1608        last_checkpoint_of_epoch: &CheckpointSequenceNumber,
1609        acc: &GlobalStateHash,
1610    ) -> SuiResult {
1611        self.perpetual_tables
1612            .root_state_hash_by_epoch
1613            .insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
1614        self.root_state_notify_read
1615            .notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
1616
1617        Ok(())
1618    }
1619
1620    fn iter_live_object_set(
1621        &self,
1622        include_wrapped_object: bool,
1623    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1624        Box::new(
1625            self.perpetual_tables
1626                .iter_live_object_set(include_wrapped_object),
1627        )
1628    }
1629}
1630
1631impl ObjectStore for AuthorityStore {
1632    /// Read an object and return it, or Ok(None) if the object was not found.
1633    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
1634        self.perpetual_tables.as_ref().get_object(object_id)
1635    }
1636
1637    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
1638        self.perpetual_tables.get_object_by_key(object_id, version)
1639    }
1640}
1641
1642/// A wrapper to make Orphan Rule happy
1643pub struct ResolverWrapper {
1644    pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1645    pub metrics: Arc<ResolverMetrics>,
1646}
1647
1648impl ResolverWrapper {
1649    pub fn new(
1650        resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1651        metrics: Arc<ResolverMetrics>,
1652    ) -> Self {
1653        metrics.module_cache_size.set(0);
1654        ResolverWrapper { resolver, metrics }
1655    }
1656
1657    fn inc_cache_size_gauge(&self) {
1658        // reset the gauge after a restart of the cache
1659        let current = self.metrics.module_cache_size.get();
1660        self.metrics.module_cache_size.set(current + 1);
1661    }
1662}
1663
1664impl ModuleResolver for ResolverWrapper {
1665    type Error = SuiError;
1666    fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
1667        self.inc_cache_size_gauge();
1668        get_module(&*self.resolver, module_id)
1669    }
1670
1671    fn get_packages_static<const N: usize>(
1672        &self,
1673        ids: [AccountAddress; N],
1674    ) -> Result<[Option<SerializedPackage>; N], Self::Error> {
1675        let mut packages = [const { None }; N];
1676        for (i, id) in ids.iter().enumerate() {
1677            packages[i] = get_package(&*self.resolver, &ObjectID::from(*id))?;
1678        }
1679        Ok(packages)
1680    }
1681
1682    fn get_packages<'a>(
1683        &self,
1684        ids: impl ExactSizeIterator<Item = &'a AccountAddress>,
1685    ) -> Result<Vec<Option<SerializedPackage>>, Self::Error> {
1686        ids.map(|id| get_package(&*self.resolver, &ObjectID::from(*id)))
1687            .collect()
1688    }
1689}
1690
1691pub enum UpdateType {
1692    Transaction(TransactionEffectsDigest),
1693    Genesis,
1694}
1695
1696pub type SuiLockResult = SuiResult<ObjectLockStatus>;
1697
1698#[derive(Debug, PartialEq, Eq)]
1699pub enum ObjectLockStatus {
1700    Initialized,
1701    LockedToTx { locked_by_tx: LockDetailsDeprecated },
1702    LockedAtDifferentVersion { locked_ref: ObjectRef },
1703}
1704
1705#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1706pub enum LockDetailsWrapperDeprecated {
1707    V1(LockDetailsV1Deprecated),
1708}
1709
1710impl LockDetailsWrapperDeprecated {
1711    pub fn migrate(self) -> Self {
1712        // TODO: when there are multiple versions, we must iteratively migrate from version N to
1713        // N+1 until we arrive at the latest version
1714        self
1715    }
1716
1717    // Always returns the most recent version. Older versions are migrated to the latest version at
1718    // read time, so there is never a need to access older versions.
1719    pub fn inner(&self) -> &LockDetailsDeprecated {
1720        match self {
1721            Self::V1(v1) => v1,
1722
1723            // can remove #[allow] when there are multiple versions
1724            #[allow(unreachable_patterns)]
1725            _ => panic!("lock details should have been migrated to latest version at read time"),
1726        }
1727    }
1728    pub fn into_inner(self) -> LockDetailsDeprecated {
1729        match self {
1730            Self::V1(v1) => v1,
1731
1732            // can remove #[allow] when there are multiple versions
1733            #[allow(unreachable_patterns)]
1734            _ => panic!("lock details should have been migrated to latest version at read time"),
1735        }
1736    }
1737}
1738
1739#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1740pub struct LockDetailsV1Deprecated {
1741    pub epoch: EpochId,
1742    pub tx_digest: TransactionDigest,
1743}
1744
1745pub type LockDetailsDeprecated = LockDetailsV1Deprecated;
1746
1747impl From<LockDetailsDeprecated> for LockDetailsWrapperDeprecated {
1748    fn from(details: LockDetailsDeprecated) -> Self {
1749        // always use latest version.
1750        LockDetailsWrapperDeprecated::V1(details)
1751    }
1752}