1use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6#[cfg(tidehunter)]
7use crate::authority::epoch_marker_key::EPOCH_MARKER_KEY_SIZE;
8use crate::authority::epoch_marker_key::EpochMarkerKey;
9use serde::{Deserialize, Serialize};
10use std::path::Path;
11use std::sync::atomic::AtomicU64;
12use sui_types::base_types::SequenceNumber;
13use sui_types::effects::{TransactionEffects, TransactionEvents};
14use sui_types::global_state_hash::GlobalStateHash;
15use sui_types::storage::MarkerValue;
16use typed_store::metrics::SamplingInterval;
17use typed_store::rocks::{
18 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
19 read_size_from_env,
20};
21use typed_store::traits::Map;
22
23use crate::authority::authority_store_types::{
24 StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
25};
26use crate::authority::epoch_start_configuration::EpochStartConfiguration;
27use typed_store::{DBMapUtils, DbIterator};
28
29const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
30pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
31const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
32const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
33
34#[derive(Default)]
36pub struct AuthorityPerpetualTablesOptions {
37 pub enable_write_stall: bool,
39 pub enable_objects_compactor: bool,
43}
44
45impl AuthorityPerpetualTablesOptions {
46 fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
47 if !self.enable_write_stall {
48 db_options = db_options.disable_write_throttling();
49 }
50 db_options
51 }
52}
53
54#[derive(DBMapUtils)]
56#[cfg_attr(tidehunter, tidehunter)]
57pub struct AuthorityPerpetualTables {
58 pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
71
72 #[rename = "owned_object_transaction_locks"]
77 pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
78
79 pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
83
84 pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
93
94 pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
99
100 pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
102
103 pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
105
106 pub(crate) executed_transactions_to_checkpoint:
110 DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
111
112 pub(crate) root_state_hash_by_epoch:
116 DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
117
118 pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
120
121 pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
123
124 pub(crate) expected_network_sui_amount: DBMap<(), u64>,
129
130 pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
134
135 pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
141 pub(crate) object_per_epoch_marker_table_v2: DBMap<EpochMarkerKey, MarkerValue>,
142
143 pub(crate) executed_transaction_digests: DBMap<(EpochId, TransactionDigest), ()>,
147
148 pub(crate) highest_committed_checkpoint: DBMap<(), CheckpointSequenceNumber>,
164}
165
166impl AuthorityPerpetualTables {
167 pub fn path(parent_path: &Path) -> PathBuf {
168 parent_path.join("perpetual")
169 }
170
171 #[cfg(not(tidehunter))]
172 pub fn open(
173 parent_path: &Path,
174 db_options_override: Option<AuthorityPerpetualTablesOptions>,
175 _pruner_watermark: Option<Arc<AtomicU64>>,
176 ) -> Self {
177 let db_options_override = db_options_override.unwrap_or_default();
178 let db_options = db_options_override
179 .apply_to(default_db_options().optimize_db_for_write_throughput(4, false));
180 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
181 (
182 "objects".to_string(),
183 objects_table_config(db_options.clone()),
184 ),
185 (
186 "owned_object_transaction_locks".to_string(),
187 owned_object_transaction_locks_table_config(db_options.clone()),
188 ),
189 (
190 "transactions".to_string(),
191 transactions_table_config(db_options.clone()),
192 ),
193 (
194 "effects".to_string(),
195 effects_table_config(db_options.clone()),
196 ),
197 ]));
198
199 Self::open_tables_read_write(
200 Self::path(parent_path),
201 MetricConf::new("perpetual")
202 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
203 Some(db_options.options),
204 Some(table_options),
205 )
206 }
207
208 #[cfg(tidehunter)]
209 pub fn open(
210 parent_path: &Path,
211 db_options_override: Option<AuthorityPerpetualTablesOptions>,
212 pruner_watermark: Option<Arc<AtomicU64>>,
213 ) -> Self {
214 use crate::authority::authority_store_pruner::apply_relocation_filter;
215 tracing::warn!("AuthorityPerpetualTables using tidehunter");
216 use typed_store::tidehunter_util::{
217 Bytes, Decision, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
218 default_cells_per_mutex, default_max_dirty_keys, default_mutex_count,
219 default_value_cache_size,
220 };
221 let mutexes = default_mutex_count() * 2;
222 let transaction_mutexes = mutexes * 4;
223 let value_cache_size = default_value_cache_size();
224 let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
226
227 let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
228 let objects_compactor = |iter: &mut dyn DoubleEndedIterator<Item = &Bytes>| {
229 let mut retain = HashSet::new();
230 let mut previous: Option<&[u8]> = None;
231 const OID_SIZE: usize = 32;
232 for key in iter.rev() {
233 if let Some(prev) = previous {
234 if prev == &key[..OID_SIZE] {
235 continue;
236 }
237 }
238 previous = Some(&key[..OID_SIZE]);
239 retain.insert(key.clone());
240 }
241 retain
242 };
243 let mut digest_prefix = vec![0; 8];
244 digest_prefix[7] = 32;
245 let uniform_key = KeyType::uniform(default_cells_per_mutex());
246 let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
247 let epoch_tx_digest_prefix_key =
249 KeyType::from_prefix_bits((8+ 8) * 8 + 12);
250 let object_indexing = KeyIndexing::fixed(32 + 8); let obj_ref_size = 32 + 8 + 32 + 8;
253 let owned_object_transaction_locks_indexing =
254 KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
255
256 let mut objects_config = KeySpaceConfig::new()
257 .with_max_dirty_keys(16 * default_max_dirty_keys())
258 .with_value_cache_size(value_cache_size);
259 if matches!(db_options_override, Some(options) if options.enable_objects_compactor) {
260 objects_config = objects_config.with_compactor(Box::new(objects_compactor));
261 }
262
263 let configs = vec![
264 (
265 "objects".to_string(),
266 ThConfig::new_with_config_indexing(
267 object_indexing,
268 mutexes * 4,
269 KeyType::uniform(1),
270 objects_config,
271 ),
272 ),
273 (
274 "owned_object_transaction_locks".to_string(),
275 ThConfig::new_with_config_indexing(
276 owned_object_transaction_locks_indexing,
277 mutexes * 16,
278 KeyType::uniform(default_cells_per_mutex()),
279 bloom_config
280 .clone()
281 .with_max_dirty_keys(16 * default_max_dirty_keys()),
282 ),
283 ),
284 (
285 "transactions".to_string(),
286 ThConfig::new_with_rm_prefix_indexing(
287 KeyIndexing::key_reduction(32, 0..16),
288 transaction_mutexes,
289 uniform_key,
290 KeySpaceConfig::new()
291 .with_value_cache_size(value_cache_size)
292 .with_relocation_filter(|_, _| Decision::Remove),
293 digest_prefix.clone(),
294 ),
295 ),
296 (
297 "effects".to_string(),
298 ThConfig::new_with_rm_prefix_indexing(
299 KeyIndexing::key_reduction(32, 0..16),
300 transaction_mutexes,
301 uniform_key,
302 apply_relocation_filter(
303 bloom_config.clone().with_value_cache_size(value_cache_size),
304 pruner_watermark.clone(),
305 |effects: TransactionEffects| effects.executed_epoch(),
306 false,
307 ),
308 digest_prefix.clone(),
309 ),
310 ),
311 (
312 "executed_effects".to_string(),
313 ThConfig::new_with_rm_prefix_indexing(
314 KeyIndexing::key_reduction(32, 0..16),
315 transaction_mutexes,
316 uniform_key,
317 bloom_config
318 .clone()
319 .with_value_cache_size(value_cache_size)
320 .with_relocation_filter(|_, _| Decision::Remove),
321 digest_prefix.clone(),
322 ),
323 ),
324 (
325 "events".to_string(),
326 ThConfig::new_with_rm_prefix(
327 32 + 8,
328 mutexes,
329 uniform_key,
330 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
331 digest_prefix.clone(),
332 ),
333 ),
334 (
335 "events_2".to_string(),
336 ThConfig::new_with_rm_prefix(
337 32,
338 mutexes,
339 uniform_key,
340 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
341 digest_prefix.clone(),
342 ),
343 ),
344 (
345 "unchanged_loaded_runtime_objects".to_string(),
346 ThConfig::new_with_rm_prefix(
347 32,
348 mutexes,
349 uniform_key,
350 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
351 digest_prefix.clone(),
352 ),
353 ),
354 (
355 "executed_transactions_to_checkpoint".to_string(),
356 ThConfig::new_with_rm_prefix(
357 32,
358 mutexes,
359 uniform_key,
360 apply_relocation_filter(
361 KeySpaceConfig::default(),
362 pruner_watermark.clone(),
363 |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
364 false,
365 ),
366 digest_prefix.clone(),
367 ),
368 ),
369 (
370 "root_state_hash_by_epoch".to_string(),
371 ThConfig::new(8, 1, KeyType::uniform(1)),
372 ),
373 (
374 "epoch_start_configuration".to_string(),
375 ThConfig::new(0, 1, KeyType::uniform(1)),
376 ),
377 (
378 "pruned_checkpoint".to_string(),
379 ThConfig::new(0, 1, KeyType::uniform(1)),
380 ),
381 (
382 "expected_network_sui_amount".to_string(),
383 ThConfig::new(0, 1, KeyType::uniform(1)),
384 ),
385 (
386 "expected_storage_fund_imbalance".to_string(),
387 ThConfig::new(0, 1, KeyType::uniform(1)),
388 ),
389 (
390 "object_per_epoch_marker_table".to_string(),
391 ThConfig::new_with_config_indexing(
392 KeyIndexing::VariableLength,
393 mutexes,
394 epoch_prefix_key,
395 apply_relocation_filter(
396 KeySpaceConfig::default(),
397 pruner_watermark.clone(),
398 |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
399 true,
400 ),
401 ),
402 ),
403 (
404 "object_per_epoch_marker_table_v2".to_string(),
405 ThConfig::new_with_config_indexing(
406 KeyIndexing::fixed(EPOCH_MARKER_KEY_SIZE),
407 mutexes,
408 epoch_prefix_key,
409 apply_relocation_filter(
410 bloom_config.clone(),
411 pruner_watermark.clone(),
412 |k: EpochMarkerKey| k.0,
413 true,
414 ),
415 ),
416 ),
417 (
418 "executed_transaction_digests".to_string(),
419 ThConfig::new_with_config_indexing(
420 KeyIndexing::fixed(8 + (32 + 8)),
422 transaction_mutexes,
423 epoch_tx_digest_prefix_key,
424 apply_relocation_filter(
425 bloom_config.clone(),
426 pruner_watermark.clone(),
427 |(epoch_id, _): (EpochId, TransactionDigest)| epoch_id,
428 true,
429 ),
430 ),
431 ),
432 (
433 "highest_committed_checkpoint".to_string(),
434 ThConfig::new(0, 1, KeyType::uniform(1)),
435 ),
436 ];
437 Self::open_tables_read_write(
438 Self::path(parent_path),
439 MetricConf::new("perpetual")
440 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
441 configs.into_iter().collect(),
442 )
443 }
444
445 #[cfg(not(tidehunter))]
446 pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
447 Self::get_read_only_handle(
448 Self::path(parent_path),
449 None,
450 None,
451 MetricConf::new("perpetual_readonly"),
452 )
453 }
454
455 #[cfg(tidehunter)]
456 pub fn open_readonly(parent_path: &Path) -> Self {
457 Self::open(parent_path, None, None)
458 }
459
460 #[cfg(tidehunter)]
461 pub fn force_rebuild_control_region(&self) -> anyhow::Result<()> {
462 self.objects.db.force_rebuild_control_region()
463 }
464
465 #[cfg(tidehunter)]
470 pub fn wait_for_tidehunter_background_threads(self: Arc<Self>) {
471 let strong = Arc::strong_count(&self);
472 if strong != 1 {
473 println!(
474 "WARNING: wait_for_tidehunter_background_threads called with Arc<AuthorityPerpetualTables> strong_count={} (expected 1); other clones will keep DBMap.db Arc<Database> alive past drop(self) and the inner Database wait will warn/timeout",
475 strong,
476 );
477 }
478 let db = self.objects.db.clone();
479 drop(self);
480 db.wait_for_tidehunter_background_threads();
481 }
482
483 pub fn find_object_lt_or_eq_version(
487 &self,
488 object_id: ObjectID,
489 version: SequenceNumber,
490 ) -> SuiResult<Option<Object>> {
491 let mut iter = self.objects.reversed_safe_iter_with_bounds(
492 Some(ObjectKey::min_for_id(&object_id)),
493 Some(ObjectKey(object_id, version)),
494 )?;
495 match iter.next() {
496 Some(Ok((key, o))) => self.object(&key, o),
497 Some(Err(e)) => Err(e.into()),
498 None => Ok(None),
499 }
500 }
501
502 fn construct_object(
503 &self,
504 object_key: &ObjectKey,
505 store_object: StoreObjectValue,
506 ) -> Result<Object, SuiError> {
507 try_construct_object(object_key, store_object)
508 }
509
510 pub fn object(
513 &self,
514 object_key: &ObjectKey,
515 store_object: StoreObjectWrapper,
516 ) -> Result<Option<Object>, SuiError> {
517 let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
518 return Ok(None);
519 };
520 Ok(Some(self.construct_object(object_key, *store_object)?))
521 }
522
523 pub fn object_reference(
524 &self,
525 object_key: &ObjectKey,
526 store_object: StoreObjectWrapper,
527 ) -> Result<ObjectRef, SuiError> {
528 let obj_ref = match store_object.migrate().into_inner() {
529 StoreObject::Value(object) => self
530 .construct_object(object_key, *object)?
531 .compute_object_reference(),
532 StoreObject::Deleted => (
533 object_key.0,
534 object_key.1,
535 ObjectDigest::OBJECT_DIGEST_DELETED,
536 ),
537 StoreObject::Wrapped => (
538 object_key.0,
539 object_key.1,
540 ObjectDigest::OBJECT_DIGEST_WRAPPED,
541 ),
542 };
543 Ok(obj_ref)
544 }
545
546 pub fn tombstone_reference(
547 &self,
548 object_key: &ObjectKey,
549 store_object: &StoreObjectWrapper,
550 ) -> Result<Option<ObjectRef>, SuiError> {
551 let obj_ref = match store_object.inner() {
552 StoreObject::Deleted => Some((
553 object_key.0,
554 object_key.1,
555 ObjectDigest::OBJECT_DIGEST_DELETED,
556 )),
557 StoreObject::Wrapped => Some((
558 object_key.0,
559 object_key.1,
560 ObjectDigest::OBJECT_DIGEST_WRAPPED,
561 )),
562 _ => None,
563 };
564 Ok(obj_ref)
565 }
566
567 pub fn get_latest_object_ref_or_tombstone(
568 &self,
569 object_id: ObjectID,
570 ) -> Result<Option<ObjectRef>, SuiError> {
571 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
572 Some(ObjectKey::min_for_id(&object_id)),
573 Some(ObjectKey::max_for_id(&object_id)),
574 )?;
575
576 if let Some(Ok((object_key, value))) = iterator.next()
577 && object_key.0 == object_id
578 {
579 return Ok(Some(self.object_reference(&object_key, value)?));
580 }
581 Ok(None)
582 }
583
584 pub fn get_latest_object_or_tombstone(
585 &self,
586 object_id: ObjectID,
587 ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
588 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
589 Some(ObjectKey::min_for_id(&object_id)),
590 Some(ObjectKey::max_for_id(&object_id)),
591 )?;
592
593 if let Some(Ok((object_key, value))) = iterator.next()
594 && object_key.0 == object_id
595 {
596 return Ok(Some((object_key, value)));
597 }
598 Ok(None)
599 }
600
601 pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
602 Ok(self
603 .epoch_start_configuration
604 .get(&())?
605 .expect("Must have current epoch.")
606 .epoch_start_state()
607 .epoch())
608 }
609
610 pub fn set_epoch_start_configuration(
611 &self,
612 epoch_start_configuration: &EpochStartConfiguration,
613 ) -> SuiResult {
614 let mut wb = self.epoch_start_configuration.batch();
615 wb.insert_batch(
616 &self.epoch_start_configuration,
617 std::iter::once(((), epoch_start_configuration)),
618 )?;
619 wb.write()?;
620 Ok(())
621 }
622
623 pub fn get_highest_pruned_checkpoint(
624 &self,
625 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
626 self.pruned_checkpoint.get(&())
627 }
628
629 pub fn set_highest_pruned_checkpoint(
630 &self,
631 wb: &mut DBBatch,
632 checkpoint_number: CheckpointSequenceNumber,
633 ) -> SuiResult {
634 wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
635 Ok(())
636 }
637
638 pub fn get_highest_committed_checkpoint(
639 &self,
640 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
641 self.highest_committed_checkpoint.get(&())
642 }
643
644 pub fn set_highest_committed_checkpoint(
648 &self,
649 wb: &mut DBBatch,
650 checkpoint_number: CheckpointSequenceNumber,
651 ) -> SuiResult {
652 wb.insert_batch(
653 &self.highest_committed_checkpoint,
654 [((), checkpoint_number)],
655 )?;
656 Ok(())
657 }
658
659 pub fn get_transaction(
660 &self,
661 digest: &TransactionDigest,
662 ) -> SuiResult<Option<TrustedTransaction>> {
663 let Some(transaction) = self.transactions.get(digest)? else {
664 return Ok(None);
665 };
666 Ok(Some(transaction))
667 }
668
669 pub fn list_transactions_from(
670 &self,
671 start: Option<TransactionDigest>,
672 limit: usize,
673 ) -> Result<Vec<TransactionDigest>, typed_store::TypedStoreError> {
674 let iter = self.transactions.safe_iter_with_bounds(start, None);
675 let mut result = Vec::with_capacity(limit);
676 for item in iter.take(limit) {
677 let (digest, _) = item?;
678 result.push(digest);
679 }
680 Ok(result)
681 }
682
683 pub fn get_executed_effects_digest(
684 &self,
685 tx_digest: &TransactionDigest,
686 ) -> Result<Option<TransactionEffectsDigest>, typed_store::TypedStoreError> {
687 self.executed_effects.get(tx_digest)
688 }
689
690 pub fn get_effects_by_digest(
691 &self,
692 effects_digest: &TransactionEffectsDigest,
693 ) -> Result<Option<TransactionEffects>, typed_store::TypedStoreError> {
694 self.effects.get(effects_digest)
695 }
696
697 pub fn insert_executed_transaction_digests_batch(
700 &self,
701 epoch: EpochId,
702 digests: impl Iterator<Item = TransactionDigest>,
703 ) -> SuiResult {
704 let mut batch = self.executed_transaction_digests.batch();
705 batch.insert_batch(
706 &self.executed_transaction_digests,
707 digests.map(|digest| ((epoch, digest), ())),
708 )?;
709 batch.write()?;
710 Ok(())
711 }
712
713 pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
714 let Some(effect_digest) = self.executed_effects.get(digest)? else {
715 return Ok(None);
716 };
717 Ok(self.effects.get(&effect_digest)?)
718 }
719
720 pub(crate) fn was_transaction_executed_in_last_epoch(
721 &self,
722 digest: &TransactionDigest,
723 current_epoch: EpochId,
724 ) -> bool {
725 if current_epoch == 0 {
726 return false;
727 }
728 self.executed_transaction_digests
729 .contains_key(&(current_epoch - 1, *digest))
730 .expect("db error")
731 }
732
733 pub fn get_checkpoint_sequence_number(
736 &self,
737 digest: &TransactionDigest,
738 ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
739 Ok(self.executed_transactions_to_checkpoint.get(digest)?)
740 }
741
742 pub fn set_highest_pruned_checkpoint_without_wb(
743 &self,
744 checkpoint_number: CheckpointSequenceNumber,
745 ) -> SuiResult {
746 let mut wb = self.pruned_checkpoint.batch();
747 self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
748 wb.write()?;
749 Ok(())
750 }
751
752 pub fn database_is_empty(&self) -> SuiResult<bool> {
753 Ok(self.objects.safe_iter().next().is_none())
754 }
755
756 pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
757 LiveSetIter {
758 iter: Box::new(self.objects.safe_iter()),
759 tables: self,
760 prev: None,
761 include_wrapped_object,
762 }
763 }
764
765 pub fn range_iter_live_object_set(
766 &self,
767 lower_bound: Option<ObjectID>,
768 upper_bound: Option<ObjectID>,
769 include_wrapped_object: bool,
770 ) -> LiveSetIter<'_> {
771 let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
772 let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
773
774 LiveSetIter {
775 iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
776 tables: self,
777 prev: None,
778 include_wrapped_object,
779 }
780 }
781
782 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
783 self.objects.checkpoint_db(path).map_err(Into::into)
785 }
786
787 pub fn insert_root_state_hash(
788 &self,
789 epoch: EpochId,
790 last_checkpoint_of_epoch: CheckpointSequenceNumber,
791 hash: GlobalStateHash,
792 ) -> SuiResult {
793 self.root_state_hash_by_epoch
794 .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
795 Ok(())
796 }
797
798 pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
799 let object_reference = object.compute_object_reference();
800 let wrapper = get_store_object(object);
801 let mut wb = self.objects.batch();
802 wb.insert_batch(
803 &self.objects,
804 std::iter::once((ObjectKey::from(object_reference), wrapper)),
805 )?;
806 wb.write()?;
807 Ok(())
808 }
809
810 pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
812 let obj_entry = self
813 .objects
814 .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
815 .next();
816
817 match obj_entry.transpose()? {
818 Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
819 Ok(self.object(&ObjectKey(obj_id, version), obj)?)
820 }
821 _ => Ok(None),
822 }
823 }
824
825 pub fn get_object_by_key_fallible(
826 &self,
827 object_id: &ObjectID,
828 version: VersionNumber,
829 ) -> SuiResult<Option<Object>> {
830 Ok(self
831 .objects
832 .get(&ObjectKey(*object_id, version))?
833 .and_then(|object| {
834 self.object(&ObjectKey(*object_id, version), object)
835 .expect("object construction error")
836 }))
837 }
838}
839
840impl ObjectStore for AuthorityPerpetualTables {
841 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
843 self.get_object_fallible(object_id).expect("db error")
844 }
845
846 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
847 self.get_object_by_key_fallible(object_id, version)
848 .expect("db error")
849 }
850}
851
852pub struct LiveSetIter<'a> {
853 iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
854 tables: &'a AuthorityPerpetualTables,
855 prev: Option<(ObjectKey, StoreObjectWrapper)>,
856 include_wrapped_object: bool,
858}
859
860#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
861pub enum LiveObject {
862 Normal(Object),
863 Wrapped(ObjectKey),
864}
865
866impl LiveObject {
867 pub fn object_id(&self) -> ObjectID {
868 match self {
869 LiveObject::Normal(obj) => obj.id(),
870 LiveObject::Wrapped(key) => key.0,
871 }
872 }
873
874 pub fn version(&self) -> SequenceNumber {
875 match self {
876 LiveObject::Normal(obj) => obj.version(),
877 LiveObject::Wrapped(key) => key.1,
878 }
879 }
880
881 pub fn object_reference(&self) -> ObjectRef {
882 match self {
883 LiveObject::Normal(obj) => obj.compute_object_reference(),
884 LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
885 }
886 }
887}
888
889impl LiveSetIter<'_> {
890 fn store_object_wrapper_to_live_object(
891 &self,
892 object_key: ObjectKey,
893 store_object: StoreObjectWrapper,
894 ) -> Option<LiveObject> {
895 match store_object.migrate().into_inner() {
896 StoreObject::Value(object) => {
897 let object = self
898 .tables
899 .construct_object(&object_key, *object)
900 .expect("Constructing object from store cannot fail");
901 Some(LiveObject::Normal(object))
902 }
903 StoreObject::Wrapped => {
904 if self.include_wrapped_object {
905 Some(LiveObject::Wrapped(object_key))
906 } else {
907 None
908 }
909 }
910 StoreObject::Deleted => None,
911 }
912 }
913}
914
915impl Iterator for LiveSetIter<'_> {
916 type Item = LiveObject;
917
918 fn next(&mut self) -> Option<Self::Item> {
919 loop {
920 if let Some(Ok((next_key, next_value))) = self.iter.next() {
921 let prev = self.prev.take();
922 self.prev = Some((next_key, next_value));
923
924 if let Some((prev_key, prev_value)) = prev
925 && prev_key.0 != next_key.0
926 {
927 let live_object =
928 self.store_object_wrapper_to_live_object(prev_key, prev_value);
929 if live_object.is_some() {
930 return live_object;
931 }
932 }
933 continue;
934 }
935 if let Some((key, value)) = self.prev.take() {
936 let live_object = self.store_object_wrapper_to_live_object(key, value);
937 if live_object.is_some() {
938 return live_object;
939 }
940 }
941 return None;
942 }
943 }
944}
945
946fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
948 DBOptions {
949 options: db_options
950 .clone()
951 .optimize_for_write_throughput()
952 .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
953 .options,
954 rw_options: db_options.rw_options.set_ignore_range_deletions(false),
955 }
956}
957
958fn objects_table_config(db_options: DBOptions) -> DBOptions {
959 db_options
960 .optimize_for_write_throughput()
961 .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
962}
963
964fn transactions_table_config(db_options: DBOptions) -> DBOptions {
965 db_options
966 .optimize_for_write_throughput()
967 .optimize_for_point_lookup(
968 read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
969 )
970}
971
972fn effects_table_config(db_options: DBOptions) -> DBOptions {
973 db_options
974 .optimize_for_write_throughput()
975 .optimize_for_point_lookup(
976 read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
977 )
978}