1use std::sync::Arc;
5use std::{iter, mem, thread};
6
7use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
8use crate::authority::authority_store_pruner::{
9 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
10};
11use crate::authority::authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object};
12use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
13use crate::global_state_hasher::GlobalStateHashStore;
14use crate::rpc_index::RpcIndexStore;
15use crate::transaction_outputs::TransactionOutputs;
16use either::Either;
17use fastcrypto::hash::{HashFunction, MultisetHash, Sha3_256};
18use futures::stream::FuturesUnordered;
19use move_core_types::resolver::ModuleResolver;
20use serde::{Deserialize, Serialize};
21use sui_config::node::AuthorityStorePruningConfig;
22use sui_macros::fail_point_arg;
23use sui_types::error::{SuiErrorKind, UserInputError};
24use sui_types::execution::TypeLayoutStore;
25use sui_types::global_state_hash::GlobalStateHash;
26use sui_types::message_envelope::Message;
27use sui_types::storage::{
28 BackingPackageStore, FullObjectKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore,
29 get_module,
30};
31use sui_types::sui_system_state::get_sui_system_state;
32use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure};
33use tokio::time::Instant;
34use tracing::{debug, info, trace};
35use typed_store::traits::Map;
36use typed_store::{
37 TypedStoreError,
38 rocks::{DBBatch, DBMap},
39};
40
41use super::authority_store_tables::LiveObject;
42use super::{authority_store_tables::AuthorityPerpetualTables, *};
43use mysten_common::sync::notify_read::NotifyRead;
44use sui_types::effects::{TransactionEffects, TransactionEvents};
45use sui_types::gas_coin::TOTAL_SUPPLY_MIST;
46
47struct AuthorityStoreMetrics {
48 sui_conservation_check_latency: IntGauge,
49 sui_conservation_live_object_count: IntGauge,
50 sui_conservation_live_object_size: IntGauge,
51 sui_conservation_imbalance: IntGauge,
52 sui_conservation_storage_fund: IntGauge,
53 sui_conservation_storage_fund_imbalance: IntGauge,
54 epoch_flags: IntGaugeVec,
55}
56
57impl AuthorityStoreMetrics {
58 pub fn new(registry: &Registry) -> Self {
59 Self {
60 sui_conservation_check_latency: register_int_gauge_with_registry!(
61 "sui_conservation_check_latency",
62 "Number of seconds took to scan all live objects in the store for SUI conservation check",
63 registry,
64 ).unwrap(),
65 sui_conservation_live_object_count: register_int_gauge_with_registry!(
66 "sui_conservation_live_object_count",
67 "Number of live objects in the store",
68 registry,
69 ).unwrap(),
70 sui_conservation_live_object_size: register_int_gauge_with_registry!(
71 "sui_conservation_live_object_size",
72 "Size in bytes of live objects in the store",
73 registry,
74 ).unwrap(),
75 sui_conservation_imbalance: register_int_gauge_with_registry!(
76 "sui_conservation_imbalance",
77 "Total amount of SUI in the network - 10B * 10^9. This delta shows the amount of imbalance",
78 registry,
79 ).unwrap(),
80 sui_conservation_storage_fund: register_int_gauge_with_registry!(
81 "sui_conservation_storage_fund",
82 "Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
83 registry,
84 ).unwrap(),
85 sui_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
86 "sui_conservation_storage_fund_imbalance",
87 "Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
88 registry,
89 ).unwrap(),
90 epoch_flags: register_int_gauge_vec_with_registry!(
91 "epoch_flags",
92 "Local flags of the currently running epoch",
93 &["flag"],
94 registry,
95 ).unwrap(),
96 }
97 }
98}
99
100pub struct AuthorityStore {
107 pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
108
109 pub(crate) root_state_notify_read:
110 NotifyRead<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
111
112 enable_epoch_sui_conservation_check: bool,
114
115 metrics: AuthorityStoreMetrics,
116}
117
118pub type ExecutionLockReadGuard<'a> = tokio::sync::RwLockReadGuard<'a, EpochId>;
119pub type ExecutionLockWriteGuard<'a> = tokio::sync::RwLockWriteGuard<'a, EpochId>;
120
121impl AuthorityStore {
122 pub async fn open(
125 perpetual_tables: Arc<AuthorityPerpetualTables>,
126 genesis: &Genesis,
127 config: &NodeConfig,
128 registry: &Registry,
129 ) -> SuiResult<Arc<Self>> {
130 let enable_epoch_sui_conservation_check = config
131 .expensive_safety_check_config
132 .enable_epoch_sui_conservation_check();
133
134 let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
135 info!("Creating new epoch start config from genesis");
136
137 #[allow(unused_mut)]
138 let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
139 fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
140 info!("Setting initial epoch flags to {:?}", flags);
141 initial_epoch_flags = flags;
142 });
143
144 let epoch_start_configuration = EpochStartConfiguration::new(
145 genesis.sui_system_object().into_epoch_start_state(),
146 *genesis.checkpoint().digest(),
147 &genesis.objects(),
148 initial_epoch_flags,
149 )?;
150 perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
151 epoch_start_configuration
152 } else {
153 info!("Loading epoch start config from DB");
154 perpetual_tables
155 .epoch_start_configuration
156 .get(&())?
157 .expect("Epoch start configuration must be set in non-empty DB")
158 };
159 let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
160 info!("Epoch start config: {:?}", epoch_start_configuration);
161 info!("Cur epoch: {:?}", cur_epoch);
162 let this = Self::open_inner(
163 genesis,
164 perpetual_tables,
165 enable_epoch_sui_conservation_check,
166 registry,
167 )
168 .await?;
169 this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
170 Ok(this)
171 }
172
173 pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
174 for flag in old {
175 self.metrics
176 .epoch_flags
177 .with_label_values(&[&flag.to_string()])
178 .set(0);
179 }
180 for flag in new {
181 self.metrics
182 .epoch_flags
183 .with_label_values(&[&flag.to_string()])
184 .set(1);
185 }
186 }
187
188 pub fn clear_object_per_epoch_marker_table(
191 &self,
192 _execution_guard: &ExecutionLockWriteGuard<'_>,
193 ) -> SuiResult<()> {
194 self.perpetual_tables
198 .object_per_epoch_marker_table
199 .schedule_delete_all()?;
200 Ok(self
201 .perpetual_tables
202 .object_per_epoch_marker_table_v2
203 .schedule_delete_all()?)
204 }
205
206 pub async fn open_with_committee_for_testing(
207 perpetual_tables: Arc<AuthorityPerpetualTables>,
208 committee: &Committee,
209 genesis: &Genesis,
210 ) -> SuiResult<Arc<Self>> {
211 assert_eq!(committee.epoch, 0);
214 Self::open_inner(genesis, perpetual_tables, true, &Registry::new()).await
215 }
216
217 async fn open_inner(
218 genesis: &Genesis,
219 perpetual_tables: Arc<AuthorityPerpetualTables>,
220 enable_epoch_sui_conservation_check: bool,
221 registry: &Registry,
222 ) -> SuiResult<Arc<Self>> {
223 let store = Arc::new(Self {
224 perpetual_tables,
225 root_state_notify_read: NotifyRead::<
226 EpochId,
227 (CheckpointSequenceNumber, GlobalStateHash),
228 >::new(),
229 enable_epoch_sui_conservation_check,
230 metrics: AuthorityStoreMetrics::new(registry),
231 });
232 if store
234 .database_is_empty()
235 .expect("Database read should not fail at init.")
236 {
237 store
238 .bulk_insert_genesis_objects(genesis.objects())
239 .expect("Cannot bulk insert genesis objects");
240
241 let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
243
244 store
245 .perpetual_tables
246 .transactions
247 .insert(transaction.digest(), transaction.serializable_ref())
248 .unwrap();
249
250 store
251 .perpetual_tables
252 .effects
253 .insert(&genesis.effects().digest(), genesis.effects())
254 .unwrap();
255 if genesis.effects().events_digest().is_some() {
259 store
260 .perpetual_tables
261 .events_2
262 .insert(transaction.digest(), genesis.events())
263 .unwrap();
264 }
265 }
266
267 Ok(store)
268 }
269
270 pub fn open_no_genesis(
274 perpetual_tables: Arc<AuthorityPerpetualTables>,
275 enable_epoch_sui_conservation_check: bool,
276 registry: &Registry,
277 ) -> SuiResult<Arc<Self>> {
278 let store = Arc::new(Self {
279 perpetual_tables,
280 root_state_notify_read: NotifyRead::<
281 EpochId,
282 (CheckpointSequenceNumber, GlobalStateHash),
283 >::new(),
284 enable_epoch_sui_conservation_check,
285 metrics: AuthorityStoreMetrics::new(registry),
286 });
287 Ok(store)
288 }
289
290 pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
291 self.perpetual_tables.get_recovery_epoch_at_restart()
292 }
293
294 pub fn get_effects(
295 &self,
296 effects_digest: &TransactionEffectsDigest,
297 ) -> SuiResult<Option<TransactionEffects>> {
298 Ok(self.perpetual_tables.effects.get(effects_digest)?)
299 }
300
301 pub fn effects_exists(&self, effects_digest: &TransactionEffectsDigest) -> SuiResult<bool> {
303 self.perpetual_tables
304 .effects
305 .contains_key(effects_digest)
306 .map_err(|e| e.into())
307 }
308
309 pub fn get_events(
310 &self,
311 digest: &TransactionDigest,
312 ) -> Result<Option<TransactionEvents>, TypedStoreError> {
313 self.perpetual_tables.events_2.get(digest)
314 }
315
316 pub fn multi_get_events(
317 &self,
318 event_digests: &[TransactionDigest],
319 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
320 Ok(event_digests
321 .iter()
322 .map(|digest| self.get_events(digest))
323 .collect::<Result<Vec<_>, _>>()?)
324 }
325
326 pub fn get_unchanged_loaded_runtime_objects(
327 &self,
328 digest: &TransactionDigest,
329 ) -> Result<Option<Vec<ObjectKey>>, TypedStoreError> {
330 self.perpetual_tables
331 .unchanged_loaded_runtime_objects
332 .get(digest)
333 }
334
335 pub fn multi_get_effects<'a>(
336 &self,
337 effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
338 ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
339 self.perpetual_tables.effects.multi_get(effects_digests)
340 }
341
342 pub fn get_executed_effects(
343 &self,
344 tx_digest: &TransactionDigest,
345 ) -> Result<Option<TransactionEffects>, TypedStoreError> {
346 let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
347 match effects_digest {
348 Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
349 None => Ok(None),
350 }
351 }
352
353 pub fn multi_get_executed_effects_digests(
356 &self,
357 digests: &[TransactionDigest],
358 ) -> Result<Vec<Option<TransactionEffectsDigest>>, TypedStoreError> {
359 self.perpetual_tables.executed_effects.multi_get(digests)
360 }
361
362 pub fn multi_get_executed_effects(
365 &self,
366 digests: &[TransactionDigest],
367 ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
368 let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
369 let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
370 let mut tx_to_effects_map = effects
371 .into_iter()
372 .flatten()
373 .map(|effects| (*effects.transaction_digest(), effects))
374 .collect::<HashMap<_, _>>();
375 Ok(digests
376 .iter()
377 .map(|digest| tx_to_effects_map.remove(digest))
378 .collect())
379 }
380
381 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> SuiResult<bool> {
382 Ok(self
383 .perpetual_tables
384 .executed_effects
385 .contains_key(digest)?)
386 }
387
388 pub fn get_marker_value(
389 &self,
390 object_key: FullObjectKey,
391 epoch_id: EpochId,
392 ) -> SuiResult<Option<MarkerValue>> {
393 Ok(self
394 .perpetual_tables
395 .object_per_epoch_marker_table_v2
396 .get(&(epoch_id, object_key))?)
397 }
398
399 pub fn get_latest_marker(
400 &self,
401 object_id: FullObjectID,
402 epoch_id: EpochId,
403 ) -> SuiResult<Option<(SequenceNumber, MarkerValue)>> {
404 let min_key = (epoch_id, FullObjectKey::min_for_id(&object_id));
405 let max_key = (epoch_id, FullObjectKey::max_for_id(&object_id));
406
407 let marker_entry = self
408 .perpetual_tables
409 .object_per_epoch_marker_table_v2
410 .reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
411 .next();
412 match marker_entry {
413 Some(Ok(((epoch, key), marker))) => {
414 assert_eq!(epoch, epoch_id);
416 assert_eq!(key.id(), object_id);
417 Ok(Some((key.version(), marker)))
418 }
419 Some(Err(e)) => Err(e.into()),
420 None => Ok(None),
421 }
422 }
423
424 pub async fn notify_read_root_state_hash(
427 &self,
428 epoch: EpochId,
429 ) -> SuiResult<(CheckpointSequenceNumber, GlobalStateHash)> {
430 let registration = self.root_state_notify_read.register_one(&epoch);
432 let hash = self.perpetual_tables.root_state_hash_by_epoch.get(&epoch)?;
433
434 let result = match hash {
435 Some(ready) => Either::Left(futures::future::ready(ready)),
437 None => Either::Right(registration),
438 }
439 .await;
440
441 Ok(result)
442 }
443
444 pub fn deprecated_insert_finalized_transactions(
446 &self,
447 digests: &[TransactionDigest],
448 epoch: EpochId,
449 sequence: CheckpointSequenceNumber,
450 ) -> SuiResult {
451 let mut batch = self
452 .perpetual_tables
453 .executed_transactions_to_checkpoint
454 .batch();
455 batch.insert_batch(
456 &self.perpetual_tables.executed_transactions_to_checkpoint,
457 digests.iter().map(|d| (*d, (epoch, sequence))),
458 )?;
459 batch.write()?;
460 trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
461 Ok(())
462 }
463
464 pub fn deprecated_get_transaction_checkpoint(
466 &self,
467 digest: &TransactionDigest,
468 ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
469 Ok(self
470 .perpetual_tables
471 .executed_transactions_to_checkpoint
472 .get(digest)?)
473 }
474
475 pub fn deprecated_multi_get_transaction_checkpoint(
477 &self,
478 digests: &[TransactionDigest],
479 ) -> SuiResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
480 Ok(self
481 .perpetual_tables
482 .executed_transactions_to_checkpoint
483 .multi_get(digests)?
484 .into_iter()
485 .collect())
486 }
487
488 pub fn database_is_empty(&self) -> SuiResult<bool> {
490 self.perpetual_tables.database_is_empty()
491 }
492
493 pub fn object_exists_by_key(
494 &self,
495 object_id: &ObjectID,
496 version: VersionNumber,
497 ) -> SuiResult<bool> {
498 Ok(self
499 .perpetual_tables
500 .objects
501 .contains_key(&ObjectKey(*object_id, version))?)
502 }
503
504 pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<bool>> {
505 Ok(self
506 .perpetual_tables
507 .objects
508 .multi_contains_keys(object_keys.to_vec())?
509 .into_iter()
510 .collect())
511 }
512
513 fn get_object_ref_prior_to_key(
514 &self,
515 object_id: &ObjectID,
516 version: VersionNumber,
517 ) -> Result<Option<ObjectRef>, SuiError> {
518 let Some(prior_version) = version.one_before() else {
519 return Ok(None);
520 };
521 let mut iterator = self
522 .perpetual_tables
523 .objects
524 .reversed_safe_iter_with_bounds(
525 Some(ObjectKey::min_for_id(object_id)),
526 Some(ObjectKey(*object_id, prior_version)),
527 )?;
528
529 if let Some((object_key, value)) = iterator.next().transpose()?
530 && object_key.0 == *object_id
531 {
532 return Ok(Some(
533 self.perpetual_tables.object_reference(&object_key, value)?,
534 ));
535 }
536 Ok(None)
537 }
538
539 pub fn multi_get_objects_by_key(
540 &self,
541 object_keys: &[ObjectKey],
542 ) -> Result<Vec<Option<Object>>, SuiError> {
543 let wrappers = self
544 .perpetual_tables
545 .objects
546 .multi_get(object_keys.to_vec())?;
547 let mut ret = vec![];
548
549 for (idx, w) in wrappers.into_iter().enumerate() {
550 ret.push(
551 w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
552 .transpose()?
553 .flatten(),
554 );
555 }
556 Ok(ret)
557 }
558
559 pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, SuiError> {
561 let mut result = Vec::new();
562 for id in objects {
563 result.push(self.get_object(id));
564 }
565 Ok(result)
566 }
567
568 pub(crate) fn insert_genesis_object(&self, object: Object) -> SuiResult {
573 debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
575 let object_ref = object.compute_object_reference();
576 self.insert_object_direct(object_ref, &object)
577 }
578
579 fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> SuiResult {
583 let mut write_batch = self.perpetual_tables.objects.batch();
584
585 let store_object = get_store_object(object.clone());
587 write_batch.insert_batch(
588 &self.perpetual_tables.objects,
589 std::iter::once((ObjectKey::from(object_ref), store_object)),
590 )?;
591
592 if object.get_single_owner().is_some() {
594 if !object.is_child_object() {
596 self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref], false)?;
597 }
598 }
599
600 write_batch.write()?;
601
602 Ok(())
603 }
604
605 #[instrument(level = "debug", skip_all)]
607 pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> SuiResult<()> {
608 let mut batch = self.perpetual_tables.objects.batch();
609 let ref_and_objects: Vec<_> = objects
610 .iter()
611 .map(|o| (o.compute_object_reference(), o))
612 .collect();
613
614 batch.insert_batch(
615 &self.perpetual_tables.objects,
616 ref_and_objects
617 .iter()
618 .map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))),
619 )?;
620
621 let non_child_object_refs: Vec<_> = ref_and_objects
622 .iter()
623 .filter(|(_, object)| !object.is_child_object())
624 .map(|(oref, _)| *oref)
625 .collect();
626
627 self.initialize_live_object_markers_impl(
628 &mut batch,
629 &non_child_object_refs,
630 false, )?;
632
633 batch.write()?;
634
635 Ok(())
636 }
637
638 pub fn bulk_insert_live_objects(
639 perpetual_db: &AuthorityPerpetualTables,
640 live_objects: impl Iterator<Item = LiveObject>,
641 expected_sha3_digest: &[u8; 32],
642 ) -> SuiResult<()> {
643 let mut hasher = Sha3_256::default();
644 let mut batch = perpetual_db.objects.batch();
645 let mut written = 0usize;
646 const MAX_BATCH_SIZE: usize = 100_000;
647 for object in live_objects {
648 hasher.update(object.object_reference().2.inner());
649 match object {
650 LiveObject::Normal(object) => {
651 let store_object_wrapper = get_store_object(object.clone());
652 batch.insert_batch(
653 &perpetual_db.objects,
654 std::iter::once((
655 ObjectKey::from(object.compute_object_reference()),
656 store_object_wrapper,
657 )),
658 )?;
659 if !object.is_child_object() {
660 Self::initialize_live_object_markers(
661 &perpetual_db.live_owned_object_markers,
662 &mut batch,
663 &[object.compute_object_reference()],
664 false, )?;
666 }
667 }
668 LiveObject::Wrapped(object_key) => {
669 batch.insert_batch(
670 &perpetual_db.objects,
671 std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
672 object_key,
673 StoreObject::Wrapped.into(),
674 )),
675 )?;
676 }
677 }
678 written += 1;
679 if written > MAX_BATCH_SIZE {
680 batch.write()?;
681 batch = perpetual_db.objects.batch();
682 written = 0;
683 }
684 }
685 let sha3_digest = hasher.finalize().digest;
686 if *expected_sha3_digest != sha3_digest {
687 error!(
688 "Sha does not match! expected: {:?}, actual: {:?}",
689 expected_sha3_digest, sha3_digest
690 );
691 return Err(SuiError::from("Sha does not match"));
692 }
693 batch.write()?;
694 Ok(())
695 }
696
697 pub fn set_epoch_start_configuration(
698 &self,
699 epoch_start_configuration: &EpochStartConfiguration,
700 ) -> SuiResult {
701 self.perpetual_tables
702 .set_epoch_start_configuration(epoch_start_configuration)?;
703 Ok(())
704 }
705
706 pub fn get_epoch_start_configuration(&self) -> SuiResult<Option<EpochStartConfiguration>> {
707 Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
708 }
709
710 #[instrument(level = "debug", skip_all)]
715 pub fn build_db_batch(
716 &self,
717 epoch_id: EpochId,
718 tx_outputs: &[Arc<TransactionOutputs>],
719 ) -> SuiResult<DBBatch> {
720 let mut written = Vec::with_capacity(tx_outputs.len());
721 for outputs in tx_outputs {
722 written.extend(outputs.written.values().cloned());
723 }
724
725 let mut write_batch = self.perpetual_tables.transactions.batch();
726 for outputs in tx_outputs {
727 self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
728 }
729 fail_point!("crash");
731
732 trace!(
733 "built batch for committed transactions: {:?}",
734 tx_outputs
735 .iter()
736 .map(|tx| tx.transaction.digest())
737 .collect::<Vec<_>>()
738 );
739
740 fail_point!("crash");
742
743 Ok(write_batch)
744 }
745
746 fn write_one_transaction_outputs(
747 &self,
748 write_batch: &mut DBBatch,
749 epoch_id: EpochId,
750 tx_outputs: &TransactionOutputs,
751 ) -> SuiResult {
752 let TransactionOutputs {
753 transaction,
754 effects,
755 markers,
756 wrapped,
757 deleted,
758 written,
759 events,
760 unchanged_loaded_runtime_objects,
761 locks_to_delete,
762 new_locks_to_init,
763 ..
764 } = tx_outputs;
765
766 let effects_digest = effects.digest();
767 let transaction_digest = transaction.digest();
768 write_batch
771 .insert_batch(
772 &self.perpetual_tables.effects,
773 [(effects_digest, effects.clone())],
774 )?
775 .insert_batch(
776 &self.perpetual_tables.executed_effects,
777 [(transaction_digest, effects_digest)],
778 )?;
779
780 write_batch.insert_batch(
782 &self.perpetual_tables.transactions,
783 iter::once((transaction_digest, transaction.serializable_ref())),
784 )?;
785
786 write_batch.insert_batch(
787 &self.perpetual_tables.executed_transaction_digests,
788 [((epoch_id, *transaction_digest), ())],
789 )?;
790
791 write_batch.insert_batch(
793 &self.perpetual_tables.object_per_epoch_marker_table_v2,
794 markers
795 .iter()
796 .map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
797 )?;
798 write_batch.insert_batch(
799 &self.perpetual_tables.objects,
800 deleted
801 .iter()
802 .map(|key| (key, StoreObject::Deleted))
803 .chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
804 .map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
805 )?;
806
807 let new_objects = written.iter().map(|(id, new_object)| {
809 let version = new_object.version();
810 trace!(?id, ?version, "writing object");
811 let store_object = get_store_object(new_object.clone());
812 (ObjectKey(*id, version), store_object)
813 });
814
815 write_batch.insert_batch(&self.perpetual_tables.objects, new_objects)?;
816
817 if effects.events_digest().is_some() {
819 write_batch.insert_batch(
820 &self.perpetual_tables.events_2,
821 [(transaction_digest, events)],
822 )?;
823 }
824
825 if !unchanged_loaded_runtime_objects.is_empty() {
827 write_batch.insert_batch(
828 &self.perpetual_tables.unchanged_loaded_runtime_objects,
829 [(transaction_digest, unchanged_loaded_runtime_objects)],
830 )?;
831 }
832
833 self.initialize_live_object_markers_impl(write_batch, new_locks_to_init, false)?;
834
835 self.delete_live_object_markers(write_batch, locks_to_delete)?;
838
839 debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
840
841 Ok(())
842 }
843
844 pub(crate) fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> SuiResult {
847 let mut batch = self.perpetual_tables.transactions.batch();
848 batch.insert_batch(
849 &self.perpetual_tables.transactions,
850 [(tx.digest(), tx.clone().into_unsigned().serializable_ref())],
851 )?;
852 batch.write()?;
853 Ok(())
854 }
855
856 pub(crate) fn get_lock(
859 &self,
860 obj_ref: ObjectRef,
861 epoch_store: &AuthorityPerEpochStore,
862 ) -> SuiLockResult {
863 if self
864 .perpetual_tables
865 .live_owned_object_markers
866 .get(&obj_ref)?
867 .is_none()
868 {
869 return Ok(ObjectLockStatus::LockedAtDifferentVersion {
870 locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
871 });
872 }
873
874 let tables = epoch_store.tables()?;
875 let epoch_id = epoch_store.epoch();
876
877 if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
878 Ok(ObjectLockStatus::LockedToTx {
879 locked_by_tx: LockDetailsDeprecated {
880 epoch: epoch_id,
881 tx_digest,
882 },
883 })
884 } else {
885 Ok(ObjectLockStatus::Initialized)
886 }
887 }
888
889 pub(crate) fn get_latest_live_version_for_object_id(
891 &self,
892 object_id: ObjectID,
893 ) -> SuiResult<ObjectRef> {
894 let mut iterator = self
895 .perpetual_tables
896 .live_owned_object_markers
897 .reversed_safe_iter_with_bounds(
898 None,
899 Some((object_id, SequenceNumber::MAX, ObjectDigest::MAX)),
900 )?;
901 Ok(iterator
902 .next()
903 .transpose()?
904 .and_then(|value| {
905 if value.0.0 == object_id {
906 Some(value)
907 } else {
908 None
909 }
910 })
911 .ok_or_else(|| {
912 SuiError::from(UserInputError::ObjectNotFound {
913 object_id,
914 version: None,
915 })
916 })?
917 .0)
918 }
919
920 pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> SuiResult {
925 let locks = self
926 .perpetual_tables
927 .live_owned_object_markers
928 .multi_get(objects)?;
929 for (lock, obj_ref) in locks.into_iter().zip(objects) {
930 if lock.is_none() {
931 let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
932 fp_bail!(
933 UserInputError::ObjectVersionUnavailableForConsumption {
934 provided_obj_ref: *obj_ref,
935 current_version: latest_lock.1
936 }
937 .into()
938 );
939 }
940 }
941 Ok(())
942 }
943
944 fn initialize_live_object_markers_impl(
947 &self,
948 write_batch: &mut DBBatch,
949 objects: &[ObjectRef],
950 is_force_reset: bool,
951 ) -> SuiResult {
952 AuthorityStore::initialize_live_object_markers(
953 &self.perpetual_tables.live_owned_object_markers,
954 write_batch,
955 objects,
956 is_force_reset,
957 )
958 }
959
960 pub fn initialize_live_object_markers(
961 live_object_marker_table: &DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
962 write_batch: &mut DBBatch,
963 objects: &[ObjectRef],
964 is_force_reset: bool,
965 ) -> SuiResult {
966 trace!(?objects, "initialize_locks");
967
968 let live_object_markers = live_object_marker_table.multi_get(objects)?;
969
970 if !is_force_reset {
971 let existing_live_object_markers: Vec<ObjectRef> = live_object_markers
975 .iter()
976 .zip(objects)
977 .filter_map(|(lock_opt, objref)| {
978 lock_opt.clone().flatten().map(|_tx_digest| *objref)
979 })
980 .collect();
981 if !existing_live_object_markers.is_empty() {
982 info!(
983 ?existing_live_object_markers,
984 "Cannot initialize live_object_markers because some exist already"
985 );
986 return Err(SuiErrorKind::ObjectLockAlreadyInitialized {
987 refs: existing_live_object_markers,
988 }
989 .into());
990 }
991 }
992
993 write_batch.insert_batch(
994 live_object_marker_table,
995 objects.iter().map(|obj_ref| (obj_ref, None)),
996 )?;
997 Ok(())
998 }
999
1000 fn delete_live_object_markers(
1002 &self,
1003 write_batch: &mut DBBatch,
1004 objects: &[ObjectRef],
1005 ) -> SuiResult {
1006 trace!(?objects, "delete_locks");
1007 write_batch.delete_batch(
1008 &self.perpetual_tables.live_owned_object_markers,
1009 objects.iter(),
1010 )?;
1011 Ok(())
1012 }
1013
1014 #[cfg(test)]
1015 pub(crate) fn reset_locks_for_test(
1016 &self,
1017 transactions: &[TransactionDigest],
1018 objects: &[ObjectRef],
1019 epoch_store: &AuthorityPerEpochStore,
1020 ) {
1021 for tx in transactions {
1022 epoch_store.delete_signed_transaction_for_test(tx);
1023 epoch_store.delete_object_locks_for_test(objects);
1024 }
1025
1026 let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1027 batch
1028 .delete_batch(
1029 &self.perpetual_tables.live_owned_object_markers,
1030 objects.iter(),
1031 )
1032 .unwrap();
1033 batch.write().unwrap();
1034
1035 let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1036 self.initialize_live_object_markers_impl(&mut batch, objects, false)
1037 .unwrap();
1038 batch.write().unwrap();
1039 }
1040
1041 pub fn find_object_lt_or_eq_version(
1046 &self,
1047 object_id: ObjectID,
1048 version: SequenceNumber,
1049 ) -> SuiResult<Option<Object>> {
1050 self.perpetual_tables
1051 .find_object_lt_or_eq_version(object_id, version)
1052 }
1053
1054 pub fn get_latest_object_ref_or_tombstone(
1064 &self,
1065 object_id: ObjectID,
1066 ) -> Result<Option<ObjectRef>, SuiError> {
1067 self.perpetual_tables
1068 .get_latest_object_ref_or_tombstone(object_id)
1069 }
1070
1071 pub fn get_latest_object_ref_if_alive(
1074 &self,
1075 object_id: ObjectID,
1076 ) -> Result<Option<ObjectRef>, SuiError> {
1077 match self.get_latest_object_ref_or_tombstone(object_id)? {
1078 Some(objref) if objref.2.is_alive() => Ok(Some(objref)),
1079 _ => Ok(None),
1080 }
1081 }
1082
1083 pub fn get_latest_object_or_tombstone(
1087 &self,
1088 object_id: ObjectID,
1089 ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, SuiError> {
1090 let Some((object_key, store_object)) = self
1091 .perpetual_tables
1092 .get_latest_object_or_tombstone(object_id)?
1093 else {
1094 return Ok(None);
1095 };
1096
1097 if let Some(object_ref) = self
1098 .perpetual_tables
1099 .tombstone_reference(&object_key, &store_object)?
1100 {
1101 return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
1102 }
1103
1104 let object = self
1105 .perpetual_tables
1106 .object(&object_key, store_object)?
1107 .expect("Non tombstone store object could not be converted to object");
1108
1109 Ok(Some((object_key, ObjectOrTombstone::Object(object))))
1110 }
1111
1112 pub fn insert_transaction_and_effects(
1113 &self,
1114 transaction: &VerifiedTransaction,
1115 transaction_effects: &TransactionEffects,
1116 ) -> Result<(), TypedStoreError> {
1117 let mut write_batch = self.perpetual_tables.transactions.batch();
1118 write_batch
1121 .insert_batch(
1122 &self.perpetual_tables.effects,
1123 [(transaction_effects.digest(), transaction_effects)],
1124 )?
1125 .insert_batch(
1126 &self.perpetual_tables.transactions,
1127 [(transaction.digest(), transaction.serializable_ref())],
1128 )?;
1129
1130 write_batch.write()?;
1131 Ok(())
1132 }
1133
1134 pub fn multi_insert_transaction_and_effects<'a>(
1135 &self,
1136 transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
1137 ) -> Result<(), TypedStoreError> {
1138 let mut write_batch = self.perpetual_tables.transactions.batch();
1139 for tx in transactions {
1140 write_batch
1141 .insert_batch(
1142 &self.perpetual_tables.effects,
1143 [(tx.effects.digest(), &tx.effects)],
1144 )?
1145 .insert_batch(
1146 &self.perpetual_tables.transactions,
1147 [(tx.transaction.digest(), tx.transaction.serializable_ref())],
1148 )?;
1149 }
1150
1151 write_batch.write()?;
1152 Ok(())
1153 }
1154
1155 pub fn multi_get_transaction_blocks(
1156 &self,
1157 tx_digests: &[TransactionDigest],
1158 ) -> Result<Vec<Option<VerifiedTransaction>>, TypedStoreError> {
1159 self.perpetual_tables
1160 .transactions
1161 .multi_get(tx_digests)
1162 .map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())
1163 }
1164
1165 pub fn get_transaction_block(
1166 &self,
1167 tx_digest: &TransactionDigest,
1168 ) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
1169 self.perpetual_tables
1170 .transactions
1171 .get(tx_digest)
1172 .map(|v| v.map(|v| v.into()))
1173 }
1174
1175 pub fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1182 get_sui_system_state(self.perpetual_tables.as_ref())
1183 }
1184
1185 pub fn expensive_check_sui_conservation<T>(
1186 self: &Arc<Self>,
1187 type_layout_store: T,
1188 old_epoch_store: &AuthorityPerEpochStore,
1189 ) -> SuiResult
1190 where
1191 T: TypeLayoutStore + Send + Copy,
1192 {
1193 if !self.enable_epoch_sui_conservation_check {
1194 return Ok(());
1195 }
1196
1197 let executor = old_epoch_store.executor();
1198 info!("Starting SUI conservation check. This may take a while..");
1199 let cur_time = Instant::now();
1200 let mut pending_objects = vec![];
1201 let mut count = 0;
1202 let mut size = 0;
1203 let (mut total_sui, mut total_storage_rebate) = thread::scope(|s| {
1204 let pending_tasks = FuturesUnordered::new();
1205 for o in self.iter_live_object_set(false) {
1206 match o {
1207 LiveObject::Normal(object) => {
1208 size += object.object_size_for_gas_metering();
1209 count += 1;
1210 pending_objects.push(object);
1211 if count % 1_000_000 == 0 {
1212 let mut task_objects = vec![];
1213 mem::swap(&mut pending_objects, &mut task_objects);
1214 pending_tasks.push(s.spawn(move || {
1215 let mut layout_resolver =
1216 executor.type_layout_resolver(Box::new(type_layout_store));
1217 let mut total_storage_rebate = 0;
1218 let mut total_sui = 0;
1219 for object in task_objects {
1220 total_storage_rebate += object.storage_rebate;
1221 total_sui +=
1224 object.get_total_sui(layout_resolver.as_mut()).unwrap()
1225 - object.storage_rebate;
1226 }
1227 if count % 50_000_000 == 0 {
1228 info!("Processed {} objects", count);
1229 }
1230 (total_sui, total_storage_rebate)
1231 }));
1232 }
1233 }
1234 LiveObject::Wrapped(_) => {
1235 unreachable!("Explicitly asked to not include wrapped tombstones")
1236 }
1237 }
1238 }
1239 pending_tasks.into_iter().fold((0, 0), |init, result| {
1240 let result = result.join().unwrap();
1241 (init.0 + result.0, init.1 + result.1)
1242 })
1243 });
1244 let mut layout_resolver = executor.type_layout_resolver(Box::new(type_layout_store));
1245 for object in pending_objects {
1246 total_storage_rebate += object.storage_rebate;
1247 total_sui +=
1248 object.get_total_sui(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
1249 }
1250 info!(
1251 "Scanned {} live objects, took {:?}",
1252 count,
1253 cur_time.elapsed()
1254 );
1255 self.metrics
1256 .sui_conservation_live_object_count
1257 .set(count as i64);
1258 self.metrics
1259 .sui_conservation_live_object_size
1260 .set(size as i64);
1261 self.metrics
1262 .sui_conservation_check_latency
1263 .set(cur_time.elapsed().as_secs() as i64);
1264
1265 let system_state = self
1267 .get_sui_system_state_object_unsafe()
1268 .expect("Reading sui system state object cannot fail")
1269 .into_sui_system_state_summary();
1270 let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
1271 info!(
1272 "Total SUI amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
1273 total_sui, storage_fund_balance, total_storage_rebate, system_state.epoch
1274 );
1275
1276 let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
1277 self.metrics
1278 .sui_conservation_storage_fund
1279 .set(storage_fund_balance as i64);
1280 self.metrics
1281 .sui_conservation_storage_fund_imbalance
1282 .set(imbalance);
1283 self.metrics
1284 .sui_conservation_imbalance
1285 .set((total_sui as i128 - TOTAL_SUPPLY_MIST as i128) as i64);
1286
1287 if let Some(expected_imbalance) = self
1288 .perpetual_tables
1289 .expected_storage_fund_imbalance
1290 .get(&())
1291 .expect("DB read cannot fail")
1292 {
1293 fp_ensure!(
1294 imbalance == expected_imbalance,
1295 SuiError::from(
1296 format!(
1297 "Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
1298 system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
1299 ).as_str()
1300 )
1301 );
1302 } else {
1303 self.perpetual_tables
1304 .expected_storage_fund_imbalance
1305 .insert(&(), &imbalance)
1306 .expect("DB write cannot fail");
1307 }
1308
1309 if let Some(expected_sui) = self
1310 .perpetual_tables
1311 .expected_network_sui_amount
1312 .get(&())
1313 .expect("DB read cannot fail")
1314 {
1315 fp_ensure!(
1316 total_sui == expected_sui,
1317 SuiError::from(
1318 format!(
1319 "Inconsistent state detected at epoch {}: total sui: {}, expecting {}",
1320 system_state.epoch, total_sui, expected_sui
1321 )
1322 .as_str()
1323 )
1324 );
1325 } else {
1326 self.perpetual_tables
1327 .expected_network_sui_amount
1328 .insert(&(), &total_sui)
1329 .expect("DB write cannot fail");
1330 }
1331
1332 Ok(())
1333 }
1334
1335 #[instrument(level = "error", skip_all)]
1338 pub fn maybe_reaccumulate_state_hash(
1339 &self,
1340 cur_epoch_store: &AuthorityPerEpochStore,
1341 new_protocol_version: ProtocolVersion,
1342 ) {
1343 let old_simplified_unwrap_then_delete = cur_epoch_store
1344 .protocol_config()
1345 .simplified_unwrap_then_delete();
1346 let new_simplified_unwrap_then_delete =
1347 ProtocolConfig::get_for_version(new_protocol_version, cur_epoch_store.get_chain())
1348 .simplified_unwrap_then_delete();
1349 let should_reaccumulate =
1352 !old_simplified_unwrap_then_delete && new_simplified_unwrap_then_delete;
1353 if !should_reaccumulate {
1354 return;
1355 }
1356 info!(
1357 "[Re-accumulate] simplified_unwrap_then_delete is enabled in the new protocol version, re-accumulating state hash"
1358 );
1359 let cur_time = Instant::now();
1360 std::thread::scope(|s| {
1361 let pending_tasks = FuturesUnordered::new();
1362 const BITS: u8 = 5;
1371 for index in 0u8..(1 << BITS) {
1372 pending_tasks.push(s.spawn(move || {
1373 let mut id_bytes = [0; ObjectID::LENGTH];
1374 id_bytes[0] = index << (8 - BITS);
1375 let start_id = ObjectID::new(id_bytes);
1376
1377 id_bytes[0] |= (1 << (8 - BITS)) - 1;
1378 for element in id_bytes.iter_mut().skip(1) {
1379 *element = u8::MAX;
1380 }
1381 let end_id = ObjectID::new(id_bytes);
1382
1383 info!(
1384 "[Re-accumulate] Scanning object ID range {:?}..{:?}",
1385 start_id, end_id
1386 );
1387 let mut prev = (
1388 ObjectKey::min_for_id(&ObjectID::ZERO),
1389 StoreObjectWrapper::V1(StoreObject::Deleted),
1390 );
1391 let mut object_scanned: u64 = 0;
1392 let mut wrapped_objects_to_remove = vec![];
1393 for db_result in self.perpetual_tables.objects.safe_range_iter(
1394 ObjectKey::min_for_id(&start_id)..=ObjectKey::max_for_id(&end_id),
1395 ) {
1396 match db_result {
1397 Ok((object_key, object)) => {
1398 object_scanned += 1;
1399 if object_scanned.is_multiple_of(100000) {
1400 info!(
1401 "[Re-accumulate] Task {}: object scanned: {}",
1402 index, object_scanned,
1403 );
1404 }
1405 if matches!(prev.1.inner(), StoreObject::Wrapped)
1406 && object_key.0 != prev.0.0
1407 {
1408 wrapped_objects_to_remove
1409 .push(WrappedObject::new(prev.0.0, prev.0.1));
1410 }
1411
1412 prev = (object_key, object);
1413 }
1414 Err(err) => {
1415 warn!("Object iterator encounter RocksDB error {:?}", err);
1416 return Err(err);
1417 }
1418 }
1419 }
1420 if matches!(prev.1.inner(), StoreObject::Wrapped) {
1421 wrapped_objects_to_remove.push(WrappedObject::new(prev.0.0, prev.0.1));
1422 }
1423 info!(
1424 "[Re-accumulate] Task {}: object scanned: {}, wrapped objects: {}",
1425 index,
1426 object_scanned,
1427 wrapped_objects_to_remove.len(),
1428 );
1429 Ok((wrapped_objects_to_remove, object_scanned))
1430 }));
1431 }
1432 let (last_checkpoint_of_epoch, cur_accumulator) = self
1433 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
1434 .expect("read cannot fail")
1435 .expect("accumulator must exist");
1436 let (accumulator, total_objects_scanned, total_wrapped_objects) =
1437 pending_tasks.into_iter().fold(
1438 (cur_accumulator, 0u64, 0usize),
1439 |(mut accumulator, total_objects_scanned, total_wrapped_objects), task| {
1440 let (wrapped_objects_to_remove, object_scanned) =
1441 task.join().unwrap().unwrap();
1442 accumulator.remove_all(
1443 wrapped_objects_to_remove
1444 .iter()
1445 .map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
1446 .collect::<Vec<Vec<u8>>>(),
1447 );
1448 (
1449 accumulator,
1450 total_objects_scanned + object_scanned,
1451 total_wrapped_objects + wrapped_objects_to_remove.len(),
1452 )
1453 },
1454 );
1455 info!(
1456 "[Re-accumulate] Total objects scanned: {}, total wrapped objects: {}",
1457 total_objects_scanned, total_wrapped_objects,
1458 );
1459 info!(
1460 "[Re-accumulate] New accumulator value: {:?}",
1461 accumulator.digest()
1462 );
1463 self.insert_state_hash_for_epoch(
1464 cur_epoch_store.epoch(),
1465 &last_checkpoint_of_epoch,
1466 &accumulator,
1467 )
1468 .unwrap();
1469 });
1470 info!(
1471 "[Re-accumulate] Re-accumulating took {}seconds",
1472 cur_time.elapsed().as_secs()
1473 );
1474 }
1475
1476 pub async fn prune_objects_and_compact_for_testing(
1477 &self,
1478 checkpoint_store: &Arc<CheckpointStore>,
1479 rpc_index: Option<&RpcIndexStore>,
1480 ) {
1481 let pruning_config = AuthorityStorePruningConfig {
1482 num_epochs_to_retain: 0,
1483 ..Default::default()
1484 };
1485 let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
1486 &self.perpetual_tables,
1487 checkpoint_store,
1488 rpc_index,
1489 None,
1490 pruning_config,
1491 AuthorityStorePruningMetrics::new_for_test(),
1492 EPOCH_DURATION_MS_FOR_TESTING,
1493 )
1494 .await;
1495 let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
1496 }
1497
1498 pub fn remove_executed_effects_for_testing(
1499 &self,
1500 tx_digest: &TransactionDigest,
1501 ) -> anyhow::Result<()> {
1502 let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
1503 if let Some(effects_digest) = effects_digest {
1504 self.perpetual_tables.executed_effects.remove(tx_digest)?;
1505 self.perpetual_tables.effects.remove(&effects_digest)?;
1506 }
1507 Ok(())
1508 }
1509
1510 #[cfg(test)]
1511 pub async fn prune_objects_immediately_for_testing(
1512 &self,
1513 transaction_effects: Vec<TransactionEffects>,
1514 ) -> anyhow::Result<()> {
1515 let mut wb = self.perpetual_tables.objects.batch();
1516
1517 let mut object_keys_to_prune = vec![];
1518 for effects in &transaction_effects {
1519 for (object_id, seq_number) in effects.modified_at_versions() {
1520 info!("Pruning object {:?} version {:?}", object_id, seq_number);
1521 object_keys_to_prune.push(ObjectKey(object_id, seq_number));
1522 }
1523 }
1524
1525 wb.delete_batch(
1526 &self.perpetual_tables.objects,
1527 object_keys_to_prune.into_iter(),
1528 )?;
1529 wb.write()?;
1530 Ok(())
1531 }
1532
1533 #[cfg(msim)]
1535 pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
1536 self.perpetual_tables
1537 .objects
1538 .safe_iter_with_bounds(
1539 Some(ObjectKey(object_id, VersionNumber::MIN)),
1540 Some(ObjectKey(object_id, VersionNumber::MAX)),
1541 )
1542 .collect::<Result<Vec<_>, _>>()
1543 .unwrap()
1544 .len()
1545 }
1546}
1547
1548impl GlobalStateHashStore for AuthorityStore {
1549 fn get_object_ref_prior_to_key_deprecated(
1550 &self,
1551 object_id: &ObjectID,
1552 version: VersionNumber,
1553 ) -> SuiResult<Option<ObjectRef>> {
1554 self.get_object_ref_prior_to_key(object_id, version)
1555 }
1556
1557 fn get_root_state_hash_for_epoch(
1558 &self,
1559 epoch: EpochId,
1560 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
1561 self.perpetual_tables
1562 .root_state_hash_by_epoch
1563 .get(&epoch)
1564 .map_err(Into::into)
1565 }
1566
1567 fn get_root_state_hash_for_highest_epoch(
1568 &self,
1569 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
1570 Ok(self
1571 .perpetual_tables
1572 .root_state_hash_by_epoch
1573 .reversed_safe_iter_with_bounds(None, None)?
1574 .next()
1575 .transpose()?)
1576 }
1577
1578 fn insert_state_hash_for_epoch(
1579 &self,
1580 epoch: EpochId,
1581 last_checkpoint_of_epoch: &CheckpointSequenceNumber,
1582 acc: &GlobalStateHash,
1583 ) -> SuiResult {
1584 self.perpetual_tables
1585 .root_state_hash_by_epoch
1586 .insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
1587 self.root_state_notify_read
1588 .notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
1589
1590 Ok(())
1591 }
1592
1593 fn iter_live_object_set(
1594 &self,
1595 include_wrapped_object: bool,
1596 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1597 Box::new(
1598 self.perpetual_tables
1599 .iter_live_object_set(include_wrapped_object),
1600 )
1601 }
1602}
1603
1604impl ObjectStore for AuthorityStore {
1605 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
1607 self.perpetual_tables.as_ref().get_object(object_id)
1608 }
1609
1610 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
1611 self.perpetual_tables.get_object_by_key(object_id, version)
1612 }
1613}
1614
1615pub struct ResolverWrapper {
1617 pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1618 pub metrics: Arc<ResolverMetrics>,
1619}
1620
1621impl ResolverWrapper {
1622 pub fn new(
1623 resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1624 metrics: Arc<ResolverMetrics>,
1625 ) -> Self {
1626 metrics.module_cache_size.set(0);
1627 ResolverWrapper { resolver, metrics }
1628 }
1629
1630 fn inc_cache_size_gauge(&self) {
1631 let current = self.metrics.module_cache_size.get();
1633 self.metrics.module_cache_size.set(current + 1);
1634 }
1635}
1636
1637impl ModuleResolver for ResolverWrapper {
1638 type Error = SuiError;
1639 fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
1640 self.inc_cache_size_gauge();
1641 get_module(&*self.resolver, module_id)
1642 }
1643}
1644
1645pub enum UpdateType {
1646 Transaction(TransactionEffectsDigest),
1647 Genesis,
1648}
1649
1650pub type SuiLockResult = SuiResult<ObjectLockStatus>;
1651
1652#[derive(Debug, PartialEq, Eq)]
1653pub enum ObjectLockStatus {
1654 Initialized,
1655 LockedToTx { locked_by_tx: LockDetailsDeprecated },
1656 LockedAtDifferentVersion { locked_ref: ObjectRef },
1657}
1658
1659#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1660pub enum LockDetailsWrapperDeprecated {
1661 V1(LockDetailsV1Deprecated),
1662}
1663
1664impl LockDetailsWrapperDeprecated {
1665 pub fn migrate(self) -> Self {
1666 self
1669 }
1670
1671 pub fn inner(&self) -> &LockDetailsDeprecated {
1674 match self {
1675 Self::V1(v1) => v1,
1676
1677 #[allow(unreachable_patterns)]
1679 _ => panic!("lock details should have been migrated to latest version at read time"),
1680 }
1681 }
1682 pub fn into_inner(self) -> LockDetailsDeprecated {
1683 match self {
1684 Self::V1(v1) => v1,
1685
1686 #[allow(unreachable_patterns)]
1688 _ => panic!("lock details should have been migrated to latest version at read time"),
1689 }
1690 }
1691}
1692
1693#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1694pub struct LockDetailsV1Deprecated {
1695 pub epoch: EpochId,
1696 pub tx_digest: TransactionDigest,
1697}
1698
1699pub type LockDetailsDeprecated = LockDetailsV1Deprecated;
1700
1701impl From<LockDetailsDeprecated> for LockDetailsWrapperDeprecated {
1702 fn from(details: LockDetailsDeprecated) -> Self {
1703 LockDetailsWrapperDeprecated::V1(details)
1705 }
1706}