sui_core/authority/
authority_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::{iter, mem, thread};
6
7use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
8use crate::authority::authority_store_pruner::{
9    AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
10};
11use crate::authority::authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object};
12use crate::authority::epoch_marker_key::EpochMarkerKey;
13use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
14use crate::global_state_hasher::GlobalStateHashStore;
15use crate::rpc_index::RpcIndexStore;
16use crate::transaction_outputs::TransactionOutputs;
17use fastcrypto::hash::{HashFunction, MultisetHash, Sha3_256};
18use futures::stream::FuturesUnordered;
19use move_core_types::account_address::AccountAddress;
20use move_core_types::resolver::{ModuleResolver, SerializedPackage};
21use serde::{Deserialize, Serialize};
22use sui_config::node::AuthorityStorePruningConfig;
23use sui_macros::fail_point_arg;
24use sui_types::error::{SuiErrorKind, UserInputError};
25use sui_types::execution::TypeLayoutStore;
26use sui_types::global_state_hash::GlobalStateHash;
27use sui_types::message_envelope::Message;
28use sui_types::storage::{
29    BackingPackageStore, FullObjectKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore,
30    get_module, get_package,
31};
32use sui_types::sui_system_state::get_sui_system_state;
33use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure};
34use tokio::time::Instant;
35use tracing::{debug, info, trace};
36use typed_store::traits::Map;
37use typed_store::{
38    TypedStoreError,
39    rocks::{DBBatch, DBMap},
40};
41
42use super::authority_store_tables::LiveObject;
43use super::{authority_store_tables::AuthorityPerpetualTables, *};
44use mysten_common::ZipDebugEqIteratorExt;
45use mysten_common::sync::notify_read::NotifyRead;
46use sui_types::effects::{TransactionEffects, TransactionEvents};
47use sui_types::gas_coin::TOTAL_SUPPLY_MIST;
48
49struct AuthorityStoreMetrics {
50    sui_conservation_check_latency: IntGauge,
51    sui_conservation_live_object_count: IntGauge,
52    sui_conservation_live_object_size: IntGauge,
53    sui_conservation_imbalance: IntGauge,
54    sui_conservation_storage_fund: IntGauge,
55    sui_conservation_storage_fund_imbalance: IntGauge,
56    epoch_flags: IntGaugeVec,
57}
58
59impl AuthorityStoreMetrics {
60    pub fn new(registry: &Registry) -> Self {
61        Self {
62            sui_conservation_check_latency: register_int_gauge_with_registry!(
63                "sui_conservation_check_latency",
64                "Number of seconds took to scan all live objects in the store for SUI conservation check",
65                registry,
66            ).unwrap(),
67            sui_conservation_live_object_count: register_int_gauge_with_registry!(
68                "sui_conservation_live_object_count",
69                "Number of live objects in the store",
70                registry,
71            ).unwrap(),
72            sui_conservation_live_object_size: register_int_gauge_with_registry!(
73                "sui_conservation_live_object_size",
74                "Size in bytes of live objects in the store",
75                registry,
76            ).unwrap(),
77            sui_conservation_imbalance: register_int_gauge_with_registry!(
78                "sui_conservation_imbalance",
79                "Total amount of SUI in the network - 10B * 10^9. This delta shows the amount of imbalance",
80                registry,
81            ).unwrap(),
82            sui_conservation_storage_fund: register_int_gauge_with_registry!(
83                "sui_conservation_storage_fund",
84                "Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
85                registry,
86            ).unwrap(),
87            sui_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
88                "sui_conservation_storage_fund_imbalance",
89                "Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
90                registry,
91            ).unwrap(),
92            epoch_flags: register_int_gauge_vec_with_registry!(
93                "epoch_flags",
94                "Local flags of the currently running epoch",
95                &["flag"],
96                registry,
97            ).unwrap(),
98        }
99    }
100}
101
102/// ALL_OBJ_VER determines whether we want to store all past
103/// versions of every object in the store. Authority doesn't store
104/// them, but other entities such as replicas will.
105/// S is a template on Authority signature state. This allows SuiDataStore to be used on either
106/// authorities or non-authorities. Specifically, when storing transactions and effects,
107/// S allows SuiDataStore to either store the authority signed version or unsigned version.
108pub struct AuthorityStore {
109    pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
110
111    pub(crate) root_state_notify_read:
112        NotifyRead<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
113
114    /// Whether to enable expensive SUI conservation check at epoch boundaries.
115    enable_epoch_sui_conservation_check: bool,
116
117    metrics: AuthorityStoreMetrics,
118}
119
120pub type ExecutionLockReadGuard<'a> = tokio::sync::RwLockReadGuard<'a, EpochId>;
121pub type ExecutionLockWriteGuard<'a> = tokio::sync::RwLockWriteGuard<'a, EpochId>;
122
123impl AuthorityStore {
124    /// Open an authority store by directory path.
125    /// If the store is empty, initialize it using genesis.
126    pub async fn open(
127        perpetual_tables: Arc<AuthorityPerpetualTables>,
128        genesis: &Genesis,
129        config: &NodeConfig,
130        registry: &Registry,
131    ) -> SuiResult<Arc<Self>> {
132        let enable_epoch_sui_conservation_check = config
133            .expensive_safety_check_config
134            .enable_epoch_sui_conservation_check();
135
136        let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
137            info!("Creating new epoch start config from genesis");
138
139            #[allow(unused_mut)]
140            let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
141            fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
142                info!("Setting initial epoch flags to {:?}", flags);
143                initial_epoch_flags = flags;
144            });
145
146            let epoch_start_configuration = EpochStartConfiguration::new(
147                genesis.sui_system_object().into_epoch_start_state(),
148                *genesis.checkpoint().digest(),
149                &genesis.objects(),
150                initial_epoch_flags,
151            )?;
152            perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
153            epoch_start_configuration
154        } else {
155            info!("Loading epoch start config from DB");
156            perpetual_tables
157                .epoch_start_configuration
158                .get(&())?
159                .expect("Epoch start configuration must be set in non-empty DB")
160        };
161        let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
162        info!("Epoch start config: {:?}", epoch_start_configuration);
163        info!("Cur epoch: {:?}", cur_epoch);
164        let this = Self::open_inner(
165            genesis,
166            perpetual_tables,
167            enable_epoch_sui_conservation_check,
168            registry,
169        )
170        .await?;
171        this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
172        Ok(this)
173    }
174
175    pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
176        for flag in old {
177            self.metrics
178                .epoch_flags
179                .with_label_values(&[&flag.to_string()])
180                .set(0);
181        }
182        for flag in new {
183            self.metrics
184                .epoch_flags
185                .with_label_values(&[&flag.to_string()])
186                .set(1);
187        }
188    }
189
190    // NB: This must only be called at time of reconfiguration. We take the execution lock write
191    // guard as an argument to ensure that this is the case.
192    pub fn clear_object_per_epoch_marker_table(
193        &self,
194        _execution_guard: &ExecutionLockWriteGuard<'_>,
195    ) -> SuiResult<()> {
196        // We can safely delete all entries in the per epoch marker table since this is only called
197        // at epoch boundaries (during reconfiguration). Therefore any entries that currently
198        // exist can be removed. Because of this we can use the `schedule_delete_all` method.
199        self.perpetual_tables
200            .object_per_epoch_marker_table
201            .schedule_delete_all()?;
202        #[cfg(not(tidehunter))]
203        {
204            self.perpetual_tables
205                .object_per_epoch_marker_table_v2
206                .schedule_delete_all()?;
207        }
208        #[cfg(tidehunter)]
209        {
210            self.perpetual_tables
211                .object_per_epoch_marker_table_v2
212                .drop_cells_in_range_raw(&EpochMarkerKey::MIN_KEY, &EpochMarkerKey::MAX_KEY)?;
213        }
214        Ok(())
215    }
216
217    pub async fn open_with_committee_for_testing(
218        perpetual_tables: Arc<AuthorityPerpetualTables>,
219        committee: &Committee,
220        genesis: &Genesis,
221    ) -> SuiResult<Arc<Self>> {
222        // TODO: Since we always start at genesis, the committee should be technically the same
223        // as the genesis committee.
224        assert_eq!(committee.epoch, 0);
225        Self::open_inner(genesis, perpetual_tables, true, &Registry::new()).await
226    }
227
228    async fn open_inner(
229        genesis: &Genesis,
230        perpetual_tables: Arc<AuthorityPerpetualTables>,
231        enable_epoch_sui_conservation_check: bool,
232        registry: &Registry,
233    ) -> SuiResult<Arc<Self>> {
234        let store = Arc::new(Self {
235            perpetual_tables,
236            root_state_notify_read: NotifyRead::<
237                EpochId,
238                (CheckpointSequenceNumber, GlobalStateHash),
239            >::new(),
240            enable_epoch_sui_conservation_check,
241            metrics: AuthorityStoreMetrics::new(registry),
242        });
243        // Only initialize an empty database.
244        if store
245            .database_is_empty()
246            .expect("Database read should not fail at init.")
247        {
248            store
249                .bulk_insert_genesis_objects(genesis.objects())
250                .expect("Cannot bulk insert genesis objects");
251
252            // insert txn and effects of genesis
253            let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
254
255            store
256                .perpetual_tables
257                .transactions
258                .insert(transaction.digest(), transaction.serializable_ref())
259                .unwrap();
260
261            store
262                .perpetual_tables
263                .effects
264                .insert(&genesis.effects().digest(), genesis.effects())
265                .unwrap();
266            // We don't insert the effects to executed_effects yet because the genesis tx hasn't but will be executed.
267            // This is important for fullnodes to be able to generate indexing data right now.
268
269            if genesis.effects().events_digest().is_some() {
270                store
271                    .perpetual_tables
272                    .events_2
273                    .insert(transaction.digest(), genesis.events())
274                    .unwrap();
275            }
276        }
277
278        Ok(store)
279    }
280
281    /// Open authority store without any operations that require
282    /// genesis, such as constructing EpochStartConfiguration
283    /// or inserting genesis objects.
284    pub fn open_no_genesis(
285        perpetual_tables: Arc<AuthorityPerpetualTables>,
286        enable_epoch_sui_conservation_check: bool,
287        registry: &Registry,
288    ) -> SuiResult<Arc<Self>> {
289        let store = Arc::new(Self {
290            perpetual_tables,
291            root_state_notify_read: NotifyRead::<
292                EpochId,
293                (CheckpointSequenceNumber, GlobalStateHash),
294            >::new(),
295            enable_epoch_sui_conservation_check,
296            metrics: AuthorityStoreMetrics::new(registry),
297        });
298        Ok(store)
299    }
300
301    pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
302        self.perpetual_tables.get_recovery_epoch_at_restart()
303    }
304
305    pub fn get_effects(
306        &self,
307        effects_digest: &TransactionEffectsDigest,
308    ) -> SuiResult<Option<TransactionEffects>> {
309        Ok(self.perpetual_tables.effects.get(effects_digest)?)
310    }
311
312    pub fn get_events(
313        &self,
314        digest: &TransactionDigest,
315    ) -> Result<Option<TransactionEvents>, TypedStoreError> {
316        self.perpetual_tables.events_2.get(digest)
317    }
318
319    pub fn multi_get_events(
320        &self,
321        event_digests: &[TransactionDigest],
322    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
323        Ok(event_digests
324            .iter()
325            .map(|digest| self.get_events(digest))
326            .collect::<Result<Vec<_>, _>>()?)
327    }
328
329    pub fn get_unchanged_loaded_runtime_objects(
330        &self,
331        digest: &TransactionDigest,
332    ) -> Result<Option<Vec<ObjectKey>>, TypedStoreError> {
333        self.perpetual_tables
334            .unchanged_loaded_runtime_objects
335            .get(digest)
336    }
337
338    pub fn multi_get_effects<'a>(
339        &self,
340        effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
341    ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
342        self.perpetual_tables.effects.multi_get(effects_digests)
343    }
344
345    pub fn get_executed_effects(
346        &self,
347        tx_digest: &TransactionDigest,
348    ) -> Result<Option<TransactionEffects>, TypedStoreError> {
349        let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
350        match effects_digest {
351            Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
352            None => Ok(None),
353        }
354    }
355
356    /// Given a list of transaction digests, returns a list of the corresponding effects only if they have been
357    /// executed. For transactions that have not been executed, None is returned.
358    pub fn multi_get_executed_effects_digests(
359        &self,
360        digests: &[TransactionDigest],
361    ) -> Result<Vec<Option<TransactionEffectsDigest>>, TypedStoreError> {
362        self.perpetual_tables.executed_effects.multi_get(digests)
363    }
364
365    /// Given a list of transaction digests, returns a list of the corresponding effects only if they have been
366    /// executed. For transactions that have not been executed, None is returned.
367    pub fn multi_get_executed_effects(
368        &self,
369        digests: &[TransactionDigest],
370    ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
371        let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
372        let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
373        let mut tx_to_effects_map = effects
374            .into_iter()
375            .flatten()
376            .map(|effects| (*effects.transaction_digest(), effects))
377            .collect::<HashMap<_, _>>();
378        Ok(digests
379            .iter()
380            .map(|digest| tx_to_effects_map.remove(digest))
381            .collect())
382    }
383
384    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> SuiResult<bool> {
385        Ok(self
386            .perpetual_tables
387            .executed_effects
388            .contains_key(digest)?)
389    }
390
391    pub fn get_marker_value(
392        &self,
393        object_key: FullObjectKey,
394        epoch_id: EpochId,
395    ) -> SuiResult<Option<MarkerValue>> {
396        Ok(self
397            .perpetual_tables
398            .object_per_epoch_marker_table_v2
399            .get(&EpochMarkerKey(epoch_id, object_key))?)
400    }
401
402    pub fn get_latest_marker(
403        &self,
404        object_id: FullObjectID,
405        epoch_id: EpochId,
406    ) -> SuiResult<Option<(SequenceNumber, MarkerValue)>> {
407        let min_key = EpochMarkerKey(epoch_id, FullObjectKey::min_for_id(&object_id));
408        let max_key = EpochMarkerKey(epoch_id, FullObjectKey::max_for_id(&object_id));
409
410        let marker_entry = self
411            .perpetual_tables
412            .object_per_epoch_marker_table_v2
413            .reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
414            .next();
415        match marker_entry {
416            Some(Ok((EpochMarkerKey(epoch, key), marker))) => {
417                // because of the iterator bounds these cannot fail
418                assert_eq!(epoch, epoch_id);
419                assert_eq!(key.id(), object_id);
420                Ok(Some((key.version(), marker)))
421            }
422            Some(Err(e)) => Err(e.into()),
423            None => Ok(None),
424        }
425    }
426
427    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
428    pub fn deprecated_insert_finalized_transactions(
429        &self,
430        digests: &[TransactionDigest],
431        epoch: EpochId,
432        sequence: CheckpointSequenceNumber,
433    ) -> SuiResult {
434        let mut batch = self
435            .perpetual_tables
436            .executed_transactions_to_checkpoint
437            .batch();
438        batch.insert_batch(
439            &self.perpetual_tables.executed_transactions_to_checkpoint,
440            digests.iter().map(|d| (*d, (epoch, sequence))),
441        )?;
442        batch.write()?;
443        trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
444        Ok(())
445    }
446
447    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
448    pub fn deprecated_get_transaction_checkpoint(
449        &self,
450        digest: &TransactionDigest,
451    ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
452        Ok(self
453            .perpetual_tables
454            .executed_transactions_to_checkpoint
455            .get(digest)?)
456    }
457
458    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
459    pub fn deprecated_multi_get_transaction_checkpoint(
460        &self,
461        digests: &[TransactionDigest],
462    ) -> SuiResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
463        Ok(self
464            .perpetual_tables
465            .executed_transactions_to_checkpoint
466            .multi_get(digests)?
467            .into_iter()
468            .collect())
469    }
470
471    /// Returns true if there are no objects in the database
472    pub fn database_is_empty(&self) -> SuiResult<bool> {
473        self.perpetual_tables.database_is_empty()
474    }
475
476    pub fn object_exists_by_key(
477        &self,
478        object_id: &ObjectID,
479        version: VersionNumber,
480    ) -> SuiResult<bool> {
481        Ok(self
482            .perpetual_tables
483            .objects
484            .contains_key(&ObjectKey(*object_id, version))?)
485    }
486
487    pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<bool>> {
488        Ok(self
489            .perpetual_tables
490            .objects
491            .multi_contains_keys(object_keys.to_vec())?
492            .into_iter()
493            .collect())
494    }
495
496    fn get_object_ref_prior_to_key(
497        &self,
498        object_id: &ObjectID,
499        version: VersionNumber,
500    ) -> Result<Option<ObjectRef>, SuiError> {
501        let Some(prior_version) = version.one_before() else {
502            return Ok(None);
503        };
504        let mut iterator = self
505            .perpetual_tables
506            .objects
507            .reversed_safe_iter_with_bounds(
508                Some(ObjectKey::min_for_id(object_id)),
509                Some(ObjectKey(*object_id, prior_version)),
510            )?;
511
512        if let Some((object_key, value)) = iterator.next().transpose()?
513            && object_key.0 == *object_id
514        {
515            return Ok(Some(
516                self.perpetual_tables.object_reference(&object_key, value)?,
517            ));
518        }
519        Ok(None)
520    }
521
522    pub fn multi_get_objects_by_key(
523        &self,
524        object_keys: &[ObjectKey],
525    ) -> Result<Vec<Option<Object>>, SuiError> {
526        let wrappers = self
527            .perpetual_tables
528            .objects
529            .multi_get(object_keys.to_vec())?;
530        let mut ret = vec![];
531
532        for (idx, w) in wrappers.into_iter().enumerate() {
533            ret.push(
534                w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
535                    .transpose()?
536                    .flatten(),
537            );
538        }
539        Ok(ret)
540    }
541
542    /// Get many objects
543    pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, SuiError> {
544        let mut result = Vec::new();
545        for id in objects {
546            result.push(self.get_object(id));
547        }
548        Ok(result)
549    }
550
551    // Methods to mutate the store
552
553    /// Insert a genesis object.
554    /// TODO: delete this method entirely (still used by authority_tests.rs)
555    pub(crate) fn insert_genesis_object(&self, object: Object) -> SuiResult {
556        // We only side load objects with a genesis parent transaction.
557        debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
558        let object_ref = object.compute_object_reference();
559        self.insert_object_direct(object_ref, &object)
560    }
561
562    /// Insert an object directly into the store, and also update relevant tables
563    /// NOTE: does not handle transaction lock.
564    /// This is used to insert genesis objects
565    fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> SuiResult {
566        let mut write_batch = self.perpetual_tables.objects.batch();
567
568        // Insert object
569        let store_object = get_store_object(object.clone());
570        write_batch.insert_batch(
571            &self.perpetual_tables.objects,
572            std::iter::once((ObjectKey::from(object_ref), store_object)),
573        )?;
574
575        // Update the index
576        if object.get_single_owner().is_some() {
577            // Only initialize lock for address owned objects.
578            if !object.is_child_object() {
579                self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref], false)?;
580            }
581        }
582
583        write_batch.write()?;
584
585        Ok(())
586    }
587
588    /// This function should only be used for initializing genesis and should remain private.
589    #[instrument(level = "debug", skip_all)]
590    pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> SuiResult<()> {
591        let mut batch = self.perpetual_tables.objects.batch();
592        let ref_and_objects: Vec<_> = objects
593            .iter()
594            .map(|o| (o.compute_object_reference(), o))
595            .collect();
596
597        batch.insert_batch(
598            &self.perpetual_tables.objects,
599            ref_and_objects
600                .iter()
601                .map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))),
602        )?;
603
604        let non_child_object_refs: Vec<_> = ref_and_objects
605            .iter()
606            .filter(|(_, object)| !object.is_child_object())
607            .map(|(oref, _)| *oref)
608            .collect();
609
610        self.initialize_live_object_markers_impl(
611            &mut batch,
612            &non_child_object_refs,
613            false, // is_force_reset
614        )?;
615
616        batch.write()?;
617
618        Ok(())
619    }
620
621    pub async fn bulk_insert_live_objects(
622        perpetual_db: Arc<AuthorityPerpetualTables>,
623        objects: Vec<LiveObject>,
624        expected_sha3_digest: &[u8; 32],
625        num_parallel_chunks: usize,
626    ) -> SuiResult<()> {
627        // Verify SHA3 over the full object set before inserting.
628        let mut hasher = Sha3_256::default();
629        for object in &objects {
630            hasher.update(object.object_reference().2.inner());
631        }
632        let sha3_digest = hasher.finalize().digest;
633        if *expected_sha3_digest != sha3_digest {
634            error!(
635                "Sha does not match! expected: {:?}, actual: {:?}",
636                expected_sha3_digest, sha3_digest
637            );
638            return Err(SuiError::from("Sha does not match"));
639        }
640
641        let chunk_size = objects.len().div_ceil(num_parallel_chunks).max(1);
642        let mut remaining = objects;
643        let mut handles = Vec::new();
644        while !remaining.is_empty() {
645            let take = chunk_size.min(remaining.len());
646            let chunk: Vec<LiveObject> = remaining.drain(..take).collect();
647            let db = perpetual_db.clone();
648            handles.push(tokio::task::spawn_blocking(move || {
649                Self::insert_objects_chunk(db, chunk)
650            }));
651        }
652        for handle in handles {
653            handle.await.expect("insert task panicked")?;
654        }
655        Ok(())
656    }
657
658    fn insert_objects_chunk(
659        perpetual_db: Arc<AuthorityPerpetualTables>,
660        objects: Vec<LiveObject>,
661    ) -> SuiResult<()> {
662        let mut batch = perpetual_db.objects.batch();
663        let mut written = 0usize;
664        const MAX_BATCH_SIZE: usize = 100_000;
665        for object in objects {
666            match object {
667                LiveObject::Normal(object) => {
668                    let store_object_wrapper = get_store_object(object.clone());
669                    batch.insert_batch(
670                        &perpetual_db.objects,
671                        std::iter::once((
672                            ObjectKey::from(object.compute_object_reference()),
673                            store_object_wrapper,
674                        )),
675                    )?;
676                    if !object.is_child_object() {
677                        Self::initialize_live_object_markers(
678                            &perpetual_db.live_owned_object_markers,
679                            &mut batch,
680                            &[object.compute_object_reference()],
681                            true, // is_force_reset
682                        )?;
683                    }
684                }
685                LiveObject::Wrapped(object_key) => {
686                    batch.insert_batch(
687                        &perpetual_db.objects,
688                        std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
689                            object_key,
690                            StoreObject::Wrapped.into(),
691                        )),
692                    )?;
693                }
694            }
695            written += 1;
696            if written > MAX_BATCH_SIZE {
697                batch.write()?;
698                batch = perpetual_db.objects.batch();
699                written = 0;
700            }
701        }
702        batch.write()?;
703        Ok(())
704    }
705
706    pub fn set_epoch_start_configuration(
707        &self,
708        epoch_start_configuration: &EpochStartConfiguration,
709    ) -> SuiResult {
710        self.perpetual_tables
711            .set_epoch_start_configuration(epoch_start_configuration)?;
712        Ok(())
713    }
714
715    pub fn get_epoch_start_configuration(&self) -> SuiResult<Option<EpochStartConfiguration>> {
716        Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
717    }
718
719    /// Updates the state resulting from the execution of a certificate.
720    ///
721    /// Internally it checks that all locks for active inputs are at the correct
722    /// version, and then writes objects, certificates, parents and clean up locks atomically.
723    #[instrument(level = "debug", skip_all)]
724    pub fn build_db_batch(
725        &self,
726        epoch_id: EpochId,
727        tx_outputs: &[Arc<TransactionOutputs>],
728    ) -> SuiResult<DBBatch> {
729        let mut write_batch = self.perpetual_tables.transactions.batch();
730        for outputs in tx_outputs {
731            self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
732        }
733        // test crashing before writing the batch
734        fail_point!("crash");
735
736        trace!(
737            "built batch for committed transactions: {:?}",
738            tx_outputs
739                .iter()
740                .map(|tx| tx.transaction.digest())
741                .collect::<Vec<_>>()
742        );
743
744        // test crashing before notifying
745        fail_point!("crash");
746
747        Ok(write_batch)
748    }
749
750    fn write_one_transaction_outputs(
751        &self,
752        write_batch: &mut DBBatch,
753        epoch_id: EpochId,
754        tx_outputs: &TransactionOutputs,
755    ) -> SuiResult {
756        let TransactionOutputs {
757            transaction,
758            effects,
759            markers,
760            wrapped,
761            deleted,
762            written,
763            events,
764            unchanged_loaded_runtime_objects,
765            locks_to_delete,
766            new_locks_to_init,
767            ..
768        } = tx_outputs;
769
770        let effects_digest = effects.digest();
771        let transaction_digest = transaction.digest();
772        // effects must be inserted before the corresponding dependent entries
773        // because they carry epoch information necessary for correct pruning via relocation filters
774        write_batch
775            .insert_batch(
776                &self.perpetual_tables.effects,
777                [(effects_digest, effects.clone())],
778            )?
779            .insert_batch(
780                &self.perpetual_tables.executed_effects,
781                [(transaction_digest, effects_digest)],
782            )?;
783
784        // Store the certificate indexed by transaction digest
785        write_batch.insert_batch(
786            &self.perpetual_tables.transactions,
787            iter::once((transaction_digest, transaction.serializable_ref())),
788        )?;
789
790        write_batch.insert_batch(
791            &self.perpetual_tables.executed_transaction_digests,
792            [((epoch_id, *transaction_digest), ())],
793        )?;
794
795        // Add batched writes for objects and locks.
796        write_batch.insert_batch(
797            &self.perpetual_tables.object_per_epoch_marker_table_v2,
798            markers
799                .iter()
800                .map(|(key, marker_value)| (EpochMarkerKey(epoch_id, *key), *marker_value)),
801        )?;
802        write_batch.insert_batch(
803            &self.perpetual_tables.objects,
804            deleted
805                .iter()
806                .map(|key| (key, StoreObject::Deleted))
807                .chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
808                .map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
809        )?;
810
811        // Insert each output object into the stores
812        let new_objects = written.iter().map(|(id, new_object)| {
813            let version = new_object.version();
814            trace!(?id, ?version, "writing object");
815            let store_object = get_store_object(new_object.clone());
816            (ObjectKey(*id, version), store_object)
817        });
818
819        write_batch.insert_batch(&self.perpetual_tables.objects, new_objects)?;
820
821        // Write events into the new table keyed off of transaction_digest
822        if effects.events_digest().is_some() {
823            write_batch.insert_batch(
824                &self.perpetual_tables.events_2,
825                [(transaction_digest, events)],
826            )?;
827        }
828
829        // Write unchanged_loaded_runtime_objects
830        if !unchanged_loaded_runtime_objects.is_empty() {
831            write_batch.insert_batch(
832                &self.perpetual_tables.unchanged_loaded_runtime_objects,
833                [(transaction_digest, unchanged_loaded_runtime_objects)],
834            )?;
835        }
836
837        self.initialize_live_object_markers_impl(write_batch, new_locks_to_init, false)?;
838
839        // Note: deletes locks for received objects as well (but not for objects that were in
840        // `Receiving` arguments which were not received)
841        self.delete_live_object_markers(write_batch, locks_to_delete)?;
842
843        debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
844
845        Ok(())
846    }
847
848    /// Commits transactions only (not effects or other transaction outputs) to the db.
849    /// See ExecutionCache::persist_transaction for more info
850    pub(crate) fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> SuiResult {
851        let mut batch = self.perpetual_tables.transactions.batch();
852        batch.insert_batch(
853            &self.perpetual_tables.transactions,
854            [(tx.digest(), tx.clone().into_unsigned().serializable_ref())],
855        )?;
856        batch.write()?;
857        Ok(())
858    }
859
860    /// Gets ObjectLockInfo that represents state of lock on an object.
861    /// Returns UserInputError::ObjectNotFound if cannot find lock record for this object
862    pub(crate) fn get_lock(
863        &self,
864        obj_ref: ObjectRef,
865        epoch_store: &AuthorityPerEpochStore,
866    ) -> SuiLockResult {
867        if self
868            .perpetual_tables
869            .live_owned_object_markers
870            .get(&obj_ref)?
871            .is_none()
872        {
873            return Ok(ObjectLockStatus::LockedAtDifferentVersion {
874                locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
875            });
876        }
877
878        let tables = epoch_store.tables()?;
879        let epoch_id = epoch_store.epoch();
880
881        if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
882            Ok(ObjectLockStatus::LockedToTx {
883                locked_by_tx: LockDetailsDeprecated {
884                    epoch: epoch_id,
885                    tx_digest,
886                },
887            })
888        } else {
889            Ok(ObjectLockStatus::Initialized)
890        }
891    }
892
893    /// Returns UserInputError::ObjectNotFound if no lock records found for this object.
894    pub(crate) fn get_latest_live_version_for_object_id(
895        &self,
896        object_id: ObjectID,
897    ) -> SuiResult<ObjectRef> {
898        let mut iterator = self
899            .perpetual_tables
900            .live_owned_object_markers
901            .reversed_safe_iter_with_bounds(
902                None,
903                Some((object_id, SequenceNumber::MAX, ObjectDigest::MAX)),
904            )?;
905        Ok(iterator
906            .next()
907            .transpose()?
908            .and_then(|value| {
909                if value.0.0 == object_id {
910                    Some(value)
911                } else {
912                    None
913                }
914            })
915            .ok_or_else(|| {
916                SuiError::from(UserInputError::ObjectNotFound {
917                    object_id,
918                    version: None,
919                })
920            })?
921            .0)
922    }
923
924    /// Checks multiple object locks exist.
925    /// Returns UserInputError::ObjectNotFound if cannot find lock record for at least one of the objects.
926    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if at least one object lock is not initialized
927    ///     at the given version.
928    pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> SuiResult {
929        let locks = self
930            .perpetual_tables
931            .live_owned_object_markers
932            .multi_get(objects)?;
933        for (lock, obj_ref) in locks.into_iter().zip_debug_eq(objects) {
934            if lock.is_none() {
935                let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
936                fp_bail!(
937                    UserInputError::ObjectVersionUnavailableForConsumption {
938                        provided_obj_ref: *obj_ref,
939                        current_version: latest_lock.1
940                    }
941                    .into()
942                );
943            }
944        }
945        Ok(())
946    }
947
948    /// Initialize a lock to None (but exists) for a given list of ObjectRefs.
949    /// Returns SuiErrorKind::ObjectLockAlreadyInitialized if the lock already exists and is locked to a transaction
950    fn initialize_live_object_markers_impl(
951        &self,
952        write_batch: &mut DBBatch,
953        objects: &[ObjectRef],
954        is_force_reset: bool,
955    ) -> SuiResult {
956        AuthorityStore::initialize_live_object_markers(
957            &self.perpetual_tables.live_owned_object_markers,
958            write_batch,
959            objects,
960            is_force_reset,
961        )
962    }
963
964    pub fn initialize_live_object_markers(
965        live_object_marker_table: &DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
966        write_batch: &mut DBBatch,
967        objects: &[ObjectRef],
968        is_force_reset: bool,
969    ) -> SuiResult {
970        trace!(?objects, "initialize_locks");
971
972        if !is_force_reset {
973            let live_object_markers = live_object_marker_table.multi_get(objects)?;
974            // If any live_object_markers exist and are not None, return errors for them
975            // Note we don't check if there is a pre-existing lock. this is because initializing the live
976            // object marker will not overwrite the lock and cause the validator to equivocate.
977            let existing_live_object_markers: Vec<ObjectRef> = live_object_markers
978                .iter()
979                .zip_debug_eq(objects)
980                .filter_map(|(lock_opt, objref)| {
981                    lock_opt.clone().flatten().map(|_tx_digest| *objref)
982                })
983                .collect();
984            if !existing_live_object_markers.is_empty() {
985                info!(
986                    ?existing_live_object_markers,
987                    "Cannot initialize live_object_markers because some exist already"
988                );
989                return Err(SuiErrorKind::ObjectLockAlreadyInitialized {
990                    refs: existing_live_object_markers,
991                }
992                .into());
993            }
994        }
995
996        write_batch.insert_batch(
997            live_object_marker_table,
998            objects.iter().map(|obj_ref| (obj_ref, None)),
999        )?;
1000        Ok(())
1001    }
1002
1003    /// Removes locks for a given list of ObjectRefs.
1004    fn delete_live_object_markers(
1005        &self,
1006        write_batch: &mut DBBatch,
1007        objects: &[ObjectRef],
1008    ) -> SuiResult {
1009        trace!(?objects, "delete_locks");
1010        write_batch.delete_batch(
1011            &self.perpetual_tables.live_owned_object_markers,
1012            objects.iter(),
1013        )?;
1014        Ok(())
1015    }
1016
1017    /// Return the object with version less then or eq to the provided seq number.
1018    /// This is used by indexer to find the correct version of dynamic field child object.
1019    /// We do not store the version of the child object, but because of lamport timestamp,
1020    /// we know the child must have version number less then or eq to the parent.
1021    pub fn find_object_lt_or_eq_version(
1022        &self,
1023        object_id: ObjectID,
1024        version: SequenceNumber,
1025    ) -> SuiResult<Option<Object>> {
1026        self.perpetual_tables
1027            .find_object_lt_or_eq_version(object_id, version)
1028    }
1029
1030    /// Returns the latest object reference we have for this object_id in the objects table.
1031    ///
1032    /// The method may also return the reference to a deleted object with a digest of
1033    /// ObjectDigest::deleted() or ObjectDigest::wrapped() and lamport version
1034    /// of a transaction that deleted the object.
1035    /// Note that a deleted object may re-appear if the deletion was the result of the object
1036    /// being wrapped in another object.
1037    ///
1038    /// If no entry for the object_id is found, return None.
1039    pub fn get_latest_object_ref_or_tombstone(
1040        &self,
1041        object_id: ObjectID,
1042    ) -> Result<Option<ObjectRef>, SuiError> {
1043        self.perpetual_tables
1044            .get_latest_object_ref_or_tombstone(object_id)
1045    }
1046
1047    /// Returns the latest object we have for this object_id in the objects table.
1048    ///
1049    /// If no entry for the object_id is found, return None.
1050    pub fn get_latest_object_or_tombstone(
1051        &self,
1052        object_id: ObjectID,
1053    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, SuiError> {
1054        let Some((object_key, store_object)) = self
1055            .perpetual_tables
1056            .get_latest_object_or_tombstone(object_id)?
1057        else {
1058            return Ok(None);
1059        };
1060
1061        if let Some(object_ref) = self
1062            .perpetual_tables
1063            .tombstone_reference(&object_key, &store_object)?
1064        {
1065            return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
1066        }
1067
1068        let object = self
1069            .perpetual_tables
1070            .object(&object_key, store_object)?
1071            .expect("Non tombstone store object could not be converted to object");
1072
1073        Ok(Some((object_key, ObjectOrTombstone::Object(object))))
1074    }
1075
1076    pub fn insert_transaction_and_effects(
1077        &self,
1078        transaction: &VerifiedTransaction,
1079        transaction_effects: &TransactionEffects,
1080    ) -> Result<(), TypedStoreError> {
1081        let mut write_batch = self.perpetual_tables.transactions.batch();
1082        // effects must be inserted before the corresponding transaction entry
1083        // because they carry epoch information necessary for correct pruning via relocation filters
1084        write_batch
1085            .insert_batch(
1086                &self.perpetual_tables.effects,
1087                [(transaction_effects.digest(), transaction_effects)],
1088            )?
1089            .insert_batch(
1090                &self.perpetual_tables.transactions,
1091                [(transaction.digest(), transaction.serializable_ref())],
1092            )?;
1093
1094        write_batch.write()?;
1095        Ok(())
1096    }
1097
1098    pub fn multi_insert_transaction_and_effects<'a>(
1099        &self,
1100        transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
1101    ) -> Result<(), TypedStoreError> {
1102        let mut write_batch = self.perpetual_tables.transactions.batch();
1103        for tx in transactions {
1104            write_batch
1105                .insert_batch(
1106                    &self.perpetual_tables.effects,
1107                    [(tx.effects.digest(), &tx.effects)],
1108                )?
1109                .insert_batch(
1110                    &self.perpetual_tables.transactions,
1111                    [(tx.transaction.digest(), tx.transaction.serializable_ref())],
1112                )?;
1113        }
1114
1115        write_batch.write()?;
1116        Ok(())
1117    }
1118
1119    pub fn multi_get_transaction_blocks(
1120        &self,
1121        tx_digests: &[TransactionDigest],
1122    ) -> Result<Vec<Option<VerifiedTransaction>>, TypedStoreError> {
1123        self.perpetual_tables
1124            .transactions
1125            .multi_get(tx_digests)
1126            .map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())
1127    }
1128
1129    pub fn get_transaction_block(
1130        &self,
1131        tx_digest: &TransactionDigest,
1132    ) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
1133        self.perpetual_tables
1134            .transactions
1135            .get(tx_digest)
1136            .map(|v| v.map(|v| v.into()))
1137    }
1138
1139    pub fn list_transactions_from(
1140        &self,
1141        start: Option<TransactionDigest>,
1142        limit: usize,
1143    ) -> Result<Vec<TransactionDigest>, TypedStoreError> {
1144        self.perpetual_tables.list_transactions_from(start, limit)
1145    }
1146
1147    pub fn get_executed_effects_digest_for_tx(
1148        &self,
1149        tx_digest: &TransactionDigest,
1150    ) -> Result<Option<TransactionEffectsDigest>, TypedStoreError> {
1151        self.perpetual_tables.get_executed_effects_digest(tx_digest)
1152    }
1153
1154    /// This function reads the DB directly to get the system state object.
1155    /// If reconfiguration is happening at the same time, there is no guarantee whether we would be getting
1156    /// the old or the new system state object.
1157    /// Hence this function should only be called during RPC reads where data race is not a major concern.
1158    /// In general we should avoid this as much as possible.
1159    /// If the intent is for testing, you can use AuthorityState:: get_sui_system_state_object_for_testing.
1160    pub fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1161        get_sui_system_state(self.perpetual_tables.as_ref())
1162    }
1163
1164    pub fn expensive_check_sui_conservation<T>(
1165        self: &Arc<Self>,
1166        type_layout_store: T,
1167        old_epoch_store: &AuthorityPerEpochStore,
1168    ) -> SuiResult
1169    where
1170        T: TypeLayoutStore + Send + Copy,
1171    {
1172        if !self.enable_epoch_sui_conservation_check {
1173            return Ok(());
1174        }
1175
1176        let executor = old_epoch_store.executor();
1177        info!("Starting SUI conservation check. This may take a while..");
1178        let cur_time = Instant::now();
1179        let mut pending_objects = vec![];
1180        let mut count = 0;
1181        let mut size = 0;
1182        let (mut total_sui, mut total_storage_rebate) = thread::scope(|s| {
1183            let pending_tasks = FuturesUnordered::new();
1184            for o in self.iter_live_object_set(false) {
1185                match o {
1186                    LiveObject::Normal(object) => {
1187                        size += object.object_size_for_gas_metering();
1188                        count += 1;
1189                        pending_objects.push(object);
1190                        if count % 1_000_000 == 0 {
1191                            let mut task_objects = vec![];
1192                            mem::swap(&mut pending_objects, &mut task_objects);
1193                            pending_tasks.push(s.spawn(move || {
1194                                let mut layout_resolver = executor.type_layout_resolver(
1195                                    old_epoch_store.protocol_config(),
1196                                    Box::new(type_layout_store),
1197                                );
1198                                let mut total_storage_rebate = 0;
1199                                let mut total_sui = 0;
1200                                for object in task_objects {
1201                                    total_storage_rebate += object.storage_rebate;
1202                                    // get_total_sui includes storage rebate, however all storage rebate is
1203                                    // also stored in the storage fund, so we need to subtract it here.
1204                                    let object_contained_sui = match object
1205                                        .get_total_sui(layout_resolver.as_mut())
1206                                    {
1207                                        Ok(sui) => sui,
1208                                        Err(e)
1209                                            if old_epoch_store.get_chain()
1210                                                == sui_protocol_config::Chain::Testnet =>
1211                                        {
1212                                            error!(
1213                                                "Error calculating total SUI for object {:?}: {:?}",
1214                                                object.compute_object_reference(),
1215                                                e
1216                                            );
1217                                            0
1218                                        }
1219                                        Err(e) => panic!(
1220                                            "Error calculating total SUI for object {:?}: {:?}",
1221                                            object.compute_object_reference(),
1222                                            e
1223                                        ),
1224                                    };
1225                                    total_sui += object_contained_sui - object.storage_rebate;
1226                                }
1227                                if count % 50_000_000 == 0 {
1228                                    info!("Processed {} objects", count);
1229                                }
1230                                (total_sui, total_storage_rebate)
1231                            }));
1232                        }
1233                    }
1234                    LiveObject::Wrapped(_) => {
1235                        unreachable!("Explicitly asked to not include wrapped tombstones")
1236                    }
1237                }
1238            }
1239            pending_tasks.into_iter().fold((0, 0), |init, result| {
1240                let result = result.join().unwrap();
1241                (init.0 + result.0, init.1 + result.1)
1242            })
1243        });
1244        let mut layout_resolver = executor.type_layout_resolver(
1245            old_epoch_store.protocol_config(),
1246            Box::new(type_layout_store),
1247        );
1248        for object in pending_objects {
1249            total_storage_rebate += object.storage_rebate;
1250            total_sui +=
1251                object.get_total_sui(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
1252        }
1253        info!(
1254            "Scanned {} live objects, took {:?}",
1255            count,
1256            cur_time.elapsed()
1257        );
1258        self.metrics
1259            .sui_conservation_live_object_count
1260            .set(count as i64);
1261        self.metrics
1262            .sui_conservation_live_object_size
1263            .set(size as i64);
1264        self.metrics
1265            .sui_conservation_check_latency
1266            .set(cur_time.elapsed().as_secs() as i64);
1267
1268        // It is safe to call this function because we are in the middle of reconfiguration.
1269        let system_state = self
1270            .get_sui_system_state_object_unsafe()
1271            .expect("Reading sui system state object cannot fail")
1272            .into_sui_system_state_summary();
1273        let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
1274        info!(
1275            "Total SUI amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
1276            total_sui, storage_fund_balance, total_storage_rebate, system_state.epoch
1277        );
1278
1279        let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
1280        self.metrics
1281            .sui_conservation_storage_fund
1282            .set(storage_fund_balance as i64);
1283        self.metrics
1284            .sui_conservation_storage_fund_imbalance
1285            .set(imbalance);
1286        self.metrics
1287            .sui_conservation_imbalance
1288            .set((total_sui as i128 - TOTAL_SUPPLY_MIST as i128) as i64);
1289
1290        if let Some(expected_imbalance) = self
1291            .perpetual_tables
1292            .expected_storage_fund_imbalance
1293            .get(&())
1294            .expect("DB read cannot fail")
1295        {
1296            fp_ensure!(
1297                imbalance == expected_imbalance,
1298                SuiError::from(
1299                    format!(
1300                        "Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
1301                        system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
1302                    ).as_str()
1303                )
1304            );
1305        } else {
1306            self.perpetual_tables
1307                .expected_storage_fund_imbalance
1308                .insert(&(), &imbalance)
1309                .expect("DB write cannot fail");
1310        }
1311
1312        if let Some(expected_sui) = self
1313            .perpetual_tables
1314            .expected_network_sui_amount
1315            .get(&())
1316            .expect("DB read cannot fail")
1317        {
1318            fp_ensure!(
1319                total_sui == expected_sui,
1320                SuiError::from(
1321                    format!(
1322                        "Inconsistent state detected at epoch {}: total sui: {}, expecting {}",
1323                        system_state.epoch, total_sui, expected_sui
1324                    )
1325                    .as_str()
1326                )
1327            );
1328        } else {
1329            self.perpetual_tables
1330                .expected_network_sui_amount
1331                .insert(&(), &total_sui)
1332                .expect("DB write cannot fail");
1333        }
1334
1335        Ok(())
1336    }
1337
1338    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
1339    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
1340    #[instrument(level = "error", skip_all)]
1341    pub fn maybe_reaccumulate_state_hash(
1342        &self,
1343        cur_epoch_store: &AuthorityPerEpochStore,
1344        new_protocol_version: ProtocolVersion,
1345    ) {
1346        let old_simplified_unwrap_then_delete = cur_epoch_store
1347            .protocol_config()
1348            .simplified_unwrap_then_delete();
1349        let new_simplified_unwrap_then_delete =
1350            ProtocolConfig::get_for_version(new_protocol_version, cur_epoch_store.get_chain())
1351                .simplified_unwrap_then_delete();
1352        // If in the new epoch the simplified_unwrap_then_delete is enabled for the first time,
1353        // we re-accumulate state root.
1354        let should_reaccumulate =
1355            !old_simplified_unwrap_then_delete && new_simplified_unwrap_then_delete;
1356        if !should_reaccumulate {
1357            return;
1358        }
1359        info!(
1360            "[Re-accumulate] simplified_unwrap_then_delete is enabled in the new protocol version, re-accumulating state hash"
1361        );
1362        let cur_time = Instant::now();
1363        std::thread::scope(|s| {
1364            let pending_tasks = FuturesUnordered::new();
1365            // Shard the object IDs into different ranges so that we can process them in parallel.
1366            // We divide the range into 2^BITS number of ranges. To do so we use the highest BITS bits
1367            // to mark the starting/ending point of the range. For example, when BITS = 5, we
1368            // divide the range into 32 ranges, and the first few ranges are:
1369            // 00000000_.... to 00000111_....
1370            // 00001000_.... to 00001111_....
1371            // 00010000_.... to 00010111_....
1372            // and etc.
1373            const BITS: u8 = 5;
1374            for index in 0u8..(1 << BITS) {
1375                pending_tasks.push(s.spawn(move || {
1376                    let mut id_bytes = [0; ObjectID::LENGTH];
1377                    id_bytes[0] = index << (8 - BITS);
1378                    let start_id = ObjectID::new(id_bytes);
1379
1380                    id_bytes[0] |= (1 << (8 - BITS)) - 1;
1381                    for element in id_bytes.iter_mut().skip(1) {
1382                        *element = u8::MAX;
1383                    }
1384                    let end_id = ObjectID::new(id_bytes);
1385
1386                    info!(
1387                        "[Re-accumulate] Scanning object ID range {:?}..{:?}",
1388                        start_id, end_id
1389                    );
1390                    let mut prev = (
1391                        ObjectKey::min_for_id(&ObjectID::ZERO),
1392                        StoreObjectWrapper::V1(StoreObject::Deleted),
1393                    );
1394                    let mut object_scanned: u64 = 0;
1395                    let mut wrapped_objects_to_remove = vec![];
1396                    for db_result in self.perpetual_tables.objects.safe_range_iter(
1397                        ObjectKey::min_for_id(&start_id)..=ObjectKey::max_for_id(&end_id),
1398                    ) {
1399                        match db_result {
1400                            Ok((object_key, object)) => {
1401                                object_scanned += 1;
1402                                if object_scanned.is_multiple_of(100000) {
1403                                    info!(
1404                                        "[Re-accumulate] Task {}: object scanned: {}",
1405                                        index, object_scanned,
1406                                    );
1407                                }
1408                                if matches!(prev.1.inner(), StoreObject::Wrapped)
1409                                    && object_key.0 != prev.0.0
1410                                {
1411                                    wrapped_objects_to_remove
1412                                        .push(WrappedObject::new(prev.0.0, prev.0.1));
1413                                }
1414
1415                                prev = (object_key, object);
1416                            }
1417                            Err(err) => {
1418                                warn!("Object iterator encounter RocksDB error {:?}", err);
1419                                return Err(err);
1420                            }
1421                        }
1422                    }
1423                    if matches!(prev.1.inner(), StoreObject::Wrapped) {
1424                        wrapped_objects_to_remove.push(WrappedObject::new(prev.0.0, prev.0.1));
1425                    }
1426                    info!(
1427                        "[Re-accumulate] Task {}: object scanned: {}, wrapped objects: {}",
1428                        index,
1429                        object_scanned,
1430                        wrapped_objects_to_remove.len(),
1431                    );
1432                    Ok((wrapped_objects_to_remove, object_scanned))
1433                }));
1434            }
1435            let (last_checkpoint_of_epoch, cur_accumulator) = self
1436                .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
1437                .expect("read cannot fail")
1438                .expect("accumulator must exist");
1439            let (accumulator, total_objects_scanned, total_wrapped_objects) =
1440                pending_tasks.into_iter().fold(
1441                    (cur_accumulator, 0u64, 0usize),
1442                    |(mut accumulator, total_objects_scanned, total_wrapped_objects), task| {
1443                        let (wrapped_objects_to_remove, object_scanned) =
1444                            task.join().unwrap().unwrap();
1445                        accumulator.remove_all(
1446                            wrapped_objects_to_remove
1447                                .iter()
1448                                .map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
1449                                .collect::<Vec<Vec<u8>>>(),
1450                        );
1451                        (
1452                            accumulator,
1453                            total_objects_scanned + object_scanned,
1454                            total_wrapped_objects + wrapped_objects_to_remove.len(),
1455                        )
1456                    },
1457                );
1458            info!(
1459                "[Re-accumulate] Total objects scanned: {}, total wrapped objects: {}",
1460                total_objects_scanned, total_wrapped_objects,
1461            );
1462            info!(
1463                "[Re-accumulate] New accumulator value: {:?}",
1464                accumulator.digest()
1465            );
1466            self.insert_state_hash_for_epoch(
1467                cur_epoch_store.epoch(),
1468                &last_checkpoint_of_epoch,
1469                &accumulator,
1470            )
1471            .unwrap();
1472        });
1473        info!(
1474            "[Re-accumulate] Re-accumulating took {}seconds",
1475            cur_time.elapsed().as_secs()
1476        );
1477    }
1478
1479    pub async fn prune_objects_and_compact_for_testing(
1480        &self,
1481        checkpoint_store: &Arc<CheckpointStore>,
1482        rpc_index: Option<&RpcIndexStore>,
1483    ) {
1484        let pruning_config = AuthorityStorePruningConfig {
1485            num_epochs_to_retain: 0,
1486            ..Default::default()
1487        };
1488        let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
1489            &self.perpetual_tables,
1490            checkpoint_store,
1491            rpc_index,
1492            None,
1493            pruning_config,
1494            AuthorityStorePruningMetrics::new_for_test(),
1495            EPOCH_DURATION_MS_FOR_TESTING,
1496        )
1497        .await;
1498        let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
1499    }
1500
1501    pub fn remove_executed_effects_for_testing(
1502        &self,
1503        tx_digest: &TransactionDigest,
1504    ) -> anyhow::Result<()> {
1505        let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
1506        if let Some(effects_digest) = effects_digest {
1507            self.perpetual_tables.executed_effects.remove(tx_digest)?;
1508            self.perpetual_tables.effects.remove(&effects_digest)?;
1509        }
1510        Ok(())
1511    }
1512
1513    #[cfg(test)]
1514    pub async fn prune_objects_immediately_for_testing(
1515        &self,
1516        transaction_effects: Vec<TransactionEffects>,
1517    ) -> anyhow::Result<()> {
1518        let mut wb = self.perpetual_tables.objects.batch();
1519
1520        let mut object_keys_to_prune = vec![];
1521        for effects in &transaction_effects {
1522            for (object_id, seq_number) in effects.modified_at_versions() {
1523                info!("Pruning object {:?} version {:?}", object_id, seq_number);
1524                object_keys_to_prune.push(ObjectKey(object_id, seq_number));
1525            }
1526        }
1527
1528        wb.delete_batch(
1529            &self.perpetual_tables.objects,
1530            object_keys_to_prune.into_iter(),
1531        )?;
1532        wb.write()?;
1533        Ok(())
1534    }
1535
1536    // Counts the number of versions exist in object store for `object_id`. This includes tombstone.
1537    #[cfg(msim)]
1538    pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
1539        self.perpetual_tables
1540            .objects
1541            .safe_iter_with_bounds(
1542                Some(ObjectKey(object_id, VersionNumber::MIN)),
1543                Some(ObjectKey(object_id, VersionNumber::MAX)),
1544            )
1545            .collect::<Result<Vec<_>, _>>()
1546            .unwrap()
1547            .len()
1548    }
1549}
1550
1551impl GlobalStateHashStore for AuthorityStore {
1552    fn get_object_ref_prior_to_key_deprecated(
1553        &self,
1554        object_id: &ObjectID,
1555        version: VersionNumber,
1556    ) -> SuiResult<Option<ObjectRef>> {
1557        self.get_object_ref_prior_to_key(object_id, version)
1558    }
1559
1560    fn get_root_state_hash_for_epoch(
1561        &self,
1562        epoch: EpochId,
1563    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
1564        self.perpetual_tables
1565            .root_state_hash_by_epoch
1566            .get(&epoch)
1567            .map_err(Into::into)
1568    }
1569
1570    fn get_root_state_hash_for_highest_epoch(
1571        &self,
1572    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
1573        Ok(self
1574            .perpetual_tables
1575            .root_state_hash_by_epoch
1576            .reversed_safe_iter_with_bounds(None, None)?
1577            .next()
1578            .transpose()?)
1579    }
1580
1581    fn insert_state_hash_for_epoch(
1582        &self,
1583        epoch: EpochId,
1584        last_checkpoint_of_epoch: &CheckpointSequenceNumber,
1585        acc: &GlobalStateHash,
1586    ) -> SuiResult {
1587        self.perpetual_tables
1588            .root_state_hash_by_epoch
1589            .insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
1590        self.root_state_notify_read
1591            .notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
1592
1593        Ok(())
1594    }
1595
1596    fn iter_live_object_set(
1597        &self,
1598        include_wrapped_object: bool,
1599    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1600        Box::new(
1601            self.perpetual_tables
1602                .iter_live_object_set(include_wrapped_object),
1603        )
1604    }
1605}
1606
1607impl ObjectStore for AuthorityStore {
1608    /// Read an object and return it, or Ok(None) if the object was not found.
1609    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
1610        self.perpetual_tables.as_ref().get_object(object_id)
1611    }
1612
1613    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
1614        self.perpetual_tables.get_object_by_key(object_id, version)
1615    }
1616}
1617
1618/// A wrapper to make Orphan Rule happy
1619pub struct ResolverWrapper {
1620    pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1621    pub metrics: Arc<ResolverMetrics>,
1622}
1623
1624impl ResolverWrapper {
1625    pub fn new(
1626        resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1627        metrics: Arc<ResolverMetrics>,
1628    ) -> Self {
1629        metrics.module_cache_size.set(0);
1630        ResolverWrapper { resolver, metrics }
1631    }
1632
1633    fn inc_cache_size_gauge(&self) {
1634        // reset the gauge after a restart of the cache
1635        let current = self.metrics.module_cache_size.get();
1636        self.metrics.module_cache_size.set(current + 1);
1637    }
1638}
1639
1640impl ModuleResolver for ResolverWrapper {
1641    type Error = SuiError;
1642    fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
1643        self.inc_cache_size_gauge();
1644        get_module(&*self.resolver, module_id)
1645    }
1646
1647    fn get_packages_static<const N: usize>(
1648        &self,
1649        ids: [AccountAddress; N],
1650    ) -> Result<[Option<SerializedPackage>; N], Self::Error> {
1651        let mut packages = [const { None }; N];
1652        for (i, id) in ids.iter().enumerate() {
1653            packages[i] = get_package(&*self.resolver, &ObjectID::from(*id))?;
1654        }
1655        Ok(packages)
1656    }
1657
1658    fn get_packages<'a>(
1659        &self,
1660        ids: impl ExactSizeIterator<Item = &'a AccountAddress>,
1661    ) -> Result<Vec<Option<SerializedPackage>>, Self::Error> {
1662        ids.map(|id| get_package(&*self.resolver, &ObjectID::from(*id)))
1663            .collect()
1664    }
1665}
1666
1667pub type SuiLockResult = SuiResult<ObjectLockStatus>;
1668
1669#[derive(Debug, PartialEq, Eq)]
1670pub enum ObjectLockStatus {
1671    Initialized,
1672    LockedToTx { locked_by_tx: LockDetailsDeprecated },
1673    LockedAtDifferentVersion { locked_ref: ObjectRef },
1674}
1675
1676#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1677pub enum LockDetailsWrapperDeprecated {
1678    V1(LockDetailsV1Deprecated),
1679}
1680
1681#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1682pub struct LockDetailsV1Deprecated {
1683    pub epoch: EpochId,
1684    pub tx_digest: TransactionDigest,
1685}
1686
1687pub type LockDetailsDeprecated = LockDetailsV1Deprecated;