1use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6#[cfg(tidehunter)]
7use crate::authority::epoch_marker_key::EPOCH_MARKER_KEY_SIZE;
8use crate::authority::epoch_marker_key::EpochMarkerKey;
9use serde::{Deserialize, Serialize};
10use std::path::Path;
11use std::sync::atomic::AtomicU64;
12use sui_types::base_types::SequenceNumber;
13use sui_types::effects::{TransactionEffects, TransactionEvents};
14use sui_types::global_state_hash::GlobalStateHash;
15use sui_types::storage::MarkerValue;
16use typed_store::metrics::SamplingInterval;
17use typed_store::rocks::{
18 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
19 read_size_from_env,
20};
21use typed_store::traits::Map;
22
23use crate::authority::authority_store_types::{
24 StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
25};
26use crate::authority::epoch_start_configuration::EpochStartConfiguration;
27use typed_store::{DBMapUtils, DbIterator};
28
29const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
30pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
31const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
32const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
33
34#[derive(Default)]
36pub struct AuthorityPerpetualTablesOptions {
37 pub enable_write_stall: bool,
39 pub enable_objects_compactor: bool,
43}
44
45impl AuthorityPerpetualTablesOptions {
46 fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
47 if !self.enable_write_stall {
48 db_options = db_options.disable_write_throttling();
49 }
50 db_options
51 }
52}
53
54#[derive(DBMapUtils)]
56#[cfg_attr(tidehunter, tidehunter)]
57pub struct AuthorityPerpetualTables {
58 pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
71
72 #[rename = "owned_object_transaction_locks"]
77 pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
78
79 pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
83
84 pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
93
94 pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
99
100 pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
102
103 pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
105
106 pub(crate) executed_transactions_to_checkpoint:
110 DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
111
112 pub(crate) root_state_hash_by_epoch:
116 DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
117
118 pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
120
121 pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
123
124 pub(crate) highest_committed_checkpoint: DBMap<(), CheckpointSequenceNumber>,
137
138 pub(crate) expected_network_sui_amount: DBMap<(), u64>,
143
144 pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
148
149 pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
155 pub(crate) object_per_epoch_marker_table_v2: DBMap<EpochMarkerKey, MarkerValue>,
156
157 pub(crate) executed_transaction_digests: DBMap<(EpochId, TransactionDigest), ()>,
161}
162
163impl AuthorityPerpetualTables {
164 pub fn path(parent_path: &Path) -> PathBuf {
165 parent_path.join("perpetual")
166 }
167
168 #[cfg(not(tidehunter))]
169 pub fn open(
170 parent_path: &Path,
171 db_options_override: Option<AuthorityPerpetualTablesOptions>,
172 _pruner_watermark: Option<Arc<AtomicU64>>,
173 ) -> Self {
174 let db_options_override = db_options_override.unwrap_or_default();
175 let db_options = db_options_override
176 .apply_to(default_db_options().optimize_db_for_write_throughput(4, false));
177 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
178 (
179 "objects".to_string(),
180 objects_table_config(db_options.clone()),
181 ),
182 (
183 "owned_object_transaction_locks".to_string(),
184 owned_object_transaction_locks_table_config(db_options.clone()),
185 ),
186 (
187 "transactions".to_string(),
188 transactions_table_config(db_options.clone()),
189 ),
190 (
191 "effects".to_string(),
192 effects_table_config(db_options.clone()),
193 ),
194 ]));
195
196 Self::open_tables_read_write(
197 Self::path(parent_path),
198 MetricConf::new("perpetual")
199 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
200 Some(db_options.options),
201 Some(table_options),
202 )
203 }
204
205 #[cfg(tidehunter)]
206 pub fn open(
207 parent_path: &Path,
208 db_options_override: Option<AuthorityPerpetualTablesOptions>,
209 pruner_watermark: Option<Arc<AtomicU64>>,
210 ) -> Self {
211 use crate::authority::authority_store_pruner::apply_relocation_filter;
212 tracing::warn!("AuthorityPerpetualTables using tidehunter");
213 use typed_store::tidehunter_util::{
214 Bytes, Decision, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
215 default_cells_per_mutex, default_max_dirty_keys, default_mutex_count,
216 default_value_cache_size,
217 };
218 let mutexes = default_mutex_count() * 2;
219 let transaction_mutexes = mutexes * 4;
220 let value_cache_size = default_value_cache_size();
221 let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
223
224 let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
225 let objects_compactor = |iter: &mut dyn DoubleEndedIterator<Item = &Bytes>| {
226 let mut retain = HashSet::new();
227 let mut previous: Option<&[u8]> = None;
228 const OID_SIZE: usize = 32;
229 for key in iter.rev() {
230 if let Some(prev) = previous {
231 if prev == &key[..OID_SIZE] {
232 continue;
233 }
234 }
235 previous = Some(&key[..OID_SIZE]);
236 retain.insert(key.clone());
237 }
238 retain
239 };
240 let mut digest_prefix = vec![0; 8];
241 digest_prefix[7] = 32;
242 let uniform_key = KeyType::uniform(default_cells_per_mutex());
243 let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
244 let epoch_tx_digest_prefix_key =
246 KeyType::from_prefix_bits((8+ 8) * 8 + 12);
247 let object_indexing = KeyIndexing::fixed(32 + 8); let obj_ref_size = 32 + 8 + 32 + 8;
250 let owned_object_transaction_locks_indexing =
251 KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
252
253 let mut objects_config = KeySpaceConfig::new()
254 .with_max_dirty_keys(16 * default_max_dirty_keys())
255 .with_value_cache_size(value_cache_size);
256 if matches!(db_options_override, Some(options) if options.enable_objects_compactor) {
257 objects_config = objects_config.with_compactor(Box::new(objects_compactor));
258 }
259
260 let configs = vec![
261 (
262 "objects".to_string(),
263 ThConfig::new_with_config_indexing(
264 object_indexing,
265 mutexes * 4,
266 KeyType::uniform(1),
267 objects_config,
268 ),
269 ),
270 (
271 "owned_object_transaction_locks".to_string(),
272 ThConfig::new_with_config_indexing(
273 owned_object_transaction_locks_indexing,
274 mutexes * 16,
275 KeyType::uniform(default_cells_per_mutex()),
276 bloom_config
277 .clone()
278 .with_max_dirty_keys(16 * default_max_dirty_keys()),
279 ),
280 ),
281 (
282 "transactions".to_string(),
283 ThConfig::new_with_rm_prefix_indexing(
284 KeyIndexing::key_reduction(32, 0..16),
285 transaction_mutexes,
286 uniform_key,
287 KeySpaceConfig::new()
288 .with_value_cache_size(value_cache_size)
289 .with_relocation_filter(|_, _| Decision::Remove),
290 digest_prefix.clone(),
291 ),
292 ),
293 (
294 "effects".to_string(),
295 ThConfig::new_with_rm_prefix_indexing(
296 KeyIndexing::key_reduction(32, 0..16),
297 transaction_mutexes,
298 uniform_key,
299 apply_relocation_filter(
300 bloom_config.clone().with_value_cache_size(value_cache_size),
301 pruner_watermark.clone(),
302 |effects: TransactionEffects| effects.executed_epoch(),
303 false,
304 ),
305 digest_prefix.clone(),
306 ),
307 ),
308 (
309 "executed_effects".to_string(),
310 ThConfig::new_with_rm_prefix_indexing(
311 KeyIndexing::key_reduction(32, 0..16),
312 transaction_mutexes,
313 uniform_key,
314 bloom_config
315 .clone()
316 .with_value_cache_size(value_cache_size)
317 .with_relocation_filter(|_, _| Decision::Remove),
318 digest_prefix.clone(),
319 ),
320 ),
321 (
322 "events".to_string(),
323 ThConfig::new_with_rm_prefix(
324 32 + 8,
325 mutexes,
326 uniform_key,
327 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
328 digest_prefix.clone(),
329 ),
330 ),
331 (
332 "events_2".to_string(),
333 ThConfig::new_with_rm_prefix(
334 32,
335 mutexes,
336 uniform_key,
337 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
338 digest_prefix.clone(),
339 ),
340 ),
341 (
342 "unchanged_loaded_runtime_objects".to_string(),
343 ThConfig::new_with_rm_prefix(
344 32,
345 mutexes,
346 uniform_key,
347 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
348 digest_prefix.clone(),
349 ),
350 ),
351 (
352 "executed_transactions_to_checkpoint".to_string(),
353 ThConfig::new_with_rm_prefix(
354 32,
355 mutexes,
356 uniform_key,
357 apply_relocation_filter(
358 KeySpaceConfig::default(),
359 pruner_watermark.clone(),
360 |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
361 false,
362 ),
363 digest_prefix.clone(),
364 ),
365 ),
366 (
367 "root_state_hash_by_epoch".to_string(),
368 ThConfig::new(8, 1, KeyType::uniform(1)),
369 ),
370 (
371 "epoch_start_configuration".to_string(),
372 ThConfig::new(0, 1, KeyType::uniform(1)),
373 ),
374 (
375 "pruned_checkpoint".to_string(),
376 ThConfig::new(0, 1, KeyType::uniform(1)),
377 ),
378 (
379 "highest_committed_checkpoint".to_string(),
380 ThConfig::new(0, 1, KeyType::uniform(1)),
381 ),
382 (
383 "expected_network_sui_amount".to_string(),
384 ThConfig::new(0, 1, KeyType::uniform(1)),
385 ),
386 (
387 "expected_storage_fund_imbalance".to_string(),
388 ThConfig::new(0, 1, KeyType::uniform(1)),
389 ),
390 (
391 "object_per_epoch_marker_table".to_string(),
392 ThConfig::new_with_config_indexing(
393 KeyIndexing::VariableLength,
394 mutexes,
395 epoch_prefix_key,
396 apply_relocation_filter(
397 KeySpaceConfig::default(),
398 pruner_watermark.clone(),
399 |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
400 true,
401 ),
402 ),
403 ),
404 (
405 "object_per_epoch_marker_table_v2".to_string(),
406 ThConfig::new_with_config_indexing(
407 KeyIndexing::fixed(EPOCH_MARKER_KEY_SIZE),
408 mutexes,
409 epoch_prefix_key,
410 apply_relocation_filter(
411 bloom_config.clone(),
412 pruner_watermark.clone(),
413 |k: EpochMarkerKey| k.0,
414 true,
415 ),
416 ),
417 ),
418 (
419 "executed_transaction_digests".to_string(),
420 ThConfig::new_with_config_indexing(
421 KeyIndexing::fixed(8 + (32 + 8)),
423 transaction_mutexes,
424 epoch_tx_digest_prefix_key,
425 apply_relocation_filter(
426 bloom_config.clone(),
427 pruner_watermark.clone(),
428 |(epoch_id, _): (EpochId, TransactionDigest)| epoch_id,
429 true,
430 ),
431 ),
432 ),
433 ];
434 Self::open_tables_read_write(
435 Self::path(parent_path),
436 MetricConf::new("perpetual")
437 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
438 configs.into_iter().collect(),
439 )
440 }
441
442 #[cfg(not(tidehunter))]
443 pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
444 Self::get_read_only_handle(
445 Self::path(parent_path),
446 None,
447 None,
448 MetricConf::new("perpetual_readonly"),
449 )
450 }
451
452 #[cfg(tidehunter)]
453 pub fn open_readonly(parent_path: &Path) -> Self {
454 Self::open(parent_path, None, None)
455 }
456
457 #[cfg(tidehunter)]
458 pub fn force_rebuild_control_region(&self) -> anyhow::Result<()> {
459 self.objects.db.force_rebuild_control_region()
460 }
461
462 #[cfg(tidehunter)]
467 pub fn wait_for_tidehunter_background_threads(self: Arc<Self>) {
468 let strong = Arc::strong_count(&self);
469 if strong != 1 {
470 println!(
471 "WARNING: wait_for_tidehunter_background_threads called with Arc<AuthorityPerpetualTables> strong_count={} (expected 1); other clones will keep DBMap.db Arc<Database> alive past drop(self) and the inner Database wait will warn/timeout",
472 strong,
473 );
474 }
475 let db = self.objects.db.clone();
476 drop(self);
477 db.wait_for_tidehunter_background_threads();
478 }
479
480 pub fn find_object_lt_or_eq_version(
484 &self,
485 object_id: ObjectID,
486 version: SequenceNumber,
487 ) -> SuiResult<Option<Object>> {
488 let mut iter = self.objects.reversed_safe_iter_with_bounds(
489 Some(ObjectKey::min_for_id(&object_id)),
490 Some(ObjectKey(object_id, version)),
491 )?;
492 match iter.next() {
493 Some(Ok((key, o))) => self.object(&key, o),
494 Some(Err(e)) => Err(e.into()),
495 None => Ok(None),
496 }
497 }
498
499 fn construct_object(
500 &self,
501 object_key: &ObjectKey,
502 store_object: StoreObjectValue,
503 ) -> Result<Object, SuiError> {
504 try_construct_object(object_key, store_object)
505 }
506
507 pub fn object(
510 &self,
511 object_key: &ObjectKey,
512 store_object: StoreObjectWrapper,
513 ) -> Result<Option<Object>, SuiError> {
514 let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
515 return Ok(None);
516 };
517 Ok(Some(self.construct_object(object_key, *store_object)?))
518 }
519
520 pub fn object_reference(
521 &self,
522 object_key: &ObjectKey,
523 store_object: StoreObjectWrapper,
524 ) -> Result<ObjectRef, SuiError> {
525 let obj_ref = match store_object.migrate().into_inner() {
526 StoreObject::Value(object) => self
527 .construct_object(object_key, *object)?
528 .compute_object_reference(),
529 StoreObject::Deleted => (
530 object_key.0,
531 object_key.1,
532 ObjectDigest::OBJECT_DIGEST_DELETED,
533 ),
534 StoreObject::Wrapped => (
535 object_key.0,
536 object_key.1,
537 ObjectDigest::OBJECT_DIGEST_WRAPPED,
538 ),
539 };
540 Ok(obj_ref)
541 }
542
543 pub fn tombstone_reference(
544 &self,
545 object_key: &ObjectKey,
546 store_object: &StoreObjectWrapper,
547 ) -> Result<Option<ObjectRef>, SuiError> {
548 let obj_ref = match store_object.inner() {
549 StoreObject::Deleted => Some((
550 object_key.0,
551 object_key.1,
552 ObjectDigest::OBJECT_DIGEST_DELETED,
553 )),
554 StoreObject::Wrapped => Some((
555 object_key.0,
556 object_key.1,
557 ObjectDigest::OBJECT_DIGEST_WRAPPED,
558 )),
559 _ => None,
560 };
561 Ok(obj_ref)
562 }
563
564 pub fn get_latest_object_ref_or_tombstone(
565 &self,
566 object_id: ObjectID,
567 ) -> Result<Option<ObjectRef>, SuiError> {
568 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
569 Some(ObjectKey::min_for_id(&object_id)),
570 Some(ObjectKey::max_for_id(&object_id)),
571 )?;
572
573 if let Some(Ok((object_key, value))) = iterator.next()
574 && object_key.0 == object_id
575 {
576 return Ok(Some(self.object_reference(&object_key, value)?));
577 }
578 Ok(None)
579 }
580
581 pub fn get_latest_object_or_tombstone(
582 &self,
583 object_id: ObjectID,
584 ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
585 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
586 Some(ObjectKey::min_for_id(&object_id)),
587 Some(ObjectKey::max_for_id(&object_id)),
588 )?;
589
590 if let Some(Ok((object_key, value))) = iterator.next()
591 && object_key.0 == object_id
592 {
593 return Ok(Some((object_key, value)));
594 }
595 Ok(None)
596 }
597
598 pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
599 Ok(self
600 .epoch_start_configuration
601 .get(&())?
602 .expect("Must have current epoch.")
603 .epoch_start_state()
604 .epoch())
605 }
606
607 pub fn set_epoch_start_configuration(
608 &self,
609 epoch_start_configuration: &EpochStartConfiguration,
610 ) -> SuiResult {
611 let mut wb = self.epoch_start_configuration.batch();
612 wb.insert_batch(
613 &self.epoch_start_configuration,
614 std::iter::once(((), epoch_start_configuration)),
615 )?;
616 wb.write()?;
617 Ok(())
618 }
619
620 pub fn get_highest_pruned_checkpoint(
621 &self,
622 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
623 self.pruned_checkpoint.get(&())
624 }
625
626 pub fn set_highest_pruned_checkpoint(
627 &self,
628 wb: &mut DBBatch,
629 checkpoint_number: CheckpointSequenceNumber,
630 ) -> SuiResult {
631 wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
632 Ok(())
633 }
634
635 pub fn get_highest_committed_checkpoint(
636 &self,
637 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
638 self.highest_committed_checkpoint.get(&())
639 }
640
641 pub fn set_highest_committed_checkpoint(
645 &self,
646 wb: &mut DBBatch,
647 checkpoint_number: CheckpointSequenceNumber,
648 ) -> SuiResult {
649 wb.insert_batch(
650 &self.highest_committed_checkpoint,
651 [((), checkpoint_number)],
652 )?;
653 Ok(())
654 }
655
656 pub fn get_transaction(
657 &self,
658 digest: &TransactionDigest,
659 ) -> SuiResult<Option<TrustedTransaction>> {
660 let Some(transaction) = self.transactions.get(digest)? else {
661 return Ok(None);
662 };
663 Ok(Some(transaction))
664 }
665
666 pub fn list_transactions_from(
667 &self,
668 start: Option<TransactionDigest>,
669 limit: usize,
670 ) -> Result<Vec<TransactionDigest>, typed_store::TypedStoreError> {
671 let iter = self.transactions.safe_iter_with_bounds(start, None);
672 let mut result = Vec::with_capacity(limit);
673 for item in iter.take(limit) {
674 let (digest, _) = item?;
675 result.push(digest);
676 }
677 Ok(result)
678 }
679
680 pub fn get_executed_effects_digest(
681 &self,
682 tx_digest: &TransactionDigest,
683 ) -> Result<Option<TransactionEffectsDigest>, typed_store::TypedStoreError> {
684 self.executed_effects.get(tx_digest)
685 }
686
687 pub fn get_effects_by_digest(
688 &self,
689 effects_digest: &TransactionEffectsDigest,
690 ) -> Result<Option<TransactionEffects>, typed_store::TypedStoreError> {
691 self.effects.get(effects_digest)
692 }
693
694 pub fn insert_executed_transaction_digests_batch(
697 &self,
698 epoch: EpochId,
699 digests: impl Iterator<Item = TransactionDigest>,
700 ) -> SuiResult {
701 let mut batch = self.executed_transaction_digests.batch();
702 batch.insert_batch(
703 &self.executed_transaction_digests,
704 digests.map(|digest| ((epoch, digest), ())),
705 )?;
706 batch.write()?;
707 Ok(())
708 }
709
710 pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
711 let Some(effect_digest) = self.executed_effects.get(digest)? else {
712 return Ok(None);
713 };
714 Ok(self.effects.get(&effect_digest)?)
715 }
716
717 pub(crate) fn was_transaction_executed_in_last_epoch(
718 &self,
719 digest: &TransactionDigest,
720 current_epoch: EpochId,
721 ) -> bool {
722 if current_epoch == 0 {
723 return false;
724 }
725 self.executed_transaction_digests
726 .contains_key(&(current_epoch - 1, *digest))
727 .expect("db error")
728 }
729
730 pub fn get_checkpoint_sequence_number(
733 &self,
734 digest: &TransactionDigest,
735 ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
736 Ok(self.executed_transactions_to_checkpoint.get(digest)?)
737 }
738
739 pub fn set_highest_pruned_checkpoint_without_wb(
740 &self,
741 checkpoint_number: CheckpointSequenceNumber,
742 ) -> SuiResult {
743 let mut wb = self.pruned_checkpoint.batch();
744 self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
745 wb.write()?;
746 Ok(())
747 }
748
749 pub fn database_is_empty(&self) -> SuiResult<bool> {
750 Ok(self.objects.safe_iter().next().is_none())
751 }
752
753 pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
754 LiveSetIter {
755 iter: Box::new(self.objects.safe_iter()),
756 tables: self,
757 prev: None,
758 include_wrapped_object,
759 }
760 }
761
762 pub fn range_iter_live_object_set(
763 &self,
764 lower_bound: Option<ObjectID>,
765 upper_bound: Option<ObjectID>,
766 include_wrapped_object: bool,
767 ) -> LiveSetIter<'_> {
768 let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
769 let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
770
771 LiveSetIter {
772 iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
773 tables: self,
774 prev: None,
775 include_wrapped_object,
776 }
777 }
778
779 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
780 self.objects.checkpoint_db(path).map_err(Into::into)
782 }
783
784 pub fn insert_root_state_hash(
785 &self,
786 epoch: EpochId,
787 last_checkpoint_of_epoch: CheckpointSequenceNumber,
788 hash: GlobalStateHash,
789 ) -> SuiResult {
790 self.root_state_hash_by_epoch
791 .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
792 Ok(())
793 }
794
795 pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
796 let object_reference = object.compute_object_reference();
797 let wrapper = get_store_object(object);
798 let mut wb = self.objects.batch();
799 wb.insert_batch(
800 &self.objects,
801 std::iter::once((ObjectKey::from(object_reference), wrapper)),
802 )?;
803 wb.write()?;
804 Ok(())
805 }
806
807 pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
809 let obj_entry = self
810 .objects
811 .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
812 .next();
813
814 match obj_entry.transpose()? {
815 Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
816 Ok(self.object(&ObjectKey(obj_id, version), obj)?)
817 }
818 _ => Ok(None),
819 }
820 }
821
822 pub fn get_object_by_key_fallible(
823 &self,
824 object_id: &ObjectID,
825 version: VersionNumber,
826 ) -> SuiResult<Option<Object>> {
827 Ok(self
828 .objects
829 .get(&ObjectKey(*object_id, version))?
830 .and_then(|object| {
831 self.object(&ObjectKey(*object_id, version), object)
832 .expect("object construction error")
833 }))
834 }
835}
836
837impl ObjectStore for AuthorityPerpetualTables {
838 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
840 self.get_object_fallible(object_id).expect("db error")
841 }
842
843 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
844 self.get_object_by_key_fallible(object_id, version)
845 .expect("db error")
846 }
847}
848
849pub struct LiveSetIter<'a> {
850 iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
851 tables: &'a AuthorityPerpetualTables,
852 prev: Option<(ObjectKey, StoreObjectWrapper)>,
853 include_wrapped_object: bool,
855}
856
857#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
858pub enum LiveObject {
859 Normal(Object),
860 Wrapped(ObjectKey),
861}
862
863impl LiveObject {
864 pub fn object_id(&self) -> ObjectID {
865 match self {
866 LiveObject::Normal(obj) => obj.id(),
867 LiveObject::Wrapped(key) => key.0,
868 }
869 }
870
871 pub fn version(&self) -> SequenceNumber {
872 match self {
873 LiveObject::Normal(obj) => obj.version(),
874 LiveObject::Wrapped(key) => key.1,
875 }
876 }
877
878 pub fn object_reference(&self) -> ObjectRef {
879 match self {
880 LiveObject::Normal(obj) => obj.compute_object_reference(),
881 LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
882 }
883 }
884}
885
886impl LiveSetIter<'_> {
887 fn store_object_wrapper_to_live_object(
888 &self,
889 object_key: ObjectKey,
890 store_object: StoreObjectWrapper,
891 ) -> Option<LiveObject> {
892 match store_object.migrate().into_inner() {
893 StoreObject::Value(object) => {
894 let object = self
895 .tables
896 .construct_object(&object_key, *object)
897 .expect("Constructing object from store cannot fail");
898 Some(LiveObject::Normal(object))
899 }
900 StoreObject::Wrapped => {
901 if self.include_wrapped_object {
902 Some(LiveObject::Wrapped(object_key))
903 } else {
904 None
905 }
906 }
907 StoreObject::Deleted => None,
908 }
909 }
910}
911
912impl Iterator for LiveSetIter<'_> {
913 type Item = LiveObject;
914
915 fn next(&mut self) -> Option<Self::Item> {
916 loop {
917 if let Some(Ok((next_key, next_value))) = self.iter.next() {
918 let prev = self.prev.take();
919 self.prev = Some((next_key, next_value));
920
921 if let Some((prev_key, prev_value)) = prev
922 && prev_key.0 != next_key.0
923 {
924 let live_object =
925 self.store_object_wrapper_to_live_object(prev_key, prev_value);
926 if live_object.is_some() {
927 return live_object;
928 }
929 }
930 continue;
931 }
932 if let Some((key, value)) = self.prev.take() {
933 let live_object = self.store_object_wrapper_to_live_object(key, value);
934 if live_object.is_some() {
935 return live_object;
936 }
937 }
938 return None;
939 }
940 }
941}
942
943fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
945 DBOptions {
946 options: db_options
947 .clone()
948 .optimize_for_write_throughput()
949 .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
950 .options,
951 rw_options: db_options.rw_options.set_ignore_range_deletions(false),
952 }
953}
954
955fn objects_table_config(db_options: DBOptions) -> DBOptions {
956 db_options
957 .optimize_for_write_throughput()
958 .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
959}
960
961fn transactions_table_config(db_options: DBOptions) -> DBOptions {
962 db_options
963 .optimize_for_write_throughput()
964 .optimize_for_point_lookup(
965 read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
966 )
967}
968
969fn effects_table_config(db_options: DBOptions) -> DBOptions {
970 db_options
971 .optimize_for_write_throughput()
972 .optimize_for_point_lookup(
973 read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
974 )
975}