1use std::sync::Arc;
5use std::{iter, mem, thread};
6
7use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
8use crate::authority::authority_store_pruner::{
9 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
10};
11use crate::authority::authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object};
12use crate::authority::epoch_marker_key::EpochMarkerKey;
13use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
14use crate::global_state_hasher::GlobalStateHashStore;
15use crate::rpc_index::RpcIndexStore;
16use crate::transaction_outputs::TransactionOutputs;
17use either::Either;
18use fastcrypto::hash::{HashFunction, MultisetHash, Sha3_256};
19use futures::stream::FuturesUnordered;
20use move_core_types::account_address::AccountAddress;
21use move_core_types::resolver::{ModuleResolver, SerializedPackage};
22use serde::{Deserialize, Serialize};
23use sui_config::node::AuthorityStorePruningConfig;
24use sui_macros::fail_point_arg;
25use sui_types::error::{SuiErrorKind, UserInputError};
26use sui_types::execution::TypeLayoutStore;
27use sui_types::global_state_hash::GlobalStateHash;
28use sui_types::message_envelope::Message;
29use sui_types::storage::{
30 BackingPackageStore, FullObjectKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore,
31 get_module, get_package,
32};
33use sui_types::sui_system_state::get_sui_system_state;
34use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure};
35use tokio::time::Instant;
36use tracing::{debug, info, trace};
37use typed_store::traits::Map;
38use typed_store::{
39 TypedStoreError,
40 rocks::{DBBatch, DBMap},
41};
42
43use super::authority_store_tables::LiveObject;
44use super::{authority_store_tables::AuthorityPerpetualTables, *};
45use mysten_common::ZipDebugEqIteratorExt;
46use mysten_common::sync::notify_read::NotifyRead;
47use sui_types::effects::{TransactionEffects, TransactionEvents};
48use sui_types::gas_coin::TOTAL_SUPPLY_MIST;
49
50struct AuthorityStoreMetrics {
51 sui_conservation_check_latency: IntGauge,
52 sui_conservation_live_object_count: IntGauge,
53 sui_conservation_live_object_size: IntGauge,
54 sui_conservation_imbalance: IntGauge,
55 sui_conservation_storage_fund: IntGauge,
56 sui_conservation_storage_fund_imbalance: IntGauge,
57 epoch_flags: IntGaugeVec,
58}
59
60impl AuthorityStoreMetrics {
61 pub fn new(registry: &Registry) -> Self {
62 Self {
63 sui_conservation_check_latency: register_int_gauge_with_registry!(
64 "sui_conservation_check_latency",
65 "Number of seconds took to scan all live objects in the store for SUI conservation check",
66 registry,
67 ).unwrap(),
68 sui_conservation_live_object_count: register_int_gauge_with_registry!(
69 "sui_conservation_live_object_count",
70 "Number of live objects in the store",
71 registry,
72 ).unwrap(),
73 sui_conservation_live_object_size: register_int_gauge_with_registry!(
74 "sui_conservation_live_object_size",
75 "Size in bytes of live objects in the store",
76 registry,
77 ).unwrap(),
78 sui_conservation_imbalance: register_int_gauge_with_registry!(
79 "sui_conservation_imbalance",
80 "Total amount of SUI in the network - 10B * 10^9. This delta shows the amount of imbalance",
81 registry,
82 ).unwrap(),
83 sui_conservation_storage_fund: register_int_gauge_with_registry!(
84 "sui_conservation_storage_fund",
85 "Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
86 registry,
87 ).unwrap(),
88 sui_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
89 "sui_conservation_storage_fund_imbalance",
90 "Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
91 registry,
92 ).unwrap(),
93 epoch_flags: register_int_gauge_vec_with_registry!(
94 "epoch_flags",
95 "Local flags of the currently running epoch",
96 &["flag"],
97 registry,
98 ).unwrap(),
99 }
100 }
101}
102
103pub struct AuthorityStore {
110 pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
111
112 pub(crate) root_state_notify_read:
113 NotifyRead<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
114
115 enable_epoch_sui_conservation_check: bool,
117
118 metrics: AuthorityStoreMetrics,
119}
120
121pub type ExecutionLockReadGuard<'a> = tokio::sync::RwLockReadGuard<'a, EpochId>;
122pub type ExecutionLockWriteGuard<'a> = tokio::sync::RwLockWriteGuard<'a, EpochId>;
123
124impl AuthorityStore {
125 pub async fn open(
128 perpetual_tables: Arc<AuthorityPerpetualTables>,
129 genesis: &Genesis,
130 config: &NodeConfig,
131 registry: &Registry,
132 ) -> SuiResult<Arc<Self>> {
133 let enable_epoch_sui_conservation_check = config
134 .expensive_safety_check_config
135 .enable_epoch_sui_conservation_check();
136
137 let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
138 info!("Creating new epoch start config from genesis");
139
140 #[allow(unused_mut)]
141 let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
142 fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
143 info!("Setting initial epoch flags to {:?}", flags);
144 initial_epoch_flags = flags;
145 });
146
147 let epoch_start_configuration = EpochStartConfiguration::new(
148 genesis.sui_system_object().into_epoch_start_state(),
149 *genesis.checkpoint().digest(),
150 &genesis.objects(),
151 initial_epoch_flags,
152 )?;
153 perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
154 epoch_start_configuration
155 } else {
156 info!("Loading epoch start config from DB");
157 perpetual_tables
158 .epoch_start_configuration
159 .get(&())?
160 .expect("Epoch start configuration must be set in non-empty DB")
161 };
162 let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
163 info!("Epoch start config: {:?}", epoch_start_configuration);
164 info!("Cur epoch: {:?}", cur_epoch);
165 let this = Self::open_inner(
166 genesis,
167 perpetual_tables,
168 enable_epoch_sui_conservation_check,
169 registry,
170 )
171 .await?;
172 this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
173 Ok(this)
174 }
175
176 pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
177 for flag in old {
178 self.metrics
179 .epoch_flags
180 .with_label_values(&[&flag.to_string()])
181 .set(0);
182 }
183 for flag in new {
184 self.metrics
185 .epoch_flags
186 .with_label_values(&[&flag.to_string()])
187 .set(1);
188 }
189 }
190
191 pub fn clear_object_per_epoch_marker_table(
194 &self,
195 _execution_guard: &ExecutionLockWriteGuard<'_>,
196 ) -> SuiResult<()> {
197 self.perpetual_tables
201 .object_per_epoch_marker_table
202 .schedule_delete_all()?;
203 #[cfg(not(tidehunter))]
204 {
205 self.perpetual_tables
206 .object_per_epoch_marker_table_v2
207 .schedule_delete_all()?;
208 }
209 #[cfg(tidehunter)]
210 {
211 self.perpetual_tables
212 .object_per_epoch_marker_table_v2
213 .drop_cells_in_range_raw(&EpochMarkerKey::MIN_KEY, &EpochMarkerKey::MAX_KEY)?;
214 }
215 Ok(())
216 }
217
218 pub async fn open_with_committee_for_testing(
219 perpetual_tables: Arc<AuthorityPerpetualTables>,
220 committee: &Committee,
221 genesis: &Genesis,
222 ) -> SuiResult<Arc<Self>> {
223 assert_eq!(committee.epoch, 0);
226 Self::open_inner(genesis, perpetual_tables, true, &Registry::new()).await
227 }
228
229 async fn open_inner(
230 genesis: &Genesis,
231 perpetual_tables: Arc<AuthorityPerpetualTables>,
232 enable_epoch_sui_conservation_check: bool,
233 registry: &Registry,
234 ) -> SuiResult<Arc<Self>> {
235 let store = Arc::new(Self {
236 perpetual_tables,
237 root_state_notify_read: NotifyRead::<
238 EpochId,
239 (CheckpointSequenceNumber, GlobalStateHash),
240 >::new(),
241 enable_epoch_sui_conservation_check,
242 metrics: AuthorityStoreMetrics::new(registry),
243 });
244 if store
246 .database_is_empty()
247 .expect("Database read should not fail at init.")
248 {
249 store
250 .bulk_insert_genesis_objects(genesis.objects())
251 .expect("Cannot bulk insert genesis objects");
252
253 let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
255
256 store
257 .perpetual_tables
258 .transactions
259 .insert(transaction.digest(), transaction.serializable_ref())
260 .unwrap();
261
262 store
263 .perpetual_tables
264 .effects
265 .insert(&genesis.effects().digest(), genesis.effects())
266 .unwrap();
267 if genesis.effects().events_digest().is_some() {
271 store
272 .perpetual_tables
273 .events_2
274 .insert(transaction.digest(), genesis.events())
275 .unwrap();
276 }
277 }
278
279 Ok(store)
280 }
281
282 pub fn open_no_genesis(
286 perpetual_tables: Arc<AuthorityPerpetualTables>,
287 enable_epoch_sui_conservation_check: bool,
288 registry: &Registry,
289 ) -> SuiResult<Arc<Self>> {
290 let store = Arc::new(Self {
291 perpetual_tables,
292 root_state_notify_read: NotifyRead::<
293 EpochId,
294 (CheckpointSequenceNumber, GlobalStateHash),
295 >::new(),
296 enable_epoch_sui_conservation_check,
297 metrics: AuthorityStoreMetrics::new(registry),
298 });
299 Ok(store)
300 }
301
302 pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
303 self.perpetual_tables.get_recovery_epoch_at_restart()
304 }
305
306 pub fn get_effects(
307 &self,
308 effects_digest: &TransactionEffectsDigest,
309 ) -> SuiResult<Option<TransactionEffects>> {
310 Ok(self.perpetual_tables.effects.get(effects_digest)?)
311 }
312
313 pub fn effects_exists(&self, effects_digest: &TransactionEffectsDigest) -> SuiResult<bool> {
315 self.perpetual_tables
316 .effects
317 .contains_key(effects_digest)
318 .map_err(|e| e.into())
319 }
320
321 pub fn get_events(
322 &self,
323 digest: &TransactionDigest,
324 ) -> Result<Option<TransactionEvents>, TypedStoreError> {
325 self.perpetual_tables.events_2.get(digest)
326 }
327
328 pub fn multi_get_events(
329 &self,
330 event_digests: &[TransactionDigest],
331 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
332 Ok(event_digests
333 .iter()
334 .map(|digest| self.get_events(digest))
335 .collect::<Result<Vec<_>, _>>()?)
336 }
337
338 pub fn get_unchanged_loaded_runtime_objects(
339 &self,
340 digest: &TransactionDigest,
341 ) -> Result<Option<Vec<ObjectKey>>, TypedStoreError> {
342 self.perpetual_tables
343 .unchanged_loaded_runtime_objects
344 .get(digest)
345 }
346
347 pub fn multi_get_effects<'a>(
348 &self,
349 effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
350 ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
351 self.perpetual_tables.effects.multi_get(effects_digests)
352 }
353
354 pub fn get_executed_effects(
355 &self,
356 tx_digest: &TransactionDigest,
357 ) -> Result<Option<TransactionEffects>, TypedStoreError> {
358 let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
359 match effects_digest {
360 Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
361 None => Ok(None),
362 }
363 }
364
365 pub fn multi_get_executed_effects_digests(
368 &self,
369 digests: &[TransactionDigest],
370 ) -> Result<Vec<Option<TransactionEffectsDigest>>, TypedStoreError> {
371 self.perpetual_tables.executed_effects.multi_get(digests)
372 }
373
374 pub fn multi_get_executed_effects(
377 &self,
378 digests: &[TransactionDigest],
379 ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
380 let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
381 let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
382 let mut tx_to_effects_map = effects
383 .into_iter()
384 .flatten()
385 .map(|effects| (*effects.transaction_digest(), effects))
386 .collect::<HashMap<_, _>>();
387 Ok(digests
388 .iter()
389 .map(|digest| tx_to_effects_map.remove(digest))
390 .collect())
391 }
392
393 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> SuiResult<bool> {
394 Ok(self
395 .perpetual_tables
396 .executed_effects
397 .contains_key(digest)?)
398 }
399
400 pub fn get_marker_value(
401 &self,
402 object_key: FullObjectKey,
403 epoch_id: EpochId,
404 ) -> SuiResult<Option<MarkerValue>> {
405 Ok(self
406 .perpetual_tables
407 .object_per_epoch_marker_table_v2
408 .get(&EpochMarkerKey(epoch_id, object_key))?)
409 }
410
411 pub fn get_latest_marker(
412 &self,
413 object_id: FullObjectID,
414 epoch_id: EpochId,
415 ) -> SuiResult<Option<(SequenceNumber, MarkerValue)>> {
416 let min_key = EpochMarkerKey(epoch_id, FullObjectKey::min_for_id(&object_id));
417 let max_key = EpochMarkerKey(epoch_id, FullObjectKey::max_for_id(&object_id));
418
419 let marker_entry = self
420 .perpetual_tables
421 .object_per_epoch_marker_table_v2
422 .reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
423 .next();
424 match marker_entry {
425 Some(Ok((EpochMarkerKey(epoch, key), marker))) => {
426 assert_eq!(epoch, epoch_id);
428 assert_eq!(key.id(), object_id);
429 Ok(Some((key.version(), marker)))
430 }
431 Some(Err(e)) => Err(e.into()),
432 None => Ok(None),
433 }
434 }
435
436 pub async fn notify_read_root_state_hash(
439 &self,
440 epoch: EpochId,
441 ) -> SuiResult<(CheckpointSequenceNumber, GlobalStateHash)> {
442 let registration = self.root_state_notify_read.register_one(&epoch);
444 let hash = self.perpetual_tables.root_state_hash_by_epoch.get(&epoch)?;
445
446 let result = match hash {
447 Some(ready) => Either::Left(futures::future::ready(ready)),
449 None => Either::Right(registration),
450 }
451 .await;
452
453 Ok(result)
454 }
455
456 pub fn deprecated_insert_finalized_transactions(
458 &self,
459 digests: &[TransactionDigest],
460 epoch: EpochId,
461 sequence: CheckpointSequenceNumber,
462 ) -> SuiResult {
463 let mut batch = self
464 .perpetual_tables
465 .executed_transactions_to_checkpoint
466 .batch();
467 batch.insert_batch(
468 &self.perpetual_tables.executed_transactions_to_checkpoint,
469 digests.iter().map(|d| (*d, (epoch, sequence))),
470 )?;
471 batch.write()?;
472 trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
473 Ok(())
474 }
475
476 pub fn deprecated_get_transaction_checkpoint(
478 &self,
479 digest: &TransactionDigest,
480 ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
481 Ok(self
482 .perpetual_tables
483 .executed_transactions_to_checkpoint
484 .get(digest)?)
485 }
486
487 pub fn deprecated_multi_get_transaction_checkpoint(
489 &self,
490 digests: &[TransactionDigest],
491 ) -> SuiResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
492 Ok(self
493 .perpetual_tables
494 .executed_transactions_to_checkpoint
495 .multi_get(digests)?
496 .into_iter()
497 .collect())
498 }
499
500 pub fn database_is_empty(&self) -> SuiResult<bool> {
502 self.perpetual_tables.database_is_empty()
503 }
504
505 pub fn object_exists_by_key(
506 &self,
507 object_id: &ObjectID,
508 version: VersionNumber,
509 ) -> SuiResult<bool> {
510 Ok(self
511 .perpetual_tables
512 .objects
513 .contains_key(&ObjectKey(*object_id, version))?)
514 }
515
516 pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<bool>> {
517 Ok(self
518 .perpetual_tables
519 .objects
520 .multi_contains_keys(object_keys.to_vec())?
521 .into_iter()
522 .collect())
523 }
524
525 fn get_object_ref_prior_to_key(
526 &self,
527 object_id: &ObjectID,
528 version: VersionNumber,
529 ) -> Result<Option<ObjectRef>, SuiError> {
530 let Some(prior_version) = version.one_before() else {
531 return Ok(None);
532 };
533 let mut iterator = self
534 .perpetual_tables
535 .objects
536 .reversed_safe_iter_with_bounds(
537 Some(ObjectKey::min_for_id(object_id)),
538 Some(ObjectKey(*object_id, prior_version)),
539 )?;
540
541 if let Some((object_key, value)) = iterator.next().transpose()?
542 && object_key.0 == *object_id
543 {
544 return Ok(Some(
545 self.perpetual_tables.object_reference(&object_key, value)?,
546 ));
547 }
548 Ok(None)
549 }
550
551 pub fn multi_get_objects_by_key(
552 &self,
553 object_keys: &[ObjectKey],
554 ) -> Result<Vec<Option<Object>>, SuiError> {
555 let wrappers = self
556 .perpetual_tables
557 .objects
558 .multi_get(object_keys.to_vec())?;
559 let mut ret = vec![];
560
561 for (idx, w) in wrappers.into_iter().enumerate() {
562 ret.push(
563 w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
564 .transpose()?
565 .flatten(),
566 );
567 }
568 Ok(ret)
569 }
570
571 pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, SuiError> {
573 let mut result = Vec::new();
574 for id in objects {
575 result.push(self.get_object(id));
576 }
577 Ok(result)
578 }
579
580 pub(crate) fn insert_genesis_object(&self, object: Object) -> SuiResult {
585 debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
587 let object_ref = object.compute_object_reference();
588 self.insert_object_direct(object_ref, &object)
589 }
590
591 fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> SuiResult {
595 let mut write_batch = self.perpetual_tables.objects.batch();
596
597 let store_object = get_store_object(object.clone());
599 write_batch.insert_batch(
600 &self.perpetual_tables.objects,
601 std::iter::once((ObjectKey::from(object_ref), store_object)),
602 )?;
603
604 if object.get_single_owner().is_some() {
606 if !object.is_child_object() {
608 self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref], false)?;
609 }
610 }
611
612 write_batch.write()?;
613
614 Ok(())
615 }
616
617 #[instrument(level = "debug", skip_all)]
619 pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> SuiResult<()> {
620 let mut batch = self.perpetual_tables.objects.batch();
621 let ref_and_objects: Vec<_> = objects
622 .iter()
623 .map(|o| (o.compute_object_reference(), o))
624 .collect();
625
626 batch.insert_batch(
627 &self.perpetual_tables.objects,
628 ref_and_objects
629 .iter()
630 .map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))),
631 )?;
632
633 let non_child_object_refs: Vec<_> = ref_and_objects
634 .iter()
635 .filter(|(_, object)| !object.is_child_object())
636 .map(|(oref, _)| *oref)
637 .collect();
638
639 self.initialize_live_object_markers_impl(
640 &mut batch,
641 &non_child_object_refs,
642 false, )?;
644
645 batch.write()?;
646
647 Ok(())
648 }
649
650 pub async fn bulk_insert_live_objects(
651 perpetual_db: Arc<AuthorityPerpetualTables>,
652 objects: Vec<LiveObject>,
653 expected_sha3_digest: &[u8; 32],
654 num_parallel_chunks: usize,
655 ) -> SuiResult<()> {
656 let mut hasher = Sha3_256::default();
658 for object in &objects {
659 hasher.update(object.object_reference().2.inner());
660 }
661 let sha3_digest = hasher.finalize().digest;
662 if *expected_sha3_digest != sha3_digest {
663 error!(
664 "Sha does not match! expected: {:?}, actual: {:?}",
665 expected_sha3_digest, sha3_digest
666 );
667 return Err(SuiError::from("Sha does not match"));
668 }
669
670 let chunk_size = objects.len().div_ceil(num_parallel_chunks).max(1);
671 let mut remaining = objects;
672 let mut handles = Vec::new();
673 while !remaining.is_empty() {
674 let take = chunk_size.min(remaining.len());
675 let chunk: Vec<LiveObject> = remaining.drain(..take).collect();
676 let db = perpetual_db.clone();
677 handles.push(tokio::task::spawn_blocking(move || {
678 Self::insert_objects_chunk(db, chunk)
679 }));
680 }
681 for handle in handles {
682 handle.await.expect("insert task panicked")?;
683 }
684 Ok(())
685 }
686
687 fn insert_objects_chunk(
688 perpetual_db: Arc<AuthorityPerpetualTables>,
689 objects: Vec<LiveObject>,
690 ) -> SuiResult<()> {
691 let mut batch = perpetual_db.objects.batch();
692 let mut written = 0usize;
693 const MAX_BATCH_SIZE: usize = 100_000;
694 for object in objects {
695 match object {
696 LiveObject::Normal(object) => {
697 let store_object_wrapper = get_store_object(object.clone());
698 batch.insert_batch(
699 &perpetual_db.objects,
700 std::iter::once((
701 ObjectKey::from(object.compute_object_reference()),
702 store_object_wrapper,
703 )),
704 )?;
705 if !object.is_child_object() {
706 Self::initialize_live_object_markers(
707 &perpetual_db.live_owned_object_markers,
708 &mut batch,
709 &[object.compute_object_reference()],
710 true, )?;
712 }
713 }
714 LiveObject::Wrapped(object_key) => {
715 batch.insert_batch(
716 &perpetual_db.objects,
717 std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
718 object_key,
719 StoreObject::Wrapped.into(),
720 )),
721 )?;
722 }
723 }
724 written += 1;
725 if written > MAX_BATCH_SIZE {
726 batch.write()?;
727 batch = perpetual_db.objects.batch();
728 written = 0;
729 }
730 }
731 batch.write()?;
732 Ok(())
733 }
734
735 pub fn set_epoch_start_configuration(
736 &self,
737 epoch_start_configuration: &EpochStartConfiguration,
738 ) -> SuiResult {
739 self.perpetual_tables
740 .set_epoch_start_configuration(epoch_start_configuration)?;
741 Ok(())
742 }
743
744 pub fn get_epoch_start_configuration(&self) -> SuiResult<Option<EpochStartConfiguration>> {
745 Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
746 }
747
748 #[instrument(level = "debug", skip_all)]
753 pub fn build_db_batch(
754 &self,
755 epoch_id: EpochId,
756 tx_outputs: &[Arc<TransactionOutputs>],
757 ) -> SuiResult<DBBatch> {
758 let mut write_batch = self.perpetual_tables.transactions.batch();
759 for outputs in tx_outputs {
760 self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
761 }
762 fail_point!("crash");
764
765 trace!(
766 "built batch for committed transactions: {:?}",
767 tx_outputs
768 .iter()
769 .map(|tx| tx.transaction.digest())
770 .collect::<Vec<_>>()
771 );
772
773 fail_point!("crash");
775
776 Ok(write_batch)
777 }
778
779 fn write_one_transaction_outputs(
780 &self,
781 write_batch: &mut DBBatch,
782 epoch_id: EpochId,
783 tx_outputs: &TransactionOutputs,
784 ) -> SuiResult {
785 let TransactionOutputs {
786 transaction,
787 effects,
788 markers,
789 wrapped,
790 deleted,
791 written,
792 events,
793 unchanged_loaded_runtime_objects,
794 locks_to_delete,
795 new_locks_to_init,
796 ..
797 } = tx_outputs;
798
799 let effects_digest = effects.digest();
800 let transaction_digest = transaction.digest();
801 write_batch
804 .insert_batch(
805 &self.perpetual_tables.effects,
806 [(effects_digest, effects.clone())],
807 )?
808 .insert_batch(
809 &self.perpetual_tables.executed_effects,
810 [(transaction_digest, effects_digest)],
811 )?;
812
813 write_batch.insert_batch(
815 &self.perpetual_tables.transactions,
816 iter::once((transaction_digest, transaction.serializable_ref())),
817 )?;
818
819 write_batch.insert_batch(
820 &self.perpetual_tables.executed_transaction_digests,
821 [((epoch_id, *transaction_digest), ())],
822 )?;
823
824 write_batch.insert_batch(
826 &self.perpetual_tables.object_per_epoch_marker_table_v2,
827 markers
828 .iter()
829 .map(|(key, marker_value)| (EpochMarkerKey(epoch_id, *key), *marker_value)),
830 )?;
831 write_batch.insert_batch(
832 &self.perpetual_tables.objects,
833 deleted
834 .iter()
835 .map(|key| (key, StoreObject::Deleted))
836 .chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
837 .map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
838 )?;
839
840 let new_objects = written.iter().map(|(id, new_object)| {
842 let version = new_object.version();
843 trace!(?id, ?version, "writing object");
844 let store_object = get_store_object(new_object.clone());
845 (ObjectKey(*id, version), store_object)
846 });
847
848 write_batch.insert_batch(&self.perpetual_tables.objects, new_objects)?;
849
850 if effects.events_digest().is_some() {
852 write_batch.insert_batch(
853 &self.perpetual_tables.events_2,
854 [(transaction_digest, events)],
855 )?;
856 }
857
858 if !unchanged_loaded_runtime_objects.is_empty() {
860 write_batch.insert_batch(
861 &self.perpetual_tables.unchanged_loaded_runtime_objects,
862 [(transaction_digest, unchanged_loaded_runtime_objects)],
863 )?;
864 }
865
866 self.initialize_live_object_markers_impl(write_batch, new_locks_to_init, false)?;
867
868 self.delete_live_object_markers(write_batch, locks_to_delete)?;
871
872 debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
873
874 Ok(())
875 }
876
877 pub(crate) fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> SuiResult {
880 let mut batch = self.perpetual_tables.transactions.batch();
881 batch.insert_batch(
882 &self.perpetual_tables.transactions,
883 [(tx.digest(), tx.clone().into_unsigned().serializable_ref())],
884 )?;
885 batch.write()?;
886 Ok(())
887 }
888
889 pub(crate) fn get_lock(
892 &self,
893 obj_ref: ObjectRef,
894 epoch_store: &AuthorityPerEpochStore,
895 ) -> SuiLockResult {
896 if self
897 .perpetual_tables
898 .live_owned_object_markers
899 .get(&obj_ref)?
900 .is_none()
901 {
902 return Ok(ObjectLockStatus::LockedAtDifferentVersion {
903 locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
904 });
905 }
906
907 let tables = epoch_store.tables()?;
908 let epoch_id = epoch_store.epoch();
909
910 if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
911 Ok(ObjectLockStatus::LockedToTx {
912 locked_by_tx: LockDetailsDeprecated {
913 epoch: epoch_id,
914 tx_digest,
915 },
916 })
917 } else {
918 Ok(ObjectLockStatus::Initialized)
919 }
920 }
921
922 pub(crate) fn get_latest_live_version_for_object_id(
924 &self,
925 object_id: ObjectID,
926 ) -> SuiResult<ObjectRef> {
927 let mut iterator = self
928 .perpetual_tables
929 .live_owned_object_markers
930 .reversed_safe_iter_with_bounds(
931 None,
932 Some((object_id, SequenceNumber::MAX, ObjectDigest::MAX)),
933 )?;
934 Ok(iterator
935 .next()
936 .transpose()?
937 .and_then(|value| {
938 if value.0.0 == object_id {
939 Some(value)
940 } else {
941 None
942 }
943 })
944 .ok_or_else(|| {
945 SuiError::from(UserInputError::ObjectNotFound {
946 object_id,
947 version: None,
948 })
949 })?
950 .0)
951 }
952
953 pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> SuiResult {
958 let locks = self
959 .perpetual_tables
960 .live_owned_object_markers
961 .multi_get(objects)?;
962 for (lock, obj_ref) in locks.into_iter().zip_debug_eq(objects) {
963 if lock.is_none() {
964 let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
965 fp_bail!(
966 UserInputError::ObjectVersionUnavailableForConsumption {
967 provided_obj_ref: *obj_ref,
968 current_version: latest_lock.1
969 }
970 .into()
971 );
972 }
973 }
974 Ok(())
975 }
976
977 fn initialize_live_object_markers_impl(
980 &self,
981 write_batch: &mut DBBatch,
982 objects: &[ObjectRef],
983 is_force_reset: bool,
984 ) -> SuiResult {
985 AuthorityStore::initialize_live_object_markers(
986 &self.perpetual_tables.live_owned_object_markers,
987 write_batch,
988 objects,
989 is_force_reset,
990 )
991 }
992
993 pub fn initialize_live_object_markers(
994 live_object_marker_table: &DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
995 write_batch: &mut DBBatch,
996 objects: &[ObjectRef],
997 is_force_reset: bool,
998 ) -> SuiResult {
999 trace!(?objects, "initialize_locks");
1000
1001 if !is_force_reset {
1002 let live_object_markers = live_object_marker_table.multi_get(objects)?;
1003 let existing_live_object_markers: Vec<ObjectRef> = live_object_markers
1007 .iter()
1008 .zip_debug_eq(objects)
1009 .filter_map(|(lock_opt, objref)| {
1010 lock_opt.clone().flatten().map(|_tx_digest| *objref)
1011 })
1012 .collect();
1013 if !existing_live_object_markers.is_empty() {
1014 info!(
1015 ?existing_live_object_markers,
1016 "Cannot initialize live_object_markers because some exist already"
1017 );
1018 return Err(SuiErrorKind::ObjectLockAlreadyInitialized {
1019 refs: existing_live_object_markers,
1020 }
1021 .into());
1022 }
1023 }
1024
1025 write_batch.insert_batch(
1026 live_object_marker_table,
1027 objects.iter().map(|obj_ref| (obj_ref, None)),
1028 )?;
1029 Ok(())
1030 }
1031
1032 fn delete_live_object_markers(
1034 &self,
1035 write_batch: &mut DBBatch,
1036 objects: &[ObjectRef],
1037 ) -> SuiResult {
1038 trace!(?objects, "delete_locks");
1039 write_batch.delete_batch(
1040 &self.perpetual_tables.live_owned_object_markers,
1041 objects.iter(),
1042 )?;
1043 Ok(())
1044 }
1045
1046 pub fn find_object_lt_or_eq_version(
1051 &self,
1052 object_id: ObjectID,
1053 version: SequenceNumber,
1054 ) -> SuiResult<Option<Object>> {
1055 self.perpetual_tables
1056 .find_object_lt_or_eq_version(object_id, version)
1057 }
1058
1059 pub fn get_latest_object_ref_or_tombstone(
1069 &self,
1070 object_id: ObjectID,
1071 ) -> Result<Option<ObjectRef>, SuiError> {
1072 self.perpetual_tables
1073 .get_latest_object_ref_or_tombstone(object_id)
1074 }
1075
1076 pub fn get_latest_object_ref_if_alive(
1079 &self,
1080 object_id: ObjectID,
1081 ) -> Result<Option<ObjectRef>, SuiError> {
1082 match self.get_latest_object_ref_or_tombstone(object_id)? {
1083 Some(objref) if objref.2.is_alive() => Ok(Some(objref)),
1084 _ => Ok(None),
1085 }
1086 }
1087
1088 pub fn get_latest_object_or_tombstone(
1092 &self,
1093 object_id: ObjectID,
1094 ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, SuiError> {
1095 let Some((object_key, store_object)) = self
1096 .perpetual_tables
1097 .get_latest_object_or_tombstone(object_id)?
1098 else {
1099 return Ok(None);
1100 };
1101
1102 if let Some(object_ref) = self
1103 .perpetual_tables
1104 .tombstone_reference(&object_key, &store_object)?
1105 {
1106 return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
1107 }
1108
1109 let object = self
1110 .perpetual_tables
1111 .object(&object_key, store_object)?
1112 .expect("Non tombstone store object could not be converted to object");
1113
1114 Ok(Some((object_key, ObjectOrTombstone::Object(object))))
1115 }
1116
1117 pub fn insert_transaction_and_effects(
1118 &self,
1119 transaction: &VerifiedTransaction,
1120 transaction_effects: &TransactionEffects,
1121 ) -> Result<(), TypedStoreError> {
1122 let mut write_batch = self.perpetual_tables.transactions.batch();
1123 write_batch
1126 .insert_batch(
1127 &self.perpetual_tables.effects,
1128 [(transaction_effects.digest(), transaction_effects)],
1129 )?
1130 .insert_batch(
1131 &self.perpetual_tables.transactions,
1132 [(transaction.digest(), transaction.serializable_ref())],
1133 )?;
1134
1135 write_batch.write()?;
1136 Ok(())
1137 }
1138
1139 pub fn multi_insert_transaction_and_effects<'a>(
1140 &self,
1141 transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
1142 ) -> Result<(), TypedStoreError> {
1143 let mut write_batch = self.perpetual_tables.transactions.batch();
1144 for tx in transactions {
1145 write_batch
1146 .insert_batch(
1147 &self.perpetual_tables.effects,
1148 [(tx.effects.digest(), &tx.effects)],
1149 )?
1150 .insert_batch(
1151 &self.perpetual_tables.transactions,
1152 [(tx.transaction.digest(), tx.transaction.serializable_ref())],
1153 )?;
1154 }
1155
1156 write_batch.write()?;
1157 Ok(())
1158 }
1159
1160 pub fn multi_get_transaction_blocks(
1161 &self,
1162 tx_digests: &[TransactionDigest],
1163 ) -> Result<Vec<Option<VerifiedTransaction>>, TypedStoreError> {
1164 self.perpetual_tables
1165 .transactions
1166 .multi_get(tx_digests)
1167 .map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())
1168 }
1169
1170 pub fn get_transaction_block(
1171 &self,
1172 tx_digest: &TransactionDigest,
1173 ) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
1174 self.perpetual_tables
1175 .transactions
1176 .get(tx_digest)
1177 .map(|v| v.map(|v| v.into()))
1178 }
1179
1180 pub fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1187 get_sui_system_state(self.perpetual_tables.as_ref())
1188 }
1189
1190 pub fn expensive_check_sui_conservation<T>(
1191 self: &Arc<Self>,
1192 type_layout_store: T,
1193 old_epoch_store: &AuthorityPerEpochStore,
1194 ) -> SuiResult
1195 where
1196 T: TypeLayoutStore + Send + Copy,
1197 {
1198 if !self.enable_epoch_sui_conservation_check {
1199 return Ok(());
1200 }
1201
1202 let executor = old_epoch_store.executor();
1203 info!("Starting SUI conservation check. This may take a while..");
1204 let cur_time = Instant::now();
1205 let mut pending_objects = vec![];
1206 let mut count = 0;
1207 let mut size = 0;
1208 let (mut total_sui, mut total_storage_rebate) = thread::scope(|s| {
1209 let pending_tasks = FuturesUnordered::new();
1210 for o in self.iter_live_object_set(false) {
1211 match o {
1212 LiveObject::Normal(object) => {
1213 size += object.object_size_for_gas_metering();
1214 count += 1;
1215 pending_objects.push(object);
1216 if count % 1_000_000 == 0 {
1217 let mut task_objects = vec![];
1218 mem::swap(&mut pending_objects, &mut task_objects);
1219 pending_tasks.push(s.spawn(move || {
1220 let mut layout_resolver = executor.type_layout_resolver(
1221 old_epoch_store.protocol_config(),
1222 Box::new(type_layout_store),
1223 );
1224 let mut total_storage_rebate = 0;
1225 let mut total_sui = 0;
1226 for object in task_objects {
1227 total_storage_rebate += object.storage_rebate;
1228 let object_contained_sui = match object
1231 .get_total_sui(layout_resolver.as_mut())
1232 {
1233 Ok(sui) => sui,
1234 Err(e)
1235 if old_epoch_store.get_chain()
1236 == sui_protocol_config::Chain::Testnet =>
1237 {
1238 error!(
1239 "Error calculating total SUI for object {:?}: {:?}",
1240 object.compute_object_reference(),
1241 e
1242 );
1243 0
1244 }
1245 Err(e) => panic!(
1246 "Error calculating total SUI for object {:?}: {:?}",
1247 object.compute_object_reference(),
1248 e
1249 ),
1250 };
1251 total_sui += object_contained_sui - object.storage_rebate;
1252 }
1253 if count % 50_000_000 == 0 {
1254 info!("Processed {} objects", count);
1255 }
1256 (total_sui, total_storage_rebate)
1257 }));
1258 }
1259 }
1260 LiveObject::Wrapped(_) => {
1261 unreachable!("Explicitly asked to not include wrapped tombstones")
1262 }
1263 }
1264 }
1265 pending_tasks.into_iter().fold((0, 0), |init, result| {
1266 let result = result.join().unwrap();
1267 (init.0 + result.0, init.1 + result.1)
1268 })
1269 });
1270 let mut layout_resolver = executor.type_layout_resolver(
1271 old_epoch_store.protocol_config(),
1272 Box::new(type_layout_store),
1273 );
1274 for object in pending_objects {
1275 total_storage_rebate += object.storage_rebate;
1276 total_sui +=
1277 object.get_total_sui(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
1278 }
1279 info!(
1280 "Scanned {} live objects, took {:?}",
1281 count,
1282 cur_time.elapsed()
1283 );
1284 self.metrics
1285 .sui_conservation_live_object_count
1286 .set(count as i64);
1287 self.metrics
1288 .sui_conservation_live_object_size
1289 .set(size as i64);
1290 self.metrics
1291 .sui_conservation_check_latency
1292 .set(cur_time.elapsed().as_secs() as i64);
1293
1294 let system_state = self
1296 .get_sui_system_state_object_unsafe()
1297 .expect("Reading sui system state object cannot fail")
1298 .into_sui_system_state_summary();
1299 let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
1300 info!(
1301 "Total SUI amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
1302 total_sui, storage_fund_balance, total_storage_rebate, system_state.epoch
1303 );
1304
1305 let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
1306 self.metrics
1307 .sui_conservation_storage_fund
1308 .set(storage_fund_balance as i64);
1309 self.metrics
1310 .sui_conservation_storage_fund_imbalance
1311 .set(imbalance);
1312 self.metrics
1313 .sui_conservation_imbalance
1314 .set((total_sui as i128 - TOTAL_SUPPLY_MIST as i128) as i64);
1315
1316 if let Some(expected_imbalance) = self
1317 .perpetual_tables
1318 .expected_storage_fund_imbalance
1319 .get(&())
1320 .expect("DB read cannot fail")
1321 {
1322 fp_ensure!(
1323 imbalance == expected_imbalance,
1324 SuiError::from(
1325 format!(
1326 "Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
1327 system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
1328 ).as_str()
1329 )
1330 );
1331 } else {
1332 self.perpetual_tables
1333 .expected_storage_fund_imbalance
1334 .insert(&(), &imbalance)
1335 .expect("DB write cannot fail");
1336 }
1337
1338 if let Some(expected_sui) = self
1339 .perpetual_tables
1340 .expected_network_sui_amount
1341 .get(&())
1342 .expect("DB read cannot fail")
1343 {
1344 fp_ensure!(
1345 total_sui == expected_sui,
1346 SuiError::from(
1347 format!(
1348 "Inconsistent state detected at epoch {}: total sui: {}, expecting {}",
1349 system_state.epoch, total_sui, expected_sui
1350 )
1351 .as_str()
1352 )
1353 );
1354 } else {
1355 self.perpetual_tables
1356 .expected_network_sui_amount
1357 .insert(&(), &total_sui)
1358 .expect("DB write cannot fail");
1359 }
1360
1361 Ok(())
1362 }
1363
1364 #[instrument(level = "error", skip_all)]
1367 pub fn maybe_reaccumulate_state_hash(
1368 &self,
1369 cur_epoch_store: &AuthorityPerEpochStore,
1370 new_protocol_version: ProtocolVersion,
1371 ) {
1372 let old_simplified_unwrap_then_delete = cur_epoch_store
1373 .protocol_config()
1374 .simplified_unwrap_then_delete();
1375 let new_simplified_unwrap_then_delete =
1376 ProtocolConfig::get_for_version(new_protocol_version, cur_epoch_store.get_chain())
1377 .simplified_unwrap_then_delete();
1378 let should_reaccumulate =
1381 !old_simplified_unwrap_then_delete && new_simplified_unwrap_then_delete;
1382 if !should_reaccumulate {
1383 return;
1384 }
1385 info!(
1386 "[Re-accumulate] simplified_unwrap_then_delete is enabled in the new protocol version, re-accumulating state hash"
1387 );
1388 let cur_time = Instant::now();
1389 std::thread::scope(|s| {
1390 let pending_tasks = FuturesUnordered::new();
1391 const BITS: u8 = 5;
1400 for index in 0u8..(1 << BITS) {
1401 pending_tasks.push(s.spawn(move || {
1402 let mut id_bytes = [0; ObjectID::LENGTH];
1403 id_bytes[0] = index << (8 - BITS);
1404 let start_id = ObjectID::new(id_bytes);
1405
1406 id_bytes[0] |= (1 << (8 - BITS)) - 1;
1407 for element in id_bytes.iter_mut().skip(1) {
1408 *element = u8::MAX;
1409 }
1410 let end_id = ObjectID::new(id_bytes);
1411
1412 info!(
1413 "[Re-accumulate] Scanning object ID range {:?}..{:?}",
1414 start_id, end_id
1415 );
1416 let mut prev = (
1417 ObjectKey::min_for_id(&ObjectID::ZERO),
1418 StoreObjectWrapper::V1(StoreObject::Deleted),
1419 );
1420 let mut object_scanned: u64 = 0;
1421 let mut wrapped_objects_to_remove = vec![];
1422 for db_result in self.perpetual_tables.objects.safe_range_iter(
1423 ObjectKey::min_for_id(&start_id)..=ObjectKey::max_for_id(&end_id),
1424 ) {
1425 match db_result {
1426 Ok((object_key, object)) => {
1427 object_scanned += 1;
1428 if object_scanned.is_multiple_of(100000) {
1429 info!(
1430 "[Re-accumulate] Task {}: object scanned: {}",
1431 index, object_scanned,
1432 );
1433 }
1434 if matches!(prev.1.inner(), StoreObject::Wrapped)
1435 && object_key.0 != prev.0.0
1436 {
1437 wrapped_objects_to_remove
1438 .push(WrappedObject::new(prev.0.0, prev.0.1));
1439 }
1440
1441 prev = (object_key, object);
1442 }
1443 Err(err) => {
1444 warn!("Object iterator encounter RocksDB error {:?}", err);
1445 return Err(err);
1446 }
1447 }
1448 }
1449 if matches!(prev.1.inner(), StoreObject::Wrapped) {
1450 wrapped_objects_to_remove.push(WrappedObject::new(prev.0.0, prev.0.1));
1451 }
1452 info!(
1453 "[Re-accumulate] Task {}: object scanned: {}, wrapped objects: {}",
1454 index,
1455 object_scanned,
1456 wrapped_objects_to_remove.len(),
1457 );
1458 Ok((wrapped_objects_to_remove, object_scanned))
1459 }));
1460 }
1461 let (last_checkpoint_of_epoch, cur_accumulator) = self
1462 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
1463 .expect("read cannot fail")
1464 .expect("accumulator must exist");
1465 let (accumulator, total_objects_scanned, total_wrapped_objects) =
1466 pending_tasks.into_iter().fold(
1467 (cur_accumulator, 0u64, 0usize),
1468 |(mut accumulator, total_objects_scanned, total_wrapped_objects), task| {
1469 let (wrapped_objects_to_remove, object_scanned) =
1470 task.join().unwrap().unwrap();
1471 accumulator.remove_all(
1472 wrapped_objects_to_remove
1473 .iter()
1474 .map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
1475 .collect::<Vec<Vec<u8>>>(),
1476 );
1477 (
1478 accumulator,
1479 total_objects_scanned + object_scanned,
1480 total_wrapped_objects + wrapped_objects_to_remove.len(),
1481 )
1482 },
1483 );
1484 info!(
1485 "[Re-accumulate] Total objects scanned: {}, total wrapped objects: {}",
1486 total_objects_scanned, total_wrapped_objects,
1487 );
1488 info!(
1489 "[Re-accumulate] New accumulator value: {:?}",
1490 accumulator.digest()
1491 );
1492 self.insert_state_hash_for_epoch(
1493 cur_epoch_store.epoch(),
1494 &last_checkpoint_of_epoch,
1495 &accumulator,
1496 )
1497 .unwrap();
1498 });
1499 info!(
1500 "[Re-accumulate] Re-accumulating took {}seconds",
1501 cur_time.elapsed().as_secs()
1502 );
1503 }
1504
1505 pub async fn prune_objects_and_compact_for_testing(
1506 &self,
1507 checkpoint_store: &Arc<CheckpointStore>,
1508 rpc_index: Option<&RpcIndexStore>,
1509 ) {
1510 let pruning_config = AuthorityStorePruningConfig {
1511 num_epochs_to_retain: 0,
1512 ..Default::default()
1513 };
1514 let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
1515 &self.perpetual_tables,
1516 checkpoint_store,
1517 rpc_index,
1518 pruning_config,
1519 AuthorityStorePruningMetrics::new_for_test(),
1520 EPOCH_DURATION_MS_FOR_TESTING,
1521 )
1522 .await;
1523 let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
1524 }
1525
1526 pub fn remove_executed_effects_for_testing(
1527 &self,
1528 tx_digest: &TransactionDigest,
1529 ) -> anyhow::Result<()> {
1530 let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
1531 if let Some(effects_digest) = effects_digest {
1532 self.perpetual_tables.executed_effects.remove(tx_digest)?;
1533 self.perpetual_tables.effects.remove(&effects_digest)?;
1534 }
1535 Ok(())
1536 }
1537
1538 #[cfg(test)]
1539 pub async fn prune_objects_immediately_for_testing(
1540 &self,
1541 transaction_effects: Vec<TransactionEffects>,
1542 ) -> anyhow::Result<()> {
1543 let mut wb = self.perpetual_tables.objects.batch();
1544
1545 let mut object_keys_to_prune = vec![];
1546 for effects in &transaction_effects {
1547 for (object_id, seq_number) in effects.modified_at_versions() {
1548 info!("Pruning object {:?} version {:?}", object_id, seq_number);
1549 object_keys_to_prune.push(ObjectKey(object_id, seq_number));
1550 }
1551 }
1552
1553 wb.delete_batch(
1554 &self.perpetual_tables.objects,
1555 object_keys_to_prune.into_iter(),
1556 )?;
1557 wb.write()?;
1558 Ok(())
1559 }
1560
1561 #[cfg(msim)]
1563 pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
1564 self.perpetual_tables
1565 .objects
1566 .safe_iter_with_bounds(
1567 Some(ObjectKey(object_id, VersionNumber::MIN)),
1568 Some(ObjectKey(object_id, VersionNumber::MAX)),
1569 )
1570 .collect::<Result<Vec<_>, _>>()
1571 .unwrap()
1572 .len()
1573 }
1574}
1575
1576impl GlobalStateHashStore for AuthorityStore {
1577 fn get_object_ref_prior_to_key_deprecated(
1578 &self,
1579 object_id: &ObjectID,
1580 version: VersionNumber,
1581 ) -> SuiResult<Option<ObjectRef>> {
1582 self.get_object_ref_prior_to_key(object_id, version)
1583 }
1584
1585 fn get_root_state_hash_for_epoch(
1586 &self,
1587 epoch: EpochId,
1588 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
1589 self.perpetual_tables
1590 .root_state_hash_by_epoch
1591 .get(&epoch)
1592 .map_err(Into::into)
1593 }
1594
1595 fn get_root_state_hash_for_highest_epoch(
1596 &self,
1597 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
1598 Ok(self
1599 .perpetual_tables
1600 .root_state_hash_by_epoch
1601 .reversed_safe_iter_with_bounds(None, None)?
1602 .next()
1603 .transpose()?)
1604 }
1605
1606 fn insert_state_hash_for_epoch(
1607 &self,
1608 epoch: EpochId,
1609 last_checkpoint_of_epoch: &CheckpointSequenceNumber,
1610 acc: &GlobalStateHash,
1611 ) -> SuiResult {
1612 self.perpetual_tables
1613 .root_state_hash_by_epoch
1614 .insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
1615 self.root_state_notify_read
1616 .notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
1617
1618 Ok(())
1619 }
1620
1621 fn iter_live_object_set(
1622 &self,
1623 include_wrapped_object: bool,
1624 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1625 Box::new(
1626 self.perpetual_tables
1627 .iter_live_object_set(include_wrapped_object),
1628 )
1629 }
1630}
1631
1632impl ObjectStore for AuthorityStore {
1633 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
1635 self.perpetual_tables.as_ref().get_object(object_id)
1636 }
1637
1638 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
1639 self.perpetual_tables.get_object_by_key(object_id, version)
1640 }
1641}
1642
1643pub struct ResolverWrapper {
1645 pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1646 pub metrics: Arc<ResolverMetrics>,
1647}
1648
1649impl ResolverWrapper {
1650 pub fn new(
1651 resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1652 metrics: Arc<ResolverMetrics>,
1653 ) -> Self {
1654 metrics.module_cache_size.set(0);
1655 ResolverWrapper { resolver, metrics }
1656 }
1657
1658 fn inc_cache_size_gauge(&self) {
1659 let current = self.metrics.module_cache_size.get();
1661 self.metrics.module_cache_size.set(current + 1);
1662 }
1663}
1664
1665impl ModuleResolver for ResolverWrapper {
1666 type Error = SuiError;
1667 fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
1668 self.inc_cache_size_gauge();
1669 get_module(&*self.resolver, module_id)
1670 }
1671
1672 fn get_packages_static<const N: usize>(
1673 &self,
1674 ids: [AccountAddress; N],
1675 ) -> Result<[Option<SerializedPackage>; N], Self::Error> {
1676 let mut packages = [const { None }; N];
1677 for (i, id) in ids.iter().enumerate() {
1678 packages[i] = get_package(&*self.resolver, &ObjectID::from(*id))?;
1679 }
1680 Ok(packages)
1681 }
1682
1683 fn get_packages<'a>(
1684 &self,
1685 ids: impl ExactSizeIterator<Item = &'a AccountAddress>,
1686 ) -> Result<Vec<Option<SerializedPackage>>, Self::Error> {
1687 ids.map(|id| get_package(&*self.resolver, &ObjectID::from(*id)))
1688 .collect()
1689 }
1690}
1691
1692pub enum UpdateType {
1693 Transaction(TransactionEffectsDigest),
1694 Genesis,
1695}
1696
1697pub type SuiLockResult = SuiResult<ObjectLockStatus>;
1698
1699#[derive(Debug, PartialEq, Eq)]
1700pub enum ObjectLockStatus {
1701 Initialized,
1702 LockedToTx { locked_by_tx: LockDetailsDeprecated },
1703 LockedAtDifferentVersion { locked_ref: ObjectRef },
1704}
1705
1706#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1707pub enum LockDetailsWrapperDeprecated {
1708 V1(LockDetailsV1Deprecated),
1709}
1710
1711impl LockDetailsWrapperDeprecated {
1712 pub fn migrate(self) -> Self {
1713 self
1716 }
1717
1718 pub fn inner(&self) -> &LockDetailsDeprecated {
1721 match self {
1722 Self::V1(v1) => v1,
1723
1724 #[allow(unreachable_patterns)]
1726 _ => panic!("lock details should have been migrated to latest version at read time"),
1727 }
1728 }
1729 pub fn into_inner(self) -> LockDetailsDeprecated {
1730 match self {
1731 Self::V1(v1) => v1,
1732
1733 #[allow(unreachable_patterns)]
1735 _ => panic!("lock details should have been migrated to latest version at read time"),
1736 }
1737 }
1738}
1739
1740#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1741pub struct LockDetailsV1Deprecated {
1742 pub epoch: EpochId,
1743 pub tx_digest: TransactionDigest,
1744}
1745
1746pub type LockDetailsDeprecated = LockDetailsV1Deprecated;
1747
1748impl From<LockDetailsDeprecated> for LockDetailsWrapperDeprecated {
1749 fn from(details: LockDetailsDeprecated) -> Self {
1750 LockDetailsWrapperDeprecated::V1(details)
1752 }
1753}