1use std::sync::Arc;
5use std::{iter, mem, thread};
6
7use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
8use crate::authority::authority_store_pruner::{
9 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
10};
11use crate::authority::authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object};
12use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
13use crate::global_state_hasher::GlobalStateHashStore;
14use crate::rpc_index::RpcIndexStore;
15use crate::transaction_outputs::TransactionOutputs;
16use either::Either;
17use fastcrypto::hash::{HashFunction, MultisetHash, Sha3_256};
18use futures::stream::FuturesUnordered;
19use itertools::izip;
20use move_core_types::resolver::ModuleResolver;
21use serde::{Deserialize, Serialize};
22use sui_config::node::AuthorityStorePruningConfig;
23use sui_macros::fail_point_arg;
24use sui_storage::mutex_table::{MutexGuard, MutexTable};
25use sui_types::error::{SuiErrorKind, UserInputError};
26use sui_types::execution::TypeLayoutStore;
27use sui_types::global_state_hash::GlobalStateHash;
28use sui_types::message_envelope::Message;
29use sui_types::storage::{
30 BackingPackageStore, FullObjectKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore,
31 get_module,
32};
33use sui_types::sui_system_state::get_sui_system_state;
34use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure};
35use tokio::time::Instant;
36use tracing::{debug, info, trace};
37use typed_store::traits::Map;
38use typed_store::{
39 TypedStoreError,
40 rocks::{DBBatch, DBMap},
41};
42
43use super::authority_store_tables::LiveObject;
44use super::{authority_store_tables::AuthorityPerpetualTables, *};
45use mysten_common::sync::notify_read::NotifyRead;
46use sui_types::effects::{TransactionEffects, TransactionEvents};
47use sui_types::gas_coin::TOTAL_SUPPLY_MIST;
48
49const NUM_SHARDS: usize = 4096;
50
51struct AuthorityStoreMetrics {
52 sui_conservation_check_latency: IntGauge,
53 sui_conservation_live_object_count: IntGauge,
54 sui_conservation_live_object_size: IntGauge,
55 sui_conservation_imbalance: IntGauge,
56 sui_conservation_storage_fund: IntGauge,
57 sui_conservation_storage_fund_imbalance: IntGauge,
58 epoch_flags: IntGaugeVec,
59}
60
61impl AuthorityStoreMetrics {
62 pub fn new(registry: &Registry) -> Self {
63 Self {
64 sui_conservation_check_latency: register_int_gauge_with_registry!(
65 "sui_conservation_check_latency",
66 "Number of seconds took to scan all live objects in the store for SUI conservation check",
67 registry,
68 ).unwrap(),
69 sui_conservation_live_object_count: register_int_gauge_with_registry!(
70 "sui_conservation_live_object_count",
71 "Number of live objects in the store",
72 registry,
73 ).unwrap(),
74 sui_conservation_live_object_size: register_int_gauge_with_registry!(
75 "sui_conservation_live_object_size",
76 "Size in bytes of live objects in the store",
77 registry,
78 ).unwrap(),
79 sui_conservation_imbalance: register_int_gauge_with_registry!(
80 "sui_conservation_imbalance",
81 "Total amount of SUI in the network - 10B * 10^9. This delta shows the amount of imbalance",
82 registry,
83 ).unwrap(),
84 sui_conservation_storage_fund: register_int_gauge_with_registry!(
85 "sui_conservation_storage_fund",
86 "Storage Fund pool balance (only includes the storage fund proper that represents object storage)",
87 registry,
88 ).unwrap(),
89 sui_conservation_storage_fund_imbalance: register_int_gauge_with_registry!(
90 "sui_conservation_storage_fund_imbalance",
91 "Imbalance of storage fund, computed with storage_fund_balance - total_object_storage_rebates",
92 registry,
93 ).unwrap(),
94 epoch_flags: register_int_gauge_vec_with_registry!(
95 "epoch_flags",
96 "Local flags of the currently running epoch",
97 &["flag"],
98 registry,
99 ).unwrap(),
100 }
101 }
102}
103
104pub struct AuthorityStore {
111 mutex_table: MutexTable<ObjectDigest>,
113
114 pub(crate) perpetual_tables: Arc<AuthorityPerpetualTables>,
115
116 pub(crate) root_state_notify_read:
117 NotifyRead<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
118
119 enable_epoch_sui_conservation_check: bool,
121
122 metrics: AuthorityStoreMetrics,
123}
124
125pub type ExecutionLockReadGuard<'a> = tokio::sync::RwLockReadGuard<'a, EpochId>;
126pub type ExecutionLockWriteGuard<'a> = tokio::sync::RwLockWriteGuard<'a, EpochId>;
127
128impl AuthorityStore {
129 pub async fn open(
132 perpetual_tables: Arc<AuthorityPerpetualTables>,
133 genesis: &Genesis,
134 config: &NodeConfig,
135 registry: &Registry,
136 ) -> SuiResult<Arc<Self>> {
137 let enable_epoch_sui_conservation_check = config
138 .expensive_safety_check_config
139 .enable_epoch_sui_conservation_check();
140
141 let epoch_start_configuration = if perpetual_tables.database_is_empty()? {
142 info!("Creating new epoch start config from genesis");
143
144 #[allow(unused_mut)]
145 let mut initial_epoch_flags = EpochFlag::default_flags_for_new_epoch(config);
146 fail_point_arg!("initial_epoch_flags", |flags: Vec<EpochFlag>| {
147 info!("Setting initial epoch flags to {:?}", flags);
148 initial_epoch_flags = flags;
149 });
150
151 let epoch_start_configuration = EpochStartConfiguration::new(
152 genesis.sui_system_object().into_epoch_start_state(),
153 *genesis.checkpoint().digest(),
154 &genesis.objects(),
155 initial_epoch_flags,
156 )?;
157 perpetual_tables.set_epoch_start_configuration(&epoch_start_configuration)?;
158 epoch_start_configuration
159 } else {
160 info!("Loading epoch start config from DB");
161 perpetual_tables
162 .epoch_start_configuration
163 .get(&())?
164 .expect("Epoch start configuration must be set in non-empty DB")
165 };
166 let cur_epoch = perpetual_tables.get_recovery_epoch_at_restart()?;
167 info!("Epoch start config: {:?}", epoch_start_configuration);
168 info!("Cur epoch: {:?}", cur_epoch);
169 let this = Self::open_inner(
170 genesis,
171 perpetual_tables,
172 enable_epoch_sui_conservation_check,
173 registry,
174 )
175 .await?;
176 this.update_epoch_flags_metrics(&[], epoch_start_configuration.flags());
177 Ok(this)
178 }
179
180 pub fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
181 for flag in old {
182 self.metrics
183 .epoch_flags
184 .with_label_values(&[&flag.to_string()])
185 .set(0);
186 }
187 for flag in new {
188 self.metrics
189 .epoch_flags
190 .with_label_values(&[&flag.to_string()])
191 .set(1);
192 }
193 }
194
195 pub fn clear_object_per_epoch_marker_table(
198 &self,
199 _execution_guard: &ExecutionLockWriteGuard<'_>,
200 ) -> SuiResult<()> {
201 self.perpetual_tables
205 .object_per_epoch_marker_table
206 .schedule_delete_all()?;
207 Ok(self
208 .perpetual_tables
209 .object_per_epoch_marker_table_v2
210 .schedule_delete_all()?)
211 }
212
213 pub async fn open_with_committee_for_testing(
214 perpetual_tables: Arc<AuthorityPerpetualTables>,
215 committee: &Committee,
216 genesis: &Genesis,
217 ) -> SuiResult<Arc<Self>> {
218 assert_eq!(committee.epoch, 0);
221 Self::open_inner(genesis, perpetual_tables, true, &Registry::new()).await
222 }
223
224 async fn open_inner(
225 genesis: &Genesis,
226 perpetual_tables: Arc<AuthorityPerpetualTables>,
227 enable_epoch_sui_conservation_check: bool,
228 registry: &Registry,
229 ) -> SuiResult<Arc<Self>> {
230 let store = Arc::new(Self {
231 mutex_table: MutexTable::new(NUM_SHARDS),
232 perpetual_tables,
233 root_state_notify_read: NotifyRead::<
234 EpochId,
235 (CheckpointSequenceNumber, GlobalStateHash),
236 >::new(),
237 enable_epoch_sui_conservation_check,
238 metrics: AuthorityStoreMetrics::new(registry),
239 });
240 if store
242 .database_is_empty()
243 .expect("Database read should not fail at init.")
244 {
245 store
246 .bulk_insert_genesis_objects(genesis.objects())
247 .expect("Cannot bulk insert genesis objects");
248
249 let transaction = VerifiedTransaction::new_unchecked(genesis.transaction().clone());
251
252 store
253 .perpetual_tables
254 .transactions
255 .insert(transaction.digest(), transaction.serializable_ref())
256 .unwrap();
257
258 store
259 .perpetual_tables
260 .effects
261 .insert(&genesis.effects().digest(), genesis.effects())
262 .unwrap();
263 if genesis.effects().events_digest().is_some() {
267 store
268 .perpetual_tables
269 .events_2
270 .insert(transaction.digest(), genesis.events())
271 .unwrap();
272 }
273 }
274
275 Ok(store)
276 }
277
278 pub fn open_no_genesis(
282 perpetual_tables: Arc<AuthorityPerpetualTables>,
283 enable_epoch_sui_conservation_check: bool,
284 registry: &Registry,
285 ) -> SuiResult<Arc<Self>> {
286 let store = Arc::new(Self {
287 mutex_table: MutexTable::new(NUM_SHARDS),
288 perpetual_tables,
289 root_state_notify_read: NotifyRead::<
290 EpochId,
291 (CheckpointSequenceNumber, GlobalStateHash),
292 >::new(),
293 enable_epoch_sui_conservation_check,
294 metrics: AuthorityStoreMetrics::new(registry),
295 });
296 Ok(store)
297 }
298
299 pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
300 self.perpetual_tables.get_recovery_epoch_at_restart()
301 }
302
303 pub fn get_effects(
304 &self,
305 effects_digest: &TransactionEffectsDigest,
306 ) -> SuiResult<Option<TransactionEffects>> {
307 Ok(self.perpetual_tables.effects.get(effects_digest)?)
308 }
309
310 pub fn effects_exists(&self, effects_digest: &TransactionEffectsDigest) -> SuiResult<bool> {
312 self.perpetual_tables
313 .effects
314 .contains_key(effects_digest)
315 .map_err(|e| e.into())
316 }
317
318 pub fn get_events(
319 &self,
320 digest: &TransactionDigest,
321 ) -> Result<Option<TransactionEvents>, TypedStoreError> {
322 self.perpetual_tables.events_2.get(digest)
323 }
324
325 pub fn multi_get_events(
326 &self,
327 event_digests: &[TransactionDigest],
328 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
329 Ok(event_digests
330 .iter()
331 .map(|digest| self.get_events(digest))
332 .collect::<Result<Vec<_>, _>>()?)
333 }
334
335 pub fn get_unchanged_loaded_runtime_objects(
336 &self,
337 digest: &TransactionDigest,
338 ) -> Result<Option<Vec<ObjectKey>>, TypedStoreError> {
339 self.perpetual_tables
340 .unchanged_loaded_runtime_objects
341 .get(digest)
342 }
343
344 pub fn multi_get_effects<'a>(
345 &self,
346 effects_digests: impl Iterator<Item = &'a TransactionEffectsDigest>,
347 ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
348 self.perpetual_tables.effects.multi_get(effects_digests)
349 }
350
351 pub fn get_executed_effects(
352 &self,
353 tx_digest: &TransactionDigest,
354 ) -> Result<Option<TransactionEffects>, TypedStoreError> {
355 let effects_digest = self.perpetual_tables.executed_effects.get(tx_digest)?;
356 match effects_digest {
357 Some(digest) => Ok(self.perpetual_tables.effects.get(&digest)?),
358 None => Ok(None),
359 }
360 }
361
362 pub fn multi_get_executed_effects_digests(
365 &self,
366 digests: &[TransactionDigest],
367 ) -> Result<Vec<Option<TransactionEffectsDigest>>, TypedStoreError> {
368 self.perpetual_tables.executed_effects.multi_get(digests)
369 }
370
371 pub fn multi_get_executed_effects(
374 &self,
375 digests: &[TransactionDigest],
376 ) -> Result<Vec<Option<TransactionEffects>>, TypedStoreError> {
377 let executed_effects_digests = self.perpetual_tables.executed_effects.multi_get(digests)?;
378 let effects = self.multi_get_effects(executed_effects_digests.iter().flatten())?;
379 let mut tx_to_effects_map = effects
380 .into_iter()
381 .flatten()
382 .map(|effects| (*effects.transaction_digest(), effects))
383 .collect::<HashMap<_, _>>();
384 Ok(digests
385 .iter()
386 .map(|digest| tx_to_effects_map.remove(digest))
387 .collect())
388 }
389
390 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> SuiResult<bool> {
391 Ok(self
392 .perpetual_tables
393 .executed_effects
394 .contains_key(digest)?)
395 }
396
397 pub fn get_marker_value(
398 &self,
399 object_key: FullObjectKey,
400 epoch_id: EpochId,
401 ) -> SuiResult<Option<MarkerValue>> {
402 Ok(self
403 .perpetual_tables
404 .object_per_epoch_marker_table_v2
405 .get(&(epoch_id, object_key))?)
406 }
407
408 pub fn get_latest_marker(
409 &self,
410 object_id: FullObjectID,
411 epoch_id: EpochId,
412 ) -> SuiResult<Option<(SequenceNumber, MarkerValue)>> {
413 let min_key = (epoch_id, FullObjectKey::min_for_id(&object_id));
414 let max_key = (epoch_id, FullObjectKey::max_for_id(&object_id));
415
416 let marker_entry = self
417 .perpetual_tables
418 .object_per_epoch_marker_table_v2
419 .reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
420 .next();
421 match marker_entry {
422 Some(Ok(((epoch, key), marker))) => {
423 assert_eq!(epoch, epoch_id);
425 assert_eq!(key.id(), object_id);
426 Ok(Some((key.version(), marker)))
427 }
428 Some(Err(e)) => Err(e.into()),
429 None => Ok(None),
430 }
431 }
432
433 pub async fn notify_read_root_state_hash(
436 &self,
437 epoch: EpochId,
438 ) -> SuiResult<(CheckpointSequenceNumber, GlobalStateHash)> {
439 let registration = self.root_state_notify_read.register_one(&epoch);
441 let hash = self.perpetual_tables.root_state_hash_by_epoch.get(&epoch)?;
442
443 let result = match hash {
444 Some(ready) => Either::Left(futures::future::ready(ready)),
446 None => Either::Right(registration),
447 }
448 .await;
449
450 Ok(result)
451 }
452
453 pub fn deprecated_insert_finalized_transactions(
455 &self,
456 digests: &[TransactionDigest],
457 epoch: EpochId,
458 sequence: CheckpointSequenceNumber,
459 ) -> SuiResult {
460 let mut batch = self
461 .perpetual_tables
462 .executed_transactions_to_checkpoint
463 .batch();
464 batch.insert_batch(
465 &self.perpetual_tables.executed_transactions_to_checkpoint,
466 digests.iter().map(|d| (*d, (epoch, sequence))),
467 )?;
468 batch.write()?;
469 trace!("Transactions {digests:?} finalized at checkpoint {sequence} epoch {epoch}");
470 Ok(())
471 }
472
473 pub fn deprecated_get_transaction_checkpoint(
475 &self,
476 digest: &TransactionDigest,
477 ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
478 Ok(self
479 .perpetual_tables
480 .executed_transactions_to_checkpoint
481 .get(digest)?)
482 }
483
484 pub fn deprecated_multi_get_transaction_checkpoint(
486 &self,
487 digests: &[TransactionDigest],
488 ) -> SuiResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
489 Ok(self
490 .perpetual_tables
491 .executed_transactions_to_checkpoint
492 .multi_get(digests)?
493 .into_iter()
494 .collect())
495 }
496
497 pub fn database_is_empty(&self) -> SuiResult<bool> {
499 self.perpetual_tables.database_is_empty()
500 }
501
502 fn acquire_locks(&self, input_objects: &[ObjectRef]) -> Vec<MutexGuard> {
504 self.mutex_table
505 .acquire_locks(input_objects.iter().map(|(_, _, digest)| *digest))
506 }
507
508 pub fn object_exists_by_key(
509 &self,
510 object_id: &ObjectID,
511 version: VersionNumber,
512 ) -> SuiResult<bool> {
513 Ok(self
514 .perpetual_tables
515 .objects
516 .contains_key(&ObjectKey(*object_id, version))?)
517 }
518
519 pub fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<bool>> {
520 Ok(self
521 .perpetual_tables
522 .objects
523 .multi_contains_keys(object_keys.to_vec())?
524 .into_iter()
525 .collect())
526 }
527
528 fn get_object_ref_prior_to_key(
529 &self,
530 object_id: &ObjectID,
531 version: VersionNumber,
532 ) -> Result<Option<ObjectRef>, SuiError> {
533 let Some(prior_version) = version.one_before() else {
534 return Ok(None);
535 };
536 let mut iterator = self
537 .perpetual_tables
538 .objects
539 .reversed_safe_iter_with_bounds(
540 Some(ObjectKey::min_for_id(object_id)),
541 Some(ObjectKey(*object_id, prior_version)),
542 )?;
543
544 if let Some((object_key, value)) = iterator.next().transpose()?
545 && object_key.0 == *object_id
546 {
547 return Ok(Some(
548 self.perpetual_tables.object_reference(&object_key, value)?,
549 ));
550 }
551 Ok(None)
552 }
553
554 pub fn multi_get_objects_by_key(
555 &self,
556 object_keys: &[ObjectKey],
557 ) -> Result<Vec<Option<Object>>, SuiError> {
558 let wrappers = self
559 .perpetual_tables
560 .objects
561 .multi_get(object_keys.to_vec())?;
562 let mut ret = vec![];
563
564 for (idx, w) in wrappers.into_iter().enumerate() {
565 ret.push(
566 w.map(|object| self.perpetual_tables.object(&object_keys[idx], object))
567 .transpose()?
568 .flatten(),
569 );
570 }
571 Ok(ret)
572 }
573
574 pub fn get_objects(&self, objects: &[ObjectID]) -> Result<Vec<Option<Object>>, SuiError> {
576 let mut result = Vec::new();
577 for id in objects {
578 result.push(self.get_object(id));
579 }
580 Ok(result)
581 }
582
583 pub(crate) fn insert_genesis_object(&self, object: Object) -> SuiResult {
588 debug_assert!(object.previous_transaction == TransactionDigest::genesis_marker());
590 let object_ref = object.compute_object_reference();
591 self.insert_object_direct(object_ref, &object)
592 }
593
594 fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> SuiResult {
598 let mut write_batch = self.perpetual_tables.objects.batch();
599
600 let store_object = get_store_object(object.clone());
602 write_batch.insert_batch(
603 &self.perpetual_tables.objects,
604 std::iter::once((ObjectKey::from(object_ref), store_object)),
605 )?;
606
607 if object.get_single_owner().is_some() {
609 if !object.is_child_object() {
611 self.initialize_live_object_markers_impl(&mut write_batch, &[object_ref], false)?;
612 }
613 }
614
615 write_batch.write()?;
616
617 Ok(())
618 }
619
620 #[instrument(level = "debug", skip_all)]
622 pub(crate) fn bulk_insert_genesis_objects(&self, objects: &[Object]) -> SuiResult<()> {
623 let mut batch = self.perpetual_tables.objects.batch();
624 let ref_and_objects: Vec<_> = objects
625 .iter()
626 .map(|o| (o.compute_object_reference(), o))
627 .collect();
628
629 batch.insert_batch(
630 &self.perpetual_tables.objects,
631 ref_and_objects
632 .iter()
633 .map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))),
634 )?;
635
636 let non_child_object_refs: Vec<_> = ref_and_objects
637 .iter()
638 .filter(|(_, object)| !object.is_child_object())
639 .map(|(oref, _)| *oref)
640 .collect();
641
642 self.initialize_live_object_markers_impl(
643 &mut batch,
644 &non_child_object_refs,
645 false, )?;
647
648 batch.write()?;
649
650 Ok(())
651 }
652
653 pub fn bulk_insert_live_objects(
654 perpetual_db: &AuthorityPerpetualTables,
655 live_objects: impl Iterator<Item = LiveObject>,
656 expected_sha3_digest: &[u8; 32],
657 ) -> SuiResult<()> {
658 let mut hasher = Sha3_256::default();
659 let mut batch = perpetual_db.objects.batch();
660 let mut written = 0usize;
661 const MAX_BATCH_SIZE: usize = 100_000;
662 for object in live_objects {
663 hasher.update(object.object_reference().2.inner());
664 match object {
665 LiveObject::Normal(object) => {
666 let store_object_wrapper = get_store_object(object.clone());
667 batch.insert_batch(
668 &perpetual_db.objects,
669 std::iter::once((
670 ObjectKey::from(object.compute_object_reference()),
671 store_object_wrapper,
672 )),
673 )?;
674 if !object.is_child_object() {
675 Self::initialize_live_object_markers(
676 &perpetual_db.live_owned_object_markers,
677 &mut batch,
678 &[object.compute_object_reference()],
679 false, )?;
681 }
682 }
683 LiveObject::Wrapped(object_key) => {
684 batch.insert_batch(
685 &perpetual_db.objects,
686 std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
687 object_key,
688 StoreObject::Wrapped.into(),
689 )),
690 )?;
691 }
692 }
693 written += 1;
694 if written > MAX_BATCH_SIZE {
695 batch.write()?;
696 batch = perpetual_db.objects.batch();
697 written = 0;
698 }
699 }
700 let sha3_digest = hasher.finalize().digest;
701 if *expected_sha3_digest != sha3_digest {
702 error!(
703 "Sha does not match! expected: {:?}, actual: {:?}",
704 expected_sha3_digest, sha3_digest
705 );
706 return Err(SuiError::from("Sha does not match"));
707 }
708 batch.write()?;
709 Ok(())
710 }
711
712 pub fn set_epoch_start_configuration(
713 &self,
714 epoch_start_configuration: &EpochStartConfiguration,
715 ) -> SuiResult {
716 self.perpetual_tables
717 .set_epoch_start_configuration(epoch_start_configuration)?;
718 Ok(())
719 }
720
721 pub fn get_epoch_start_configuration(&self) -> SuiResult<Option<EpochStartConfiguration>> {
722 Ok(self.perpetual_tables.epoch_start_configuration.get(&())?)
723 }
724
725 #[instrument(level = "debug", skip_all)]
730 pub fn build_db_batch(
731 &self,
732 epoch_id: EpochId,
733 tx_outputs: &[Arc<TransactionOutputs>],
734 ) -> SuiResult<DBBatch> {
735 let mut written = Vec::with_capacity(tx_outputs.len());
736 for outputs in tx_outputs {
737 written.extend(outputs.written.values().cloned());
738 }
739
740 let mut write_batch = self.perpetual_tables.transactions.batch();
741 for outputs in tx_outputs {
742 self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
743 }
744 fail_point!("crash");
746
747 trace!(
748 "built batch for committed transactions: {:?}",
749 tx_outputs
750 .iter()
751 .map(|tx| tx.transaction.digest())
752 .collect::<Vec<_>>()
753 );
754
755 fail_point!("crash");
757
758 Ok(write_batch)
759 }
760
761 fn write_one_transaction_outputs(
762 &self,
763 write_batch: &mut DBBatch,
764 epoch_id: EpochId,
765 tx_outputs: &TransactionOutputs,
766 ) -> SuiResult {
767 let TransactionOutputs {
768 transaction,
769 effects,
770 markers,
771 wrapped,
772 deleted,
773 written,
774 events,
775 unchanged_loaded_runtime_objects,
776 locks_to_delete,
777 new_locks_to_init,
778 ..
779 } = tx_outputs;
780
781 let effects_digest = effects.digest();
782 let transaction_digest = transaction.digest();
783 write_batch
786 .insert_batch(
787 &self.perpetual_tables.effects,
788 [(effects_digest, effects.clone())],
789 )?
790 .insert_batch(
791 &self.perpetual_tables.executed_effects,
792 [(transaction_digest, effects_digest)],
793 )?;
794
795 write_batch.insert_batch(
797 &self.perpetual_tables.transactions,
798 iter::once((transaction_digest, transaction.serializable_ref())),
799 )?;
800
801 write_batch.insert_batch(
802 &self.perpetual_tables.executed_transaction_digests,
803 [((epoch_id, *transaction_digest), ())],
804 )?;
805
806 write_batch.insert_batch(
808 &self.perpetual_tables.object_per_epoch_marker_table_v2,
809 markers
810 .iter()
811 .map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
812 )?;
813 write_batch.insert_batch(
814 &self.perpetual_tables.objects,
815 deleted
816 .iter()
817 .map(|key| (key, StoreObject::Deleted))
818 .chain(wrapped.iter().map(|key| (key, StoreObject::Wrapped)))
819 .map(|(key, store_object)| (key, StoreObjectWrapper::from(store_object))),
820 )?;
821
822 let new_objects = written.iter().map(|(id, new_object)| {
824 let version = new_object.version();
825 trace!(?id, ?version, "writing object");
826 let store_object = get_store_object(new_object.clone());
827 (ObjectKey(*id, version), store_object)
828 });
829
830 write_batch.insert_batch(&self.perpetual_tables.objects, new_objects)?;
831
832 if effects.events_digest().is_some() {
834 write_batch.insert_batch(
835 &self.perpetual_tables.events_2,
836 [(transaction_digest, events)],
837 )?;
838 }
839
840 if !unchanged_loaded_runtime_objects.is_empty() {
842 write_batch.insert_batch(
843 &self.perpetual_tables.unchanged_loaded_runtime_objects,
844 [(transaction_digest, unchanged_loaded_runtime_objects)],
845 )?;
846 }
847
848 self.initialize_live_object_markers_impl(write_batch, new_locks_to_init, false)?;
849
850 self.delete_live_object_markers(write_batch, locks_to_delete)?;
853
854 debug!(effects_digest = ?effects.digest(), "commit_certificate finished");
855
856 Ok(())
857 }
858
859 pub(crate) fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> SuiResult {
862 let mut batch = self.perpetual_tables.transactions.batch();
863 batch.insert_batch(
864 &self.perpetual_tables.transactions,
865 [(tx.digest(), tx.clone().into_unsigned().serializable_ref())],
866 )?;
867 batch.write()?;
868 Ok(())
869 }
870
871 pub fn acquire_transaction_locks(
872 &self,
873 epoch_store: &AuthorityPerEpochStore,
874 owned_input_objects: &[ObjectRef],
875 tx_digest: TransactionDigest,
876 signed_transaction: Option<VerifiedSignedTransaction>,
877 ) -> SuiResult {
878 let epoch = epoch_store.epoch();
879 let _mutexes = self.acquire_locks(owned_input_objects);
883
884 trace!(?owned_input_objects, "acquire_locks");
885 let mut locks_to_write = Vec::new();
886
887 let live_object_markers = self
888 .perpetual_tables
889 .live_owned_object_markers
890 .multi_get(owned_input_objects)?;
891
892 let epoch_tables = epoch_store.tables()?;
893
894 let locks = epoch_tables.multi_get_locked_transactions(owned_input_objects)?;
895
896 assert_eq!(locks.len(), live_object_markers.len());
897
898 for (live_marker, lock, obj_ref) in izip!(
899 live_object_markers.into_iter(),
900 locks.into_iter(),
901 owned_input_objects
902 ) {
903 let Some(live_marker) = live_marker else {
904 let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
905 fp_bail!(
906 UserInputError::ObjectVersionUnavailableForConsumption {
907 provided_obj_ref: *obj_ref,
908 current_version: latest_lock.1
909 }
910 .into()
911 );
912 };
913
914 let live_marker = live_marker.map(|l| l.migrate().into_inner());
915
916 if let Some(LockDetailsDeprecated {
917 epoch: previous_epoch,
918 ..
919 }) = &live_marker
920 {
921 assert!(
924 previous_epoch < &epoch,
925 "lock for {:?} should be from a prior epoch",
926 obj_ref
927 );
928 }
929
930 if let Some(previous_tx_digest) = &lock {
931 if previous_tx_digest == &tx_digest {
932 continue;
934 } else {
935 info!(prev_tx_digest = ?previous_tx_digest,
937 cur_tx_digest = ?tx_digest,
938 "Cannot acquire lock: conflicting transaction!");
939 return Err(SuiErrorKind::ObjectLockConflict {
940 obj_ref: *obj_ref,
941 pending_transaction: *previous_tx_digest,
942 }
943 .into());
944 }
945 }
946
947 locks_to_write.push((*obj_ref, tx_digest));
948 }
949
950 if !locks_to_write.is_empty() {
951 trace!(?locks_to_write, "Writing locks");
952 epoch_tables.write_transaction_locks(signed_transaction, locks_to_write.into_iter())?;
953 }
954
955 Ok(())
956 }
957
958 pub(crate) fn get_lock(
961 &self,
962 obj_ref: ObjectRef,
963 epoch_store: &AuthorityPerEpochStore,
964 ) -> SuiLockResult {
965 if self
966 .perpetual_tables
967 .live_owned_object_markers
968 .get(&obj_ref)?
969 .is_none()
970 {
971 return Ok(ObjectLockStatus::LockedAtDifferentVersion {
972 locked_ref: self.get_latest_live_version_for_object_id(obj_ref.0)?,
973 });
974 }
975
976 let tables = epoch_store.tables()?;
977 let epoch_id = epoch_store.epoch();
978
979 if let Some(tx_digest) = tables.get_locked_transaction(&obj_ref)? {
980 Ok(ObjectLockStatus::LockedToTx {
981 locked_by_tx: LockDetailsDeprecated {
982 epoch: epoch_id,
983 tx_digest,
984 },
985 })
986 } else {
987 Ok(ObjectLockStatus::Initialized)
988 }
989 }
990
991 pub(crate) fn get_latest_live_version_for_object_id(
993 &self,
994 object_id: ObjectID,
995 ) -> SuiResult<ObjectRef> {
996 let mut iterator = self
997 .perpetual_tables
998 .live_owned_object_markers
999 .reversed_safe_iter_with_bounds(
1000 None,
1001 Some((object_id, SequenceNumber::MAX, ObjectDigest::MAX)),
1002 )?;
1003 Ok(iterator
1004 .next()
1005 .transpose()?
1006 .and_then(|value| {
1007 if value.0.0 == object_id {
1008 Some(value)
1009 } else {
1010 None
1011 }
1012 })
1013 .ok_or_else(|| {
1014 SuiError::from(UserInputError::ObjectNotFound {
1015 object_id,
1016 version: None,
1017 })
1018 })?
1019 .0)
1020 }
1021
1022 pub fn check_owned_objects_are_live(&self, objects: &[ObjectRef]) -> SuiResult {
1027 let locks = self
1028 .perpetual_tables
1029 .live_owned_object_markers
1030 .multi_get(objects)?;
1031 for (lock, obj_ref) in locks.into_iter().zip(objects) {
1032 if lock.is_none() {
1033 let latest_lock = self.get_latest_live_version_for_object_id(obj_ref.0)?;
1034 fp_bail!(
1035 UserInputError::ObjectVersionUnavailableForConsumption {
1036 provided_obj_ref: *obj_ref,
1037 current_version: latest_lock.1
1038 }
1039 .into()
1040 );
1041 }
1042 }
1043 Ok(())
1044 }
1045
1046 fn initialize_live_object_markers_impl(
1049 &self,
1050 write_batch: &mut DBBatch,
1051 objects: &[ObjectRef],
1052 is_force_reset: bool,
1053 ) -> SuiResult {
1054 AuthorityStore::initialize_live_object_markers(
1055 &self.perpetual_tables.live_owned_object_markers,
1056 write_batch,
1057 objects,
1058 is_force_reset,
1059 )
1060 }
1061
1062 pub fn initialize_live_object_markers(
1063 live_object_marker_table: &DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
1064 write_batch: &mut DBBatch,
1065 objects: &[ObjectRef],
1066 is_force_reset: bool,
1067 ) -> SuiResult {
1068 trace!(?objects, "initialize_locks");
1069
1070 let live_object_markers = live_object_marker_table.multi_get(objects)?;
1071
1072 if !is_force_reset {
1073 let existing_live_object_markers: Vec<ObjectRef> = live_object_markers
1077 .iter()
1078 .zip(objects)
1079 .filter_map(|(lock_opt, objref)| {
1080 lock_opt.clone().flatten().map(|_tx_digest| *objref)
1081 })
1082 .collect();
1083 if !existing_live_object_markers.is_empty() {
1084 info!(
1085 ?existing_live_object_markers,
1086 "Cannot initialize live_object_markers because some exist already"
1087 );
1088 return Err(SuiErrorKind::ObjectLockAlreadyInitialized {
1089 refs: existing_live_object_markers,
1090 }
1091 .into());
1092 }
1093 }
1094
1095 write_batch.insert_batch(
1096 live_object_marker_table,
1097 objects.iter().map(|obj_ref| (obj_ref, None)),
1098 )?;
1099 Ok(())
1100 }
1101
1102 fn delete_live_object_markers(
1104 &self,
1105 write_batch: &mut DBBatch,
1106 objects: &[ObjectRef],
1107 ) -> SuiResult {
1108 trace!(?objects, "delete_locks");
1109 write_batch.delete_batch(
1110 &self.perpetual_tables.live_owned_object_markers,
1111 objects.iter(),
1112 )?;
1113 Ok(())
1114 }
1115
1116 #[cfg(test)]
1117 pub(crate) fn reset_locks_for_test(
1118 &self,
1119 transactions: &[TransactionDigest],
1120 objects: &[ObjectRef],
1121 epoch_store: &AuthorityPerEpochStore,
1122 ) {
1123 for tx in transactions {
1124 epoch_store.delete_signed_transaction_for_test(tx);
1125 epoch_store.delete_object_locks_for_test(objects);
1126 }
1127
1128 let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1129 batch
1130 .delete_batch(
1131 &self.perpetual_tables.live_owned_object_markers,
1132 objects.iter(),
1133 )
1134 .unwrap();
1135 batch.write().unwrap();
1136
1137 let mut batch = self.perpetual_tables.live_owned_object_markers.batch();
1138 self.initialize_live_object_markers_impl(&mut batch, objects, false)
1139 .unwrap();
1140 batch.write().unwrap();
1141 }
1142
1143 pub fn find_object_lt_or_eq_version(
1148 &self,
1149 object_id: ObjectID,
1150 version: SequenceNumber,
1151 ) -> SuiResult<Option<Object>> {
1152 self.perpetual_tables
1153 .find_object_lt_or_eq_version(object_id, version)
1154 }
1155
1156 pub fn get_latest_object_ref_or_tombstone(
1166 &self,
1167 object_id: ObjectID,
1168 ) -> Result<Option<ObjectRef>, SuiError> {
1169 self.perpetual_tables
1170 .get_latest_object_ref_or_tombstone(object_id)
1171 }
1172
1173 pub fn get_latest_object_ref_if_alive(
1176 &self,
1177 object_id: ObjectID,
1178 ) -> Result<Option<ObjectRef>, SuiError> {
1179 match self.get_latest_object_ref_or_tombstone(object_id)? {
1180 Some(objref) if objref.2.is_alive() => Ok(Some(objref)),
1181 _ => Ok(None),
1182 }
1183 }
1184
1185 pub fn get_latest_object_or_tombstone(
1189 &self,
1190 object_id: ObjectID,
1191 ) -> Result<Option<(ObjectKey, ObjectOrTombstone)>, SuiError> {
1192 let Some((object_key, store_object)) = self
1193 .perpetual_tables
1194 .get_latest_object_or_tombstone(object_id)?
1195 else {
1196 return Ok(None);
1197 };
1198
1199 if let Some(object_ref) = self
1200 .perpetual_tables
1201 .tombstone_reference(&object_key, &store_object)?
1202 {
1203 return Ok(Some((object_key, ObjectOrTombstone::Tombstone(object_ref))));
1204 }
1205
1206 let object = self
1207 .perpetual_tables
1208 .object(&object_key, store_object)?
1209 .expect("Non tombstone store object could not be converted to object");
1210
1211 Ok(Some((object_key, ObjectOrTombstone::Object(object))))
1212 }
1213
1214 pub fn insert_transaction_and_effects(
1215 &self,
1216 transaction: &VerifiedTransaction,
1217 transaction_effects: &TransactionEffects,
1218 ) -> Result<(), TypedStoreError> {
1219 let mut write_batch = self.perpetual_tables.transactions.batch();
1220 write_batch
1223 .insert_batch(
1224 &self.perpetual_tables.effects,
1225 [(transaction_effects.digest(), transaction_effects)],
1226 )?
1227 .insert_batch(
1228 &self.perpetual_tables.transactions,
1229 [(transaction.digest(), transaction.serializable_ref())],
1230 )?;
1231
1232 write_batch.write()?;
1233 Ok(())
1234 }
1235
1236 pub fn multi_insert_transaction_and_effects<'a>(
1237 &self,
1238 transactions: impl Iterator<Item = &'a VerifiedExecutionData>,
1239 ) -> Result<(), TypedStoreError> {
1240 let mut write_batch = self.perpetual_tables.transactions.batch();
1241 for tx in transactions {
1242 write_batch
1243 .insert_batch(
1244 &self.perpetual_tables.effects,
1245 [(tx.effects.digest(), &tx.effects)],
1246 )?
1247 .insert_batch(
1248 &self.perpetual_tables.transactions,
1249 [(tx.transaction.digest(), tx.transaction.serializable_ref())],
1250 )?;
1251 }
1252
1253 write_batch.write()?;
1254 Ok(())
1255 }
1256
1257 pub fn multi_get_transaction_blocks(
1258 &self,
1259 tx_digests: &[TransactionDigest],
1260 ) -> Result<Vec<Option<VerifiedTransaction>>, TypedStoreError> {
1261 self.perpetual_tables
1262 .transactions
1263 .multi_get(tx_digests)
1264 .map(|v| v.into_iter().map(|v| v.map(|v| v.into())).collect())
1265 }
1266
1267 pub fn get_transaction_block(
1268 &self,
1269 tx_digest: &TransactionDigest,
1270 ) -> Result<Option<VerifiedTransaction>, TypedStoreError> {
1271 self.perpetual_tables
1272 .transactions
1273 .get(tx_digest)
1274 .map(|v| v.map(|v| v.into()))
1275 }
1276
1277 pub fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1284 get_sui_system_state(self.perpetual_tables.as_ref())
1285 }
1286
1287 pub fn expensive_check_sui_conservation<T>(
1288 self: &Arc<Self>,
1289 type_layout_store: T,
1290 old_epoch_store: &AuthorityPerEpochStore,
1291 ) -> SuiResult
1292 where
1293 T: TypeLayoutStore + Send + Copy,
1294 {
1295 if !self.enable_epoch_sui_conservation_check {
1296 return Ok(());
1297 }
1298
1299 let executor = old_epoch_store.executor();
1300 info!("Starting SUI conservation check. This may take a while..");
1301 let cur_time = Instant::now();
1302 let mut pending_objects = vec![];
1303 let mut count = 0;
1304 let mut size = 0;
1305 let (mut total_sui, mut total_storage_rebate) = thread::scope(|s| {
1306 let pending_tasks = FuturesUnordered::new();
1307 for o in self.iter_live_object_set(false) {
1308 match o {
1309 LiveObject::Normal(object) => {
1310 size += object.object_size_for_gas_metering();
1311 count += 1;
1312 pending_objects.push(object);
1313 if count % 1_000_000 == 0 {
1314 let mut task_objects = vec![];
1315 mem::swap(&mut pending_objects, &mut task_objects);
1316 pending_tasks.push(s.spawn(move || {
1317 let mut layout_resolver =
1318 executor.type_layout_resolver(Box::new(type_layout_store));
1319 let mut total_storage_rebate = 0;
1320 let mut total_sui = 0;
1321 for object in task_objects {
1322 total_storage_rebate += object.storage_rebate;
1323 total_sui +=
1326 object.get_total_sui(layout_resolver.as_mut()).unwrap()
1327 - object.storage_rebate;
1328 }
1329 if count % 50_000_000 == 0 {
1330 info!("Processed {} objects", count);
1331 }
1332 (total_sui, total_storage_rebate)
1333 }));
1334 }
1335 }
1336 LiveObject::Wrapped(_) => {
1337 unreachable!("Explicitly asked to not include wrapped tombstones")
1338 }
1339 }
1340 }
1341 pending_tasks.into_iter().fold((0, 0), |init, result| {
1342 let result = result.join().unwrap();
1343 (init.0 + result.0, init.1 + result.1)
1344 })
1345 });
1346 let mut layout_resolver = executor.type_layout_resolver(Box::new(type_layout_store));
1347 for object in pending_objects {
1348 total_storage_rebate += object.storage_rebate;
1349 total_sui +=
1350 object.get_total_sui(layout_resolver.as_mut()).unwrap() - object.storage_rebate;
1351 }
1352 info!(
1353 "Scanned {} live objects, took {:?}",
1354 count,
1355 cur_time.elapsed()
1356 );
1357 self.metrics
1358 .sui_conservation_live_object_count
1359 .set(count as i64);
1360 self.metrics
1361 .sui_conservation_live_object_size
1362 .set(size as i64);
1363 self.metrics
1364 .sui_conservation_check_latency
1365 .set(cur_time.elapsed().as_secs() as i64);
1366
1367 let system_state = self
1369 .get_sui_system_state_object_unsafe()
1370 .expect("Reading sui system state object cannot fail")
1371 .into_sui_system_state_summary();
1372 let storage_fund_balance = system_state.storage_fund_total_object_storage_rebates;
1373 info!(
1374 "Total SUI amount in the network: {}, storage fund balance: {}, total storage rebate: {} at beginning of epoch {}",
1375 total_sui, storage_fund_balance, total_storage_rebate, system_state.epoch
1376 );
1377
1378 let imbalance = (storage_fund_balance as i64) - (total_storage_rebate as i64);
1379 self.metrics
1380 .sui_conservation_storage_fund
1381 .set(storage_fund_balance as i64);
1382 self.metrics
1383 .sui_conservation_storage_fund_imbalance
1384 .set(imbalance);
1385 self.metrics
1386 .sui_conservation_imbalance
1387 .set((total_sui as i128 - TOTAL_SUPPLY_MIST as i128) as i64);
1388
1389 if let Some(expected_imbalance) = self
1390 .perpetual_tables
1391 .expected_storage_fund_imbalance
1392 .get(&())
1393 .expect("DB read cannot fail")
1394 {
1395 fp_ensure!(
1396 imbalance == expected_imbalance,
1397 SuiError::from(
1398 format!(
1399 "Inconsistent state detected at epoch {}: total storage rebate: {}, storage fund balance: {}, expected imbalance: {}",
1400 system_state.epoch, total_storage_rebate, storage_fund_balance, expected_imbalance
1401 ).as_str()
1402 )
1403 );
1404 } else {
1405 self.perpetual_tables
1406 .expected_storage_fund_imbalance
1407 .insert(&(), &imbalance)
1408 .expect("DB write cannot fail");
1409 }
1410
1411 if let Some(expected_sui) = self
1412 .perpetual_tables
1413 .expected_network_sui_amount
1414 .get(&())
1415 .expect("DB read cannot fail")
1416 {
1417 fp_ensure!(
1418 total_sui == expected_sui,
1419 SuiError::from(
1420 format!(
1421 "Inconsistent state detected at epoch {}: total sui: {}, expecting {}",
1422 system_state.epoch, total_sui, expected_sui
1423 )
1424 .as_str()
1425 )
1426 );
1427 } else {
1428 self.perpetual_tables
1429 .expected_network_sui_amount
1430 .insert(&(), &total_sui)
1431 .expect("DB write cannot fail");
1432 }
1433
1434 Ok(())
1435 }
1436
1437 #[instrument(level = "error", skip_all)]
1440 pub fn maybe_reaccumulate_state_hash(
1441 &self,
1442 cur_epoch_store: &AuthorityPerEpochStore,
1443 new_protocol_version: ProtocolVersion,
1444 ) {
1445 let old_simplified_unwrap_then_delete = cur_epoch_store
1446 .protocol_config()
1447 .simplified_unwrap_then_delete();
1448 let new_simplified_unwrap_then_delete =
1449 ProtocolConfig::get_for_version(new_protocol_version, cur_epoch_store.get_chain())
1450 .simplified_unwrap_then_delete();
1451 let should_reaccumulate =
1454 !old_simplified_unwrap_then_delete && new_simplified_unwrap_then_delete;
1455 if !should_reaccumulate {
1456 return;
1457 }
1458 info!(
1459 "[Re-accumulate] simplified_unwrap_then_delete is enabled in the new protocol version, re-accumulating state hash"
1460 );
1461 let cur_time = Instant::now();
1462 std::thread::scope(|s| {
1463 let pending_tasks = FuturesUnordered::new();
1464 const BITS: u8 = 5;
1473 for index in 0u8..(1 << BITS) {
1474 pending_tasks.push(s.spawn(move || {
1475 let mut id_bytes = [0; ObjectID::LENGTH];
1476 id_bytes[0] = index << (8 - BITS);
1477 let start_id = ObjectID::new(id_bytes);
1478
1479 id_bytes[0] |= (1 << (8 - BITS)) - 1;
1480 for element in id_bytes.iter_mut().skip(1) {
1481 *element = u8::MAX;
1482 }
1483 let end_id = ObjectID::new(id_bytes);
1484
1485 info!(
1486 "[Re-accumulate] Scanning object ID range {:?}..{:?}",
1487 start_id, end_id
1488 );
1489 let mut prev = (
1490 ObjectKey::min_for_id(&ObjectID::ZERO),
1491 StoreObjectWrapper::V1(StoreObject::Deleted),
1492 );
1493 let mut object_scanned: u64 = 0;
1494 let mut wrapped_objects_to_remove = vec![];
1495 for db_result in self.perpetual_tables.objects.safe_range_iter(
1496 ObjectKey::min_for_id(&start_id)..=ObjectKey::max_for_id(&end_id),
1497 ) {
1498 match db_result {
1499 Ok((object_key, object)) => {
1500 object_scanned += 1;
1501 if object_scanned.is_multiple_of(100000) {
1502 info!(
1503 "[Re-accumulate] Task {}: object scanned: {}",
1504 index, object_scanned,
1505 );
1506 }
1507 if matches!(prev.1.inner(), StoreObject::Wrapped)
1508 && object_key.0 != prev.0.0
1509 {
1510 wrapped_objects_to_remove
1511 .push(WrappedObject::new(prev.0.0, prev.0.1));
1512 }
1513
1514 prev = (object_key, object);
1515 }
1516 Err(err) => {
1517 warn!("Object iterator encounter RocksDB error {:?}", err);
1518 return Err(err);
1519 }
1520 }
1521 }
1522 if matches!(prev.1.inner(), StoreObject::Wrapped) {
1523 wrapped_objects_to_remove.push(WrappedObject::new(prev.0.0, prev.0.1));
1524 }
1525 info!(
1526 "[Re-accumulate] Task {}: object scanned: {}, wrapped objects: {}",
1527 index,
1528 object_scanned,
1529 wrapped_objects_to_remove.len(),
1530 );
1531 Ok((wrapped_objects_to_remove, object_scanned))
1532 }));
1533 }
1534 let (last_checkpoint_of_epoch, cur_accumulator) = self
1535 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
1536 .expect("read cannot fail")
1537 .expect("accumulator must exist");
1538 let (accumulator, total_objects_scanned, total_wrapped_objects) =
1539 pending_tasks.into_iter().fold(
1540 (cur_accumulator, 0u64, 0usize),
1541 |(mut accumulator, total_objects_scanned, total_wrapped_objects), task| {
1542 let (wrapped_objects_to_remove, object_scanned) =
1543 task.join().unwrap().unwrap();
1544 accumulator.remove_all(
1545 wrapped_objects_to_remove
1546 .iter()
1547 .map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
1548 .collect::<Vec<Vec<u8>>>(),
1549 );
1550 (
1551 accumulator,
1552 total_objects_scanned + object_scanned,
1553 total_wrapped_objects + wrapped_objects_to_remove.len(),
1554 )
1555 },
1556 );
1557 info!(
1558 "[Re-accumulate] Total objects scanned: {}, total wrapped objects: {}",
1559 total_objects_scanned, total_wrapped_objects,
1560 );
1561 info!(
1562 "[Re-accumulate] New accumulator value: {:?}",
1563 accumulator.digest()
1564 );
1565 self.insert_state_hash_for_epoch(
1566 cur_epoch_store.epoch(),
1567 &last_checkpoint_of_epoch,
1568 &accumulator,
1569 )
1570 .unwrap();
1571 });
1572 info!(
1573 "[Re-accumulate] Re-accumulating took {}seconds",
1574 cur_time.elapsed().as_secs()
1575 );
1576 }
1577
1578 pub async fn prune_objects_and_compact_for_testing(
1579 &self,
1580 checkpoint_store: &Arc<CheckpointStore>,
1581 rpc_index: Option<&RpcIndexStore>,
1582 ) {
1583 let pruning_config = AuthorityStorePruningConfig {
1584 num_epochs_to_retain: 0,
1585 ..Default::default()
1586 };
1587 let _ = AuthorityStorePruner::prune_objects_for_eligible_epochs(
1588 &self.perpetual_tables,
1589 checkpoint_store,
1590 rpc_index,
1591 None,
1592 pruning_config,
1593 AuthorityStorePruningMetrics::new_for_test(),
1594 EPOCH_DURATION_MS_FOR_TESTING,
1595 )
1596 .await;
1597 let _ = AuthorityStorePruner::compact(&self.perpetual_tables);
1598 }
1599
1600 #[cfg(test)]
1601 pub async fn prune_objects_immediately_for_testing(
1602 &self,
1603 transaction_effects: Vec<TransactionEffects>,
1604 ) -> anyhow::Result<()> {
1605 let mut wb = self.perpetual_tables.objects.batch();
1606
1607 let mut object_keys_to_prune = vec![];
1608 for effects in &transaction_effects {
1609 for (object_id, seq_number) in effects.modified_at_versions() {
1610 info!("Pruning object {:?} version {:?}", object_id, seq_number);
1611 object_keys_to_prune.push(ObjectKey(object_id, seq_number));
1612 }
1613 }
1614
1615 wb.delete_batch(
1616 &self.perpetual_tables.objects,
1617 object_keys_to_prune.into_iter(),
1618 )?;
1619 wb.write()?;
1620 Ok(())
1621 }
1622
1623 #[cfg(msim)]
1625 pub fn count_object_versions(&self, object_id: ObjectID) -> usize {
1626 self.perpetual_tables
1627 .objects
1628 .safe_iter_with_bounds(
1629 Some(ObjectKey(object_id, VersionNumber::MIN)),
1630 Some(ObjectKey(object_id, VersionNumber::MAX)),
1631 )
1632 .collect::<Result<Vec<_>, _>>()
1633 .unwrap()
1634 .len()
1635 }
1636}
1637
1638impl GlobalStateHashStore for AuthorityStore {
1639 fn get_object_ref_prior_to_key_deprecated(
1640 &self,
1641 object_id: &ObjectID,
1642 version: VersionNumber,
1643 ) -> SuiResult<Option<ObjectRef>> {
1644 self.get_object_ref_prior_to_key(object_id, version)
1645 }
1646
1647 fn get_root_state_hash_for_epoch(
1648 &self,
1649 epoch: EpochId,
1650 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
1651 self.perpetual_tables
1652 .root_state_hash_by_epoch
1653 .get(&epoch)
1654 .map_err(Into::into)
1655 }
1656
1657 fn get_root_state_hash_for_highest_epoch(
1658 &self,
1659 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
1660 Ok(self
1661 .perpetual_tables
1662 .root_state_hash_by_epoch
1663 .reversed_safe_iter_with_bounds(None, None)?
1664 .next()
1665 .transpose()?)
1666 }
1667
1668 fn insert_state_hash_for_epoch(
1669 &self,
1670 epoch: EpochId,
1671 last_checkpoint_of_epoch: &CheckpointSequenceNumber,
1672 acc: &GlobalStateHash,
1673 ) -> SuiResult {
1674 self.perpetual_tables
1675 .root_state_hash_by_epoch
1676 .insert(&epoch, &(*last_checkpoint_of_epoch, acc.clone()))?;
1677 self.root_state_notify_read
1678 .notify(&epoch, &(*last_checkpoint_of_epoch, acc.clone()));
1679
1680 Ok(())
1681 }
1682
1683 fn iter_live_object_set(
1684 &self,
1685 include_wrapped_object: bool,
1686 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
1687 Box::new(
1688 self.perpetual_tables
1689 .iter_live_object_set(include_wrapped_object),
1690 )
1691 }
1692}
1693
1694impl ObjectStore for AuthorityStore {
1695 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
1697 self.perpetual_tables.as_ref().get_object(object_id)
1698 }
1699
1700 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
1701 self.perpetual_tables.get_object_by_key(object_id, version)
1702 }
1703}
1704
1705pub struct ResolverWrapper {
1707 pub resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1708 pub metrics: Arc<ResolverMetrics>,
1709}
1710
1711impl ResolverWrapper {
1712 pub fn new(
1713 resolver: Arc<dyn BackingPackageStore + Send + Sync>,
1714 metrics: Arc<ResolverMetrics>,
1715 ) -> Self {
1716 metrics.module_cache_size.set(0);
1717 ResolverWrapper { resolver, metrics }
1718 }
1719
1720 fn inc_cache_size_gauge(&self) {
1721 let current = self.metrics.module_cache_size.get();
1723 self.metrics.module_cache_size.set(current + 1);
1724 }
1725}
1726
1727impl ModuleResolver for ResolverWrapper {
1728 type Error = SuiError;
1729 fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
1730 self.inc_cache_size_gauge();
1731 get_module(&*self.resolver, module_id)
1732 }
1733}
1734
1735pub enum UpdateType {
1736 Transaction(TransactionEffectsDigest),
1737 Genesis,
1738}
1739
1740pub type SuiLockResult = SuiResult<ObjectLockStatus>;
1741
1742#[derive(Debug, PartialEq, Eq)]
1743pub enum ObjectLockStatus {
1744 Initialized,
1745 LockedToTx { locked_by_tx: LockDetailsDeprecated },
1746 LockedAtDifferentVersion { locked_ref: ObjectRef },
1747}
1748
1749#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1750pub enum LockDetailsWrapperDeprecated {
1751 V1(LockDetailsV1Deprecated),
1752}
1753
1754impl LockDetailsWrapperDeprecated {
1755 pub fn migrate(self) -> Self {
1756 self
1759 }
1760
1761 pub fn inner(&self) -> &LockDetailsDeprecated {
1764 match self {
1765 Self::V1(v1) => v1,
1766
1767 #[allow(unreachable_patterns)]
1769 _ => panic!("lock details should have been migrated to latest version at read time"),
1770 }
1771 }
1772 pub fn into_inner(self) -> LockDetailsDeprecated {
1773 match self {
1774 Self::V1(v1) => v1,
1775
1776 #[allow(unreachable_patterns)]
1778 _ => panic!("lock details should have been migrated to latest version at read time"),
1779 }
1780 }
1781}
1782
1783#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1784pub struct LockDetailsV1Deprecated {
1785 pub epoch: EpochId,
1786 pub tx_digest: TransactionDigest,
1787}
1788
1789pub type LockDetailsDeprecated = LockDetailsV1Deprecated;
1790
1791impl From<LockDetailsDeprecated> for LockDetailsWrapperDeprecated {
1792 fn from(details: LockDetailsDeprecated) -> Self {
1793 LockDetailsWrapperDeprecated::V1(details)
1795 }
1796}