sui_core/authority/
authority_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::{iter, mem, thread};
6
7use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
8use crate::authority::authority_store_pruner::{
9    AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
10};
11use crate::authority::authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object};
12use crate::authority::epoch_marker_key::EpochMarkerKey;
13use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
14use crate::global_state_hasher::GlobalStateHashStore;
15use crate::rpc_index::RpcIndexStore;
16use crate::transaction_outputs::TransactionOutputs;
17use either::Either;
18use fastcrypto::hash::{HashFunction, MultisetHash, Sha3_256};
19use futures::stream::FuturesUnordered;
20use move_core_types::account_address::AccountAddress;
21use move_core_types::resolver::{ModuleResolver, SerializedPackage};
22use serde::{Deserialize, Serialize};
23use sui_config::node::AuthorityStorePruningConfig;
24use sui_macros::fail_point_arg;
25use sui_types::error::{SuiErrorKind, UserInputError};
26use sui_types::execution::TypeLayoutStore;
27use sui_types::global_state_hash::GlobalStateHash;
28use sui_types::message_envelope::Message;
29use sui_types::storage::{
30    BackingPackageStore, FullObjectKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore,
31    get_module, get_package,
32};
33use sui_types::sui_system_state::get_sui_system_state;
34use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure};
35use tokio::time::Instant;
36use tracing::{debug, info, trace};
37use typed_store::traits::Map;
38use typed_store::{
39    TypedStoreError,
40    rocks::{DBBatch, DBMap},
41};
42
43use super::authority_store_tables::LiveObject;
44use super::{authority_store_tables::AuthorityPerpetualTables, *};
45use mysten_common::ZipDebugEqIteratorExt;
46use mysten_common::sync::notify_read::NotifyRead;
47use sui_types::effects::{TransactionEffects, TransactionEvents};
48use sui_types::gas_coin::TOTAL_SUPPLY_MIST;
49
50struct AuthorityStoreMetrics {
51    sui_conservation_check_latency: IntGauge,
52    sui_conservation_live_object_count: IntGauge,
53    sui_conservation_live_object_size: IntGauge,
54    sui_conservation_imbalance: IntGauge,
55    sui_conservation_storage_fund: IntGauge,
56    sui_conservation_storage_fund_imbalance: IntGauge,
57    epoch_flags: IntGaugeVec,
58}
59
60impl AuthorityStoreMetrics {
61    pub fn new(registry: &Registry) -> Self {
62        Self {
63            sui_conservation_check_latency: register_int_gauge_with_registry!(
64                "sui_conservation_check_latency",
65                "Number of seconds took to scan all live objects in the store for SUI conservation check",
66                registry,
67            ).unwrap(),
68            sui_conservation_live_object_count: register_int_gauge_with_registry!(
69                "sui_conservation_live_object_count",
70                "Number of live objects in the store",
71                registry,
72            ).unwrap(),
73            sui_conservation_live_object_size: register_int_gauge_with_registry!(
74                "sui_conservation_live_object_size",
75                "Size in bytes of live objects in the store",
76                registry,
77            ).unwrap(),
78            sui_conservation_imbalance: register_int_gauge_with_registry!(
79                "sui_conservation_imbalance",
80                "Total amount of SUI in the network - 10B * 10^9. This delta shows the amount of imbalance",
81                registry,
82            ).unwrap(),
83            sui_conservation_storage_fund: register_int_gauge_with_registry!(
84                "sui_conservation_storage_fund",
85                "Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
86                registry,
87            ).unwrap(),
88            sui_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
89                "sui_conservation_storage_fund_imbalance",
90                "Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
91                registry,
92            ).unwrap(),
93            epoch_flags: register_int_gauge_vec_with_registry!(
94                "epoch_flags",
95                "Local flags of the currently running epoch",
96                &["flag"],
97                registry,
98            ).unwrap(),
99        }
100    }
101}
102
103/// ALL_OBJ_VER determines whether we want to store all past
104/// versions of every object in the store. Authority doesn't store
105/// them, but other entities such as replicas will.
106/// S is a template on Authority signature state. This allows SuiDataStore to be used on either
107/// authorities or non-authorities. Specifically, when storing transactions and effects,
108/// S allows SuiDataStore to either store the authority signed version or unsigned version.
109pub struct AuthorityStore {
110    pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
111
112    pub(crate) root_state_notify_read:
113        NotifyRead<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
114
115    /// Whether to enable expensive SUI conservation check at epoch boundaries.
116    enable_epoch_sui_conservation_check: bool,
117
118    metrics: AuthorityStoreMetrics,
119}
120
121pub type ExecutionLockReadGuard<'a> = tokio::sync::RwLockReadGuard<'a, EpochId>;
122pub type ExecutionLockWriteGuard<'a> = tokio::sync::RwLockWriteGuard<'a, EpochId>;
123
124impl AuthorityStore {
125    /// Open an authority store by directory path.
126    /// If the store is empty, initialize it using genesis.
127    pub async fn open(
128        perpetual_tables: Arc<AuthorityPerpetualTables>,
129        genesis: &Genesis,
130        config: &NodeConfig,
131        registry: &Registry,
132    ) -> SuiResult<Arc<Self>> {
133        let enable_epoch_sui_conservation_check = config
134            .expensive_safety_check_config
135            .enable_epoch_sui_conservation_check();
136
137        let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
138            info!("Creating new epoch start config from genesis");
139
140            #[allow(unused_mut)]
141            let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
142            fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
143                info!("Setting initial epoch flags to {:?}", flags);
144                initial_epoch_flags = flags;
145            });
146
147            let epoch_start_configuration = EpochStartConfiguration::new(
148                genesis.sui_system_object().into_epoch_start_state(),
149                *genesis.checkpoint().digest(),
150                &genesis.objects(),
151                initial_epoch_flags,
152            )?;
153            perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
154            epoch_start_configuration
155        } else {
156            info!("Loading epoch start config from DB");
157            perpetual_tables
158                .epoch_start_configuration
159                .get(&())?
160                .expect("Epoch start configuration must be set in non-empty DB")
161        };
162        let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
163        info!("Epoch start config: {:?}", epoch_start_configuration);
164        info!("Cur epoch: {:?}", cur_epoch);
165        let this = Self::open_inner(
166            genesis,
167            perpetual_tables,
168            enable_epoch_sui_conservation_check,
169            registry,
170        )
171        .await?;
172        this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
173        Ok(this)
174    }
175
176    pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
177        for flag in old {
178            self.metrics
179                .epoch_flags
180                .with_label_values(&[&flag.to_string()])
181                .set(0);
182        }
183        for flag in new {
184            self.metrics
185                .epoch_flags
186                .with_label_values(&[&flag.to_string()])
187                .set(1);
188        }
189    }
190
191    // NB: This must only be called at time of reconfiguration. We take the execution lock write
192    // guard as an argument to ensure that this is the case.
193    pub fn clear_object_per_epoch_marker_table(
194        &self,
195        _execution_guard: &ExecutionLockWriteGuard<'_>,
196    ) -> SuiResult<()> {
197        // We can safely delete all entries in the per epoch marker table since this is only called
198        // at epoch boundaries (during reconfiguration). Therefore any entries that currently
199        // exist can be removed. Because of this we can use the `schedule_delete_all` method.
200        self.perpetual_tables
201            .object_per_epoch_marker_table
202            .schedule_delete_all()?;
203        #[cfg(not(tidehunter))]
204        {
205            self.perpetual_tables
206                .object_per_epoch_marker_table_v2
207                .schedule_delete_all()?;
208        }
209        #[cfg(tidehunter)]
210        {
211            self.perpetual_tables
212                .object_per_epoch_marker_table_v2
213                .drop_cells_in_range_raw(&EpochMarkerKey::MIN_KEY, &EpochMarkerKey::MAX_KEY)?;
214        }
215        Ok(())
216    }
217
218    pub async fn open_with_committee_for_testing(
219        perpetual_tables: Arc<AuthorityPerpetualTables>,
220        committee: &Committee,
221        genesis: &Genesis,
222    ) -> SuiResult<Arc<Self>> {
223        // TODO: Since we always start at genesis, the committee should be technically the same
224        // as the genesis committee.
225        assert_eq!(committee.epoch, 0);
226        Self::open_inner(genesis, perpetual_tables, true, &Registry::new()).await
227    }
228
229    async fn open_inner(
230        genesis: &Genesis,
231        perpetual_tables: Arc<AuthorityPerpetualTables>,
232        enable_epoch_sui_conservation_check: bool,
233        registry: &Registry,
234    ) -> SuiResult<Arc<Self>> {
235        let store = Arc::new(Self {
236            perpetual_tables,
237            root_state_notify_read: NotifyRead::<
238                EpochId,
239                (CheckpointSequenceNumber, GlobalStateHash),
240            >::new(),
241            enable_epoch_sui_conservation_check,
242            metrics: AuthorityStoreMetrics::new(registry),
243        });
244        // Only initialize an empty database.
245        if store
246            .database_is_empty()
247            .expect("Database read should not fail at init.")
248        {
249            store
250                .bulk_insert_genesis_objects(genesis.objects())
251                .expect("Cannot bulk insert genesis objects");
252
253            // insert txn and effects of genesis
254            let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
255
256            store
257                .perpetual_tables
258                .transactions
259                .insert(transaction.digest(), transaction.serializable_ref())
260                .unwrap();
261
262            store
263                .perpetual_tables
264                .effects
265                .insert(&genesis.effects().digest(), genesis.effects())
266                .unwrap();
267            // We don't insert the effects to executed_effects yet because the genesis tx hasn't but will be executed.
268            // This is important for fullnodes to be able to generate indexing data right now.
269
270            if genesis.effects().events_digest().is_some() {
271                store
272                    .perpetual_tables
273                    .events_2
274                    .insert(transaction.digest(), genesis.events())
275                    .unwrap();
276            }
277        }
278
279        Ok(store)
280    }
281
282    /// Open authority store without any operations that require
283    /// genesis, such as constructing EpochStartConfiguration
284    /// or inserting genesis objects.
285    pub fn open_no_genesis(
286        perpetual_tables: Arc<AuthorityPerpetualTables>,
287        enable_epoch_sui_conservation_check: bool,
288        registry: &Registry,
289    ) -> SuiResult<Arc<Self>> {
290        let store = Arc::new(Self {
291            perpetual_tables,
292            root_state_notify_read: NotifyRead::<
293                EpochId,
294                (CheckpointSequenceNumber, GlobalStateHash),
295            >::new(),
296            enable_epoch_sui_conservation_check,
297            metrics: AuthorityStoreMetrics::new(registry),
298        });
299        Ok(store)
300    }
301
302    pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
303        self.perpetual_tables.get_recovery_epoch_at_restart()
304    }
305
306    pub fn get_effects(
307        &self,
308        effects_digest: &TransactionEffectsDigest,
309    ) -> SuiResult<Option<TransactionEffects>> {
310        Ok(self.perpetual_tables.effects.get(effects_digest)?)
311    }
312
313    /// Returns true if we have an effects structure for this transaction digest
314    pub fn effects_exists(&self, effects_digest: &TransactionEffectsDigest) -> SuiResult<bool> {
315        self.perpetual_tables
316            .effects
317            .contains_key(effects_digest)
318            .map_err(|e| e.into())
319    }
320
321    pub fn get_events(
322        &self,
323        digest: &TransactionDigest,
324    ) -> Result<Option<TransactionEvents>, TypedStoreError> {
325        self.perpetual_tables.events_2.get(digest)
326    }
327
328    pub fn multi_get_events(
329        &self,
330        event_digests: &[TransactionDigest],
331    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
332        Ok(event_digests
333            .iter()
334            .map(|digest| self.get_events(digest))
335            .collect::<Result<Vec<_>, _>>()?)
336    }
337
338    pub fn get_unchanged_loaded_runtime_objects(
339        &self,
340        digest: &TransactionDigest,
341    ) -> Result<Option<Vec<ObjectKey>>, TypedStoreError> {
342        self.perpetual_tables
343            .unchanged_loaded_runtime_objects
344            .get(digest)
345    }
346
347    pub fn multi_get_effects<'a>(
348        &self,
349        effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
350    ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
351        self.perpetual_tables.effects.multi_get(effects_digests)
352    }
353
354    pub fn get_executed_effects(
355        &self,
356        tx_digest: &TransactionDigest,
357    ) -> Result<Option<TransactionEffects>, TypedStoreError> {
358        let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
359        match effects_digest {
360            Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
361            None => Ok(None),
362        }
363    }
364
365    /// Given a list of transaction digests, returns a list of the corresponding effects only if they have been
366    /// executed. For transactions that have not been executed, None is returned.
367    pub fn multi_get_executed_effects_digests(
368        &self,
369        digests: &[TransactionDigest],
370    ) -> Result<Vec<Option<TransactionEffectsDigest>>, TypedStoreError> {
371        self.perpetual_tables.executed_effects.multi_get(digests)
372    }
373
374    /// Given a list of transaction digests, returns a list of the corresponding effects only if they have been
375    /// executed. For transactions that have not been executed, None is returned.
376    pub fn multi_get_executed_effects(
377        &self,
378        digests: &[TransactionDigest],
379    ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
380        let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
381        let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
382        let mut tx_to_effects_map = effects
383            .into_iter()
384            .flatten()
385            .map(|effects| (*effects.transaction_digest(), effects))
386            .collect::<HashMap<_, _>>();
387        Ok(digests
388            .iter()
389            .map(|digest| tx_to_effects_map.remove(digest))
390            .collect())
391    }
392
393    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> SuiResult<bool> {
394        Ok(self
395            .perpetual_tables
396            .executed_effects
397            .contains_key(digest)?)
398    }
399
400    pub fn get_marker_value(
401        &self,
402        object_key: FullObjectKey,
403        epoch_id: EpochId,
404    ) -> SuiResult<Option<MarkerValue>> {
405        Ok(self
406            .perpetual_tables
407            .object_per_epoch_marker_table_v2
408            .get(&EpochMarkerKey(epoch_id, object_key))?)
409    }
410
411    pub fn get_latest_marker(
412        &self,
413        object_id: FullObjectID,
414        epoch_id: EpochId,
415    ) -> SuiResult<Option<(SequenceNumber, MarkerValue)>> {
416        let min_key = EpochMarkerKey(epoch_id, FullObjectKey::min_for_id(&object_id));
417        let max_key = EpochMarkerKey(epoch_id, FullObjectKey::max_for_id(&object_id));
418
419        let marker_entry = self
420            .perpetual_tables
421            .object_per_epoch_marker_table_v2
422            .reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
423            .next();
424        match marker_entry {
425            Some(Ok((EpochMarkerKey(epoch, key), marker))) => {
426                // because of the iterator bounds these cannot fail
427                assert_eq!(epoch, epoch_id);
428                assert_eq!(key.id(), object_id);
429                Ok(Some((key.version(), marker)))
430            }
431            Some(Err(e)) => Err(e.into()),
432            None => Ok(None),
433        }
434    }
435
436    /// Returns future containing the state hash for the given epoch
437    /// once available
438    pub async fn notify_read_root_state_hash(
439        &self,
440        epoch: EpochId,
441    ) -> SuiResult<(CheckpointSequenceNumber, GlobalStateHash)> {
442        // We need to register waiters _before_ reading from the database to avoid race conditions
443        let registration = self.root_state_notify_read.register_one(&epoch);
444        let hash = self.perpetual_tables.root_state_hash_by_epoch.get(&epoch)?;
445
446        let result = match hash {
447            // Note that Some() clause also drops registration that is already fulfilled
448            Some(ready) => Either::Left(futures::future::ready(ready)),
449            None => Either::Right(registration),
450        }
451        .await;
452
453        Ok(result)
454    }
455
456    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
457    pub fn deprecated_insert_finalized_transactions(
458        &self,
459        digests: &[TransactionDigest],
460        epoch: EpochId,
461        sequence: CheckpointSequenceNumber,
462    ) -> SuiResult {
463        let mut batch = self
464            .perpetual_tables
465            .executed_transactions_to_checkpoint
466            .batch();
467        batch.insert_batch(
468            &self.perpetual_tables.executed_transactions_to_checkpoint,
469            digests.iter().map(|d| (*d, (epoch, sequence))),
470        )?;
471        batch.write()?;
472        trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
473        Ok(())
474    }
475
476    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
477    pub fn deprecated_get_transaction_checkpoint(
478        &self,
479        digest: &TransactionDigest,
480    ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
481        Ok(self
482            .perpetual_tables
483            .executed_transactions_to_checkpoint
484            .get(digest)?)
485    }
486
487    // DEPRECATED -- use function of same name in AuthorityPerEpochStore
488    pub fn deprecated_multi_get_transaction_checkpoint(
489        &self,
490        digests: &[TransactionDigest],
491    ) -> SuiResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
492        Ok(self
493            .perpetual_tables
494            .executed_transactions_to_checkpoint
495            .multi_get(digests)?
496            .into_iter()
497            .collect())
498    }
499
500    /// Returns true if there are no objects in the database
501    pub fn database_is_empty(&self) -> SuiResult<bool> {
502        self.perpetual_tables.database_is_empty()
503    }
504
505    pub fn object_exists_by_key(
506        &self,
507        object_id: &ObjectID,
508        version: VersionNumber,
509    ) -> SuiResult<bool> {
510        Ok(self
511            .perpetual_tables
512            .objects
513            .contains_key(&ObjectKey(*object_id, version))?)
514    }
515
516    pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<bool>> {
517        Ok(self
518            .perpetual_tables
519            .objects
520            .multi_contains_keys(object_keys.to_vec())?
521            .into_iter()
522            .collect())
523    }
524
525    fn get_object_ref_prior_to_key(
526        &self,
527        object_id: &ObjectID,
528        version: VersionNumber,
529    ) -> Result<Option<ObjectRef>, SuiError> {
530        let Some(prior_version) = version.one_before() else {
531            return Ok(None);
532        };
533        let mut iterator = self
534            .perpetual_tables
535            .objects
536            .reversed_safe_iter_with_bounds(
537                Some(ObjectKey::min_for_id(object_id)),
538                Some(ObjectKey(*object_id, prior_version)),
539            )?;
540
541        if let Some((object_key, value)) = iterator.next().transpose()?
542            && object_key.0 == *object_id
543        {
544            return Ok(Some(
545                self.perpetual_tables.object_reference(&object_key, value)?,
546            ));
547        }
548        Ok(None)
549    }
550
551    pub fn multi_get_objects_by_key(
552        &self,
553        object_keys: &[ObjectKey],
554    ) -> Result<Vec<Option<Object>>, SuiError> {
555        let wrappers = self
556            .perpetual_tables
557            .objects
558            .multi_get(object_keys.to_vec())?;
559        let mut ret = vec![];
560
561        for (idx, w) in wrappers.into_iter().enumerate() {
562            ret.push(
563                w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
564                    .transpose()?
565                    .flatten(),
566            );
567        }
568        Ok(ret)
569    }
570
571    /// Get many objects
572    pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, SuiError> {
573        let mut result = Vec::new();
574        for id in objects {
575            result.push(self.get_object(id));
576        }
577        Ok(result)
578    }
579
580    // Methods to mutate the store
581
582    /// Insert a genesis object.
583    /// TODO: delete this method entirely (still used by authority_tests.rs)
584    pub(crate) fn insert_genesis_object(&self, object: Object) -> SuiResult {
585        // We only side load objects with a genesis parent transaction.
586        debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
587        let object_ref = object.compute_object_reference();
588        self.insert_object_direct(object_ref, &object)
589    }
590
591    /// Insert an object directly into the store, and also update relevant tables
592    /// NOTE: does not handle transaction lock.
593    /// This is used to insert genesis objects
594    fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> SuiResult {
595        let mut write_batch = self.perpetual_tables.objects.batch();
596
597        // Insert object
598        let store_object = get_store_object(object.clone());
599        write_batch.insert_batch(
600            &self.perpetual_tables.objects,
601            std::iter::once((ObjectKey::from(object_ref), store_object)),
602        )?;
603
604        // Update the index
605        if object.get_single_owner().is_some() {
606            // Only initialize lock for address owned objects.
607            if !object.is_child_object() {
608                self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref], false)?;
609            }
610        }
611
612        write_batch.write()?;
613
614        Ok(())
615    }
616
617    /// This function should only be used for initializing genesis and should remain private.
618    #[instrument(level = "debug", skip_all)]
619    pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> SuiResult<()> {
620        let mut batch = self.perpetual_tables.objects.batch();
621        let ref_and_objects: Vec<_> = objects
622            .iter()
623            .map(|o| (o.compute_object_reference(), o))
624            .collect();
625
626        batch.insert_batch(
627            &self.perpetual_tables.objects,
628            ref_and_objects
629                .iter()
630                .map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))),
631        )?;
632
633        let non_child_object_refs: Vec<_> = ref_and_objects
634            .iter()
635            .filter(|(_, object)| !object.is_child_object())
636            .map(|(oref, _)| *oref)
637            .collect();
638
639        self.initialize_live_object_markers_impl(
640            &mut batch,
641            &non_child_object_refs,
642            false, // is_force_reset
643        )?;
644
645        batch.write()?;
646
647        Ok(())
648    }
649
650    pub async fn bulk_insert_live_objects(
651        perpetual_db: Arc<AuthorityPerpetualTables>,
652        objects: Vec<LiveObject>,
653        expected_sha3_digest: &[u8; 32],
654        num_parallel_chunks: usize,
655    ) -> SuiResult<()> {
656        // Verify SHA3 over the full object set before inserting.
657        let mut hasher = Sha3_256::default();
658        for object in &objects {
659            hasher.update(object.object_reference().2.inner());
660        }
661        let sha3_digest = hasher.finalize().digest;
662        if *expected_sha3_digest != sha3_digest {
663            error!(
664                "Sha does not match! expected: {:?}, actual: {:?}",
665                expected_sha3_digest, sha3_digest
666            );
667            return Err(SuiError::from("Sha does not match"));
668        }
669
670        let chunk_size = objects.len().div_ceil(num_parallel_chunks).max(1);
671        let mut remaining = objects;
672        let mut handles = Vec::new();
673        while !remaining.is_empty() {
674            let take = chunk_size.min(remaining.len());
675            let chunk: Vec<LiveObject> = remaining.drain(..take).collect();
676            let db = perpetual_db.clone();
677            handles.push(tokio::task::spawn_blocking(move || {
678                Self::insert_objects_chunk(db, chunk)
679            }));
680        }
681        for handle in handles {
682            handle.await.expect("insert task panicked")?;
683        }
684        Ok(())
685    }
686
687    fn insert_objects_chunk(
688        perpetual_db: Arc<AuthorityPerpetualTables>,
689        objects: Vec<LiveObject>,
690    ) -> SuiResult<()> {
691        let mut batch = perpetual_db.objects.batch();
692        let mut written = 0usize;
693        const MAX_BATCH_SIZE: usize = 100_000;
694        for object in objects {
695            match object {
696                LiveObject::Normal(object) => {
697                    let store_object_wrapper = get_store_object(object.clone());
698                    batch.insert_batch(
699                        &perpetual_db.objects,
700                        std::iter::once((
701                            ObjectKey::from(object.compute_object_reference()),
702                            store_object_wrapper,
703                        )),
704                    )?;
705                    if !object.is_child_object() {
706                        Self::initialize_live_object_markers(
707                            &perpetual_db.live_owned_object_markers,
708                            &mut batch,
709                            &[object.compute_object_reference()],
710                            true, // is_force_reset
711                        )?;
712                    }
713                }
714                LiveObject::Wrapped(object_key) => {
715                    batch.insert_batch(
716                        &perpetual_db.objects,
717                        std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
718                            object_key,
719                            StoreObject::Wrapped.into(),
720                        )),
721                    )?;
722                }
723            }
724            written += 1;
725            if written > MAX_BATCH_SIZE {
726                batch.write()?;
727                batch = perpetual_db.objects.batch();
728                written = 0;
729            }
730        }
731        batch.write()?;
732        Ok(())
733    }
734
735    pub fn set_epoch_start_configuration(
736        &self,
737        epoch_start_configuration: &EpochStartConfiguration,
738    ) -> SuiResult {
739        self.perpetual_tables
740            .set_epoch_start_configuration(epoch_start_configuration)?;
741        Ok(())
742    }
743
744    pub fn get_epoch_start_configuration(&self) -> SuiResult<Option<EpochStartConfiguration>> {
745        Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
746    }
747
748    /// Updates the state resulting from the execution of a certificate.
749    ///
750    /// Internally it checks that all locks for active inputs are at the correct
751    /// version, and then writes objects, certificates, parents and clean up locks atomically.
752    #[instrument(level = "debug", skip_all)]
753    pub fn build_db_batch(
754        &self,
755        epoch_id: EpochId,
756        tx_outputs: &[Arc<TransactionOutputs>],
757    ) -> SuiResult<DBBatch> {
758        let mut write_batch = self.perpetual_tables.transactions.batch();
759        for outputs in tx_outputs {
760            self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
761        }
762        // test crashing before writing the batch
763        fail_point!("crash");
764
765        trace!(
766            "built batch for committed transactions: {:?}",
767            tx_outputs
768                .iter()
769                .map(|tx| tx.transaction.digest())
770                .collect::<Vec<_>>()
771        );
772
773        // test crashing before notifying
774        fail_point!("crash");
775
776        Ok(write_batch)
777    }
778
779    fn write_one_transaction_outputs(
780        &self,
781        write_batch: &mut DBBatch,
782        epoch_id: EpochId,
783        tx_outputs: &TransactionOutputs,
784    ) -> SuiResult {
785        let TransactionOutputs {
786            transaction,
787            effects,
788            markers,
789            wrapped,
790            deleted,
791            written,
792            events,
793            unchanged_loaded_runtime_objects,
794            locks_to_delete,
795            new_locks_to_init,
796            ..
797        } = tx_outputs;
798
799        let effects_digest = effects.digest();
800        let transaction_digest = transaction.digest();
801        // effects must be inserted before the corresponding dependent entries
802        // because they carry epoch information necessary for correct pruning via relocation filters
803        write_batch
804            .insert_batch(
805                &self.perpetual_tables.effects,
806                [(effects_digest, effects.clone())],
807            )?
808            .insert_batch(
809                &self.perpetual_tables.executed_effects,
810                [(transaction_digest, effects_digest)],
811            )?;
812
813        // Store the certificate indexed by transaction digest
814        write_batch.insert_batch(
815            &self.perpetual_tables.transactions,
816            iter::once((transaction_digest, transaction.serializable_ref())),
817        )?;
818
819        write_batch.insert_batch(
820            &self.perpetual_tables.executed_transaction_digests,
821            [((epoch_id, *transaction_digest), ())],
822        )?;
823
824        // Add batched writes for objects and locks.
825        write_batch.insert_batch(
826            &self.perpetual_tables.object_per_epoch_marker_table_v2,
827            markers
828                .iter()
829                .map(|(key, marker_value)| (EpochMarkerKey(epoch_id, *key), *marker_value)),
830        )?;
831        write_batch.insert_batch(
832            &self.perpetual_tables.objects,
833            deleted
834                .iter()
835                .map(|key| (key, StoreObject::Deleted))
836                .chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
837                .map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
838        )?;
839
840        // Insert each output object into the stores
841        let new_objects = written.iter().map(|(id, new_object)| {
842            let version = new_object.version();
843            trace!(?id, ?version, "writing object");
844            let store_object = get_store_object(new_object.clone());
845            (ObjectKey(*id, version), store_object)
846        });
847
848        write_batch.insert_batch(&self.perpetual_tables.objects, new_objects)?;
849
850        // Write events into the new table keyed off of transaction_digest
851        if effects.events_digest().is_some() {
852            write_batch.insert_batch(
853                &self.perpetual_tables.events_2,
854                [(transaction_digest, events)],
855            )?;
856        }
857
858        // Write unchanged_loaded_runtime_objects
859        if !unchanged_loaded_runtime_objects.is_empty() {
860            write_batch.insert_batch(
861                &self.perpetual_tables.unchanged_loaded_runtime_objects,
862                [(transaction_digest, unchanged_loaded_runtime_objects)],
863            )?;
864        }
865
866        self.initialize_live_object_markers_impl(write_batch, new_locks_to_init, false)?;
867
868        // Note: deletes locks for received objects as well (but not for objects that were in
869        // `Receiving` arguments which were not received)
870        self.delete_live_object_markers(write_batch, locks_to_delete)?;
871
872        debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
873
874        Ok(())
875    }
876
877    /// Commits transactions only (not effects or other transaction outputs) to the db.
878    /// See ExecutionCache::persist_transaction for more info
879    pub(crate) fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> SuiResult {
880        let mut batch = self.perpetual_tables.transactions.batch();
881        batch.insert_batch(
882            &self.perpetual_tables.transactions,
883            [(tx.digest(), tx.clone().into_unsigned().serializable_ref())],
884        )?;
885        batch.write()?;
886        Ok(())
887    }
888
889    /// Gets ObjectLockInfo that represents state of lock on an object.
890    /// Returns UserInputError::ObjectNotFound if cannot find lock record for this object
891    pub(crate) fn get_lock(
892        &self,
893        obj_ref: ObjectRef,
894        epoch_store: &AuthorityPerEpochStore,
895    ) -> SuiLockResult {
896        if self
897            .perpetual_tables
898            .live_owned_object_markers
899            .get(&obj_ref)?
900            .is_none()
901        {
902            return Ok(ObjectLockStatus::LockedAtDifferentVersion {
903                locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
904            });
905        }
906
907        let tables = epoch_store.tables()?;
908        let epoch_id = epoch_store.epoch();
909
910        if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
911            Ok(ObjectLockStatus::LockedToTx {
912                locked_by_tx: LockDetailsDeprecated {
913                    epoch: epoch_id,
914                    tx_digest,
915                },
916            })
917        } else {
918            Ok(ObjectLockStatus::Initialized)
919        }
920    }
921
922    /// Returns UserInputError::ObjectNotFound if no lock records found for this object.
923    pub(crate) fn get_latest_live_version_for_object_id(
924        &self,
925        object_id: ObjectID,
926    ) -> SuiResult<ObjectRef> {
927        let mut iterator = self
928            .perpetual_tables
929            .live_owned_object_markers
930            .reversed_safe_iter_with_bounds(
931                None,
932                Some((object_id, SequenceNumber::MAX, ObjectDigest::MAX)),
933            )?;
934        Ok(iterator
935            .next()
936            .transpose()?
937            .and_then(|value| {
938                if value.0.0 == object_id {
939                    Some(value)
940                } else {
941                    None
942                }
943            })
944            .ok_or_else(|| {
945                SuiError::from(UserInputError::ObjectNotFound {
946                    object_id,
947                    version: None,
948                })
949            })?
950            .0)
951    }
952
953    /// Checks multiple object locks exist.
954    /// Returns UserInputError::ObjectNotFound if cannot find lock record for at least one of the objects.
955    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if at least one object lock is not initialized
956    ///     at the given version.
957    pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> SuiResult {
958        let locks = self
959            .perpetual_tables
960            .live_owned_object_markers
961            .multi_get(objects)?;
962        for (lock, obj_ref) in locks.into_iter().zip_debug_eq(objects) {
963            if lock.is_none() {
964                let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
965                fp_bail!(
966                    UserInputError::ObjectVersionUnavailableForConsumption {
967                        provided_obj_ref: *obj_ref,
968                        current_version: latest_lock.1
969                    }
970                    .into()
971                );
972            }
973        }
974        Ok(())
975    }
976
977    /// Initialize a lock to None (but exists) for a given list of ObjectRefs.
978    /// Returns SuiErrorKind::ObjectLockAlreadyInitialized if the lock already exists and is locked to a transaction
979    fn initialize_live_object_markers_impl(
980        &self,
981        write_batch: &mut DBBatch,
982        objects: &[ObjectRef],
983        is_force_reset: bool,
984    ) -> SuiResult {
985        AuthorityStore::initialize_live_object_markers(
986            &self.perpetual_tables.live_owned_object_markers,
987            write_batch,
988            objects,
989            is_force_reset,
990        )
991    }
992
993    pub fn initialize_live_object_markers(
994        live_object_marker_table: &DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
995        write_batch: &mut DBBatch,
996        objects: &[ObjectRef],
997        is_force_reset: bool,
998    ) -> SuiResult {
999        trace!(?objects, "initialize_locks");
1000
1001        if !is_force_reset {
1002            let live_object_markers = live_object_marker_table.multi_get(objects)?;
1003            // If any live_object_markers exist and are not None, return errors for them
1004            // Note we don't check if there is a pre-existing lock. this is because initializing the live
1005            // object marker will not overwrite the lock and cause the validator to equivocate.
1006            let existing_live_object_markers: Vec<ObjectRef> = live_object_markers
1007                .iter()
1008                .zip_debug_eq(objects)
1009                .filter_map(|(lock_opt, objref)| {
1010                    lock_opt.clone().flatten().map(|_tx_digest| *objref)
1011                })
1012                .collect();
1013            if !existing_live_object_markers.is_empty() {
1014                info!(
1015                    ?existing_live_object_markers,
1016                    "Cannot initialize live_object_markers because some exist already"
1017                );
1018                return Err(SuiErrorKind::ObjectLockAlreadyInitialized {
1019                    refs: existing_live_object_markers,
1020                }
1021                .into());
1022            }
1023        }
1024
1025        write_batch.insert_batch(
1026            live_object_marker_table,
1027            objects.iter().map(|obj_ref| (obj_ref, None)),
1028        )?;
1029        Ok(())
1030    }
1031
1032    /// Removes locks for a given list of ObjectRefs.
1033    fn delete_live_object_markers(
1034        &self,
1035        write_batch: &mut DBBatch,
1036        objects: &[ObjectRef],
1037    ) -> SuiResult {
1038        trace!(?objects, "delete_locks");
1039        write_batch.delete_batch(
1040            &self.perpetual_tables.live_owned_object_markers,
1041            objects.iter(),
1042        )?;
1043        Ok(())
1044    }
1045
1046    /// Return the object with version less then or eq to the provided seq number.
1047    /// This is used by indexer to find the correct version of dynamic field child object.
1048    /// We do not store the version of the child object, but because of lamport timestamp,
1049    /// we know the child must have version number less then or eq to the parent.
1050    pub fn find_object_lt_or_eq_version(
1051        &self,
1052        object_id: ObjectID,
1053        version: SequenceNumber,
1054    ) -> SuiResult<Option<Object>> {
1055        self.perpetual_tables
1056            .find_object_lt_or_eq_version(object_id, version)
1057    }
1058
1059    /// Returns the latest object reference we have for this object_id in the objects table.
1060    ///
1061    /// The method may also return the reference to a deleted object with a digest of
1062    /// ObjectDigest::deleted() or ObjectDigest::wrapped() and lamport version
1063    /// of a transaction that deleted the object.
1064    /// Note that a deleted object may re-appear if the deletion was the result of the object
1065    /// being wrapped in another object.
1066    ///
1067    /// If no entry for the object_id is found, return None.
1068    pub fn get_latest_object_ref_or_tombstone(
1069        &self,
1070        object_id: ObjectID,
1071    ) -> Result<Option<ObjectRef>, SuiError> {
1072        self.perpetual_tables
1073            .get_latest_object_ref_or_tombstone(object_id)
1074    }
1075
1076    /// Returns the latest object reference if and only if the object is still live (i.e. it does
1077    /// not return tombstones)
1078    pub fn get_latest_object_ref_if_alive(
1079        &self,
1080        object_id: ObjectID,
1081    ) -> Result<Option<ObjectRef>, SuiError> {
1082        match self.get_latest_object_ref_or_tombstone(object_id)? {
1083            Some(objref) if objref.2.is_alive() => Ok(Some(objref)),
1084            _ => Ok(None),
1085        }
1086    }
1087
1088    /// Returns the latest object we have for this object_id in the objects table.
1089    ///
1090    /// If no entry for the object_id is found, return None.
1091    pub fn get_latest_object_or_tombstone(
1092        &self,
1093        object_id: ObjectID,
1094    ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, SuiError> {
1095        let Some((object_key, store_object)) = self
1096            .perpetual_tables
1097            .get_latest_object_or_tombstone(object_id)?
1098        else {
1099            return Ok(None);
1100        };
1101
1102        if let Some(object_ref) = self
1103            .perpetual_tables
1104            .tombstone_reference(&object_key, &store_object)?
1105        {
1106            return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
1107        }
1108
1109        let object = self
1110            .perpetual_tables
1111            .object(&object_key, store_object)?
1112            .expect("Non tombstone store object could not be converted to object");
1113
1114        Ok(Some((object_key, ObjectOrTombstone::Object(object))))
1115    }
1116
1117    pub fn insert_transaction_and_effects(
1118        &self,
1119        transaction: &VerifiedTransaction,
1120        transaction_effects: &TransactionEffects,
1121    ) -> Result<(), TypedStoreError> {
1122        let mut write_batch = self.perpetual_tables.transactions.batch();
1123        // effects must be inserted before the corresponding transaction entry
1124        // because they carry epoch information necessary for correct pruning via relocation filters
1125        write_batch
1126            .insert_batch(
1127                &self.perpetual_tables.effects,
1128                [(transaction_effects.digest(), transaction_effects)],
1129            )?
1130            .insert_batch(
1131                &self.perpetual_tables.transactions,
1132                [(transaction.digest(), transaction.serializable_ref())],
1133            )?;
1134
1135        write_batch.write()?;
1136        Ok(())
1137    }
1138
1139    pub fn multi_insert_transaction_and_effects<'a>(
1140        &self,
1141        transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
1142    ) -> Result<(), TypedStoreError> {
1143        let mut write_batch = self.perpetual_tables.transactions.batch();
1144        for tx in transactions {
1145            write_batch
1146                .insert_batch(
1147                    &self.perpetual_tables.effects,
1148                    [(tx.effects.digest(), &tx.effects)],
1149                )?
1150                .insert_batch(
1151                    &self.perpetual_tables.transactions,
1152                    [(tx.transaction.digest(), tx.transaction.serializable_ref())],
1153                )?;
1154        }
1155
1156        write_batch.write()?;
1157        Ok(())
1158    }
1159
1160    pub fn multi_get_transaction_blocks(
1161        &self,
1162        tx_digests: &[TransactionDigest],
1163    ) -> Result<Vec<Option<VerifiedTransaction>>, TypedStoreError> {
1164        self.perpetual_tables
1165            .transactions
1166            .multi_get(tx_digests)
1167            .map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())
1168    }
1169
1170    pub fn get_transaction_block(
1171        &self,
1172        tx_digest: &TransactionDigest,
1173    ) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
1174        self.perpetual_tables
1175            .transactions
1176            .get(tx_digest)
1177            .map(|v| v.map(|v| v.into()))
1178    }
1179
1180    /// This function reads the DB directly to get the system state object.
1181    /// If reconfiguration is happening at the same time, there is no guarantee whether we would be getting
1182    /// the old or the new system state object.
1183    /// Hence this function should only be called during RPC reads where data race is not a major concern.
1184    /// In general we should avoid this as much as possible.
1185    /// If the intent is for testing, you can use AuthorityState:: get_sui_system_state_object_for_testing.
1186    pub fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1187        get_sui_system_state(self.perpetual_tables.as_ref())
1188    }
1189
1190    pub fn expensive_check_sui_conservation<T>(
1191        self: &Arc<Self>,
1192        type_layout_store: T,
1193        old_epoch_store: &AuthorityPerEpochStore,
1194    ) -> SuiResult
1195    where
1196        T: TypeLayoutStore + Send + Copy,
1197    {
1198        if !self.enable_epoch_sui_conservation_check {
1199            return Ok(());
1200        }
1201
1202        let executor = old_epoch_store.executor();
1203        info!("Starting SUI conservation check. This may take a while..");
1204        let cur_time = Instant::now();
1205        let mut pending_objects = vec![];
1206        let mut count = 0;
1207        let mut size = 0;
1208        let (mut total_sui, mut total_storage_rebate) = thread::scope(|s| {
1209            let pending_tasks = FuturesUnordered::new();
1210            for o in self.iter_live_object_set(false) {
1211                match o {
1212                    LiveObject::Normal(object) => {
1213                        size += object.object_size_for_gas_metering();
1214                        count += 1;
1215                        pending_objects.push(object);
1216                        if count % 1_000_000 == 0 {
1217                            let mut task_objects = vec![];
1218                            mem::swap(&mut pending_objects, &mut task_objects);
1219                            pending_tasks.push(s.spawn(move || {
1220                                let mut layout_resolver = executor.type_layout_resolver(
1221                                    old_epoch_store.protocol_config(),
1222                                    Box::new(type_layout_store),
1223                                );
1224                                let mut total_storage_rebate = 0;
1225                                let mut total_sui = 0;
1226                                for object in task_objects {
1227                                    total_storage_rebate += object.storage_rebate;
1228                                    // get_total_sui includes storage rebate, however all storage rebate is
1229                                    // also stored in the storage fund, so we need to subtract it here.
1230                                    let object_contained_sui = match object
1231                                        .get_total_sui(layout_resolver.as_mut())
1232                                    {
1233                                        Ok(sui) => sui,
1234                                        Err(e)
1235                                            if old_epoch_store.get_chain()
1236                                                == sui_protocol_config::Chain::Testnet =>
1237                                        {
1238                                            error!(
1239                                                "Error calculating total SUI for object {:?}: {:?}",
1240                                                object.compute_object_reference(),
1241                                                e
1242                                            );
1243                                            0
1244                                        }
1245                                        Err(e) => panic!(
1246                                            "Error calculating total SUI for object {:?}: {:?}",
1247                                            object.compute_object_reference(),
1248                                            e
1249                                        ),
1250                                    };
1251                                    total_sui += object_contained_sui - object.storage_rebate;
1252                                }
1253                                if count % 50_000_000 == 0 {
1254                                    info!("Processed {} objects", count);
1255                                }
1256                                (total_sui, total_storage_rebate)
1257                            }));
1258                        }
1259                    }
1260                    LiveObject::Wrapped(_) => {
1261                        unreachable!("Explicitly asked to not include wrapped tombstones")
1262                    }
1263                }
1264            }
1265            pending_tasks.into_iter().fold((0, 0), |init, result| {
1266                let result = result.join().unwrap();
1267                (init.0 + result.0, init.1 + result.1)
1268            })
1269        });
1270        let mut layout_resolver = executor.type_layout_resolver(
1271            old_epoch_store.protocol_config(),
1272            Box::new(type_layout_store),
1273        );
1274        for object in pending_objects {
1275            total_storage_rebate += object.storage_rebate;
1276            total_sui +=
1277                object.get_total_sui(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
1278        }
1279        info!(
1280            "Scanned {} live objects, took {:?}",
1281            count,
1282            cur_time.elapsed()
1283        );
1284        self.metrics
1285            .sui_conservation_live_object_count
1286            .set(count as i64);
1287        self.metrics
1288            .sui_conservation_live_object_size
1289            .set(size as i64);
1290        self.metrics
1291            .sui_conservation_check_latency
1292            .set(cur_time.elapsed().as_secs() as i64);
1293
1294        // It is safe to call this function because we are in the middle of reconfiguration.
1295        let system_state = self
1296            .get_sui_system_state_object_unsafe()
1297            .expect("Reading sui system state object cannot fail")
1298            .into_sui_system_state_summary();
1299        let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
1300        info!(
1301            "Total SUI amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
1302            total_sui, storage_fund_balance, total_storage_rebate, system_state.epoch
1303        );
1304
1305        let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
1306        self.metrics
1307            .sui_conservation_storage_fund
1308            .set(storage_fund_balance as i64);
1309        self.metrics
1310            .sui_conservation_storage_fund_imbalance
1311            .set(imbalance);
1312        self.metrics
1313            .sui_conservation_imbalance
1314            .set((total_sui as i128 - TOTAL_SUPPLY_MIST as i128) as i64);
1315
1316        if let Some(expected_imbalance) = self
1317            .perpetual_tables
1318            .expected_storage_fund_imbalance
1319            .get(&())
1320            .expect("DB read cannot fail")
1321        {
1322            fp_ensure!(
1323                imbalance == expected_imbalance,
1324                SuiError::from(
1325                    format!(
1326                        "Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
1327                        system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
1328                    ).as_str()
1329                )
1330            );
1331        } else {
1332            self.perpetual_tables
1333                .expected_storage_fund_imbalance
1334                .insert(&(), &imbalance)
1335                .expect("DB write cannot fail");
1336        }
1337
1338        if let Some(expected_sui) = self
1339            .perpetual_tables
1340            .expected_network_sui_amount
1341            .get(&())
1342            .expect("DB read cannot fail")
1343        {
1344            fp_ensure!(
1345                total_sui == expected_sui,
1346                SuiError::from(
1347                    format!(
1348                        "Inconsistent state detected at epoch {}: total sui: {}, expecting {}",
1349                        system_state.epoch, total_sui, expected_sui
1350                    )
1351                    .as_str()
1352                )
1353            );
1354        } else {
1355            self.perpetual_tables
1356                .expected_network_sui_amount
1357                .insert(&(), &total_sui)
1358                .expect("DB write cannot fail");
1359        }
1360
1361        Ok(())
1362    }
1363
1364    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
1365    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
1366    #[instrument(level = "error", skip_all)]
1367    pub fn maybe_reaccumulate_state_hash(
1368        &self,
1369        cur_epoch_store: &AuthorityPerEpochStore,
1370        new_protocol_version: ProtocolVersion,
1371    ) {
1372        let old_simplified_unwrap_then_delete = cur_epoch_store
1373            .protocol_config()
1374            .simplified_unwrap_then_delete();
1375        let new_simplified_unwrap_then_delete =
1376            ProtocolConfig::get_for_version(new_protocol_version, cur_epoch_store.get_chain())
1377                .simplified_unwrap_then_delete();
1378        // If in the new epoch the simplified_unwrap_then_delete is enabled for the first time,
1379        // we re-accumulate state root.
1380        let should_reaccumulate =
1381            !old_simplified_unwrap_then_delete && new_simplified_unwrap_then_delete;
1382        if !should_reaccumulate {
1383            return;
1384        }
1385        info!(
1386            "[Re-accumulate] simplified_unwrap_then_delete is enabled in the new protocol version, re-accumulating state hash"
1387        );
1388        let cur_time = Instant::now();
1389        std::thread::scope(|s| {
1390            let pending_tasks = FuturesUnordered::new();
1391            // Shard the object IDs into different ranges so that we can process them in parallel.
1392            // We divide the range into 2^BITS number of ranges. To do so we use the highest BITS bits
1393            // to mark the starting/ending point of the range. For example, when BITS = 5, we
1394            // divide the range into 32 ranges, and the first few ranges are:
1395            // 00000000_.... to 00000111_....
1396            // 00001000_.... to 00001111_....
1397            // 00010000_.... to 00010111_....
1398            // and etc.
1399            const BITS: u8 = 5;
1400            for index in 0u8..(1 << BITS) {
1401                pending_tasks.push(s.spawn(move || {
1402                    let mut id_bytes = [0; ObjectID::LENGTH];
1403                    id_bytes[0] = index << (8 - BITS);
1404                    let start_id = ObjectID::new(id_bytes);
1405
1406                    id_bytes[0] |= (1 << (8 - BITS)) - 1;
1407                    for element in id_bytes.iter_mut().skip(1) {
1408                        *element = u8::MAX;
1409                    }
1410                    let end_id = ObjectID::new(id_bytes);
1411
1412                    info!(
1413                        "[Re-accumulate] Scanning object ID range {:?}..{:?}",
1414                        start_id, end_id
1415                    );
1416                    let mut prev = (
1417                        ObjectKey::min_for_id(&ObjectID::ZERO),
1418                        StoreObjectWrapper::V1(StoreObject::Deleted),
1419                    );
1420                    let mut object_scanned: u64 = 0;
1421                    let mut wrapped_objects_to_remove = vec![];
1422                    for db_result in self.perpetual_tables.objects.safe_range_iter(
1423                        ObjectKey::min_for_id(&start_id)..=ObjectKey::max_for_id(&end_id),
1424                    ) {
1425                        match db_result {
1426                            Ok((object_key, object)) => {
1427                                object_scanned += 1;
1428                                if object_scanned.is_multiple_of(100000) {
1429                                    info!(
1430                                        "[Re-accumulate] Task {}: object scanned: {}",
1431                                        index, object_scanned,
1432                                    );
1433                                }
1434                                if matches!(prev.1.inner(), StoreObject::Wrapped)
1435                                    && object_key.0 != prev.0.0
1436                                {
1437                                    wrapped_objects_to_remove
1438                                        .push(WrappedObject::new(prev.0.0, prev.0.1));
1439                                }
1440
1441                                prev = (object_key, object);
1442                            }
1443                            Err(err) => {
1444                                warn!("Object iterator encounter RocksDB error {:?}", err);
1445                                return Err(err);
1446                            }
1447                        }
1448                    }
1449                    if matches!(prev.1.inner(), StoreObject::Wrapped) {
1450                        wrapped_objects_to_remove.push(WrappedObject::new(prev.0.0, prev.0.1));
1451                    }
1452                    info!(
1453                        "[Re-accumulate] Task {}: object scanned: {}, wrapped objects: {}",
1454                        index,
1455                        object_scanned,
1456                        wrapped_objects_to_remove.len(),
1457                    );
1458                    Ok((wrapped_objects_to_remove, object_scanned))
1459                }));
1460            }
1461            let (last_checkpoint_of_epoch, cur_accumulator) = self
1462                .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
1463                .expect("read cannot fail")
1464                .expect("accumulator must exist");
1465            let (accumulator, total_objects_scanned, total_wrapped_objects) =
1466                pending_tasks.into_iter().fold(
1467                    (cur_accumulator, 0u64, 0usize),
1468                    |(mut accumulator, total_objects_scanned, total_wrapped_objects), task| {
1469                        let (wrapped_objects_to_remove, object_scanned) =
1470                            task.join().unwrap().unwrap();
1471                        accumulator.remove_all(
1472                            wrapped_objects_to_remove
1473                                .iter()
1474                                .map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
1475                                .collect::<Vec<Vec<u8>>>(),
1476                        );
1477                        (
1478                            accumulator,
1479                            total_objects_scanned + object_scanned,
1480                            total_wrapped_objects + wrapped_objects_to_remove.len(),
1481                        )
1482                    },
1483                );
1484            info!(
1485                "[Re-accumulate] Total objects scanned: {}, total wrapped objects: {}",
1486                total_objects_scanned, total_wrapped_objects,
1487            );
1488            info!(
1489                "[Re-accumulate] New accumulator value: {:?}",
1490                accumulator.digest()
1491            );
1492            self.insert_state_hash_for_epoch(
1493                cur_epoch_store.epoch(),
1494                &last_checkpoint_of_epoch,
1495                &accumulator,
1496            )
1497            .unwrap();
1498        });
1499        info!(
1500            "[Re-accumulate] Re-accumulating took {}seconds",
1501            cur_time.elapsed().as_secs()
1502        );
1503    }
1504
1505    pub async fn prune_objects_and_compact_for_testing(
1506        &self,
1507        checkpoint_store: &Arc<CheckpointStore>,
1508        rpc_index: Option<&RpcIndexStore>,
1509    ) {
1510        let pruning_config = AuthorityStorePruningConfig {
1511            num_epochs_to_retain: 0,
1512            ..Default::default()
1513        };
1514        let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
1515            &self.perpetual_tables,
1516            checkpoint_store,
1517            rpc_index,
1518            pruning_config,
1519            AuthorityStorePruningMetrics::new_for_test(),
1520            EPOCH_DURATION_MS_FOR_TESTING,
1521        )
1522        .await;
1523        let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
1524    }
1525
1526    pub fn remove_executed_effects_for_testing(
1527        &self,
1528        tx_digest: &TransactionDigest,
1529    ) -> anyhow::Result<()> {
1530        let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
1531        if let Some(effects_digest) = effects_digest {
1532            self.perpetual_tables.executed_effects.remove(tx_digest)?;
1533            self.perpetual_tables.effects.remove(&effects_digest)?;
1534        }
1535        Ok(())
1536    }
1537
1538    #[cfg(test)]
1539    pub async fn prune_objects_immediately_for_testing(
1540        &self,
1541        transaction_effects: Vec<TransactionEffects>,
1542    ) -> anyhow::Result<()> {
1543        let mut wb = self.perpetual_tables.objects.batch();
1544
1545        let mut object_keys_to_prune = vec![];
1546        for effects in &transaction_effects {
1547            for (object_id, seq_number) in effects.modified_at_versions() {
1548                info!("Pruning object {:?} version {:?}", object_id, seq_number);
1549                object_keys_to_prune.push(ObjectKey(object_id, seq_number));
1550            }
1551        }
1552
1553        wb.delete_batch(
1554            &self.perpetual_tables.objects,
1555            object_keys_to_prune.into_iter(),
1556        )?;
1557        wb.write()?;
1558        Ok(())
1559    }
1560
1561    // Counts the number of versions exist in object store for `object_id`. This includes tombstone.
1562    #[cfg(msim)]
1563    pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
1564        self.perpetual_tables
1565            .objects
1566            .safe_iter_with_bounds(
1567                Some(ObjectKey(object_id, VersionNumber::MIN)),
1568                Some(ObjectKey(object_id, VersionNumber::MAX)),
1569            )
1570            .collect::<Result<Vec<_>, _>>()
1571            .unwrap()
1572            .len()
1573    }
1574}
1575
1576impl GlobalStateHashStore for AuthorityStore {
1577    fn get_object_ref_prior_to_key_deprecated(
1578        &self,
1579        object_id: &ObjectID,
1580        version: VersionNumber,
1581    ) -> SuiResult<Option<ObjectRef>> {
1582        self.get_object_ref_prior_to_key(object_id, version)
1583    }
1584
1585    fn get_root_state_hash_for_epoch(
1586        &self,
1587        epoch: EpochId,
1588    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
1589        self.perpetual_tables
1590            .root_state_hash_by_epoch
1591            .get(&epoch)
1592            .map_err(Into::into)
1593    }
1594
1595    fn get_root_state_hash_for_highest_epoch(
1596        &self,
1597    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
1598        Ok(self
1599            .perpetual_tables
1600            .root_state_hash_by_epoch
1601            .reversed_safe_iter_with_bounds(None, None)?
1602            .next()
1603            .transpose()?)
1604    }
1605
1606    fn insert_state_hash_for_epoch(
1607        &self,
1608        epoch: EpochId,
1609        last_checkpoint_of_epoch: &CheckpointSequenceNumber,
1610        acc: &GlobalStateHash,
1611    ) -> SuiResult {
1612        self.perpetual_tables
1613            .root_state_hash_by_epoch
1614            .insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
1615        self.root_state_notify_read
1616            .notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
1617
1618        Ok(())
1619    }
1620
1621    fn iter_live_object_set(
1622        &self,
1623        include_wrapped_object: bool,
1624    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1625        Box::new(
1626            self.perpetual_tables
1627                .iter_live_object_set(include_wrapped_object),
1628        )
1629    }
1630}
1631
1632impl ObjectStore for AuthorityStore {
1633    /// Read an object and return it, or Ok(None) if the object was not found.
1634    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
1635        self.perpetual_tables.as_ref().get_object(object_id)
1636    }
1637
1638    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
1639        self.perpetual_tables.get_object_by_key(object_id, version)
1640    }
1641}
1642
1643/// A wrapper to make Orphan Rule happy
1644pub struct ResolverWrapper {
1645    pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1646    pub metrics: Arc<ResolverMetrics>,
1647}
1648
1649impl ResolverWrapper {
1650    pub fn new(
1651        resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1652        metrics: Arc<ResolverMetrics>,
1653    ) -> Self {
1654        metrics.module_cache_size.set(0);
1655        ResolverWrapper { resolver, metrics }
1656    }
1657
1658    fn inc_cache_size_gauge(&self) {
1659        // reset the gauge after a restart of the cache
1660        let current = self.metrics.module_cache_size.get();
1661        self.metrics.module_cache_size.set(current + 1);
1662    }
1663}
1664
1665impl ModuleResolver for ResolverWrapper {
1666    type Error = SuiError;
1667    fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
1668        self.inc_cache_size_gauge();
1669        get_module(&*self.resolver, module_id)
1670    }
1671
1672    fn get_packages_static<const N: usize>(
1673        &self,
1674        ids: [AccountAddress; N],
1675    ) -> Result<[Option<SerializedPackage>; N], Self::Error> {
1676        let mut packages = [const { None }; N];
1677        for (i, id) in ids.iter().enumerate() {
1678            packages[i] = get_package(&*self.resolver, &ObjectID::from(*id))?;
1679        }
1680        Ok(packages)
1681    }
1682
1683    fn get_packages<'a>(
1684        &self,
1685        ids: impl ExactSizeIterator<Item = &'a AccountAddress>,
1686    ) -> Result<Vec<Option<SerializedPackage>>, Self::Error> {
1687        ids.map(|id| get_package(&*self.resolver, &ObjectID::from(*id)))
1688            .collect()
1689    }
1690}
1691
1692pub enum UpdateType {
1693    Transaction(TransactionEffectsDigest),
1694    Genesis,
1695}
1696
1697pub type SuiLockResult = SuiResult<ObjectLockStatus>;
1698
1699#[derive(Debug, PartialEq, Eq)]
1700pub enum ObjectLockStatus {
1701    Initialized,
1702    LockedToTx { locked_by_tx: LockDetailsDeprecated },
1703    LockedAtDifferentVersion { locked_ref: ObjectRef },
1704}
1705
1706#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1707pub enum LockDetailsWrapperDeprecated {
1708    V1(LockDetailsV1Deprecated),
1709}
1710
1711impl LockDetailsWrapperDeprecated {
1712    pub fn migrate(self) -> Self {
1713        // TODO: when there are multiple versions, we must iteratively migrate from version N to
1714        // N+1 until we arrive at the latest version
1715        self
1716    }
1717
1718    // Always returns the most recent version. Older versions are migrated to the latest version at
1719    // read time, so there is never a need to access older versions.
1720    pub fn inner(&self) -> &LockDetailsDeprecated {
1721        match self {
1722            Self::V1(v1) => v1,
1723
1724            // can remove #[allow] when there are multiple versions
1725            #[allow(unreachable_patterns)]
1726            _ => panic!("lock details should have been migrated to latest version at read time"),
1727        }
1728    }
1729    pub fn into_inner(self) -> LockDetailsDeprecated {
1730        match self {
1731            Self::V1(v1) => v1,
1732
1733            // can remove #[allow] when there are multiple versions
1734            #[allow(unreachable_patterns)]
1735            _ => panic!("lock details should have been migrated to latest version at read time"),
1736        }
1737    }
1738}
1739
1740#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1741pub struct LockDetailsV1Deprecated {
1742    pub epoch: EpochId,
1743    pub tx_digest: TransactionDigest,
1744}
1745
1746pub type LockDetailsDeprecated = LockDetailsV1Deprecated;
1747
1748impl From<LockDetailsDeprecated> for LockDetailsWrapperDeprecated {
1749    fn from(details: LockDetailsDeprecated) -> Self {
1750        // always use latest version.
1751        LockDetailsWrapperDeprecated::V1(details)
1752    }
1753}