1use std::sync::Arc;
5use std::{iter, mem, thread};
6
7use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
8use crate::authority::authority_store_pruner::{
9 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
10};
11use crate::authority::authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object};
12use crate::authority::epoch_marker_key::EpochMarkerKey;
13use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
14use crate::global_state_hasher::GlobalStateHashStore;
15use crate::rpc_index::RpcIndexStore;
16use crate::transaction_outputs::TransactionOutputs;
17use either::Either;
18use fastcrypto::hash::{HashFunction, MultisetHash, Sha3_256};
19use futures::stream::FuturesUnordered;
20use move_core_types::account_address::AccountAddress;
21use move_core_types::resolver::{ModuleResolver, SerializedPackage};
22use serde::{Deserialize, Serialize};
23use sui_config::node::AuthorityStorePruningConfig;
24use sui_macros::fail_point_arg;
25use sui_types::error::{SuiErrorKind, UserInputError};
26use sui_types::execution::TypeLayoutStore;
27use sui_types::global_state_hash::GlobalStateHash;
28use sui_types::message_envelope::Message;
29use sui_types::storage::{
30 BackingPackageStore, FullObjectKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore,
31 get_module, get_package,
32};
33use sui_types::sui_system_state::get_sui_system_state;
34use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure};
35use tokio::time::Instant;
36use tracing::{debug, info, trace};
37use typed_store::traits::Map;
38use typed_store::{
39 TypedStoreError,
40 rocks::{DBBatch, DBMap},
41};
42
43use super::authority_store_tables::LiveObject;
44use super::{authority_store_tables::AuthorityPerpetualTables, *};
45use mysten_common::sync::notify_read::NotifyRead;
46use sui_types::effects::{TransactionEffects, TransactionEvents};
47use sui_types::gas_coin::TOTAL_SUPPLY_MIST;
48
49struct AuthorityStoreMetrics {
50 sui_conservation_check_latency: IntGauge,
51 sui_conservation_live_object_count: IntGauge,
52 sui_conservation_live_object_size: IntGauge,
53 sui_conservation_imbalance: IntGauge,
54 sui_conservation_storage_fund: IntGauge,
55 sui_conservation_storage_fund_imbalance: IntGauge,
56 epoch_flags: IntGaugeVec,
57}
58
59impl AuthorityStoreMetrics {
60 pub fn new(registry: &Registry) -> Self {
61 Self {
62 sui_conservation_check_latency: register_int_gauge_with_registry!(
63 "sui_conservation_check_latency",
64 "Number of seconds took to scan all live objects in the store for SUI conservation check",
65 registry,
66 ).unwrap(),
67 sui_conservation_live_object_count: register_int_gauge_with_registry!(
68 "sui_conservation_live_object_count",
69 "Number of live objects in the store",
70 registry,
71 ).unwrap(),
72 sui_conservation_live_object_size: register_int_gauge_with_registry!(
73 "sui_conservation_live_object_size",
74 "Size in bytes of live objects in the store",
75 registry,
76 ).unwrap(),
77 sui_conservation_imbalance: register_int_gauge_with_registry!(
78 "sui_conservation_imbalance",
79 "Total amount of SUI in the network - 10B * 10^9. This delta shows the amount of imbalance",
80 registry,
81 ).unwrap(),
82 sui_conservation_storage_fund: register_int_gauge_with_registry!(
83 "sui_conservation_storage_fund",
84 "Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
85 registry,
86 ).unwrap(),
87 sui_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
88 "sui_conservation_storage_fund_imbalance",
89 "Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
90 registry,
91 ).unwrap(),
92 epoch_flags: register_int_gauge_vec_with_registry!(
93 "epoch_flags",
94 "Local flags of the currently running epoch",
95 &["flag"],
96 registry,
97 ).unwrap(),
98 }
99 }
100}
101
102pub struct AuthorityStore {
109 pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
110
111 pub(crate) root_state_notify_read:
112 NotifyRead<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
113
114 enable_epoch_sui_conservation_check: bool,
116
117 metrics: AuthorityStoreMetrics,
118}
119
120pub type ExecutionLockReadGuard<'a> = tokio::sync::RwLockReadGuard<'a, EpochId>;
121pub type ExecutionLockWriteGuard<'a> = tokio::sync::RwLockWriteGuard<'a, EpochId>;
122
123impl AuthorityStore {
124 pub async fn open(
127 perpetual_tables: Arc<AuthorityPerpetualTables>,
128 genesis: &Genesis,
129 config: &NodeConfig,
130 registry: &Registry,
131 ) -> SuiResult<Arc<Self>> {
132 let enable_epoch_sui_conservation_check = config
133 .expensive_safety_check_config
134 .enable_epoch_sui_conservation_check();
135
136 let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
137 info!("Creating new epoch start config from genesis");
138
139 #[allow(unused_mut)]
140 let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
141 fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
142 info!("Setting initial epoch flags to {:?}", flags);
143 initial_epoch_flags = flags;
144 });
145
146 let epoch_start_configuration = EpochStartConfiguration::new(
147 genesis.sui_system_object().into_epoch_start_state(),
148 *genesis.checkpoint().digest(),
149 &genesis.objects(),
150 initial_epoch_flags,
151 )?;
152 perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
153 epoch_start_configuration
154 } else {
155 info!("Loading epoch start config from DB");
156 perpetual_tables
157 .epoch_start_configuration
158 .get(&())?
159 .expect("Epoch start configuration must be set in non-empty DB")
160 };
161 let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
162 info!("Epoch start config: {:?}", epoch_start_configuration);
163 info!("Cur epoch: {:?}", cur_epoch);
164 let this = Self::open_inner(
165 genesis,
166 perpetual_tables,
167 enable_epoch_sui_conservation_check,
168 registry,
169 )
170 .await?;
171 this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
172 Ok(this)
173 }
174
175 pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
176 for flag in old {
177 self.metrics
178 .epoch_flags
179 .with_label_values(&[&flag.to_string()])
180 .set(0);
181 }
182 for flag in new {
183 self.metrics
184 .epoch_flags
185 .with_label_values(&[&flag.to_string()])
186 .set(1);
187 }
188 }
189
190 pub fn clear_object_per_epoch_marker_table(
193 &self,
194 _execution_guard: &ExecutionLockWriteGuard<'_>,
195 ) -> SuiResult<()> {
196 self.perpetual_tables
200 .object_per_epoch_marker_table
201 .schedule_delete_all()?;
202 #[cfg(not(tidehunter))]
203 {
204 self.perpetual_tables
205 .object_per_epoch_marker_table_v2
206 .schedule_delete_all()?;
207 }
208 #[cfg(tidehunter)]
209 {
210 self.perpetual_tables
211 .object_per_epoch_marker_table_v2
212 .drop_cells_in_range_raw(&EpochMarkerKey::MIN_KEY, &EpochMarkerKey::MAX_KEY)?;
213 }
214 Ok(())
215 }
216
217 pub async fn open_with_committee_for_testing(
218 perpetual_tables: Arc<AuthorityPerpetualTables>,
219 committee: &Committee,
220 genesis: &Genesis,
221 ) -> SuiResult<Arc<Self>> {
222 assert_eq!(committee.epoch, 0);
225 Self::open_inner(genesis, perpetual_tables, true, &Registry::new()).await
226 }
227
228 async fn open_inner(
229 genesis: &Genesis,
230 perpetual_tables: Arc<AuthorityPerpetualTables>,
231 enable_epoch_sui_conservation_check: bool,
232 registry: &Registry,
233 ) -> SuiResult<Arc<Self>> {
234 let store = Arc::new(Self {
235 perpetual_tables,
236 root_state_notify_read: NotifyRead::<
237 EpochId,
238 (CheckpointSequenceNumber, GlobalStateHash),
239 >::new(),
240 enable_epoch_sui_conservation_check,
241 metrics: AuthorityStoreMetrics::new(registry),
242 });
243 if store
245 .database_is_empty()
246 .expect("Database read should not fail at init.")
247 {
248 store
249 .bulk_insert_genesis_objects(genesis.objects())
250 .expect("Cannot bulk insert genesis objects");
251
252 let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
254
255 store
256 .perpetual_tables
257 .transactions
258 .insert(transaction.digest(), transaction.serializable_ref())
259 .unwrap();
260
261 store
262 .perpetual_tables
263 .effects
264 .insert(&genesis.effects().digest(), genesis.effects())
265 .unwrap();
266 if genesis.effects().events_digest().is_some() {
270 store
271 .perpetual_tables
272 .events_2
273 .insert(transaction.digest(), genesis.events())
274 .unwrap();
275 }
276 }
277
278 Ok(store)
279 }
280
281 pub fn open_no_genesis(
285 perpetual_tables: Arc<AuthorityPerpetualTables>,
286 enable_epoch_sui_conservation_check: bool,
287 registry: &Registry,
288 ) -> SuiResult<Arc<Self>> {
289 let store = Arc::new(Self {
290 perpetual_tables,
291 root_state_notify_read: NotifyRead::<
292 EpochId,
293 (CheckpointSequenceNumber, GlobalStateHash),
294 >::new(),
295 enable_epoch_sui_conservation_check,
296 metrics: AuthorityStoreMetrics::new(registry),
297 });
298 Ok(store)
299 }
300
301 pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
302 self.perpetual_tables.get_recovery_epoch_at_restart()
303 }
304
305 pub fn get_effects(
306 &self,
307 effects_digest: &TransactionEffectsDigest,
308 ) -> SuiResult<Option<TransactionEffects>> {
309 Ok(self.perpetual_tables.effects.get(effects_digest)?)
310 }
311
312 pub fn effects_exists(&self, effects_digest: &TransactionEffectsDigest) -> SuiResult<bool> {
314 self.perpetual_tables
315 .effects
316 .contains_key(effects_digest)
317 .map_err(|e| e.into())
318 }
319
320 pub fn get_events(
321 &self,
322 digest: &TransactionDigest,
323 ) -> Result<Option<TransactionEvents>, TypedStoreError> {
324 self.perpetual_tables.events_2.get(digest)
325 }
326
327 pub fn multi_get_events(
328 &self,
329 event_digests: &[TransactionDigest],
330 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
331 Ok(event_digests
332 .iter()
333 .map(|digest| self.get_events(digest))
334 .collect::<Result<Vec<_>, _>>()?)
335 }
336
337 pub fn get_unchanged_loaded_runtime_objects(
338 &self,
339 digest: &TransactionDigest,
340 ) -> Result<Option<Vec<ObjectKey>>, TypedStoreError> {
341 self.perpetual_tables
342 .unchanged_loaded_runtime_objects
343 .get(digest)
344 }
345
346 pub fn multi_get_effects<'a>(
347 &self,
348 effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
349 ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
350 self.perpetual_tables.effects.multi_get(effects_digests)
351 }
352
353 pub fn get_executed_effects(
354 &self,
355 tx_digest: &TransactionDigest,
356 ) -> Result<Option<TransactionEffects>, TypedStoreError> {
357 let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
358 match effects_digest {
359 Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
360 None => Ok(None),
361 }
362 }
363
364 pub fn multi_get_executed_effects_digests(
367 &self,
368 digests: &[TransactionDigest],
369 ) -> Result<Vec<Option<TransactionEffectsDigest>>, TypedStoreError> {
370 self.perpetual_tables.executed_effects.multi_get(digests)
371 }
372
373 pub fn multi_get_executed_effects(
376 &self,
377 digests: &[TransactionDigest],
378 ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
379 let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
380 let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
381 let mut tx_to_effects_map = effects
382 .into_iter()
383 .flatten()
384 .map(|effects| (*effects.transaction_digest(), effects))
385 .collect::<HashMap<_, _>>();
386 Ok(digests
387 .iter()
388 .map(|digest| tx_to_effects_map.remove(digest))
389 .collect())
390 }
391
392 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> SuiResult<bool> {
393 Ok(self
394 .perpetual_tables
395 .executed_effects
396 .contains_key(digest)?)
397 }
398
399 pub fn get_marker_value(
400 &self,
401 object_key: FullObjectKey,
402 epoch_id: EpochId,
403 ) -> SuiResult<Option<MarkerValue>> {
404 Ok(self
405 .perpetual_tables
406 .object_per_epoch_marker_table_v2
407 .get(&EpochMarkerKey(epoch_id, object_key))?)
408 }
409
410 pub fn get_latest_marker(
411 &self,
412 object_id: FullObjectID,
413 epoch_id: EpochId,
414 ) -> SuiResult<Option<(SequenceNumber, MarkerValue)>> {
415 let min_key = EpochMarkerKey(epoch_id, FullObjectKey::min_for_id(&object_id));
416 let max_key = EpochMarkerKey(epoch_id, FullObjectKey::max_for_id(&object_id));
417
418 let marker_entry = self
419 .perpetual_tables
420 .object_per_epoch_marker_table_v2
421 .reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
422 .next();
423 match marker_entry {
424 Some(Ok((EpochMarkerKey(epoch, key), marker))) => {
425 assert_eq!(epoch, epoch_id);
427 assert_eq!(key.id(), object_id);
428 Ok(Some((key.version(), marker)))
429 }
430 Some(Err(e)) => Err(e.into()),
431 None => Ok(None),
432 }
433 }
434
435 pub async fn notify_read_root_state_hash(
438 &self,
439 epoch: EpochId,
440 ) -> SuiResult<(CheckpointSequenceNumber, GlobalStateHash)> {
441 let registration = self.root_state_notify_read.register_one(&epoch);
443 let hash = self.perpetual_tables.root_state_hash_by_epoch.get(&epoch)?;
444
445 let result = match hash {
446 Some(ready) => Either::Left(futures::future::ready(ready)),
448 None => Either::Right(registration),
449 }
450 .await;
451
452 Ok(result)
453 }
454
455 pub fn deprecated_insert_finalized_transactions(
457 &self,
458 digests: &[TransactionDigest],
459 epoch: EpochId,
460 sequence: CheckpointSequenceNumber,
461 ) -> SuiResult {
462 let mut batch = self
463 .perpetual_tables
464 .executed_transactions_to_checkpoint
465 .batch();
466 batch.insert_batch(
467 &self.perpetual_tables.executed_transactions_to_checkpoint,
468 digests.iter().map(|d| (*d, (epoch, sequence))),
469 )?;
470 batch.write()?;
471 trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
472 Ok(())
473 }
474
475 pub fn deprecated_get_transaction_checkpoint(
477 &self,
478 digest: &TransactionDigest,
479 ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
480 Ok(self
481 .perpetual_tables
482 .executed_transactions_to_checkpoint
483 .get(digest)?)
484 }
485
486 pub fn deprecated_multi_get_transaction_checkpoint(
488 &self,
489 digests: &[TransactionDigest],
490 ) -> SuiResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
491 Ok(self
492 .perpetual_tables
493 .executed_transactions_to_checkpoint
494 .multi_get(digests)?
495 .into_iter()
496 .collect())
497 }
498
499 pub fn database_is_empty(&self) -> SuiResult<bool> {
501 self.perpetual_tables.database_is_empty()
502 }
503
504 pub fn object_exists_by_key(
505 &self,
506 object_id: &ObjectID,
507 version: VersionNumber,
508 ) -> SuiResult<bool> {
509 Ok(self
510 .perpetual_tables
511 .objects
512 .contains_key(&ObjectKey(*object_id, version))?)
513 }
514
515 pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<bool>> {
516 Ok(self
517 .perpetual_tables
518 .objects
519 .multi_contains_keys(object_keys.to_vec())?
520 .into_iter()
521 .collect())
522 }
523
524 fn get_object_ref_prior_to_key(
525 &self,
526 object_id: &ObjectID,
527 version: VersionNumber,
528 ) -> Result<Option<ObjectRef>, SuiError> {
529 let Some(prior_version) = version.one_before() else {
530 return Ok(None);
531 };
532 let mut iterator = self
533 .perpetual_tables
534 .objects
535 .reversed_safe_iter_with_bounds(
536 Some(ObjectKey::min_for_id(object_id)),
537 Some(ObjectKey(*object_id, prior_version)),
538 )?;
539
540 if let Some((object_key, value)) = iterator.next().transpose()?
541 && object_key.0 == *object_id
542 {
543 return Ok(Some(
544 self.perpetual_tables.object_reference(&object_key, value)?,
545 ));
546 }
547 Ok(None)
548 }
549
550 pub fn multi_get_objects_by_key(
551 &self,
552 object_keys: &[ObjectKey],
553 ) -> Result<Vec<Option<Object>>, SuiError> {
554 let wrappers = self
555 .perpetual_tables
556 .objects
557 .multi_get(object_keys.to_vec())?;
558 let mut ret = vec![];
559
560 for (idx, w) in wrappers.into_iter().enumerate() {
561 ret.push(
562 w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
563 .transpose()?
564 .flatten(),
565 );
566 }
567 Ok(ret)
568 }
569
570 pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, SuiError> {
572 let mut result = Vec::new();
573 for id in objects {
574 result.push(self.get_object(id));
575 }
576 Ok(result)
577 }
578
579 pub(crate) fn insert_genesis_object(&self, object: Object) -> SuiResult {
584 debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
586 let object_ref = object.compute_object_reference();
587 self.insert_object_direct(object_ref, &object)
588 }
589
590 fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> SuiResult {
594 let mut write_batch = self.perpetual_tables.objects.batch();
595
596 let store_object = get_store_object(object.clone());
598 write_batch.insert_batch(
599 &self.perpetual_tables.objects,
600 std::iter::once((ObjectKey::from(object_ref), store_object)),
601 )?;
602
603 if object.get_single_owner().is_some() {
605 if !object.is_child_object() {
607 self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref], false)?;
608 }
609 }
610
611 write_batch.write()?;
612
613 Ok(())
614 }
615
616 #[instrument(level = "debug", skip_all)]
618 pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> SuiResult<()> {
619 let mut batch = self.perpetual_tables.objects.batch();
620 let ref_and_objects: Vec<_> = objects
621 .iter()
622 .map(|o| (o.compute_object_reference(), o))
623 .collect();
624
625 batch.insert_batch(
626 &self.perpetual_tables.objects,
627 ref_and_objects
628 .iter()
629 .map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))),
630 )?;
631
632 let non_child_object_refs: Vec<_> = ref_and_objects
633 .iter()
634 .filter(|(_, object)| !object.is_child_object())
635 .map(|(oref, _)| *oref)
636 .collect();
637
638 self.initialize_live_object_markers_impl(
639 &mut batch,
640 &non_child_object_refs,
641 false, )?;
643
644 batch.write()?;
645
646 Ok(())
647 }
648
649 pub async fn bulk_insert_live_objects(
650 perpetual_db: Arc<AuthorityPerpetualTables>,
651 objects: Vec<LiveObject>,
652 expected_sha3_digest: &[u8; 32],
653 num_parallel_chunks: usize,
654 ) -> SuiResult<()> {
655 let mut hasher = Sha3_256::default();
657 for object in &objects {
658 hasher.update(object.object_reference().2.inner());
659 }
660 let sha3_digest = hasher.finalize().digest;
661 if *expected_sha3_digest != sha3_digest {
662 error!(
663 "Sha does not match! expected: {:?}, actual: {:?}",
664 expected_sha3_digest, sha3_digest
665 );
666 return Err(SuiError::from("Sha does not match"));
667 }
668
669 let chunk_size = objects.len().div_ceil(num_parallel_chunks).max(1);
670 let mut remaining = objects;
671 let mut handles = Vec::new();
672 while !remaining.is_empty() {
673 let take = chunk_size.min(remaining.len());
674 let chunk: Vec<LiveObject> = remaining.drain(..take).collect();
675 let db = perpetual_db.clone();
676 handles.push(tokio::task::spawn_blocking(move || {
677 Self::insert_objects_chunk(db, chunk)
678 }));
679 }
680 for handle in handles {
681 handle.await.expect("insert task panicked")?;
682 }
683 Ok(())
684 }
685
686 fn insert_objects_chunk(
687 perpetual_db: Arc<AuthorityPerpetualTables>,
688 objects: Vec<LiveObject>,
689 ) -> SuiResult<()> {
690 let mut batch = perpetual_db.objects.batch();
691 let mut written = 0usize;
692 const MAX_BATCH_SIZE: usize = 100_000;
693 for object in objects {
694 match object {
695 LiveObject::Normal(object) => {
696 let store_object_wrapper = get_store_object(object.clone());
697 batch.insert_batch(
698 &perpetual_db.objects,
699 std::iter::once((
700 ObjectKey::from(object.compute_object_reference()),
701 store_object_wrapper,
702 )),
703 )?;
704 if !object.is_child_object() {
705 Self::initialize_live_object_markers(
706 &perpetual_db.live_owned_object_markers,
707 &mut batch,
708 &[object.compute_object_reference()],
709 true, )?;
711 }
712 }
713 LiveObject::Wrapped(object_key) => {
714 batch.insert_batch(
715 &perpetual_db.objects,
716 std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
717 object_key,
718 StoreObject::Wrapped.into(),
719 )),
720 )?;
721 }
722 }
723 written += 1;
724 if written > MAX_BATCH_SIZE {
725 batch.write()?;
726 batch = perpetual_db.objects.batch();
727 written = 0;
728 }
729 }
730 batch.write()?;
731 Ok(())
732 }
733
734 pub fn set_epoch_start_configuration(
735 &self,
736 epoch_start_configuration: &EpochStartConfiguration,
737 ) -> SuiResult {
738 self.perpetual_tables
739 .set_epoch_start_configuration(epoch_start_configuration)?;
740 Ok(())
741 }
742
743 pub fn get_epoch_start_configuration(&self) -> SuiResult<Option<EpochStartConfiguration>> {
744 Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
745 }
746
747 #[instrument(level = "debug", skip_all)]
752 pub fn build_db_batch(
753 &self,
754 epoch_id: EpochId,
755 tx_outputs: &[Arc<TransactionOutputs>],
756 ) -> SuiResult<DBBatch> {
757 let mut written = Vec::with_capacity(tx_outputs.len());
758 for outputs in tx_outputs {
759 written.extend(outputs.written.values().cloned());
760 }
761
762 let mut write_batch = self.perpetual_tables.transactions.batch();
763 for outputs in tx_outputs {
764 self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
765 }
766 fail_point!("crash");
768
769 trace!(
770 "built batch for committed transactions: {:?}",
771 tx_outputs
772 .iter()
773 .map(|tx| tx.transaction.digest())
774 .collect::<Vec<_>>()
775 );
776
777 fail_point!("crash");
779
780 Ok(write_batch)
781 }
782
783 fn write_one_transaction_outputs(
784 &self,
785 write_batch: &mut DBBatch,
786 epoch_id: EpochId,
787 tx_outputs: &TransactionOutputs,
788 ) -> SuiResult {
789 let TransactionOutputs {
790 transaction,
791 effects,
792 markers,
793 wrapped,
794 deleted,
795 written,
796 events,
797 unchanged_loaded_runtime_objects,
798 locks_to_delete,
799 new_locks_to_init,
800 ..
801 } = tx_outputs;
802
803 let effects_digest = effects.digest();
804 let transaction_digest = transaction.digest();
805 write_batch
808 .insert_batch(
809 &self.perpetual_tables.effects,
810 [(effects_digest, effects.clone())],
811 )?
812 .insert_batch(
813 &self.perpetual_tables.executed_effects,
814 [(transaction_digest, effects_digest)],
815 )?;
816
817 write_batch.insert_batch(
819 &self.perpetual_tables.transactions,
820 iter::once((transaction_digest, transaction.serializable_ref())),
821 )?;
822
823 write_batch.insert_batch(
824 &self.perpetual_tables.executed_transaction_digests,
825 [((epoch_id, *transaction_digest), ())],
826 )?;
827
828 write_batch.insert_batch(
830 &self.perpetual_tables.object_per_epoch_marker_table_v2,
831 markers
832 .iter()
833 .map(|(key, marker_value)| (EpochMarkerKey(epoch_id, *key), *marker_value)),
834 )?;
835 write_batch.insert_batch(
836 &self.perpetual_tables.objects,
837 deleted
838 .iter()
839 .map(|key| (key, StoreObject::Deleted))
840 .chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
841 .map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
842 )?;
843
844 let new_objects = written.iter().map(|(id, new_object)| {
846 let version = new_object.version();
847 trace!(?id, ?version, "writing object");
848 let store_object = get_store_object(new_object.clone());
849 (ObjectKey(*id, version), store_object)
850 });
851
852 write_batch.insert_batch(&self.perpetual_tables.objects, new_objects)?;
853
854 if effects.events_digest().is_some() {
856 write_batch.insert_batch(
857 &self.perpetual_tables.events_2,
858 [(transaction_digest, events)],
859 )?;
860 }
861
862 if !unchanged_loaded_runtime_objects.is_empty() {
864 write_batch.insert_batch(
865 &self.perpetual_tables.unchanged_loaded_runtime_objects,
866 [(transaction_digest, unchanged_loaded_runtime_objects)],
867 )?;
868 }
869
870 self.initialize_live_object_markers_impl(write_batch, new_locks_to_init, false)?;
871
872 self.delete_live_object_markers(write_batch, locks_to_delete)?;
875
876 debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
877
878 Ok(())
879 }
880
881 pub(crate) fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> SuiResult {
884 let mut batch = self.perpetual_tables.transactions.batch();
885 batch.insert_batch(
886 &self.perpetual_tables.transactions,
887 [(tx.digest(), tx.clone().into_unsigned().serializable_ref())],
888 )?;
889 batch.write()?;
890 Ok(())
891 }
892
893 pub(crate) fn get_lock(
896 &self,
897 obj_ref: ObjectRef,
898 epoch_store: &AuthorityPerEpochStore,
899 ) -> SuiLockResult {
900 if self
901 .perpetual_tables
902 .live_owned_object_markers
903 .get(&obj_ref)?
904 .is_none()
905 {
906 return Ok(ObjectLockStatus::LockedAtDifferentVersion {
907 locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
908 });
909 }
910
911 let tables = epoch_store.tables()?;
912 let epoch_id = epoch_store.epoch();
913
914 if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
915 Ok(ObjectLockStatus::LockedToTx {
916 locked_by_tx: LockDetailsDeprecated {
917 epoch: epoch_id,
918 tx_digest,
919 },
920 })
921 } else {
922 Ok(ObjectLockStatus::Initialized)
923 }
924 }
925
926 pub(crate) fn get_latest_live_version_for_object_id(
928 &self,
929 object_id: ObjectID,
930 ) -> SuiResult<ObjectRef> {
931 let mut iterator = self
932 .perpetual_tables
933 .live_owned_object_markers
934 .reversed_safe_iter_with_bounds(
935 None,
936 Some((object_id, SequenceNumber::MAX, ObjectDigest::MAX)),
937 )?;
938 Ok(iterator
939 .next()
940 .transpose()?
941 .and_then(|value| {
942 if value.0.0 == object_id {
943 Some(value)
944 } else {
945 None
946 }
947 })
948 .ok_or_else(|| {
949 SuiError::from(UserInputError::ObjectNotFound {
950 object_id,
951 version: None,
952 })
953 })?
954 .0)
955 }
956
957 pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> SuiResult {
962 let locks = self
963 .perpetual_tables
964 .live_owned_object_markers
965 .multi_get(objects)?;
966 for (lock, obj_ref) in locks.into_iter().zip(objects) {
967 if lock.is_none() {
968 let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
969 fp_bail!(
970 UserInputError::ObjectVersionUnavailableForConsumption {
971 provided_obj_ref: *obj_ref,
972 current_version: latest_lock.1
973 }
974 .into()
975 );
976 }
977 }
978 Ok(())
979 }
980
981 fn initialize_live_object_markers_impl(
984 &self,
985 write_batch: &mut DBBatch,
986 objects: &[ObjectRef],
987 is_force_reset: bool,
988 ) -> SuiResult {
989 AuthorityStore::initialize_live_object_markers(
990 &self.perpetual_tables.live_owned_object_markers,
991 write_batch,
992 objects,
993 is_force_reset,
994 )
995 }
996
997 pub fn initialize_live_object_markers(
998 live_object_marker_table: &DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
999 write_batch: &mut DBBatch,
1000 objects: &[ObjectRef],
1001 is_force_reset: bool,
1002 ) -> SuiResult {
1003 trace!(?objects, "initialize_locks");
1004
1005 if !is_force_reset {
1006 let live_object_markers = live_object_marker_table.multi_get(objects)?;
1007 let existing_live_object_markers: Vec<ObjectRef> = live_object_markers
1011 .iter()
1012 .zip(objects)
1013 .filter_map(|(lock_opt, objref)| {
1014 lock_opt.clone().flatten().map(|_tx_digest| *objref)
1015 })
1016 .collect();
1017 if !existing_live_object_markers.is_empty() {
1018 info!(
1019 ?existing_live_object_markers,
1020 "Cannot initialize live_object_markers because some exist already"
1021 );
1022 return Err(SuiErrorKind::ObjectLockAlreadyInitialized {
1023 refs: existing_live_object_markers,
1024 }
1025 .into());
1026 }
1027 }
1028
1029 write_batch.insert_batch(
1030 live_object_marker_table,
1031 objects.iter().map(|obj_ref| (obj_ref, None)),
1032 )?;
1033 Ok(())
1034 }
1035
1036 fn delete_live_object_markers(
1038 &self,
1039 write_batch: &mut DBBatch,
1040 objects: &[ObjectRef],
1041 ) -> SuiResult {
1042 trace!(?objects, "delete_locks");
1043 write_batch.delete_batch(
1044 &self.perpetual_tables.live_owned_object_markers,
1045 objects.iter(),
1046 )?;
1047 Ok(())
1048 }
1049
1050 pub fn find_object_lt_or_eq_version(
1055 &self,
1056 object_id: ObjectID,
1057 version: SequenceNumber,
1058 ) -> SuiResult<Option<Object>> {
1059 self.perpetual_tables
1060 .find_object_lt_or_eq_version(object_id, version)
1061 }
1062
1063 pub fn get_latest_object_ref_or_tombstone(
1073 &self,
1074 object_id: ObjectID,
1075 ) -> Result<Option<ObjectRef>, SuiError> {
1076 self.perpetual_tables
1077 .get_latest_object_ref_or_tombstone(object_id)
1078 }
1079
1080 pub fn get_latest_object_ref_if_alive(
1083 &self,
1084 object_id: ObjectID,
1085 ) -> Result<Option<ObjectRef>, SuiError> {
1086 match self.get_latest_object_ref_or_tombstone(object_id)? {
1087 Some(objref) if objref.2.is_alive() => Ok(Some(objref)),
1088 _ => Ok(None),
1089 }
1090 }
1091
1092 pub fn get_latest_object_or_tombstone(
1096 &self,
1097 object_id: ObjectID,
1098 ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, SuiError> {
1099 let Some((object_key, store_object)) = self
1100 .perpetual_tables
1101 .get_latest_object_or_tombstone(object_id)?
1102 else {
1103 return Ok(None);
1104 };
1105
1106 if let Some(object_ref) = self
1107 .perpetual_tables
1108 .tombstone_reference(&object_key, &store_object)?
1109 {
1110 return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
1111 }
1112
1113 let object = self
1114 .perpetual_tables
1115 .object(&object_key, store_object)?
1116 .expect("Non tombstone store object could not be converted to object");
1117
1118 Ok(Some((object_key, ObjectOrTombstone::Object(object))))
1119 }
1120
1121 pub fn insert_transaction_and_effects(
1122 &self,
1123 transaction: &VerifiedTransaction,
1124 transaction_effects: &TransactionEffects,
1125 ) -> Result<(), TypedStoreError> {
1126 let mut write_batch = self.perpetual_tables.transactions.batch();
1127 write_batch
1130 .insert_batch(
1131 &self.perpetual_tables.effects,
1132 [(transaction_effects.digest(), transaction_effects)],
1133 )?
1134 .insert_batch(
1135 &self.perpetual_tables.transactions,
1136 [(transaction.digest(), transaction.serializable_ref())],
1137 )?;
1138
1139 write_batch.write()?;
1140 Ok(())
1141 }
1142
1143 pub fn multi_insert_transaction_and_effects<'a>(
1144 &self,
1145 transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
1146 ) -> Result<(), TypedStoreError> {
1147 let mut write_batch = self.perpetual_tables.transactions.batch();
1148 for tx in transactions {
1149 write_batch
1150 .insert_batch(
1151 &self.perpetual_tables.effects,
1152 [(tx.effects.digest(), &tx.effects)],
1153 )?
1154 .insert_batch(
1155 &self.perpetual_tables.transactions,
1156 [(tx.transaction.digest(), tx.transaction.serializable_ref())],
1157 )?;
1158 }
1159
1160 write_batch.write()?;
1161 Ok(())
1162 }
1163
1164 pub fn multi_get_transaction_blocks(
1165 &self,
1166 tx_digests: &[TransactionDigest],
1167 ) -> Result<Vec<Option<VerifiedTransaction>>, TypedStoreError> {
1168 self.perpetual_tables
1169 .transactions
1170 .multi_get(tx_digests)
1171 .map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())
1172 }
1173
1174 pub fn get_transaction_block(
1175 &self,
1176 tx_digest: &TransactionDigest,
1177 ) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
1178 self.perpetual_tables
1179 .transactions
1180 .get(tx_digest)
1181 .map(|v| v.map(|v| v.into()))
1182 }
1183
1184 pub fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1191 get_sui_system_state(self.perpetual_tables.as_ref())
1192 }
1193
1194 pub fn expensive_check_sui_conservation<T>(
1195 self: &Arc<Self>,
1196 type_layout_store: T,
1197 old_epoch_store: &AuthorityPerEpochStore,
1198 ) -> SuiResult
1199 where
1200 T: TypeLayoutStore + Send + Copy,
1201 {
1202 if !self.enable_epoch_sui_conservation_check {
1203 return Ok(());
1204 }
1205
1206 let executor = old_epoch_store.executor();
1207 info!("Starting SUI conservation check. This may take a while..");
1208 let cur_time = Instant::now();
1209 let mut pending_objects = vec![];
1210 let mut count = 0;
1211 let mut size = 0;
1212 let (mut total_sui, mut total_storage_rebate) = thread::scope(|s| {
1213 let pending_tasks = FuturesUnordered::new();
1214 for o in self.iter_live_object_set(false) {
1215 match o {
1216 LiveObject::Normal(object) => {
1217 size += object.object_size_for_gas_metering();
1218 count += 1;
1219 pending_objects.push(object);
1220 if count % 1_000_000 == 0 {
1221 let mut task_objects = vec![];
1222 mem::swap(&mut pending_objects, &mut task_objects);
1223 pending_tasks.push(s.spawn(move || {
1224 let mut layout_resolver =
1225 executor.type_layout_resolver(Box::new(type_layout_store));
1226 let mut total_storage_rebate = 0;
1227 let mut total_sui = 0;
1228 for object in task_objects {
1229 total_storage_rebate += object.storage_rebate;
1230 let object_contained_sui = match object
1233 .get_total_sui(layout_resolver.as_mut())
1234 {
1235 Ok(sui) => sui,
1236 Err(e)
1237 if old_epoch_store.get_chain()
1238 == sui_protocol_config::Chain::Testnet =>
1239 {
1240 error!(
1241 "Error calculating total SUI for object {:?}: {:?}",
1242 object.compute_object_reference(),
1243 e
1244 );
1245 0
1246 }
1247 Err(e) => panic!(
1248 "Error calculating total SUI for object {:?}: {:?}",
1249 object.compute_object_reference(),
1250 e
1251 ),
1252 };
1253 total_sui += object_contained_sui - object.storage_rebate;
1254 }
1255 if count % 50_000_000 == 0 {
1256 info!("Processed {} objects", count);
1257 }
1258 (total_sui, total_storage_rebate)
1259 }));
1260 }
1261 }
1262 LiveObject::Wrapped(_) => {
1263 unreachable!("Explicitly asked to not include wrapped tombstones")
1264 }
1265 }
1266 }
1267 pending_tasks.into_iter().fold((0, 0), |init, result| {
1268 let result = result.join().unwrap();
1269 (init.0 + result.0, init.1 + result.1)
1270 })
1271 });
1272 let mut layout_resolver = executor.type_layout_resolver(Box::new(type_layout_store));
1273 for object in pending_objects {
1274 total_storage_rebate += object.storage_rebate;
1275 total_sui +=
1276 object.get_total_sui(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
1277 }
1278 info!(
1279 "Scanned {} live objects, took {:?}",
1280 count,
1281 cur_time.elapsed()
1282 );
1283 self.metrics
1284 .sui_conservation_live_object_count
1285 .set(count as i64);
1286 self.metrics
1287 .sui_conservation_live_object_size
1288 .set(size as i64);
1289 self.metrics
1290 .sui_conservation_check_latency
1291 .set(cur_time.elapsed().as_secs() as i64);
1292
1293 let system_state = self
1295 .get_sui_system_state_object_unsafe()
1296 .expect("Reading sui system state object cannot fail")
1297 .into_sui_system_state_summary();
1298 let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
1299 info!(
1300 "Total SUI amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
1301 total_sui, storage_fund_balance, total_storage_rebate, system_state.epoch
1302 );
1303
1304 let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
1305 self.metrics
1306 .sui_conservation_storage_fund
1307 .set(storage_fund_balance as i64);
1308 self.metrics
1309 .sui_conservation_storage_fund_imbalance
1310 .set(imbalance);
1311 self.metrics
1312 .sui_conservation_imbalance
1313 .set((total_sui as i128 - TOTAL_SUPPLY_MIST as i128) as i64);
1314
1315 if let Some(expected_imbalance) = self
1316 .perpetual_tables
1317 .expected_storage_fund_imbalance
1318 .get(&())
1319 .expect("DB read cannot fail")
1320 {
1321 fp_ensure!(
1322 imbalance == expected_imbalance,
1323 SuiError::from(
1324 format!(
1325 "Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
1326 system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
1327 ).as_str()
1328 )
1329 );
1330 } else {
1331 self.perpetual_tables
1332 .expected_storage_fund_imbalance
1333 .insert(&(), &imbalance)
1334 .expect("DB write cannot fail");
1335 }
1336
1337 if let Some(expected_sui) = self
1338 .perpetual_tables
1339 .expected_network_sui_amount
1340 .get(&())
1341 .expect("DB read cannot fail")
1342 {
1343 fp_ensure!(
1344 total_sui == expected_sui,
1345 SuiError::from(
1346 format!(
1347 "Inconsistent state detected at epoch {}: total sui: {}, expecting {}",
1348 system_state.epoch, total_sui, expected_sui
1349 )
1350 .as_str()
1351 )
1352 );
1353 } else {
1354 self.perpetual_tables
1355 .expected_network_sui_amount
1356 .insert(&(), &total_sui)
1357 .expect("DB write cannot fail");
1358 }
1359
1360 Ok(())
1361 }
1362
1363 #[instrument(level = "error", skip_all)]
1366 pub fn maybe_reaccumulate_state_hash(
1367 &self,
1368 cur_epoch_store: &AuthorityPerEpochStore,
1369 new_protocol_version: ProtocolVersion,
1370 ) {
1371 let old_simplified_unwrap_then_delete = cur_epoch_store
1372 .protocol_config()
1373 .simplified_unwrap_then_delete();
1374 let new_simplified_unwrap_then_delete =
1375 ProtocolConfig::get_for_version(new_protocol_version, cur_epoch_store.get_chain())
1376 .simplified_unwrap_then_delete();
1377 let should_reaccumulate =
1380 !old_simplified_unwrap_then_delete && new_simplified_unwrap_then_delete;
1381 if !should_reaccumulate {
1382 return;
1383 }
1384 info!(
1385 "[Re-accumulate] simplified_unwrap_then_delete is enabled in the new protocol version, re-accumulating state hash"
1386 );
1387 let cur_time = Instant::now();
1388 std::thread::scope(|s| {
1389 let pending_tasks = FuturesUnordered::new();
1390 const BITS: u8 = 5;
1399 for index in 0u8..(1 << BITS) {
1400 pending_tasks.push(s.spawn(move || {
1401 let mut id_bytes = [0; ObjectID::LENGTH];
1402 id_bytes[0] = index << (8 - BITS);
1403 let start_id = ObjectID::new(id_bytes);
1404
1405 id_bytes[0] |= (1 << (8 - BITS)) - 1;
1406 for element in id_bytes.iter_mut().skip(1) {
1407 *element = u8::MAX;
1408 }
1409 let end_id = ObjectID::new(id_bytes);
1410
1411 info!(
1412 "[Re-accumulate] Scanning object ID range {:?}..{:?}",
1413 start_id, end_id
1414 );
1415 let mut prev = (
1416 ObjectKey::min_for_id(&ObjectID::ZERO),
1417 StoreObjectWrapper::V1(StoreObject::Deleted),
1418 );
1419 let mut object_scanned: u64 = 0;
1420 let mut wrapped_objects_to_remove = vec![];
1421 for db_result in self.perpetual_tables.objects.safe_range_iter(
1422 ObjectKey::min_for_id(&start_id)..=ObjectKey::max_for_id(&end_id),
1423 ) {
1424 match db_result {
1425 Ok((object_key, object)) => {
1426 object_scanned += 1;
1427 if object_scanned.is_multiple_of(100000) {
1428 info!(
1429 "[Re-accumulate] Task {}: object scanned: {}",
1430 index, object_scanned,
1431 );
1432 }
1433 if matches!(prev.1.inner(), StoreObject::Wrapped)
1434 && object_key.0 != prev.0.0
1435 {
1436 wrapped_objects_to_remove
1437 .push(WrappedObject::new(prev.0.0, prev.0.1));
1438 }
1439
1440 prev = (object_key, object);
1441 }
1442 Err(err) => {
1443 warn!("Object iterator encounter RocksDB error {:?}", err);
1444 return Err(err);
1445 }
1446 }
1447 }
1448 if matches!(prev.1.inner(), StoreObject::Wrapped) {
1449 wrapped_objects_to_remove.push(WrappedObject::new(prev.0.0, prev.0.1));
1450 }
1451 info!(
1452 "[Re-accumulate] Task {}: object scanned: {}, wrapped objects: {}",
1453 index,
1454 object_scanned,
1455 wrapped_objects_to_remove.len(),
1456 );
1457 Ok((wrapped_objects_to_remove, object_scanned))
1458 }));
1459 }
1460 let (last_checkpoint_of_epoch, cur_accumulator) = self
1461 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
1462 .expect("read cannot fail")
1463 .expect("accumulator must exist");
1464 let (accumulator, total_objects_scanned, total_wrapped_objects) =
1465 pending_tasks.into_iter().fold(
1466 (cur_accumulator, 0u64, 0usize),
1467 |(mut accumulator, total_objects_scanned, total_wrapped_objects), task| {
1468 let (wrapped_objects_to_remove, object_scanned) =
1469 task.join().unwrap().unwrap();
1470 accumulator.remove_all(
1471 wrapped_objects_to_remove
1472 .iter()
1473 .map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
1474 .collect::<Vec<Vec<u8>>>(),
1475 );
1476 (
1477 accumulator,
1478 total_objects_scanned + object_scanned,
1479 total_wrapped_objects + wrapped_objects_to_remove.len(),
1480 )
1481 },
1482 );
1483 info!(
1484 "[Re-accumulate] Total objects scanned: {}, total wrapped objects: {}",
1485 total_objects_scanned, total_wrapped_objects,
1486 );
1487 info!(
1488 "[Re-accumulate] New accumulator value: {:?}",
1489 accumulator.digest()
1490 );
1491 self.insert_state_hash_for_epoch(
1492 cur_epoch_store.epoch(),
1493 &last_checkpoint_of_epoch,
1494 &accumulator,
1495 )
1496 .unwrap();
1497 });
1498 info!(
1499 "[Re-accumulate] Re-accumulating took {}seconds",
1500 cur_time.elapsed().as_secs()
1501 );
1502 }
1503
1504 pub async fn prune_objects_and_compact_for_testing(
1505 &self,
1506 checkpoint_store: &Arc<CheckpointStore>,
1507 rpc_index: Option<&RpcIndexStore>,
1508 ) {
1509 let pruning_config = AuthorityStorePruningConfig {
1510 num_epochs_to_retain: 0,
1511 ..Default::default()
1512 };
1513 let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
1514 &self.perpetual_tables,
1515 checkpoint_store,
1516 rpc_index,
1517 pruning_config,
1518 AuthorityStorePruningMetrics::new_for_test(),
1519 EPOCH_DURATION_MS_FOR_TESTING,
1520 )
1521 .await;
1522 let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
1523 }
1524
1525 pub fn remove_executed_effects_for_testing(
1526 &self,
1527 tx_digest: &TransactionDigest,
1528 ) -> anyhow::Result<()> {
1529 let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
1530 if let Some(effects_digest) = effects_digest {
1531 self.perpetual_tables.executed_effects.remove(tx_digest)?;
1532 self.perpetual_tables.effects.remove(&effects_digest)?;
1533 }
1534 Ok(())
1535 }
1536
1537 #[cfg(test)]
1538 pub async fn prune_objects_immediately_for_testing(
1539 &self,
1540 transaction_effects: Vec<TransactionEffects>,
1541 ) -> anyhow::Result<()> {
1542 let mut wb = self.perpetual_tables.objects.batch();
1543
1544 let mut object_keys_to_prune = vec![];
1545 for effects in &transaction_effects {
1546 for (object_id, seq_number) in effects.modified_at_versions() {
1547 info!("Pruning object {:?} version {:?}", object_id, seq_number);
1548 object_keys_to_prune.push(ObjectKey(object_id, seq_number));
1549 }
1550 }
1551
1552 wb.delete_batch(
1553 &self.perpetual_tables.objects,
1554 object_keys_to_prune.into_iter(),
1555 )?;
1556 wb.write()?;
1557 Ok(())
1558 }
1559
1560 #[cfg(msim)]
1562 pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
1563 self.perpetual_tables
1564 .objects
1565 .safe_iter_with_bounds(
1566 Some(ObjectKey(object_id, VersionNumber::MIN)),
1567 Some(ObjectKey(object_id, VersionNumber::MAX)),
1568 )
1569 .collect::<Result<Vec<_>, _>>()
1570 .unwrap()
1571 .len()
1572 }
1573}
1574
1575impl GlobalStateHashStore for AuthorityStore {
1576 fn get_object_ref_prior_to_key_deprecated(
1577 &self,
1578 object_id: &ObjectID,
1579 version: VersionNumber,
1580 ) -> SuiResult<Option<ObjectRef>> {
1581 self.get_object_ref_prior_to_key(object_id, version)
1582 }
1583
1584 fn get_root_state_hash_for_epoch(
1585 &self,
1586 epoch: EpochId,
1587 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
1588 self.perpetual_tables
1589 .root_state_hash_by_epoch
1590 .get(&epoch)
1591 .map_err(Into::into)
1592 }
1593
1594 fn get_root_state_hash_for_highest_epoch(
1595 &self,
1596 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
1597 Ok(self
1598 .perpetual_tables
1599 .root_state_hash_by_epoch
1600 .reversed_safe_iter_with_bounds(None, None)?
1601 .next()
1602 .transpose()?)
1603 }
1604
1605 fn insert_state_hash_for_epoch(
1606 &self,
1607 epoch: EpochId,
1608 last_checkpoint_of_epoch: &CheckpointSequenceNumber,
1609 acc: &GlobalStateHash,
1610 ) -> SuiResult {
1611 self.perpetual_tables
1612 .root_state_hash_by_epoch
1613 .insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
1614 self.root_state_notify_read
1615 .notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
1616
1617 Ok(())
1618 }
1619
1620 fn iter_live_object_set(
1621 &self,
1622 include_wrapped_object: bool,
1623 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1624 Box::new(
1625 self.perpetual_tables
1626 .iter_live_object_set(include_wrapped_object),
1627 )
1628 }
1629}
1630
1631impl ObjectStore for AuthorityStore {
1632 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
1634 self.perpetual_tables.as_ref().get_object(object_id)
1635 }
1636
1637 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
1638 self.perpetual_tables.get_object_by_key(object_id, version)
1639 }
1640}
1641
1642pub struct ResolverWrapper {
1644 pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1645 pub metrics: Arc<ResolverMetrics>,
1646}
1647
1648impl ResolverWrapper {
1649 pub fn new(
1650 resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1651 metrics: Arc<ResolverMetrics>,
1652 ) -> Self {
1653 metrics.module_cache_size.set(0);
1654 ResolverWrapper { resolver, metrics }
1655 }
1656
1657 fn inc_cache_size_gauge(&self) {
1658 let current = self.metrics.module_cache_size.get();
1660 self.metrics.module_cache_size.set(current + 1);
1661 }
1662}
1663
1664impl ModuleResolver for ResolverWrapper {
1665 type Error = SuiError;
1666 fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
1667 self.inc_cache_size_gauge();
1668 get_module(&*self.resolver, module_id)
1669 }
1670
1671 fn get_packages_static<const N: usize>(
1672 &self,
1673 ids: [AccountAddress; N],
1674 ) -> Result<[Option<SerializedPackage>; N], Self::Error> {
1675 let mut packages = [const { None }; N];
1676 for (i, id) in ids.iter().enumerate() {
1677 packages[i] = get_package(&*self.resolver, &ObjectID::from(*id))?;
1678 }
1679 Ok(packages)
1680 }
1681
1682 fn get_packages<'a>(
1683 &self,
1684 ids: impl ExactSizeIterator<Item = &'a AccountAddress>,
1685 ) -> Result<Vec<Option<SerializedPackage>>, Self::Error> {
1686 ids.map(|id| get_package(&*self.resolver, &ObjectID::from(*id)))
1687 .collect()
1688 }
1689}
1690
1691pub enum UpdateType {
1692 Transaction(TransactionEffectsDigest),
1693 Genesis,
1694}
1695
1696pub type SuiLockResult = SuiResult<ObjectLockStatus>;
1697
1698#[derive(Debug, PartialEq, Eq)]
1699pub enum ObjectLockStatus {
1700 Initialized,
1701 LockedToTx { locked_by_tx: LockDetailsDeprecated },
1702 LockedAtDifferentVersion { locked_ref: ObjectRef },
1703}
1704
1705#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1706pub enum LockDetailsWrapperDeprecated {
1707 V1(LockDetailsV1Deprecated),
1708}
1709
1710impl LockDetailsWrapperDeprecated {
1711 pub fn migrate(self) -> Self {
1712 self
1715 }
1716
1717 pub fn inner(&self) -> &LockDetailsDeprecated {
1720 match self {
1721 Self::V1(v1) => v1,
1722
1723 #[allow(unreachable_patterns)]
1725 _ => panic!("lock details should have been migrated to latest version at read time"),
1726 }
1727 }
1728 pub fn into_inner(self) -> LockDetailsDeprecated {
1729 match self {
1730 Self::V1(v1) => v1,
1731
1732 #[allow(unreachable_patterns)]
1734 _ => panic!("lock details should have been migrated to latest version at read time"),
1735 }
1736 }
1737}
1738
1739#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1740pub struct LockDetailsV1Deprecated {
1741 pub epoch: EpochId,
1742 pub tx_digest: TransactionDigest,
1743}
1744
1745pub type LockDetailsDeprecated = LockDetailsV1Deprecated;
1746
1747impl From<LockDetailsDeprecated> for LockDetailsWrapperDeprecated {
1748 fn from(details: LockDetailsDeprecated) -> Self {
1749 LockDetailsWrapperDeprecated::V1(details)
1751 }
1752}