1use std::sync::Arc;
5use std::{iter, mem, thread};
6
7use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
8use crate::authority::authority_store_pruner::{
9 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
10};
11use crate::authority::authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object};
12use crate::authority::epoch_marker_key::EpochMarkerKey;
13use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
14use crate::global_state_hasher::GlobalStateHashStore;
15use crate::rpc_index::RpcIndexStore;
16use crate::transaction_outputs::TransactionOutputs;
17use fastcrypto::hash::{HashFunction, MultisetHash, Sha3_256};
18use futures::stream::FuturesUnordered;
19use move_core_types::account_address::AccountAddress;
20use move_core_types::resolver::{ModuleResolver, SerializedPackage};
21use serde::{Deserialize, Serialize};
22use sui_config::node::AuthorityStorePruningConfig;
23use sui_macros::fail_point_arg;
24use sui_types::error::{SuiErrorKind, UserInputError};
25use sui_types::execution::TypeLayoutStore;
26use sui_types::global_state_hash::GlobalStateHash;
27use sui_types::message_envelope::Message;
28use sui_types::storage::{
29 BackingPackageStore, FullObjectKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore,
30 get_module, get_package,
31};
32use sui_types::sui_system_state::get_sui_system_state;
33use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure};
34use tokio::time::Instant;
35use tracing::{debug, info, trace};
36use typed_store::traits::Map;
37use typed_store::{
38 TypedStoreError,
39 rocks::{DBBatch, DBMap},
40};
41
42use super::authority_store_tables::LiveObject;
43use super::{authority_store_tables::AuthorityPerpetualTables, *};
44use mysten_common::ZipDebugEqIteratorExt;
45use mysten_common::sync::notify_read::NotifyRead;
46use sui_types::effects::{TransactionEffects, TransactionEvents};
47use sui_types::gas_coin::TOTAL_SUPPLY_MIST;
48
49struct AuthorityStoreMetrics {
50 sui_conservation_check_latency: IntGauge,
51 sui_conservation_live_object_count: IntGauge,
52 sui_conservation_live_object_size: IntGauge,
53 sui_conservation_imbalance: IntGauge,
54 sui_conservation_storage_fund: IntGauge,
55 sui_conservation_storage_fund_imbalance: IntGauge,
56 epoch_flags: IntGaugeVec,
57}
58
59impl AuthorityStoreMetrics {
60 pub fn new(registry: &Registry) -> Self {
61 Self {
62 sui_conservation_check_latency: register_int_gauge_with_registry!(
63 "sui_conservation_check_latency",
64 "Number of seconds took to scan all live objects in the store for SUI conservation check",
65 registry,
66 ).unwrap(),
67 sui_conservation_live_object_count: register_int_gauge_with_registry!(
68 "sui_conservation_live_object_count",
69 "Number of live objects in the store",
70 registry,
71 ).unwrap(),
72 sui_conservation_live_object_size: register_int_gauge_with_registry!(
73 "sui_conservation_live_object_size",
74 "Size in bytes of live objects in the store",
75 registry,
76 ).unwrap(),
77 sui_conservation_imbalance: register_int_gauge_with_registry!(
78 "sui_conservation_imbalance",
79 "Total amount of SUI in the network - 10B * 10^9. This delta shows the amount of imbalance",
80 registry,
81 ).unwrap(),
82 sui_conservation_storage_fund: register_int_gauge_with_registry!(
83 "sui_conservation_storage_fund",
84 "Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
85 registry,
86 ).unwrap(),
87 sui_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
88 "sui_conservation_storage_fund_imbalance",
89 "Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
90 registry,
91 ).unwrap(),
92 epoch_flags: register_int_gauge_vec_with_registry!(
93 "epoch_flags",
94 "Local flags of the currently running epoch",
95 &["flag"],
96 registry,
97 ).unwrap(),
98 }
99 }
100}
101
102pub struct AuthorityStore {
109 pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
110
111 pub(crate) root_state_notify_read:
112 NotifyRead<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
113
114 enable_epoch_sui_conservation_check: bool,
116
117 metrics: AuthorityStoreMetrics,
118}
119
120pub type ExecutionLockReadGuard<'a> = tokio::sync::RwLockReadGuard<'a, EpochId>;
121pub type ExecutionLockWriteGuard<'a> = tokio::sync::RwLockWriteGuard<'a, EpochId>;
122
123impl AuthorityStore {
124 pub async fn open(
127 perpetual_tables: Arc<AuthorityPerpetualTables>,
128 genesis: &Genesis,
129 config: &NodeConfig,
130 registry: &Registry,
131 ) -> SuiResult<Arc<Self>> {
132 let enable_epoch_sui_conservation_check = config
133 .expensive_safety_check_config
134 .enable_epoch_sui_conservation_check();
135
136 let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
137 info!("Creating new epoch start config from genesis");
138
139 #[allow(unused_mut)]
140 let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
141 fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
142 info!("Setting initial epoch flags to {:?}", flags);
143 initial_epoch_flags = flags;
144 });
145
146 let epoch_start_configuration = EpochStartConfiguration::new(
147 genesis.sui_system_object().into_epoch_start_state(),
148 *genesis.checkpoint().digest(),
149 &genesis.objects(),
150 initial_epoch_flags,
151 )?;
152 perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
153 epoch_start_configuration
154 } else {
155 info!("Loading epoch start config from DB");
156 perpetual_tables
157 .epoch_start_configuration
158 .get(&())?
159 .expect("Epoch start configuration must be set in non-empty DB")
160 };
161 let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
162 info!("Epoch start config: {:?}", epoch_start_configuration);
163 info!("Cur epoch: {:?}", cur_epoch);
164 let this = Self::open_inner(
165 genesis,
166 perpetual_tables,
167 enable_epoch_sui_conservation_check,
168 registry,
169 )
170 .await?;
171 this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
172 Ok(this)
173 }
174
175 pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
176 for flag in old {
177 self.metrics
178 .epoch_flags
179 .with_label_values(&[&flag.to_string()])
180 .set(0);
181 }
182 for flag in new {
183 self.metrics
184 .epoch_flags
185 .with_label_values(&[&flag.to_string()])
186 .set(1);
187 }
188 }
189
190 pub fn clear_object_per_epoch_marker_table(
193 &self,
194 _execution_guard: &ExecutionLockWriteGuard<'_>,
195 ) -> SuiResult<()> {
196 self.perpetual_tables
200 .object_per_epoch_marker_table
201 .schedule_delete_all()?;
202 #[cfg(not(tidehunter))]
203 {
204 self.perpetual_tables
205 .object_per_epoch_marker_table_v2
206 .schedule_delete_all()?;
207 }
208 #[cfg(tidehunter)]
209 {
210 self.perpetual_tables
211 .object_per_epoch_marker_table_v2
212 .drop_cells_in_range_raw(&EpochMarkerKey::MIN_KEY, &EpochMarkerKey::MAX_KEY)?;
213 }
214 Ok(())
215 }
216
217 pub async fn open_with_committee_for_testing(
218 perpetual_tables: Arc<AuthorityPerpetualTables>,
219 committee: &Committee,
220 genesis: &Genesis,
221 ) -> SuiResult<Arc<Self>> {
222 assert_eq!(committee.epoch, 0);
225 Self::open_inner(genesis, perpetual_tables, true, &Registry::new()).await
226 }
227
228 async fn open_inner(
229 genesis: &Genesis,
230 perpetual_tables: Arc<AuthorityPerpetualTables>,
231 enable_epoch_sui_conservation_check: bool,
232 registry: &Registry,
233 ) -> SuiResult<Arc<Self>> {
234 let store = Arc::new(Self {
235 perpetual_tables,
236 root_state_notify_read: NotifyRead::<
237 EpochId,
238 (CheckpointSequenceNumber, GlobalStateHash),
239 >::new(),
240 enable_epoch_sui_conservation_check,
241 metrics: AuthorityStoreMetrics::new(registry),
242 });
243 if store
245 .database_is_empty()
246 .expect("Database read should not fail at init.")
247 {
248 store
249 .bulk_insert_genesis_objects(genesis.objects())
250 .expect("Cannot bulk insert genesis objects");
251
252 let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
254
255 store
256 .perpetual_tables
257 .transactions
258 .insert(transaction.digest(), transaction.serializable_ref())
259 .unwrap();
260
261 store
262 .perpetual_tables
263 .effects
264 .insert(&genesis.effects().digest(), genesis.effects())
265 .unwrap();
266 if genesis.effects().events_digest().is_some() {
270 store
271 .perpetual_tables
272 .events_2
273 .insert(transaction.digest(), genesis.events())
274 .unwrap();
275 }
276 }
277
278 Ok(store)
279 }
280
281 pub fn open_no_genesis(
285 perpetual_tables: Arc<AuthorityPerpetualTables>,
286 enable_epoch_sui_conservation_check: bool,
287 registry: &Registry,
288 ) -> SuiResult<Arc<Self>> {
289 let store = Arc::new(Self {
290 perpetual_tables,
291 root_state_notify_read: NotifyRead::<
292 EpochId,
293 (CheckpointSequenceNumber, GlobalStateHash),
294 >::new(),
295 enable_epoch_sui_conservation_check,
296 metrics: AuthorityStoreMetrics::new(registry),
297 });
298 Ok(store)
299 }
300
301 pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
302 self.perpetual_tables.get_recovery_epoch_at_restart()
303 }
304
305 pub fn get_effects(
306 &self,
307 effects_digest: &TransactionEffectsDigest,
308 ) -> SuiResult<Option<TransactionEffects>> {
309 Ok(self.perpetual_tables.effects.get(effects_digest)?)
310 }
311
312 pub fn get_events(
313 &self,
314 digest: &TransactionDigest,
315 ) -> Result<Option<TransactionEvents>, TypedStoreError> {
316 self.perpetual_tables.events_2.get(digest)
317 }
318
319 pub fn multi_get_events(
320 &self,
321 event_digests: &[TransactionDigest],
322 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
323 Ok(event_digests
324 .iter()
325 .map(|digest| self.get_events(digest))
326 .collect::<Result<Vec<_>, _>>()?)
327 }
328
329 pub fn get_unchanged_loaded_runtime_objects(
330 &self,
331 digest: &TransactionDigest,
332 ) -> Result<Option<Vec<ObjectKey>>, TypedStoreError> {
333 self.perpetual_tables
334 .unchanged_loaded_runtime_objects
335 .get(digest)
336 }
337
338 pub fn multi_get_effects<'a>(
339 &self,
340 effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
341 ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
342 self.perpetual_tables.effects.multi_get(effects_digests)
343 }
344
345 pub fn get_executed_effects(
346 &self,
347 tx_digest: &TransactionDigest,
348 ) -> Result<Option<TransactionEffects>, TypedStoreError> {
349 let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
350 match effects_digest {
351 Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
352 None => Ok(None),
353 }
354 }
355
356 pub fn multi_get_executed_effects_digests(
359 &self,
360 digests: &[TransactionDigest],
361 ) -> Result<Vec<Option<TransactionEffectsDigest>>, TypedStoreError> {
362 self.perpetual_tables.executed_effects.multi_get(digests)
363 }
364
365 pub fn multi_get_executed_effects(
368 &self,
369 digests: &[TransactionDigest],
370 ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
371 let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
372 let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
373 let mut tx_to_effects_map = effects
374 .into_iter()
375 .flatten()
376 .map(|effects| (*effects.transaction_digest(), effects))
377 .collect::<HashMap<_, _>>();
378 Ok(digests
379 .iter()
380 .map(|digest| tx_to_effects_map.remove(digest))
381 .collect())
382 }
383
384 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> SuiResult<bool> {
385 Ok(self
386 .perpetual_tables
387 .executed_effects
388 .contains_key(digest)?)
389 }
390
391 pub fn get_marker_value(
392 &self,
393 object_key: FullObjectKey,
394 epoch_id: EpochId,
395 ) -> SuiResult<Option<MarkerValue>> {
396 Ok(self
397 .perpetual_tables
398 .object_per_epoch_marker_table_v2
399 .get(&EpochMarkerKey(epoch_id, object_key))?)
400 }
401
402 pub fn get_latest_marker(
403 &self,
404 object_id: FullObjectID,
405 epoch_id: EpochId,
406 ) -> SuiResult<Option<(SequenceNumber, MarkerValue)>> {
407 let min_key = EpochMarkerKey(epoch_id, FullObjectKey::min_for_id(&object_id));
408 let max_key = EpochMarkerKey(epoch_id, FullObjectKey::max_for_id(&object_id));
409
410 let marker_entry = self
411 .perpetual_tables
412 .object_per_epoch_marker_table_v2
413 .reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
414 .next();
415 match marker_entry {
416 Some(Ok((EpochMarkerKey(epoch, key), marker))) => {
417 assert_eq!(epoch, epoch_id);
419 assert_eq!(key.id(), object_id);
420 Ok(Some((key.version(), marker)))
421 }
422 Some(Err(e)) => Err(e.into()),
423 None => Ok(None),
424 }
425 }
426
427 pub fn deprecated_insert_finalized_transactions(
429 &self,
430 digests: &[TransactionDigest],
431 epoch: EpochId,
432 sequence: CheckpointSequenceNumber,
433 ) -> SuiResult {
434 let mut batch = self
435 .perpetual_tables
436 .executed_transactions_to_checkpoint
437 .batch();
438 batch.insert_batch(
439 &self.perpetual_tables.executed_transactions_to_checkpoint,
440 digests.iter().map(|d| (*d, (epoch, sequence))),
441 )?;
442 batch.write()?;
443 trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
444 Ok(())
445 }
446
447 pub fn deprecated_get_transaction_checkpoint(
449 &self,
450 digest: &TransactionDigest,
451 ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
452 Ok(self
453 .perpetual_tables
454 .executed_transactions_to_checkpoint
455 .get(digest)?)
456 }
457
458 pub fn deprecated_multi_get_transaction_checkpoint(
460 &self,
461 digests: &[TransactionDigest],
462 ) -> SuiResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
463 Ok(self
464 .perpetual_tables
465 .executed_transactions_to_checkpoint
466 .multi_get(digests)?
467 .into_iter()
468 .collect())
469 }
470
471 pub fn database_is_empty(&self) -> SuiResult<bool> {
473 self.perpetual_tables.database_is_empty()
474 }
475
476 pub fn object_exists_by_key(
477 &self,
478 object_id: &ObjectID,
479 version: VersionNumber,
480 ) -> SuiResult<bool> {
481 Ok(self
482 .perpetual_tables
483 .objects
484 .contains_key(&ObjectKey(*object_id, version))?)
485 }
486
487 pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<bool>> {
488 Ok(self
489 .perpetual_tables
490 .objects
491 .multi_contains_keys(object_keys.to_vec())?
492 .into_iter()
493 .collect())
494 }
495
496 fn get_object_ref_prior_to_key(
497 &self,
498 object_id: &ObjectID,
499 version: VersionNumber,
500 ) -> Result<Option<ObjectRef>, SuiError> {
501 let Some(prior_version) = version.one_before() else {
502 return Ok(None);
503 };
504 let mut iterator = self
505 .perpetual_tables
506 .objects
507 .reversed_safe_iter_with_bounds(
508 Some(ObjectKey::min_for_id(object_id)),
509 Some(ObjectKey(*object_id, prior_version)),
510 )?;
511
512 if let Some((object_key, value)) = iterator.next().transpose()?
513 && object_key.0 == *object_id
514 {
515 return Ok(Some(
516 self.perpetual_tables.object_reference(&object_key, value)?,
517 ));
518 }
519 Ok(None)
520 }
521
522 pub fn multi_get_objects_by_key(
523 &self,
524 object_keys: &[ObjectKey],
525 ) -> Result<Vec<Option<Object>>, SuiError> {
526 let wrappers = self
527 .perpetual_tables
528 .objects
529 .multi_get(object_keys.to_vec())?;
530 let mut ret = vec![];
531
532 for (idx, w) in wrappers.into_iter().enumerate() {
533 ret.push(
534 w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
535 .transpose()?
536 .flatten(),
537 );
538 }
539 Ok(ret)
540 }
541
542 pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, SuiError> {
544 let mut result = Vec::new();
545 for id in objects {
546 result.push(self.get_object(id));
547 }
548 Ok(result)
549 }
550
551 pub(crate) fn insert_genesis_object(&self, object: Object) -> SuiResult {
556 debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
558 let object_ref = object.compute_object_reference();
559 self.insert_object_direct(object_ref, &object)
560 }
561
562 fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> SuiResult {
566 let mut write_batch = self.perpetual_tables.objects.batch();
567
568 let store_object = get_store_object(object.clone());
570 write_batch.insert_batch(
571 &self.perpetual_tables.objects,
572 std::iter::once((ObjectKey::from(object_ref), store_object)),
573 )?;
574
575 if object.get_single_owner().is_some() {
577 if !object.is_child_object() {
579 self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref], false)?;
580 }
581 }
582
583 write_batch.write()?;
584
585 Ok(())
586 }
587
588 #[instrument(level = "debug", skip_all)]
590 pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> SuiResult<()> {
591 let mut batch = self.perpetual_tables.objects.batch();
592 let ref_and_objects: Vec<_> = objects
593 .iter()
594 .map(|o| (o.compute_object_reference(), o))
595 .collect();
596
597 batch.insert_batch(
598 &self.perpetual_tables.objects,
599 ref_and_objects
600 .iter()
601 .map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))),
602 )?;
603
604 let non_child_object_refs: Vec<_> = ref_and_objects
605 .iter()
606 .filter(|(_, object)| !object.is_child_object())
607 .map(|(oref, _)| *oref)
608 .collect();
609
610 self.initialize_live_object_markers_impl(
611 &mut batch,
612 &non_child_object_refs,
613 false, )?;
615
616 batch.write()?;
617
618 Ok(())
619 }
620
621 pub async fn bulk_insert_live_objects(
622 perpetual_db: Arc<AuthorityPerpetualTables>,
623 objects: Vec<LiveObject>,
624 expected_sha3_digest: &[u8; 32],
625 num_parallel_chunks: usize,
626 ) -> SuiResult<()> {
627 let mut hasher = Sha3_256::default();
629 for object in &objects {
630 hasher.update(object.object_reference().2.inner());
631 }
632 let sha3_digest = hasher.finalize().digest;
633 if *expected_sha3_digest != sha3_digest {
634 error!(
635 "Sha does not match! expected: {:?}, actual: {:?}",
636 expected_sha3_digest, sha3_digest
637 );
638 return Err(SuiError::from("Sha does not match"));
639 }
640
641 let chunk_size = objects.len().div_ceil(num_parallel_chunks).max(1);
642 let mut remaining = objects;
643 let mut handles = Vec::new();
644 while !remaining.is_empty() {
645 let take = chunk_size.min(remaining.len());
646 let chunk: Vec<LiveObject> = remaining.drain(..take).collect();
647 let db = perpetual_db.clone();
648 handles.push(tokio::task::spawn_blocking(move || {
649 Self::insert_objects_chunk(db, chunk)
650 }));
651 }
652 for handle in handles {
653 handle.await.expect("insert task panicked")?;
654 }
655 Ok(())
656 }
657
658 fn insert_objects_chunk(
659 perpetual_db: Arc<AuthorityPerpetualTables>,
660 objects: Vec<LiveObject>,
661 ) -> SuiResult<()> {
662 let mut batch = perpetual_db.objects.batch();
663 let mut written = 0usize;
664 const MAX_BATCH_SIZE: usize = 100_000;
665 for object in objects {
666 match object {
667 LiveObject::Normal(object) => {
668 let store_object_wrapper = get_store_object(object.clone());
669 batch.insert_batch(
670 &perpetual_db.objects,
671 std::iter::once((
672 ObjectKey::from(object.compute_object_reference()),
673 store_object_wrapper,
674 )),
675 )?;
676 if !object.is_child_object() {
677 Self::initialize_live_object_markers(
678 &perpetual_db.live_owned_object_markers,
679 &mut batch,
680 &[object.compute_object_reference()],
681 true, )?;
683 }
684 }
685 LiveObject::Wrapped(object_key) => {
686 batch.insert_batch(
687 &perpetual_db.objects,
688 std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
689 object_key,
690 StoreObject::Wrapped.into(),
691 )),
692 )?;
693 }
694 }
695 written += 1;
696 if written > MAX_BATCH_SIZE {
697 batch.write()?;
698 batch = perpetual_db.objects.batch();
699 written = 0;
700 }
701 }
702 batch.write()?;
703 Ok(())
704 }
705
706 pub fn set_epoch_start_configuration(
707 &self,
708 epoch_start_configuration: &EpochStartConfiguration,
709 ) -> SuiResult {
710 self.perpetual_tables
711 .set_epoch_start_configuration(epoch_start_configuration)?;
712 Ok(())
713 }
714
715 pub fn get_epoch_start_configuration(&self) -> SuiResult<Option<EpochStartConfiguration>> {
716 Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
717 }
718
719 #[instrument(level = "debug", skip_all)]
724 pub fn build_db_batch(
725 &self,
726 epoch_id: EpochId,
727 tx_outputs: &[Arc<TransactionOutputs>],
728 ) -> SuiResult<DBBatch> {
729 let mut write_batch = self.perpetual_tables.transactions.batch();
730 for outputs in tx_outputs {
731 self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
732 }
733 fail_point!("crash");
735
736 trace!(
737 "built batch for committed transactions: {:?}",
738 tx_outputs
739 .iter()
740 .map(|tx| tx.transaction.digest())
741 .collect::<Vec<_>>()
742 );
743
744 fail_point!("crash");
746
747 Ok(write_batch)
748 }
749
750 fn write_one_transaction_outputs(
751 &self,
752 write_batch: &mut DBBatch,
753 epoch_id: EpochId,
754 tx_outputs: &TransactionOutputs,
755 ) -> SuiResult {
756 let TransactionOutputs {
757 transaction,
758 effects,
759 markers,
760 wrapped,
761 deleted,
762 written,
763 events,
764 unchanged_loaded_runtime_objects,
765 locks_to_delete,
766 new_locks_to_init,
767 ..
768 } = tx_outputs;
769
770 let effects_digest = effects.digest();
771 let transaction_digest = transaction.digest();
772 write_batch
775 .insert_batch(
776 &self.perpetual_tables.effects,
777 [(effects_digest, effects.clone())],
778 )?
779 .insert_batch(
780 &self.perpetual_tables.executed_effects,
781 [(transaction_digest, effects_digest)],
782 )?;
783
784 write_batch.insert_batch(
786 &self.perpetual_tables.transactions,
787 iter::once((transaction_digest, transaction.serializable_ref())),
788 )?;
789
790 write_batch.insert_batch(
791 &self.perpetual_tables.executed_transaction_digests,
792 [((epoch_id, *transaction_digest), ())],
793 )?;
794
795 write_batch.insert_batch(
797 &self.perpetual_tables.object_per_epoch_marker_table_v2,
798 markers
799 .iter()
800 .map(|(key, marker_value)| (EpochMarkerKey(epoch_id, *key), *marker_value)),
801 )?;
802 write_batch.insert_batch(
803 &self.perpetual_tables.objects,
804 deleted
805 .iter()
806 .map(|key| (key, StoreObject::Deleted))
807 .chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
808 .map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
809 )?;
810
811 let new_objects = written.iter().map(|(id, new_object)| {
813 let version = new_object.version();
814 trace!(?id, ?version, "writing object");
815 let store_object = get_store_object(new_object.clone());
816 (ObjectKey(*id, version), store_object)
817 });
818
819 write_batch.insert_batch(&self.perpetual_tables.objects, new_objects)?;
820
821 if effects.events_digest().is_some() {
823 write_batch.insert_batch(
824 &self.perpetual_tables.events_2,
825 [(transaction_digest, events)],
826 )?;
827 }
828
829 if !unchanged_loaded_runtime_objects.is_empty() {
831 write_batch.insert_batch(
832 &self.perpetual_tables.unchanged_loaded_runtime_objects,
833 [(transaction_digest, unchanged_loaded_runtime_objects)],
834 )?;
835 }
836
837 self.initialize_live_object_markers_impl(write_batch, new_locks_to_init, false)?;
838
839 self.delete_live_object_markers(write_batch, locks_to_delete)?;
842
843 debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
844
845 Ok(())
846 }
847
848 pub(crate) fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> SuiResult {
851 let mut batch = self.perpetual_tables.transactions.batch();
852 batch.insert_batch(
853 &self.perpetual_tables.transactions,
854 [(tx.digest(), tx.clone().into_unsigned().serializable_ref())],
855 )?;
856 batch.write()?;
857 Ok(())
858 }
859
860 pub(crate) fn get_lock(
863 &self,
864 obj_ref: ObjectRef,
865 epoch_store: &AuthorityPerEpochStore,
866 ) -> SuiLockResult {
867 if self
868 .perpetual_tables
869 .live_owned_object_markers
870 .get(&obj_ref)?
871 .is_none()
872 {
873 return Ok(ObjectLockStatus::LockedAtDifferentVersion {
874 locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
875 });
876 }
877
878 let tables = epoch_store.tables()?;
879 let epoch_id = epoch_store.epoch();
880
881 if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
882 Ok(ObjectLockStatus::LockedToTx {
883 locked_by_tx: LockDetailsDeprecated {
884 epoch: epoch_id,
885 tx_digest,
886 },
887 })
888 } else {
889 Ok(ObjectLockStatus::Initialized)
890 }
891 }
892
893 pub(crate) fn get_latest_live_version_for_object_id(
895 &self,
896 object_id: ObjectID,
897 ) -> SuiResult<ObjectRef> {
898 let mut iterator = self
899 .perpetual_tables
900 .live_owned_object_markers
901 .reversed_safe_iter_with_bounds(
902 None,
903 Some((object_id, SequenceNumber::MAX, ObjectDigest::MAX)),
904 )?;
905 Ok(iterator
906 .next()
907 .transpose()?
908 .and_then(|value| {
909 if value.0.0 == object_id {
910 Some(value)
911 } else {
912 None
913 }
914 })
915 .ok_or_else(|| {
916 SuiError::from(UserInputError::ObjectNotFound {
917 object_id,
918 version: None,
919 })
920 })?
921 .0)
922 }
923
924 pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> SuiResult {
929 let locks = self
930 .perpetual_tables
931 .live_owned_object_markers
932 .multi_get(objects)?;
933 for (lock, obj_ref) in locks.into_iter().zip_debug_eq(objects) {
934 if lock.is_none() {
935 let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
936 fp_bail!(
937 UserInputError::ObjectVersionUnavailableForConsumption {
938 provided_obj_ref: *obj_ref,
939 current_version: latest_lock.1
940 }
941 .into()
942 );
943 }
944 }
945 Ok(())
946 }
947
948 fn initialize_live_object_markers_impl(
951 &self,
952 write_batch: &mut DBBatch,
953 objects: &[ObjectRef],
954 is_force_reset: bool,
955 ) -> SuiResult {
956 AuthorityStore::initialize_live_object_markers(
957 &self.perpetual_tables.live_owned_object_markers,
958 write_batch,
959 objects,
960 is_force_reset,
961 )
962 }
963
964 pub fn initialize_live_object_markers(
965 live_object_marker_table: &DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
966 write_batch: &mut DBBatch,
967 objects: &[ObjectRef],
968 is_force_reset: bool,
969 ) -> SuiResult {
970 trace!(?objects, "initialize_locks");
971
972 if !is_force_reset {
973 let live_object_markers = live_object_marker_table.multi_get(objects)?;
974 let existing_live_object_markers: Vec<ObjectRef> = live_object_markers
978 .iter()
979 .zip_debug_eq(objects)
980 .filter_map(|(lock_opt, objref)| {
981 lock_opt.clone().flatten().map(|_tx_digest| *objref)
982 })
983 .collect();
984 if !existing_live_object_markers.is_empty() {
985 info!(
986 ?existing_live_object_markers,
987 "Cannot initialize live_object_markers because some exist already"
988 );
989 return Err(SuiErrorKind::ObjectLockAlreadyInitialized {
990 refs: existing_live_object_markers,
991 }
992 .into());
993 }
994 }
995
996 write_batch.insert_batch(
997 live_object_marker_table,
998 objects.iter().map(|obj_ref| (obj_ref, None)),
999 )?;
1000 Ok(())
1001 }
1002
1003 fn delete_live_object_markers(
1005 &self,
1006 write_batch: &mut DBBatch,
1007 objects: &[ObjectRef],
1008 ) -> SuiResult {
1009 trace!(?objects, "delete_locks");
1010 write_batch.delete_batch(
1011 &self.perpetual_tables.live_owned_object_markers,
1012 objects.iter(),
1013 )?;
1014 Ok(())
1015 }
1016
1017 pub fn find_object_lt_or_eq_version(
1022 &self,
1023 object_id: ObjectID,
1024 version: SequenceNumber,
1025 ) -> SuiResult<Option<Object>> {
1026 self.perpetual_tables
1027 .find_object_lt_or_eq_version(object_id, version)
1028 }
1029
1030 pub fn get_latest_object_ref_or_tombstone(
1040 &self,
1041 object_id: ObjectID,
1042 ) -> Result<Option<ObjectRef>, SuiError> {
1043 self.perpetual_tables
1044 .get_latest_object_ref_or_tombstone(object_id)
1045 }
1046
1047 pub fn get_latest_object_or_tombstone(
1051 &self,
1052 object_id: ObjectID,
1053 ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, SuiError> {
1054 let Some((object_key, store_object)) = self
1055 .perpetual_tables
1056 .get_latest_object_or_tombstone(object_id)?
1057 else {
1058 return Ok(None);
1059 };
1060
1061 if let Some(object_ref) = self
1062 .perpetual_tables
1063 .tombstone_reference(&object_key, &store_object)?
1064 {
1065 return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
1066 }
1067
1068 let object = self
1069 .perpetual_tables
1070 .object(&object_key, store_object)?
1071 .expect("Non tombstone store object could not be converted to object");
1072
1073 Ok(Some((object_key, ObjectOrTombstone::Object(object))))
1074 }
1075
1076 pub fn insert_transaction_and_effects(
1077 &self,
1078 transaction: &VerifiedTransaction,
1079 transaction_effects: &TransactionEffects,
1080 ) -> Result<(), TypedStoreError> {
1081 let mut write_batch = self.perpetual_tables.transactions.batch();
1082 write_batch
1085 .insert_batch(
1086 &self.perpetual_tables.effects,
1087 [(transaction_effects.digest(), transaction_effects)],
1088 )?
1089 .insert_batch(
1090 &self.perpetual_tables.transactions,
1091 [(transaction.digest(), transaction.serializable_ref())],
1092 )?;
1093
1094 write_batch.write()?;
1095 Ok(())
1096 }
1097
1098 pub fn multi_insert_transaction_and_effects<'a>(
1099 &self,
1100 transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
1101 ) -> Result<(), TypedStoreError> {
1102 let mut write_batch = self.perpetual_tables.transactions.batch();
1103 for tx in transactions {
1104 write_batch
1105 .insert_batch(
1106 &self.perpetual_tables.effects,
1107 [(tx.effects.digest(), &tx.effects)],
1108 )?
1109 .insert_batch(
1110 &self.perpetual_tables.transactions,
1111 [(tx.transaction.digest(), tx.transaction.serializable_ref())],
1112 )?;
1113 }
1114
1115 write_batch.write()?;
1116 Ok(())
1117 }
1118
1119 pub fn multi_get_transaction_blocks(
1120 &self,
1121 tx_digests: &[TransactionDigest],
1122 ) -> Result<Vec<Option<VerifiedTransaction>>, TypedStoreError> {
1123 self.perpetual_tables
1124 .transactions
1125 .multi_get(tx_digests)
1126 .map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())
1127 }
1128
1129 pub fn get_transaction_block(
1130 &self,
1131 tx_digest: &TransactionDigest,
1132 ) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
1133 self.perpetual_tables
1134 .transactions
1135 .get(tx_digest)
1136 .map(|v| v.map(|v| v.into()))
1137 }
1138
1139 pub fn list_transactions_from(
1140 &self,
1141 start: Option<TransactionDigest>,
1142 limit: usize,
1143 ) -> Result<Vec<TransactionDigest>, TypedStoreError> {
1144 self.perpetual_tables.list_transactions_from(start, limit)
1145 }
1146
1147 pub fn get_executed_effects_digest_for_tx(
1148 &self,
1149 tx_digest: &TransactionDigest,
1150 ) -> Result<Option<TransactionEffectsDigest>, TypedStoreError> {
1151 self.perpetual_tables.get_executed_effects_digest(tx_digest)
1152 }
1153
1154 pub fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1161 get_sui_system_state(self.perpetual_tables.as_ref())
1162 }
1163
1164 pub fn expensive_check_sui_conservation<T>(
1165 self: &Arc<Self>,
1166 type_layout_store: T,
1167 old_epoch_store: &AuthorityPerEpochStore,
1168 ) -> SuiResult
1169 where
1170 T: TypeLayoutStore + Send + Copy,
1171 {
1172 if !self.enable_epoch_sui_conservation_check {
1173 return Ok(());
1174 }
1175
1176 let executor = old_epoch_store.executor();
1177 info!("Starting SUI conservation check. This may take a while..");
1178 let cur_time = Instant::now();
1179 let mut pending_objects = vec![];
1180 let mut count = 0;
1181 let mut size = 0;
1182 let (mut total_sui, mut total_storage_rebate) = thread::scope(|s| {
1183 let pending_tasks = FuturesUnordered::new();
1184 for o in self.iter_live_object_set(false) {
1185 match o {
1186 LiveObject::Normal(object) => {
1187 size += object.object_size_for_gas_metering();
1188 count += 1;
1189 pending_objects.push(object);
1190 if count % 1_000_000 == 0 {
1191 let mut task_objects = vec![];
1192 mem::swap(&mut pending_objects, &mut task_objects);
1193 pending_tasks.push(s.spawn(move || {
1194 let mut layout_resolver = executor.type_layout_resolver(
1195 old_epoch_store.protocol_config(),
1196 Box::new(type_layout_store),
1197 );
1198 let mut total_storage_rebate = 0;
1199 let mut total_sui = 0;
1200 for object in task_objects {
1201 total_storage_rebate += object.storage_rebate;
1202 let object_contained_sui = match object
1205 .get_total_sui(layout_resolver.as_mut())
1206 {
1207 Ok(sui) => sui,
1208 Err(e)
1209 if old_epoch_store.get_chain()
1210 == sui_protocol_config::Chain::Testnet =>
1211 {
1212 error!(
1213 "Error calculating total SUI for object {:?}: {:?}",
1214 object.compute_object_reference(),
1215 e
1216 );
1217 0
1218 }
1219 Err(e) => panic!(
1220 "Error calculating total SUI for object {:?}: {:?}",
1221 object.compute_object_reference(),
1222 e
1223 ),
1224 };
1225 total_sui += object_contained_sui - object.storage_rebate;
1226 }
1227 if count % 50_000_000 == 0 {
1228 info!("Processed {} objects", count);
1229 }
1230 (total_sui, total_storage_rebate)
1231 }));
1232 }
1233 }
1234 LiveObject::Wrapped(_) => {
1235 unreachable!("Explicitly asked to not include wrapped tombstones")
1236 }
1237 }
1238 }
1239 pending_tasks.into_iter().fold((0, 0), |init, result| {
1240 let result = result.join().unwrap();
1241 (init.0 + result.0, init.1 + result.1)
1242 })
1243 });
1244 let mut layout_resolver = executor.type_layout_resolver(
1245 old_epoch_store.protocol_config(),
1246 Box::new(type_layout_store),
1247 );
1248 for object in pending_objects {
1249 total_storage_rebate += object.storage_rebate;
1250 total_sui +=
1251 object.get_total_sui(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
1252 }
1253 info!(
1254 "Scanned {} live objects, took {:?}",
1255 count,
1256 cur_time.elapsed()
1257 );
1258 self.metrics
1259 .sui_conservation_live_object_count
1260 .set(count as i64);
1261 self.metrics
1262 .sui_conservation_live_object_size
1263 .set(size as i64);
1264 self.metrics
1265 .sui_conservation_check_latency
1266 .set(cur_time.elapsed().as_secs() as i64);
1267
1268 let system_state = self
1270 .get_sui_system_state_object_unsafe()
1271 .expect("Reading sui system state object cannot fail")
1272 .into_sui_system_state_summary();
1273 let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
1274 info!(
1275 "Total SUI amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
1276 total_sui, storage_fund_balance, total_storage_rebate, system_state.epoch
1277 );
1278
1279 let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
1280 self.metrics
1281 .sui_conservation_storage_fund
1282 .set(storage_fund_balance as i64);
1283 self.metrics
1284 .sui_conservation_storage_fund_imbalance
1285 .set(imbalance);
1286 self.metrics
1287 .sui_conservation_imbalance
1288 .set((total_sui as i128 - TOTAL_SUPPLY_MIST as i128) as i64);
1289
1290 if let Some(expected_imbalance) = self
1291 .perpetual_tables
1292 .expected_storage_fund_imbalance
1293 .get(&())
1294 .expect("DB read cannot fail")
1295 {
1296 fp_ensure!(
1297 imbalance == expected_imbalance,
1298 SuiError::from(
1299 format!(
1300 "Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
1301 system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
1302 ).as_str()
1303 )
1304 );
1305 } else {
1306 self.perpetual_tables
1307 .expected_storage_fund_imbalance
1308 .insert(&(), &imbalance)
1309 .expect("DB write cannot fail");
1310 }
1311
1312 if let Some(expected_sui) = self
1313 .perpetual_tables
1314 .expected_network_sui_amount
1315 .get(&())
1316 .expect("DB read cannot fail")
1317 {
1318 fp_ensure!(
1319 total_sui == expected_sui,
1320 SuiError::from(
1321 format!(
1322 "Inconsistent state detected at epoch {}: total sui: {}, expecting {}",
1323 system_state.epoch, total_sui, expected_sui
1324 )
1325 .as_str()
1326 )
1327 );
1328 } else {
1329 self.perpetual_tables
1330 .expected_network_sui_amount
1331 .insert(&(), &total_sui)
1332 .expect("DB write cannot fail");
1333 }
1334
1335 Ok(())
1336 }
1337
1338 #[instrument(level = "error", skip_all)]
1341 pub fn maybe_reaccumulate_state_hash(
1342 &self,
1343 cur_epoch_store: &AuthorityPerEpochStore,
1344 new_protocol_version: ProtocolVersion,
1345 ) {
1346 let old_simplified_unwrap_then_delete = cur_epoch_store
1347 .protocol_config()
1348 .simplified_unwrap_then_delete();
1349 let new_simplified_unwrap_then_delete =
1350 ProtocolConfig::get_for_version(new_protocol_version, cur_epoch_store.get_chain())
1351 .simplified_unwrap_then_delete();
1352 let should_reaccumulate =
1355 !old_simplified_unwrap_then_delete && new_simplified_unwrap_then_delete;
1356 if !should_reaccumulate {
1357 return;
1358 }
1359 info!(
1360 "[Re-accumulate] simplified_unwrap_then_delete is enabled in the new protocol version, re-accumulating state hash"
1361 );
1362 let cur_time = Instant::now();
1363 std::thread::scope(|s| {
1364 let pending_tasks = FuturesUnordered::new();
1365 const BITS: u8 = 5;
1374 for index in 0u8..(1 << BITS) {
1375 pending_tasks.push(s.spawn(move || {
1376 let mut id_bytes = [0; ObjectID::LENGTH];
1377 id_bytes[0] = index << (8 - BITS);
1378 let start_id = ObjectID::new(id_bytes);
1379
1380 id_bytes[0] |= (1 << (8 - BITS)) - 1;
1381 for element in id_bytes.iter_mut().skip(1) {
1382 *element = u8::MAX;
1383 }
1384 let end_id = ObjectID::new(id_bytes);
1385
1386 info!(
1387 "[Re-accumulate] Scanning object ID range {:?}..{:?}",
1388 start_id, end_id
1389 );
1390 let mut prev = (
1391 ObjectKey::min_for_id(&ObjectID::ZERO),
1392 StoreObjectWrapper::V1(StoreObject::Deleted),
1393 );
1394 let mut object_scanned: u64 = 0;
1395 let mut wrapped_objects_to_remove = vec![];
1396 for db_result in self.perpetual_tables.objects.safe_range_iter(
1397 ObjectKey::min_for_id(&start_id)..=ObjectKey::max_for_id(&end_id),
1398 ) {
1399 match db_result {
1400 Ok((object_key, object)) => {
1401 object_scanned += 1;
1402 if object_scanned.is_multiple_of(100000) {
1403 info!(
1404 "[Re-accumulate] Task {}: object scanned: {}",
1405 index, object_scanned,
1406 );
1407 }
1408 if matches!(prev.1.inner(), StoreObject::Wrapped)
1409 && object_key.0 != prev.0.0
1410 {
1411 wrapped_objects_to_remove
1412 .push(WrappedObject::new(prev.0.0, prev.0.1));
1413 }
1414
1415 prev = (object_key, object);
1416 }
1417 Err(err) => {
1418 warn!("Object iterator encounter RocksDB error {:?}", err);
1419 return Err(err);
1420 }
1421 }
1422 }
1423 if matches!(prev.1.inner(), StoreObject::Wrapped) {
1424 wrapped_objects_to_remove.push(WrappedObject::new(prev.0.0, prev.0.1));
1425 }
1426 info!(
1427 "[Re-accumulate] Task {}: object scanned: {}, wrapped objects: {}",
1428 index,
1429 object_scanned,
1430 wrapped_objects_to_remove.len(),
1431 );
1432 Ok((wrapped_objects_to_remove, object_scanned))
1433 }));
1434 }
1435 let (last_checkpoint_of_epoch, cur_accumulator) = self
1436 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
1437 .expect("read cannot fail")
1438 .expect("accumulator must exist");
1439 let (accumulator, total_objects_scanned, total_wrapped_objects) =
1440 pending_tasks.into_iter().fold(
1441 (cur_accumulator, 0u64, 0usize),
1442 |(mut accumulator, total_objects_scanned, total_wrapped_objects), task| {
1443 let (wrapped_objects_to_remove, object_scanned) =
1444 task.join().unwrap().unwrap();
1445 accumulator.remove_all(
1446 wrapped_objects_to_remove
1447 .iter()
1448 .map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
1449 .collect::<Vec<Vec<u8>>>(),
1450 );
1451 (
1452 accumulator,
1453 total_objects_scanned + object_scanned,
1454 total_wrapped_objects + wrapped_objects_to_remove.len(),
1455 )
1456 },
1457 );
1458 info!(
1459 "[Re-accumulate] Total objects scanned: {}, total wrapped objects: {}",
1460 total_objects_scanned, total_wrapped_objects,
1461 );
1462 info!(
1463 "[Re-accumulate] New accumulator value: {:?}",
1464 accumulator.digest()
1465 );
1466 self.insert_state_hash_for_epoch(
1467 cur_epoch_store.epoch(),
1468 &last_checkpoint_of_epoch,
1469 &accumulator,
1470 )
1471 .unwrap();
1472 });
1473 info!(
1474 "[Re-accumulate] Re-accumulating took {}seconds",
1475 cur_time.elapsed().as_secs()
1476 );
1477 }
1478
1479 pub async fn prune_objects_and_compact_for_testing(
1480 &self,
1481 checkpoint_store: &Arc<CheckpointStore>,
1482 rpc_index: Option<&RpcIndexStore>,
1483 ) {
1484 let pruning_config = AuthorityStorePruningConfig {
1485 num_epochs_to_retain: 0,
1486 ..Default::default()
1487 };
1488 let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
1489 &self.perpetual_tables,
1490 checkpoint_store,
1491 rpc_index,
1492 None,
1493 pruning_config,
1494 AuthorityStorePruningMetrics::new_for_test(),
1495 EPOCH_DURATION_MS_FOR_TESTING,
1496 )
1497 .await;
1498 let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
1499 }
1500
1501 pub fn remove_executed_effects_for_testing(
1502 &self,
1503 tx_digest: &TransactionDigest,
1504 ) -> anyhow::Result<()> {
1505 let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
1506 if let Some(effects_digest) = effects_digest {
1507 self.perpetual_tables.executed_effects.remove(tx_digest)?;
1508 self.perpetual_tables.effects.remove(&effects_digest)?;
1509 }
1510 Ok(())
1511 }
1512
1513 #[cfg(test)]
1514 pub async fn prune_objects_immediately_for_testing(
1515 &self,
1516 transaction_effects: Vec<TransactionEffects>,
1517 ) -> anyhow::Result<()> {
1518 let mut wb = self.perpetual_tables.objects.batch();
1519
1520 let mut object_keys_to_prune = vec![];
1521 for effects in &transaction_effects {
1522 for (object_id, seq_number) in effects.modified_at_versions() {
1523 info!("Pruning object {:?} version {:?}", object_id, seq_number);
1524 object_keys_to_prune.push(ObjectKey(object_id, seq_number));
1525 }
1526 }
1527
1528 wb.delete_batch(
1529 &self.perpetual_tables.objects,
1530 object_keys_to_prune.into_iter(),
1531 )?;
1532 wb.write()?;
1533 Ok(())
1534 }
1535
1536 #[cfg(msim)]
1538 pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
1539 self.perpetual_tables
1540 .objects
1541 .safe_iter_with_bounds(
1542 Some(ObjectKey(object_id, VersionNumber::MIN)),
1543 Some(ObjectKey(object_id, VersionNumber::MAX)),
1544 )
1545 .collect::<Result<Vec<_>, _>>()
1546 .unwrap()
1547 .len()
1548 }
1549}
1550
1551impl GlobalStateHashStore for AuthorityStore {
1552 fn get_object_ref_prior_to_key_deprecated(
1553 &self,
1554 object_id: &ObjectID,
1555 version: VersionNumber,
1556 ) -> SuiResult<Option<ObjectRef>> {
1557 self.get_object_ref_prior_to_key(object_id, version)
1558 }
1559
1560 fn get_root_state_hash_for_epoch(
1561 &self,
1562 epoch: EpochId,
1563 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
1564 self.perpetual_tables
1565 .root_state_hash_by_epoch
1566 .get(&epoch)
1567 .map_err(Into::into)
1568 }
1569
1570 fn get_root_state_hash_for_highest_epoch(
1571 &self,
1572 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
1573 Ok(self
1574 .perpetual_tables
1575 .root_state_hash_by_epoch
1576 .reversed_safe_iter_with_bounds(None, None)?
1577 .next()
1578 .transpose()?)
1579 }
1580
1581 fn insert_state_hash_for_epoch(
1582 &self,
1583 epoch: EpochId,
1584 last_checkpoint_of_epoch: &CheckpointSequenceNumber,
1585 acc: &GlobalStateHash,
1586 ) -> SuiResult {
1587 self.perpetual_tables
1588 .root_state_hash_by_epoch
1589 .insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
1590 self.root_state_notify_read
1591 .notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
1592
1593 Ok(())
1594 }
1595
1596 fn iter_live_object_set(
1597 &self,
1598 include_wrapped_object: bool,
1599 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1600 Box::new(
1601 self.perpetual_tables
1602 .iter_live_object_set(include_wrapped_object),
1603 )
1604 }
1605}
1606
1607impl ObjectStore for AuthorityStore {
1608 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
1610 self.perpetual_tables.as_ref().get_object(object_id)
1611 }
1612
1613 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
1614 self.perpetual_tables.get_object_by_key(object_id, version)
1615 }
1616}
1617
1618pub struct ResolverWrapper {
1620 pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1621 pub metrics: Arc<ResolverMetrics>,
1622}
1623
1624impl ResolverWrapper {
1625 pub fn new(
1626 resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1627 metrics: Arc<ResolverMetrics>,
1628 ) -> Self {
1629 metrics.module_cache_size.set(0);
1630 ResolverWrapper { resolver, metrics }
1631 }
1632
1633 fn inc_cache_size_gauge(&self) {
1634 let current = self.metrics.module_cache_size.get();
1636 self.metrics.module_cache_size.set(current + 1);
1637 }
1638}
1639
1640impl ModuleResolver for ResolverWrapper {
1641 type Error = SuiError;
1642 fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
1643 self.inc_cache_size_gauge();
1644 get_module(&*self.resolver, module_id)
1645 }
1646
1647 fn get_packages_static<const N: usize>(
1648 &self,
1649 ids: [AccountAddress; N],
1650 ) -> Result<[Option<SerializedPackage>; N], Self::Error> {
1651 let mut packages = [const { None }; N];
1652 for (i, id) in ids.iter().enumerate() {
1653 packages[i] = get_package(&*self.resolver, &ObjectID::from(*id))?;
1654 }
1655 Ok(packages)
1656 }
1657
1658 fn get_packages<'a>(
1659 &self,
1660 ids: impl ExactSizeIterator<Item = &'a AccountAddress>,
1661 ) -> Result<Vec<Option<SerializedPackage>>, Self::Error> {
1662 ids.map(|id| get_package(&*self.resolver, &ObjectID::from(*id)))
1663 .collect()
1664 }
1665}
1666
1667pub type SuiLockResult = SuiResult<ObjectLockStatus>;
1668
1669#[derive(Debug, PartialEq, Eq)]
1670pub enum ObjectLockStatus {
1671 Initialized,
1672 LockedToTx { locked_by_tx: LockDetailsDeprecated },
1673 LockedAtDifferentVersion { locked_ref: ObjectRef },
1674}
1675
1676#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1677pub enum LockDetailsWrapperDeprecated {
1678 V1(LockDetailsV1Deprecated),
1679}
1680
1681#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1682pub struct LockDetailsV1Deprecated {
1683 pub epoch: EpochId,
1684 pub tx_digest: TransactionDigest,
1685}
1686
1687pub type LockDetailsDeprecated = LockDetailsV1Deprecated;