sui_core/authority/
authority_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::{iter, mem, thread};
6
7use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
8use crate::authority::authority_store_pruner::{
9    AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
10};
11use crate::authority::authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object};
12use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
13use crate::global_state_hasher::GlobalStateHashStore;
14use crate::rpc_index::RpcIndexStore;
15use crate::transaction_outputs::TransactionOutputs;
16use either::Either;
17use fastcrypto::hash::{HashFunction, MultisetHash, Sha3_256};
18use futures::stream::FuturesUnordered;
19use move_core_types::resolver::ModuleResolver;
20use serde::{Deserialize, Serialize};
21use sui_config::node::AuthorityStorePruningConfig;
22use sui_macros::fail_point_arg;
23use sui_types::error::{SuiErrorKind, UserInputError};
24use sui_types::execution::TypeLayoutStore;
25use sui_types::global_state_hash::GlobalStateHash;
26use sui_types::message_envelope::Message;
27use sui_types::storage::{
28    BackingPackageStore, FullObjectKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore,
29    get_module,
30};
31use sui_types::sui_system_state::get_sui_system_state;
32use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure};
33use tokio::time::Instant;
34use tracing::{debug, info, trace};
35use typed_store::traits::Map;
36use typed_store::{
37    TypedStoreError,
38    rocks::{DBBatch, DBMap},
39};
40
41use super::authority_store_tables::LiveObject;
42use super::{authority_store_tables::AuthorityPerpetualTables, *};
43use mysten_common::sync::notify_read::NotifyRead;
44use sui_types::effects::{TransactionEffects, TransactionEvents};
45use sui_types::gas_coin::TOTAL_SUPPLY_MIST;
46
47struct AuthorityStoreMetrics {
48    sui_conservation_check_latency: IntGauge,
49    sui_conservation_live_object_count: IntGauge,
50    sui_conservation_live_object_size: IntGauge,
51    sui_conservation_imbalance: IntGauge,
52    sui_conservation_storage_fund: IntGauge,
53    sui_conservation_storage_fund_imbalance: IntGauge,
54    epoch_flags: IntGaugeVec,
55}
56
57impl AuthorityStoreMetrics {
58    pub fn new(registry: &Registry) -> Self {
59        Self {
60            sui_conservation_check_latency: register_int_gauge_with_registry!(
61                "sui_conservation_check_latency",
62                "Number of seconds took to scan all live objects in the store for SUI conservation check",
63                registry,
64            ).unwrap(),
65            sui_conservation_live_object_count: register_int_gauge_with_registry!(
66                "sui_conservation_live_object_count",
67                "Number of live objects in the store",
68                registry,
69            ).unwrap(),
70            sui_conservation_live_object_size: register_int_gauge_with_registry!(
71                "sui_conservation_live_object_size",
72                "Size in bytes of live objects in the store",
73                registry,
74            ).unwrap(),
75            sui_conservation_imbalance: register_int_gauge_with_registry!(
76                "sui_conservation_imbalance",
77                "Total amount of SUI in the network - 10B * 10^9. This delta shows the amount of imbalance",
78                registry,
79            ).unwrap(),
80            sui_conservation_storage_fund: register_int_gauge_with_registry!(
81                "sui_conservation_storage_fund",
82                "Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
83                registry,
84            ).unwrap(),
85            sui_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
86                "sui_conservation_storage_fund_imbalance",
87                "Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
88                registry,
89            ).unwrap(),
90            epoch_flags: register_int_gauge_vec_with_registry!(
91                "epoch_flags",
92                "Local flags of the currently running epoch",
93                &["flag"],
94                registry,
95            ).unwrap(),
96        }
97    }
98}
99
100/// ALL_OBJ_VER determines whether we want to store all past
101/// versions of every object in the store. Authority doesn't store
102/// them, but other entities such as replicas will.
103/// S is a template on Authority signature state. This allows SuiDataStore to be used on either
104/// authorities or non-authorities. Specifically, when storing transactions and effects,
105/// S allows SuiDataStore to either store the authority signed version or unsigned version.
106pub struct AuthorityStore {
107    pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
108
109    pub(crate) root_state_notify_read:
110        NotifyRead<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
111
112    /// Whether to enable expensive SUI conservation check at epoch boundaries.
113    enable_epoch_sui_conservation_check: bool,
114
115    metrics: AuthorityStoreMetrics,
116}
117
118pub type ExecutionLockReadGuard<'a> = tokio::sync::RwLockReadGuard<'a, EpochId>;
119pub type ExecutionLockWriteGuard<'a> = tokio::sync::RwLockWriteGuard<'a, EpochId>;
120
121impl AuthorityStore {
122    /// Open an authority store by directory path.
123    /// If the store is empty, initialize it using genesis.
124    pub async fn open(
125        perpetual_tables: Arc<AuthorityPerpetualTables>,
126        genesis: &Genesis,
127        config: &NodeConfig,
128        registry: &Registry,
129    ) -> SuiResult<Arc<Self>> {
130        let enable_epoch_sui_conservation_check = config
131            .expensive_safety_check_config
132            .enable_epoch_sui_conservation_check();
133
134        let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
135            info!("Creating new epoch start config from genesis");
136
137            #[allow(unused_mut)]
138            let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
139            fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
140                info!("Setting initial epoch flags to {:?}", flags);
141                initial_epoch_flags = flags;
142            });
143
144            let epoch_start_configuration = EpochStartConfiguration::new(
145                genesis.sui_system_object().into_epoch_start_state(),
146                *genesis.checkpoint().digest(),
147                &genesis.objects(),
148                initial_epoch_flags,
149            )?;
150            perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
151            epoch_start_configuration
152        } else {
153            info!("Loading epoch start config from DB");
154            perpetual_tables
155                .epoch_start_configuration
156                .get(&())?
157                .expect("Epoch start configuration must be set in non-empty DB")
158        };
159        let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
160        info!("Epoch start config: {:?}", epoch_start_configuration);
161        info!("Cur epoch: {:?}", cur_epoch);
162        let this = Self::open_inner(
163            genesis,
164            perpetual_tables,
165            enable_epoch_sui_conservation_check,
166            registry,
167        )
168        .await?;
169        this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
170        Ok(this)
171    }
172
173    pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
174        for flag in old {
175            self.metrics
176                .epoch_flags
177                .with_label_values(&[&flag.to_string()])
178                .set(0);
179        }
180        for flag in new {
181            self.metrics
182                .epoch_flags
183                .with_label_values(&[&flag.to_string()])
184                .set(1);
185        }
186    }
187
188    // NB: This must only be called at time of reconfiguration. We take the execution lock write
189    // guard as an argument to ensure that this is the case.
190    pub fn clear_object_per_epoch_marker_table(
191        &self,
192        _execution_guard: &ExecutionLockWriteGuard<'_>,
193    ) -> SuiResult<()> {
194        // We can safely delete all entries in the per epoch marker table since this is only called
195        // at epoch boundaries (during reconfiguration). Therefore any entries that currently
196        // exist can be removed. Because of this we can use the `schedule_delete_all` method.
197        self.perpetual_tables
198            .object_per_epoch_marker_table
199            .schedule_delete_all()?;
200        Ok(self
201            .perpetual_tables
202            .object_per_epoch_marker_table_v2
203            .schedule_delete_all()?)
204    }
205
206    pub async fn open_with_committee_for_testing(
207        perpetual_tables: Arc<AuthorityPerpetualTables>,
208        committee: &Committee,
209        genesis: &Genesis,
210    ) -> SuiResult<Arc<Self>> {
211        // TODO: Since we always start at genesis, the committee should be technically the same
212        // as the genesis committee.
213        assert_eq!(committee.epoch, 0);
214        Self::open_inner(genesis, perpetual_tables, true, &Registry::new()).await
215    }
216
217    async fn open_inner(
218        genesis: &Genesis,
219        perpetual_tables: Arc<AuthorityPerpetualTables>,
220        enable_epoch_sui_conservation_check: bool,
221        registry: &Registry,
222    ) -> SuiResult<Arc<Self>> {
223        let store = Arc::new(Self {
224            perpetual_tables,
225            root_state_notify_read: NotifyRead::<
226                EpochId,
227                (CheckpointSequenceNumber, GlobalStateHash),
228            >::new(),
229            enable_epoch_sui_conservation_check,
230            metrics: AuthorityStoreMetrics::new(registry),
231        });
232        // Only initialize an empty database.
233        if store
234            .database_is_empty()
235            .expect("Database read should not fail at init.")
236        {
237            store
238                .bulk_insert_genesis_objects(genesis.objects())
239                .expect("Cannot bulk insert genesis objects");
240
241            // insert txn and effects of genesis
242            let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
243
244            store
245                .perpetual_tables
246                .transactions
247                .insert(transaction.digest(), transaction.serializable_ref())
248                .unwrap();
249
250            store
251                .perpetual_tables
252                .effects
253                .insert(&genesis.effects().digest(), genesis.effects())
254                .unwrap();
255            // We don't insert the effects to executed_effects yet because the genesis tx hasn't but will be executed.
256            // This is important for fullnodes to be able to generate indexing data right now.
257
258            if genesis.effects().events_digest().is_some() {
259                store
260                    .perpetual_tables
261                    .events_2
262                    .insert(transaction.digest(), genesis.events())
263                    .unwrap();
264            }
265        }
266
267        Ok(store)
268    }
269
270    /// Open authority store without any operations that require
271    /// genesis, such as constructing EpochStartConfiguration
272    /// or inserting genesis objects.
273    pub fn open_no_genesis(
274        perpetual_tables: Arc<AuthorityPerpetualTables>,
275        enable_epoch_sui_conservation_check: bool,
276        registry: &Registry,
277    ) -> SuiResult<Arc<Self>> {
278        let store = Arc::new(Self {
279            perpetual_tables,
280            root_state_notify_read: NotifyRead::<
281                EpochId,
282                (CheckpointSequenceNumber, GlobalStateHash),
283            >::new(),
284            enable_epoch_sui_conservation_check,
285            metrics: AuthorityStoreMetrics::new(registry),
286        });
287        Ok(store)
288    }
289
290    pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
291        self.perpetual_tables.get_recovery_epoch_at_restart()
292    }
293
294    pub fn get_effects(
295        &self,
296        effects_digest: &TransactionEffectsDigest,
297    ) -> SuiResult<Option<TransactionEffects>> {
298        Ok(self.perpetual_tables.effects.get(effects_digest)?)
299    }
300
301    /// Returns true if we have an effects structure for this transaction digest
302    pub fn effects_exists(&self, effects_digest: &TransactionEffectsDigest) -> SuiResult<bool> {
303        self.perpetual_tables
304            .effects
305            .contains_key(effects_digest)
306            .map_err(|e| e.into())
307    }
308
309    pub fn get_events(
310        &self,
311        digest: &TransactionDigest,
312    ) -> Result<Option<TransactionEvents>, TypedStoreError> {
313        self.perpetual_tables.events_2.get(digest)
314    }
315
316    pub fn multi_get_events(
317        &self,
318        event_digests: &[TransactionDigest],
319    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
320        Ok(event_digests
321            .iter()
322            .map(|digest| self.get_events(digest))
323            .collect::<Result<Vec<_>, _>>()?)
324    }
325
326    pub fn get_unchanged_loaded_runtime_objects(
327        &self,
328        digest: &TransactionDigest,
329    ) -> Result<Option<Vec<ObjectKey>>, TypedStoreError> {
330        self.perpetual_tables
331            .unchanged_loaded_runtime_objects
332            .get(digest)
333    }
334
335    pub fn multi_get_effects<'a>(
336        &self,
337        effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
338    ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
339        self.perpetual_tables.effects.multi_get(effects_digests)
340    }
341
342    pub fn get_executed_effects(
343        &self,
344        tx_digest: &TransactionDigest,
345    ) -> Result<Option<TransactionEffects>, TypedStoreError> {
346        let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
347        match effects_digest {
348            Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
349            None => Ok(None),
350        }
351    }
352
353    /// Given a list of transaction digests, returns a list of the corresponding effects only if they have been
354    /// executed. For transactions that have not been executed, None is returned.
355    pub fn multi_get_executed_effects_digests(
356        &self,
357        digests: &[TransactionDigest],
358    ) -> Result<Vec<Option<TransactionEffectsDigest>>, TypedStoreError> {
359        self.perpetual_tables.executed_effects.multi_get(digests)
360    }
361
362    /// Given a list of transaction digests, returns a list of the corresponding effects only if they have been
363    /// executed. For transactions that have not been executed, None is returned.
364    pub fn multi_get_executed_effects(
365        &self,
366        digests: &[TransactionDigest],
367    ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
368        let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
369        let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
370        let mut tx_to_effects_map = effects
371            .into_iter()
372            .flatten()
373            .map(|effects| (*effects.transaction_digest(), effects))
374            .collect::<HashMap<_, _>>();
375        Ok(digests
376            .iter()
377            .map(|digest| tx_to_effects_map.remove(digest))
378            .collect())
379    }
380
381    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> SuiResult<bool> {
382        Ok(self
383            .perpetual_tables
384            .executed_effects
385            .contains_key(digest)?)
386    }
387
388    pub fn get_marker_value(
389        &self,
390        object_key: FullObjectKey,
391        epoch_id: EpochId,
392    ) -> SuiResult<Option<MarkerValue>> {
393        Ok(self
394            .perpetual_tables
395            .object_per_epoch_marker_table_v2
396            .get(&(epoch_id, object_key))?)
397    }
398
399    pub fn get_latest_marker(
400        &self,
401        object_id: FullObjectID,
402        epoch_id: EpochId,
403    ) -> SuiResult<Option<(SequenceNumber, MarkerValue)>> {
404        let min_key = (epoch_id, FullObjectKey::min_for_id(&object_id));
405        let max_key = (epoch_id, FullObjectKey::max_for_id(&object_id));
406
407        let marker_entry = self
408            .perpetual_tables
409            .object_per_epoch_marker_table_v2
410            .reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
411            .next();
412        match marker_entry {
413            Some(Ok(((epoch, key), marker))) => {
414                // because of the iterator bounds these cannot fail
415                assert_eq!(epoch, epoch_id);
416                assert_eq!(key.id(), object_id);
417                Ok(Some((key.version(), marker)))
418            }
419            Some(Err(e)) => Err(e.into()),
420            None => Ok(None),
421        }
422    }
423
424    /// Returns future containing the state hash for the given epoch
425    /// once available
426    pub async fn notify_read_root_state_hash(
427        &self,
428        epoch: EpochId,
429    ) -> SuiResult<(CheckpointSequenceNumber, GlobalStateHash)> {
430        // We need to register waiters _before_ reading from the database to avoid race conditions
431        let registration = self.root_state_notify_read.register_one(&epoch);
432        let hash = self.perpetual_tables.root_state_hash_by_epoch.get(&epoch)?;
433
434        let result = match hash {
435            // Note that Some() clause also drops registration that is already fulfilled
436            Some(ready) => Either::Left(futures::future::ready(ready)),
437            None => Either::Right(registration),
438        }
439        .await;
440
441        Ok(result)
442    }
443
444    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
445    pub fn deprecated_insert_finalized_transactions(
446        &self,
447        digests: &[TransactionDigest],
448        epoch: EpochId,
449        sequence: CheckpointSequenceNumber,
450    ) -> SuiResult {
451        let mut batch = self
452            .perpetual_tables
453            .executed_transactions_to_checkpoint
454            .batch();
455        batch.insert_batch(
456            &self.perpetual_tables.executed_transactions_to_checkpoint,
457            digests.iter().map(|d| (*d, (epoch, sequence))),
458        )?;
459        batch.write()?;
460        trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
461        Ok(())
462    }
463
464    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
465    pub fn deprecated_get_transaction_checkpoint(
466        &self,
467        digest: &TransactionDigest,
468    ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
469        Ok(self
470            .perpetual_tables
471            .executed_transactions_to_checkpoint
472            .get(digest)?)
473    }
474
475    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
476    pub fn deprecated_multi_get_transaction_checkpoint(
477        &self,
478        digests: &[TransactionDigest],
479    ) -> SuiResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
480        Ok(self
481            .perpetual_tables
482            .executed_transactions_to_checkpoint
483            .multi_get(digests)?
484            .into_iter()
485            .collect())
486    }
487
488    /// Returns true if there are no objects in the database
489    pub fn database_is_empty(&self) -> SuiResult<bool> {
490        self.perpetual_tables.database_is_empty()
491    }
492
493    pub fn object_exists_by_key(
494        &self,
495        object_id: &ObjectID,
496        version: VersionNumber,
497    ) -> SuiResult<bool> {
498        Ok(self
499            .perpetual_tables
500            .objects
501            .contains_key(&ObjectKey(*object_id, version))?)
502    }
503
504    pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<bool>> {
505        Ok(self
506            .perpetual_tables
507            .objects
508            .multi_contains_keys(object_keys.to_vec())?
509            .into_iter()
510            .collect())
511    }
512
513    fn get_object_ref_prior_to_key(
514        &self,
515        object_id: &ObjectID,
516        version: VersionNumber,
517    ) -> Result<Option<ObjectRef>, SuiError> {
518        let Some(prior_version) = version.one_before() else {
519            return Ok(None);
520        };
521        let mut iterator = self
522            .perpetual_tables
523            .objects
524            .reversed_safe_iter_with_bounds(
525                Some(ObjectKey::min_for_id(object_id)),
526                Some(ObjectKey(*object_id, prior_version)),
527            )?;
528
529        if let Some((object_key, value)) = iterator.next().transpose()?
530            && object_key.0 == *object_id
531        {
532            return Ok(Some(
533                self.perpetual_tables.object_reference(&object_key, value)?,
534            ));
535        }
536        Ok(None)
537    }
538
539    pub fn multi_get_objects_by_key(
540        &self,
541        object_keys: &[ObjectKey],
542    ) -> Result<Vec<Option<Object>>, SuiError> {
543        let wrappers = self
544            .perpetual_tables
545            .objects
546            .multi_get(object_keys.to_vec())?;
547        let mut ret = vec![];
548
549        for (idx, w) in wrappers.into_iter().enumerate() {
550            ret.push(
551                w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
552                    .transpose()?
553                    .flatten(),
554            );
555        }
556        Ok(ret)
557    }
558
559    /// Get many objects
560    pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, SuiError> {
561        let mut result = Vec::new();
562        for id in objects {
563            result.push(self.get_object(id));
564        }
565        Ok(result)
566    }
567
568    // Methods to mutate the store
569
570    /// Insert a genesis object.
571    /// TODO: delete this method entirely (still used by authority_tests.rs)
572    pub(crate) fn insert_genesis_object(&self, object: Object) -> SuiResult {
573        // We only side load objects with a genesis parent transaction.
574        debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
575        let object_ref = object.compute_object_reference();
576        self.insert_object_direct(object_ref, &object)
577    }
578
579    /// Insert an object directly into the store, and also update relevant tables
580    /// NOTE: does not handle transaction lock.
581    /// This is used to insert genesis objects
582    fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> SuiResult {
583        let mut write_batch = self.perpetual_tables.objects.batch();
584
585        // Insert object
586        let store_object = get_store_object(object.clone());
587        write_batch.insert_batch(
588            &self.perpetual_tables.objects,
589            std::iter::once((ObjectKey::from(object_ref), store_object)),
590        )?;
591
592        // Update the index
593        if object.get_single_owner().is_some() {
594            // Only initialize lock for address owned objects.
595            if !object.is_child_object() {
596                self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref], false)?;
597            }
598        }
599
600        write_batch.write()?;
601
602        Ok(())
603    }
604
605    /// This function should only be used for initializing genesis and should remain private.
606    #[instrument(level = "debug", skip_all)]
607    pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> SuiResult<()> {
608        let mut batch = self.perpetual_tables.objects.batch();
609        let ref_and_objects: Vec<_> = objects
610            .iter()
611            .map(|o| (o.compute_object_reference(), o))
612            .collect();
613
614        batch.insert_batch(
615            &self.perpetual_tables.objects,
616            ref_and_objects
617                .iter()
618                .map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))),
619        )?;
620
621        let non_child_object_refs: Vec<_> = ref_and_objects
622            .iter()
623            .filter(|(_, object)| !object.is_child_object())
624            .map(|(oref, _)| *oref)
625            .collect();
626
627        self.initialize_live_object_markers_impl(
628            &mut batch,
629            &non_child_object_refs,
630            false, // is_force_reset
631        )?;
632
633        batch.write()?;
634
635        Ok(())
636    }
637
638    pub fn bulk_insert_live_objects(
639        perpetual_db: &AuthorityPerpetualTables,
640        live_objects: impl Iterator<Item = LiveObject>,
641        expected_sha3_digest: &[u8; 32],
642    ) -> SuiResult<()> {
643        let mut hasher = Sha3_256::default();
644        let mut batch = perpetual_db.objects.batch();
645        let mut written = 0usize;
646        const MAX_BATCH_SIZE: usize = 100_000;
647        for object in live_objects {
648            hasher.update(object.object_reference().2.inner());
649            match object {
650                LiveObject::Normal(object) => {
651                    let store_object_wrapper = get_store_object(object.clone());
652                    batch.insert_batch(
653                        &perpetual_db.objects,
654                        std::iter::once((
655                            ObjectKey::from(object.compute_object_reference()),
656                            store_object_wrapper,
657                        )),
658                    )?;
659                    if !object.is_child_object() {
660                        Self::initialize_live_object_markers(
661                            &perpetual_db.live_owned_object_markers,
662                            &mut batch,
663                            &[object.compute_object_reference()],
664                            false, // is_force_reset
665                        )?;
666                    }
667                }
668                LiveObject::Wrapped(object_key) => {
669                    batch.insert_batch(
670                        &perpetual_db.objects,
671                        std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
672                            object_key,
673                            StoreObject::Wrapped.into(),
674                        )),
675                    )?;
676                }
677            }
678            written += 1;
679            if written > MAX_BATCH_SIZE {
680                batch.write()?;
681                batch = perpetual_db.objects.batch();
682                written = 0;
683            }
684        }
685        let sha3_digest = hasher.finalize().digest;
686        if *expected_sha3_digest != sha3_digest {
687            error!(
688                "Sha does not match! expected: {:?}, actual: {:?}",
689                expected_sha3_digest, sha3_digest
690            );
691            return Err(SuiError::from("Sha does not match"));
692        }
693        batch.write()?;
694        Ok(())
695    }
696
697    pub fn set_epoch_start_configuration(
698        &self,
699        epoch_start_configuration: &EpochStartConfiguration,
700    ) -> SuiResult {
701        self.perpetual_tables
702            .set_epoch_start_configuration(epoch_start_configuration)?;
703        Ok(())
704    }
705
706    pub fn get_epoch_start_configuration(&self) -> SuiResult<Option<EpochStartConfiguration>> {
707        Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
708    }
709
710    /// Updates the state resulting from the execution of a certificate.
711    ///
712    /// Internally it checks that all locks for active inputs are at the correct
713    /// version, and then writes objects, certificates, parents and clean up locks atomically.
714    #[instrument(level = "debug", skip_all)]
715    pub fn build_db_batch(
716        &self,
717        epoch_id: EpochId,
718        tx_outputs: &[Arc<TransactionOutputs>],
719    ) -> SuiResult<DBBatch> {
720        let mut written = Vec::with_capacity(tx_outputs.len());
721        for outputs in tx_outputs {
722            written.extend(outputs.written.values().cloned());
723        }
724
725        let mut write_batch = self.perpetual_tables.transactions.batch();
726        for outputs in tx_outputs {
727            self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
728        }
729        // test crashing before writing the batch
730        fail_point!("crash");
731
732        trace!(
733            "built batch for committed transactions: {:?}",
734            tx_outputs
735                .iter()
736                .map(|tx| tx.transaction.digest())
737                .collect::<Vec<_>>()
738        );
739
740        // test crashing before notifying
741        fail_point!("crash");
742
743        Ok(write_batch)
744    }
745
746    fn write_one_transaction_outputs(
747        &self,
748        write_batch: &mut DBBatch,
749        epoch_id: EpochId,
750        tx_outputs: &TransactionOutputs,
751    ) -> SuiResult {
752        let TransactionOutputs {
753            transaction,
754            effects,
755            markers,
756            wrapped,
757            deleted,
758            written,
759            events,
760            unchanged_loaded_runtime_objects,
761            locks_to_delete,
762            new_locks_to_init,
763            ..
764        } = tx_outputs;
765
766        let effects_digest = effects.digest();
767        let transaction_digest = transaction.digest();
768        // effects must be inserted before the corresponding dependent entries
769        // because they carry epoch information necessary for correct pruning via relocation filters
770        write_batch
771            .insert_batch(
772                &self.perpetual_tables.effects,
773                [(effects_digest, effects.clone())],
774            )?
775            .insert_batch(
776                &self.perpetual_tables.executed_effects,
777                [(transaction_digest, effects_digest)],
778            )?;
779
780        // Store the certificate indexed by transaction digest
781        write_batch.insert_batch(
782            &self.perpetual_tables.transactions,
783            iter::once((transaction_digest, transaction.serializable_ref())),
784        )?;
785
786        write_batch.insert_batch(
787            &self.perpetual_tables.executed_transaction_digests,
788            [((epoch_id, *transaction_digest), ())],
789        )?;
790
791        // Add batched writes for objects and locks.
792        write_batch.insert_batch(
793            &self.perpetual_tables.object_per_epoch_marker_table_v2,
794            markers
795                .iter()
796                .map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
797        )?;
798        write_batch.insert_batch(
799            &self.perpetual_tables.objects,
800            deleted
801                .iter()
802                .map(|key| (key, StoreObject::Deleted))
803                .chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
804                .map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
805        )?;
806
807        // Insert each output object into the stores
808        let new_objects = written.iter().map(|(id, new_object)| {
809            let version = new_object.version();
810            trace!(?id, ?version, "writing object");
811            let store_object = get_store_object(new_object.clone());
812            (ObjectKey(*id, version), store_object)
813        });
814
815        write_batch.insert_batch(&self.perpetual_tables.objects, new_objects)?;
816
817        // Write events into the new table keyed off of transaction_digest
818        if effects.events_digest().is_some() {
819            write_batch.insert_batch(
820                &self.perpetual_tables.events_2,
821                [(transaction_digest, events)],
822            )?;
823        }
824
825        // Write unchanged_loaded_runtime_objects
826        if !unchanged_loaded_runtime_objects.is_empty() {
827            write_batch.insert_batch(
828                &self.perpetual_tables.unchanged_loaded_runtime_objects,
829                [(transaction_digest, unchanged_loaded_runtime_objects)],
830            )?;
831        }
832
833        self.initialize_live_object_markers_impl(write_batch, new_locks_to_init, false)?;
834
835        // Note: deletes locks for received objects as well (but not for objects that were in
836        // `Receiving` arguments which were not received)
837        self.delete_live_object_markers(write_batch, locks_to_delete)?;
838
839        debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
840
841        Ok(())
842    }
843
844    /// Commits transactions only (not effects or other transaction outputs) to the db.
845    /// See ExecutionCache::persist_transaction for more info
846    pub(crate) fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> SuiResult {
847        let mut batch = self.perpetual_tables.transactions.batch();
848        batch.insert_batch(
849            &self.perpetual_tables.transactions,
850            [(tx.digest(), tx.clone().into_unsigned().serializable_ref())],
851        )?;
852        batch.write()?;
853        Ok(())
854    }
855
856    /// Gets ObjectLockInfo that represents state of lock on an object.
857    /// Returns UserInputError::ObjectNotFound if cannot find lock record for this object
858    pub(crate) fn get_lock(
859        &self,
860        obj_ref: ObjectRef,
861        epoch_store: &AuthorityPerEpochStore,
862    ) -> SuiLockResult {
863        if self
864            .perpetual_tables
865            .live_owned_object_markers
866            .get(&obj_ref)?
867            .is_none()
868        {
869            return Ok(ObjectLockStatus::LockedAtDifferentVersion {
870                locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
871            });
872        }
873
874        let tables = epoch_store.tables()?;
875        let epoch_id = epoch_store.epoch();
876
877        if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
878            Ok(ObjectLockStatus::LockedToTx {
879                locked_by_tx: LockDetailsDeprecated {
880                    epoch: epoch_id,
881                    tx_digest,
882                },
883            })
884        } else {
885            Ok(ObjectLockStatus::Initialized)
886        }
887    }
888
889    /// Returns UserInputError::ObjectNotFound if no lock records found for this object.
890    pub(crate) fn get_latest_live_version_for_object_id(
891        &self,
892        object_id: ObjectID,
893    ) -> SuiResult<ObjectRef> {
894        let mut iterator = self
895            .perpetual_tables
896            .live_owned_object_markers
897            .reversed_safe_iter_with_bounds(
898                None,
899                Some((object_id, SequenceNumber::MAX, ObjectDigest::MAX)),
900            )?;
901        Ok(iterator
902            .next()
903            .transpose()?
904            .and_then(|value| {
905                if value.0.0 == object_id {
906                    Some(value)
907                } else {
908                    None
909                }
910            })
911            .ok_or_else(|| {
912                SuiError::from(UserInputError::ObjectNotFound {
913                    object_id,
914                    version: None,
915                })
916            })?
917            .0)
918    }
919
920    /// Checks multiple object locks exist.
921    /// Returns UserInputError::ObjectNotFound if cannot find lock record for at least one of the objects.
922    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if at least one object lock is not initialized
923    ///     at the given version.
924    pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> SuiResult {
925        let locks = self
926            .perpetual_tables
927            .live_owned_object_markers
928            .multi_get(objects)?;
929        for (lock, obj_ref) in locks.into_iter().zip(objects) {
930            if lock.is_none() {
931                let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
932                fp_bail!(
933                    UserInputError::ObjectVersionUnavailableForConsumption {
934                        provided_obj_ref: *obj_ref,
935                        current_version: latest_lock.1
936                    }
937                    .into()
938                );
939            }
940        }
941        Ok(())
942    }
943
944    /// Initialize a lock to None (but exists) for a given list of ObjectRefs.
945    /// Returns SuiErrorKind::ObjectLockAlreadyInitialized if the lock already exists and is locked to a transaction
946    fn initialize_live_object_markers_impl(
947        &self,
948        write_batch: &mut DBBatch,
949        objects: &[ObjectRef],
950        is_force_reset: bool,
951    ) -> SuiResult {
952        AuthorityStore::initialize_live_object_markers(
953            &self.perpetual_tables.live_owned_object_markers,
954            write_batch,
955            objects,
956            is_force_reset,
957        )
958    }
959
960    pub fn initialize_live_object_markers(
961        live_object_marker_table: &DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
962        write_batch: &mut DBBatch,
963        objects: &[ObjectRef],
964        is_force_reset: bool,
965    ) -> SuiResult {
966        trace!(?objects, "initialize_locks");
967
968        let live_object_markers = live_object_marker_table.multi_get(objects)?;
969
970        if !is_force_reset {
971            // If any live_object_markers exist and are not None, return errors for them
972            // Note we don't check if there is a pre-existing lock. this is because initializing the live
973            // object marker will not overwrite the lock and cause the validator to equivocate.
974            let existing_live_object_markers: Vec<ObjectRef> = live_object_markers
975                .iter()
976                .zip(objects)
977                .filter_map(|(lock_opt, objref)| {
978                    lock_opt.clone().flatten().map(|_tx_digest| *objref)
979                })
980                .collect();
981            if !existing_live_object_markers.is_empty() {
982                info!(
983                    ?existing_live_object_markers,
984                    "Cannot initialize live_object_markers because some exist already"
985                );
986                return Err(SuiErrorKind::ObjectLockAlreadyInitialized {
987                    refs: existing_live_object_markers,
988                }
989                .into());
990            }
991        }
992
993        write_batch.insert_batch(
994            live_object_marker_table,
995            objects.iter().map(|obj_ref| (obj_ref, None)),
996        )?;
997        Ok(())
998    }
999
1000    /// Removes locks for a given list of ObjectRefs.
1001    fn delete_live_object_markers(
1002        &self,
1003        write_batch: &mut DBBatch,
1004        objects: &[ObjectRef],
1005    ) -> SuiResult {
1006        trace!(?objects, "delete_locks");
1007        write_batch.delete_batch(
1008            &self.perpetual_tables.live_owned_object_markers,
1009            objects.iter(),
1010        )?;
1011        Ok(())
1012    }
1013
1014    #[cfg(test)]
1015    pub(crate) fn reset_locks_for_test(
1016        &self,
1017        transactions: &[TransactionDigest],
1018        objects: &[ObjectRef],
1019        epoch_store: &AuthorityPerEpochStore,
1020    ) {
1021        for tx in transactions {
1022            epoch_store.delete_signed_transaction_for_test(tx);
1023            epoch_store.delete_object_locks_for_test(objects);
1024        }
1025
1026        let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1027        batch
1028            .delete_batch(
1029                &self.perpetual_tables.live_owned_object_markers,
1030                objects.iter(),
1031            )
1032            .unwrap();
1033        batch.write().unwrap();
1034
1035        let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1036        self.initialize_live_object_markers_impl(&mut batch, objects, false)
1037            .unwrap();
1038        batch.write().unwrap();
1039    }
1040
1041    /// Return the object with version less then or eq to the provided seq number.
1042    /// This is used by indexer to find the correct version of dynamic field child object.
1043    /// We do not store the version of the child object, but because of lamport timestamp,
1044    /// we know the child must have version number less then or eq to the parent.
1045    pub fn find_object_lt_or_eq_version(
1046        &self,
1047        object_id: ObjectID,
1048        version: SequenceNumber,
1049    ) -> SuiResult<Option<Object>> {
1050        self.perpetual_tables
1051            .find_object_lt_or_eq_version(object_id, version)
1052    }
1053
1054    /// Returns the latest object reference we have for this object_id in the objects table.
1055    ///
1056    /// The method may also return the reference to a deleted object with a digest of
1057    /// ObjectDigest::deleted() or ObjectDigest::wrapped() and lamport version
1058    /// of a transaction that deleted the object.
1059    /// Note that a deleted object may re-appear if the deletion was the result of the object
1060    /// being wrapped in another object.
1061    ///
1062    /// If no entry for the object_id is found, return None.
1063    pub fn get_latest_object_ref_or_tombstone(
1064        &self,
1065        object_id: ObjectID,
1066    ) -> Result<Option<ObjectRef>, SuiError> {
1067        self.perpetual_tables
1068            .get_latest_object_ref_or_tombstone(object_id)
1069    }
1070
1071    /// Returns the latest object reference if and only if the object is still live (i.e. it does
1072    /// not return tombstones)
1073    pub fn get_latest_object_ref_if_alive(
1074        &self,
1075        object_id: ObjectID,
1076    ) -> Result<Option<ObjectRef>, SuiError> {
1077        match self.get_latest_object_ref_or_tombstone(object_id)? {
1078            Some(objref) if objref.2.is_alive() => Ok(Some(objref)),
1079            _ => Ok(None),
1080        }
1081    }
1082
1083    /// Returns the latest object we have for this object_id in the objects table.
1084    ///
1085    /// If no entry for the object_id is found, return None.
1086    pub fn get_latest_object_or_tombstone(
1087        &self,
1088        object_id: ObjectID,
1089    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, SuiError> {
1090        let Some((object_key, store_object)) = self
1091            .perpetual_tables
1092            .get_latest_object_or_tombstone(object_id)?
1093        else {
1094            return Ok(None);
1095        };
1096
1097        if let Some(object_ref) = self
1098            .perpetual_tables
1099            .tombstone_reference(&object_key, &store_object)?
1100        {
1101            return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
1102        }
1103
1104        let object = self
1105            .perpetual_tables
1106            .object(&object_key, store_object)?
1107            .expect("Non tombstone store object could not be converted to object");
1108
1109        Ok(Some((object_key, ObjectOrTombstone::Object(object))))
1110    }
1111
1112    pub fn insert_transaction_and_effects(
1113        &self,
1114        transaction: &VerifiedTransaction,
1115        transaction_effects: &TransactionEffects,
1116    ) -> Result<(), TypedStoreError> {
1117        let mut write_batch = self.perpetual_tables.transactions.batch();
1118        // effects must be inserted before the corresponding transaction entry
1119        // because they carry epoch information necessary for correct pruning via relocation filters
1120        write_batch
1121            .insert_batch(
1122                &self.perpetual_tables.effects,
1123                [(transaction_effects.digest(), transaction_effects)],
1124            )?
1125            .insert_batch(
1126                &self.perpetual_tables.transactions,
1127                [(transaction.digest(), transaction.serializable_ref())],
1128            )?;
1129
1130        write_batch.write()?;
1131        Ok(())
1132    }
1133
1134    pub fn multi_insert_transaction_and_effects<'a>(
1135        &self,
1136        transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
1137    ) -> Result<(), TypedStoreError> {
1138        let mut write_batch = self.perpetual_tables.transactions.batch();
1139        for tx in transactions {
1140            write_batch
1141                .insert_batch(
1142                    &self.perpetual_tables.effects,
1143                    [(tx.effects.digest(), &tx.effects)],
1144                )?
1145                .insert_batch(
1146                    &self.perpetual_tables.transactions,
1147                    [(tx.transaction.digest(), tx.transaction.serializable_ref())],
1148                )?;
1149        }
1150
1151        write_batch.write()?;
1152        Ok(())
1153    }
1154
1155    pub fn multi_get_transaction_blocks(
1156        &self,
1157        tx_digests: &[TransactionDigest],
1158    ) -> Result<Vec<Option<VerifiedTransaction>>, TypedStoreError> {
1159        self.perpetual_tables
1160            .transactions
1161            .multi_get(tx_digests)
1162            .map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())
1163    }
1164
1165    pub fn get_transaction_block(
1166        &self,
1167        tx_digest: &TransactionDigest,
1168    ) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
1169        self.perpetual_tables
1170            .transactions
1171            .get(tx_digest)
1172            .map(|v| v.map(|v| v.into()))
1173    }
1174
1175    /// This function reads the DB directly to get the system state object.
1176    /// If reconfiguration is happening at the same time, there is no guarantee whether we would be getting
1177    /// the old or the new system state object.
1178    /// Hence this function should only be called during RPC reads where data race is not a major concern.
1179    /// In general we should avoid this as much as possible.
1180    /// If the intent is for testing, you can use AuthorityState:: get_sui_system_state_object_for_testing.
1181    pub fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1182        get_sui_system_state(self.perpetual_tables.as_ref())
1183    }
1184
1185    pub fn expensive_check_sui_conservation<T>(
1186        self: &Arc<Self>,
1187        type_layout_store: T,
1188        old_epoch_store: &AuthorityPerEpochStore,
1189    ) -> SuiResult
1190    where
1191        T: TypeLayoutStore + Send + Copy,
1192    {
1193        if !self.enable_epoch_sui_conservation_check {
1194            return Ok(());
1195        }
1196
1197        let executor = old_epoch_store.executor();
1198        info!("Starting SUI conservation check. This may take a while..");
1199        let cur_time = Instant::now();
1200        let mut pending_objects = vec![];
1201        let mut count = 0;
1202        let mut size = 0;
1203        let (mut total_sui, mut total_storage_rebate) = thread::scope(|s| {
1204            let pending_tasks = FuturesUnordered::new();
1205            for o in self.iter_live_object_set(false) {
1206                match o {
1207                    LiveObject::Normal(object) => {
1208                        size += object.object_size_for_gas_metering();
1209                        count += 1;
1210                        pending_objects.push(object);
1211                        if count % 1_000_000 == 0 {
1212                            let mut task_objects = vec![];
1213                            mem::swap(&mut pending_objects, &mut task_objects);
1214                            pending_tasks.push(s.spawn(move || {
1215                                let mut layout_resolver =
1216                                    executor.type_layout_resolver(Box::new(type_layout_store));
1217                                let mut total_storage_rebate = 0;
1218                                let mut total_sui = 0;
1219                                for object in task_objects {
1220                                    total_storage_rebate += object.storage_rebate;
1221                                    // get_total_sui includes storage rebate, however all storage rebate is
1222                                    // also stored in the storage fund, so we need to subtract it here.
1223                                    total_sui +=
1224                                        object.get_total_sui(layout_resolver.as_mut()).unwrap()
1225                                            - object.storage_rebate;
1226                                }
1227                                if count % 50_000_000 == 0 {
1228                                    info!("Processed {} objects", count);
1229                                }
1230                                (total_sui, total_storage_rebate)
1231                            }));
1232                        }
1233                    }
1234                    LiveObject::Wrapped(_) => {
1235                        unreachable!("Explicitly asked to not include wrapped tombstones")
1236                    }
1237                }
1238            }
1239            pending_tasks.into_iter().fold((0, 0), |init, result| {
1240                let result = result.join().unwrap();
1241                (init.0 + result.0, init.1 + result.1)
1242            })
1243        });
1244        let mut layout_resolver = executor.type_layout_resolver(Box::new(type_layout_store));
1245        for object in pending_objects {
1246            total_storage_rebate += object.storage_rebate;
1247            total_sui +=
1248                object.get_total_sui(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
1249        }
1250        info!(
1251            "Scanned {} live objects, took {:?}",
1252            count,
1253            cur_time.elapsed()
1254        );
1255        self.metrics
1256            .sui_conservation_live_object_count
1257            .set(count as i64);
1258        self.metrics
1259            .sui_conservation_live_object_size
1260            .set(size as i64);
1261        self.metrics
1262            .sui_conservation_check_latency
1263            .set(cur_time.elapsed().as_secs() as i64);
1264
1265        // It is safe to call this function because we are in the middle of reconfiguration.
1266        let system_state = self
1267            .get_sui_system_state_object_unsafe()
1268            .expect("Reading sui system state object cannot fail")
1269            .into_sui_system_state_summary();
1270        let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
1271        info!(
1272            "Total SUI amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
1273            total_sui, storage_fund_balance, total_storage_rebate, system_state.epoch
1274        );
1275
1276        let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
1277        self.metrics
1278            .sui_conservation_storage_fund
1279            .set(storage_fund_balance as i64);
1280        self.metrics
1281            .sui_conservation_storage_fund_imbalance
1282            .set(imbalance);
1283        self.metrics
1284            .sui_conservation_imbalance
1285            .set((total_sui as i128 - TOTAL_SUPPLY_MIST as i128) as i64);
1286
1287        if let Some(expected_imbalance) = self
1288            .perpetual_tables
1289            .expected_storage_fund_imbalance
1290            .get(&())
1291            .expect("DB read cannot fail")
1292        {
1293            fp_ensure!(
1294                imbalance == expected_imbalance,
1295                SuiError::from(
1296                    format!(
1297                        "Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
1298                        system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
1299                    ).as_str()
1300                )
1301            );
1302        } else {
1303            self.perpetual_tables
1304                .expected_storage_fund_imbalance
1305                .insert(&(), &imbalance)
1306                .expect("DB write cannot fail");
1307        }
1308
1309        if let Some(expected_sui) = self
1310            .perpetual_tables
1311            .expected_network_sui_amount
1312            .get(&())
1313            .expect("DB read cannot fail")
1314        {
1315            fp_ensure!(
1316                total_sui == expected_sui,
1317                SuiError::from(
1318                    format!(
1319                        "Inconsistent state detected at epoch {}: total sui: {}, expecting {}",
1320                        system_state.epoch, total_sui, expected_sui
1321                    )
1322                    .as_str()
1323                )
1324            );
1325        } else {
1326            self.perpetual_tables
1327                .expected_network_sui_amount
1328                .insert(&(), &total_sui)
1329                .expect("DB write cannot fail");
1330        }
1331
1332        Ok(())
1333    }
1334
1335    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
1336    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
1337    #[instrument(level = "error", skip_all)]
1338    pub fn maybe_reaccumulate_state_hash(
1339        &self,
1340        cur_epoch_store: &AuthorityPerEpochStore,
1341        new_protocol_version: ProtocolVersion,
1342    ) {
1343        let old_simplified_unwrap_then_delete = cur_epoch_store
1344            .protocol_config()
1345            .simplified_unwrap_then_delete();
1346        let new_simplified_unwrap_then_delete =
1347            ProtocolConfig::get_for_version(new_protocol_version, cur_epoch_store.get_chain())
1348                .simplified_unwrap_then_delete();
1349        // If in the new epoch the simplified_unwrap_then_delete is enabled for the first time,
1350        // we re-accumulate state root.
1351        let should_reaccumulate =
1352            !old_simplified_unwrap_then_delete && new_simplified_unwrap_then_delete;
1353        if !should_reaccumulate {
1354            return;
1355        }
1356        info!(
1357            "[Re-accumulate] simplified_unwrap_then_delete is enabled in the new protocol version, re-accumulating state hash"
1358        );
1359        let cur_time = Instant::now();
1360        std::thread::scope(|s| {
1361            let pending_tasks = FuturesUnordered::new();
1362            // Shard the object IDs into different ranges so that we can process them in parallel.
1363            // We divide the range into 2^BITS number of ranges. To do so we use the highest BITS bits
1364            // to mark the starting/ending point of the range. For example, when BITS = 5, we
1365            // divide the range into 32 ranges, and the first few ranges are:
1366            // 00000000_.... to 00000111_....
1367            // 00001000_.... to 00001111_....
1368            // 00010000_.... to 00010111_....
1369            // and etc.
1370            const BITS: u8 = 5;
1371            for index in 0u8..(1 << BITS) {
1372                pending_tasks.push(s.spawn(move || {
1373                    let mut id_bytes = [0; ObjectID::LENGTH];
1374                    id_bytes[0] = index << (8 - BITS);
1375                    let start_id = ObjectID::new(id_bytes);
1376
1377                    id_bytes[0] |= (1 << (8 - BITS)) - 1;
1378                    for element in id_bytes.iter_mut().skip(1) {
1379                        *element = u8::MAX;
1380                    }
1381                    let end_id = ObjectID::new(id_bytes);
1382
1383                    info!(
1384                        "[Re-accumulate] Scanning object ID range {:?}..{:?}",
1385                        start_id, end_id
1386                    );
1387                    let mut prev = (
1388                        ObjectKey::min_for_id(&ObjectID::ZERO),
1389                        StoreObjectWrapper::V1(StoreObject::Deleted),
1390                    );
1391                    let mut object_scanned: u64 = 0;
1392                    let mut wrapped_objects_to_remove = vec![];
1393                    for db_result in self.perpetual_tables.objects.safe_range_iter(
1394                        ObjectKey::min_for_id(&start_id)..=ObjectKey::max_for_id(&end_id),
1395                    ) {
1396                        match db_result {
1397                            Ok((object_key, object)) => {
1398                                object_scanned += 1;
1399                                if object_scanned.is_multiple_of(100000) {
1400                                    info!(
1401                                        "[Re-accumulate] Task {}: object scanned: {}",
1402                                        index, object_scanned,
1403                                    );
1404                                }
1405                                if matches!(prev.1.inner(), StoreObject::Wrapped)
1406                                    && object_key.0 != prev.0.0
1407                                {
1408                                    wrapped_objects_to_remove
1409                                        .push(WrappedObject::new(prev.0.0, prev.0.1));
1410                                }
1411
1412                                prev = (object_key, object);
1413                            }
1414                            Err(err) => {
1415                                warn!("Object iterator encounter RocksDB error {:?}", err);
1416                                return Err(err);
1417                            }
1418                        }
1419                    }
1420                    if matches!(prev.1.inner(), StoreObject::Wrapped) {
1421                        wrapped_objects_to_remove.push(WrappedObject::new(prev.0.0, prev.0.1));
1422                    }
1423                    info!(
1424                        "[Re-accumulate] Task {}: object scanned: {}, wrapped objects: {}",
1425                        index,
1426                        object_scanned,
1427                        wrapped_objects_to_remove.len(),
1428                    );
1429                    Ok((wrapped_objects_to_remove, object_scanned))
1430                }));
1431            }
1432            let (last_checkpoint_of_epoch, cur_accumulator) = self
1433                .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
1434                .expect("read cannot fail")
1435                .expect("accumulator must exist");
1436            let (accumulator, total_objects_scanned, total_wrapped_objects) =
1437                pending_tasks.into_iter().fold(
1438                    (cur_accumulator, 0u64, 0usize),
1439                    |(mut accumulator, total_objects_scanned, total_wrapped_objects), task| {
1440                        let (wrapped_objects_to_remove, object_scanned) =
1441                            task.join().unwrap().unwrap();
1442                        accumulator.remove_all(
1443                            wrapped_objects_to_remove
1444                                .iter()
1445                                .map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
1446                                .collect::<Vec<Vec<u8>>>(),
1447                        );
1448                        (
1449                            accumulator,
1450                            total_objects_scanned + object_scanned,
1451                            total_wrapped_objects + wrapped_objects_to_remove.len(),
1452                        )
1453                    },
1454                );
1455            info!(
1456                "[Re-accumulate] Total objects scanned: {}, total wrapped objects: {}",
1457                total_objects_scanned, total_wrapped_objects,
1458            );
1459            info!(
1460                "[Re-accumulate] New accumulator value: {:?}",
1461                accumulator.digest()
1462            );
1463            self.insert_state_hash_for_epoch(
1464                cur_epoch_store.epoch(),
1465                &last_checkpoint_of_epoch,
1466                &accumulator,
1467            )
1468            .unwrap();
1469        });
1470        info!(
1471            "[Re-accumulate] Re-accumulating took {}seconds",
1472            cur_time.elapsed().as_secs()
1473        );
1474    }
1475
1476    pub async fn prune_objects_and_compact_for_testing(
1477        &self,
1478        checkpoint_store: &Arc<CheckpointStore>,
1479        rpc_index: Option<&RpcIndexStore>,
1480    ) {
1481        let pruning_config = AuthorityStorePruningConfig {
1482            num_epochs_to_retain: 0,
1483            ..Default::default()
1484        };
1485        let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
1486            &self.perpetual_tables,
1487            checkpoint_store,
1488            rpc_index,
1489            None,
1490            pruning_config,
1491            AuthorityStorePruningMetrics::new_for_test(),
1492            EPOCH_DURATION_MS_FOR_TESTING,
1493        )
1494        .await;
1495        let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
1496    }
1497
1498    pub fn remove_executed_effects_for_testing(
1499        &self,
1500        tx_digest: &TransactionDigest,
1501    ) -> anyhow::Result<()> {
1502        let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
1503        if let Some(effects_digest) = effects_digest {
1504            self.perpetual_tables.executed_effects.remove(tx_digest)?;
1505            self.perpetual_tables.effects.remove(&effects_digest)?;
1506        }
1507        Ok(())
1508    }
1509
1510    #[cfg(test)]
1511    pub async fn prune_objects_immediately_for_testing(
1512        &self,
1513        transaction_effects: Vec<TransactionEffects>,
1514    ) -> anyhow::Result<()> {
1515        let mut wb = self.perpetual_tables.objects.batch();
1516
1517        let mut object_keys_to_prune = vec![];
1518        for effects in &transaction_effects {
1519            for (object_id, seq_number) in effects.modified_at_versions() {
1520                info!("Pruning object {:?} version {:?}", object_id, seq_number);
1521                object_keys_to_prune.push(ObjectKey(object_id, seq_number));
1522            }
1523        }
1524
1525        wb.delete_batch(
1526            &self.perpetual_tables.objects,
1527            object_keys_to_prune.into_iter(),
1528        )?;
1529        wb.write()?;
1530        Ok(())
1531    }
1532
1533    // Counts the number of versions exist in object store for `object_id`. This includes tombstone.
1534    #[cfg(msim)]
1535    pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
1536        self.perpetual_tables
1537            .objects
1538            .safe_iter_with_bounds(
1539                Some(ObjectKey(object_id, VersionNumber::MIN)),
1540                Some(ObjectKey(object_id, VersionNumber::MAX)),
1541            )
1542            .collect::<Result<Vec<_>, _>>()
1543            .unwrap()
1544            .len()
1545    }
1546}
1547
1548impl GlobalStateHashStore for AuthorityStore {
1549    fn get_object_ref_prior_to_key_deprecated(
1550        &self,
1551        object_id: &ObjectID,
1552        version: VersionNumber,
1553    ) -> SuiResult<Option<ObjectRef>> {
1554        self.get_object_ref_prior_to_key(object_id, version)
1555    }
1556
1557    fn get_root_state_hash_for_epoch(
1558        &self,
1559        epoch: EpochId,
1560    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
1561        self.perpetual_tables
1562            .root_state_hash_by_epoch
1563            .get(&epoch)
1564            .map_err(Into::into)
1565    }
1566
1567    fn get_root_state_hash_for_highest_epoch(
1568        &self,
1569    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
1570        Ok(self
1571            .perpetual_tables
1572            .root_state_hash_by_epoch
1573            .reversed_safe_iter_with_bounds(None, None)?
1574            .next()
1575            .transpose()?)
1576    }
1577
1578    fn insert_state_hash_for_epoch(
1579        &self,
1580        epoch: EpochId,
1581        last_checkpoint_of_epoch: &CheckpointSequenceNumber,
1582        acc: &GlobalStateHash,
1583    ) -> SuiResult {
1584        self.perpetual_tables
1585            .root_state_hash_by_epoch
1586            .insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
1587        self.root_state_notify_read
1588            .notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
1589
1590        Ok(())
1591    }
1592
1593    fn iter_live_object_set(
1594        &self,
1595        include_wrapped_object: bool,
1596    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1597        Box::new(
1598            self.perpetual_tables
1599                .iter_live_object_set(include_wrapped_object),
1600        )
1601    }
1602}
1603
1604impl ObjectStore for AuthorityStore {
1605    /// Read an object and return it, or Ok(None) if the object was not found.
1606    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
1607        self.perpetual_tables.as_ref().get_object(object_id)
1608    }
1609
1610    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
1611        self.perpetual_tables.get_object_by_key(object_id, version)
1612    }
1613}
1614
1615/// A wrapper to make Orphan Rule happy
1616pub struct ResolverWrapper {
1617    pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1618    pub metrics: Arc<ResolverMetrics>,
1619}
1620
1621impl ResolverWrapper {
1622    pub fn new(
1623        resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1624        metrics: Arc<ResolverMetrics>,
1625    ) -> Self {
1626        metrics.module_cache_size.set(0);
1627        ResolverWrapper { resolver, metrics }
1628    }
1629
1630    fn inc_cache_size_gauge(&self) {
1631        // reset the gauge after a restart of the cache
1632        let current = self.metrics.module_cache_size.get();
1633        self.metrics.module_cache_size.set(current + 1);
1634    }
1635}
1636
1637impl ModuleResolver for ResolverWrapper {
1638    type Error = SuiError;
1639    fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
1640        self.inc_cache_size_gauge();
1641        get_module(&*self.resolver, module_id)
1642    }
1643}
1644
1645pub enum UpdateType {
1646    Transaction(TransactionEffectsDigest),
1647    Genesis,
1648}
1649
1650pub type SuiLockResult = SuiResult<ObjectLockStatus>;
1651
1652#[derive(Debug, PartialEq, Eq)]
1653pub enum ObjectLockStatus {
1654    Initialized,
1655    LockedToTx { locked_by_tx: LockDetailsDeprecated },
1656    LockedAtDifferentVersion { locked_ref: ObjectRef },
1657}
1658
1659#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1660pub enum LockDetailsWrapperDeprecated {
1661    V1(LockDetailsV1Deprecated),
1662}
1663
1664impl LockDetailsWrapperDeprecated {
1665    pub fn migrate(self) -> Self {
1666        // TODO: when there are multiple versions, we must iteratively migrate from version N to
1667        // N+1 until we arrive at the latest version
1668        self
1669    }
1670
1671    // Always returns the most recent version. Older versions are migrated to the latest version at
1672    // read time, so there is never a need to access older versions.
1673    pub fn inner(&self) -> &LockDetailsDeprecated {
1674        match self {
1675            Self::V1(v1) => v1,
1676
1677            // can remove #[allow] when there are multiple versions
1678            #[allow(unreachable_patterns)]
1679            _ => panic!("lock details should have been migrated to latest version at read time"),
1680        }
1681    }
1682    pub fn into_inner(self) -> LockDetailsDeprecated {
1683        match self {
1684            Self::V1(v1) => v1,
1685
1686            // can remove #[allow] when there are multiple versions
1687            #[allow(unreachable_patterns)]
1688            _ => panic!("lock details should have been migrated to latest version at read time"),
1689        }
1690    }
1691}
1692
1693#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1694pub struct LockDetailsV1Deprecated {
1695    pub epoch: EpochId,
1696    pub tx_digest: TransactionDigest,
1697}
1698
1699pub type LockDetailsDeprecated = LockDetailsV1Deprecated;
1700
1701impl From<LockDetailsDeprecated> for LockDetailsWrapperDeprecated {
1702    fn from(details: LockDetailsDeprecated) -> Self {
1703        // always use latest version.
1704        LockDetailsWrapperDeprecated::V1(details)
1705    }
1706}