1use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6#[cfg(tidehunter)]
7use crate::authority::epoch_marker_key::EPOCH_MARKER_KEY_SIZE;
8use crate::authority::epoch_marker_key::EpochMarkerKey;
9use serde::{Deserialize, Serialize};
10use std::path::Path;
11use std::sync::atomic::AtomicU64;
12use sui_types::base_types::SequenceNumber;
13use sui_types::effects::{TransactionEffects, TransactionEvents};
14use sui_types::global_state_hash::GlobalStateHash;
15use sui_types::storage::MarkerValue;
16use typed_store::metrics::SamplingInterval;
17use typed_store::rocks::{
18 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
19 read_size_from_env,
20};
21use typed_store::traits::Map;
22
23use crate::authority::authority_store_types::{
24 StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
25};
26use crate::authority::epoch_start_configuration::EpochStartConfiguration;
27use typed_store::{DBMapUtils, DbIterator};
28
29const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
30pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
31const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
32const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
33
34#[derive(Default)]
36pub struct AuthorityPerpetualTablesOptions {
37 pub enable_write_stall: bool,
39 pub is_validator: bool,
40}
41
42impl AuthorityPerpetualTablesOptions {
43 fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
44 if !self.enable_write_stall {
45 db_options = db_options.disable_write_throttling();
46 }
47 db_options
48 }
49}
50
51#[derive(DBMapUtils)]
53#[cfg_attr(tidehunter, tidehunter)]
54pub struct AuthorityPerpetualTables {
55 pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
68
69 #[rename = "owned_object_transaction_locks"]
74 pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
75
76 pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
80
81 pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
90
91 pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
96
97 pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
99
100 pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
102
103 pub(crate) executed_transactions_to_checkpoint:
107 DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
108
109 pub(crate) root_state_hash_by_epoch:
113 DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
114
115 pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
117
118 pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
120
121 pub(crate) expected_network_sui_amount: DBMap<(), u64>,
126
127 pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
131
132 pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
138 pub(crate) object_per_epoch_marker_table_v2: DBMap<EpochMarkerKey, MarkerValue>,
139
140 pub(crate) executed_transaction_digests: DBMap<(EpochId, TransactionDigest), ()>,
144}
145
146impl AuthorityPerpetualTables {
147 pub fn path(parent_path: &Path) -> PathBuf {
148 parent_path.join("perpetual")
149 }
150
151 #[cfg(not(tidehunter))]
152 pub fn open(
153 parent_path: &Path,
154 db_options_override: Option<AuthorityPerpetualTablesOptions>,
155 _pruner_watermark: Option<Arc<AtomicU64>>,
156 ) -> Self {
157 let db_options_override = db_options_override.unwrap_or_default();
158 let db_options = db_options_override
159 .apply_to(default_db_options().optimize_db_for_write_throughput(4, false));
160 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
161 (
162 "objects".to_string(),
163 objects_table_config(db_options.clone()),
164 ),
165 (
166 "owned_object_transaction_locks".to_string(),
167 owned_object_transaction_locks_table_config(db_options.clone()),
168 ),
169 (
170 "transactions".to_string(),
171 transactions_table_config(db_options.clone()),
172 ),
173 (
174 "effects".to_string(),
175 effects_table_config(db_options.clone()),
176 ),
177 ]));
178
179 Self::open_tables_read_write(
180 Self::path(parent_path),
181 MetricConf::new("perpetual")
182 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
183 Some(db_options.options),
184 Some(table_options),
185 )
186 }
187
188 #[cfg(tidehunter)]
189 pub fn open(
190 parent_path: &Path,
191 db_options_override: Option<AuthorityPerpetualTablesOptions>,
192 pruner_watermark: Option<Arc<AtomicU64>>,
193 ) -> Self {
194 use crate::authority::authority_store_pruner::apply_relocation_filter;
195 tracing::warn!("AuthorityPerpetualTables using tidehunter");
196 use typed_store::tidehunter_util::{
197 Bytes, Decision, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
198 default_cells_per_mutex, default_max_dirty_keys, default_mutex_count,
199 default_value_cache_size,
200 };
201 let mutexes = default_mutex_count() * 2;
202 let transaction_mutexes = mutexes * 4;
203 let value_cache_size = default_value_cache_size();
204 let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
206
207 let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
208 let objects_compactor = |iter: &mut dyn DoubleEndedIterator<Item = &Bytes>| {
209 let mut retain = HashSet::new();
210 let mut previous: Option<&[u8]> = None;
211 const OID_SIZE: usize = 32;
212 for key in iter.rev() {
213 if let Some(prev) = previous {
214 if prev == &key[..OID_SIZE] {
215 continue;
216 }
217 }
218 previous = Some(&key[..OID_SIZE]);
219 retain.insert(key.clone());
220 }
221 retain
222 };
223 let mut digest_prefix = vec![0; 8];
224 digest_prefix[7] = 32;
225 let uniform_key = KeyType::uniform(default_cells_per_mutex());
226 let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
227 let epoch_tx_digest_prefix_key =
229 KeyType::from_prefix_bits((8+ 8) * 8 + 12);
230 let object_indexing = KeyIndexing::fixed(32 + 8); let obj_ref_size = 32 + 8 + 32 + 8;
233 let owned_object_transaction_locks_indexing =
234 KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
235
236 let mut objects_config = KeySpaceConfig::new()
237 .with_max_dirty_keys(4 * default_max_dirty_keys())
238 .with_value_cache_size(value_cache_size);
239 if matches!(db_options_override, Some(options) if options.is_validator) {
240 objects_config = objects_config.with_compactor(Box::new(objects_compactor));
241 }
242
243 let configs = vec![
244 (
245 "objects".to_string(),
246 ThConfig::new_with_config_indexing(
247 object_indexing,
248 mutexes * 4,
249 KeyType::uniform(1),
250 objects_config,
251 ),
252 ),
253 (
254 "owned_object_transaction_locks".to_string(),
255 ThConfig::new_with_config_indexing(
256 owned_object_transaction_locks_indexing,
257 mutexes * 16,
258 KeyType::uniform(default_cells_per_mutex()),
259 bloom_config
260 .clone()
261 .with_max_dirty_keys(16 * default_max_dirty_keys()),
262 ),
263 ),
264 (
265 "transactions".to_string(),
266 ThConfig::new_with_rm_prefix_indexing(
267 KeyIndexing::key_reduction(32, 0..16),
268 transaction_mutexes,
269 uniform_key,
270 KeySpaceConfig::new()
271 .with_value_cache_size(value_cache_size)
272 .with_relocation_filter(|_, _| Decision::Remove),
273 digest_prefix.clone(),
274 ),
275 ),
276 (
277 "effects".to_string(),
278 ThConfig::new_with_rm_prefix_indexing(
279 KeyIndexing::key_reduction(32, 0..16),
280 transaction_mutexes,
281 uniform_key,
282 apply_relocation_filter(
283 bloom_config.clone().with_value_cache_size(value_cache_size),
284 pruner_watermark.clone(),
285 |effects: TransactionEffects| effects.executed_epoch(),
286 false,
287 ),
288 digest_prefix.clone(),
289 ),
290 ),
291 (
292 "executed_effects".to_string(),
293 ThConfig::new_with_rm_prefix_indexing(
294 KeyIndexing::key_reduction(32, 0..16),
295 transaction_mutexes,
296 uniform_key,
297 bloom_config
298 .clone()
299 .with_value_cache_size(value_cache_size)
300 .with_relocation_filter(|_, _| Decision::Remove),
301 digest_prefix.clone(),
302 ),
303 ),
304 (
305 "events".to_string(),
306 ThConfig::new_with_rm_prefix(
307 32 + 8,
308 mutexes,
309 uniform_key,
310 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
311 digest_prefix.clone(),
312 ),
313 ),
314 (
315 "events_2".to_string(),
316 ThConfig::new_with_rm_prefix(
317 32,
318 mutexes,
319 uniform_key,
320 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
321 digest_prefix.clone(),
322 ),
323 ),
324 (
325 "unchanged_loaded_runtime_objects".to_string(),
326 ThConfig::new_with_rm_prefix(
327 32,
328 mutexes,
329 uniform_key,
330 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
331 digest_prefix.clone(),
332 ),
333 ),
334 (
335 "executed_transactions_to_checkpoint".to_string(),
336 ThConfig::new_with_rm_prefix(
337 32,
338 mutexes,
339 uniform_key,
340 apply_relocation_filter(
341 KeySpaceConfig::default(),
342 pruner_watermark.clone(),
343 |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
344 false,
345 ),
346 digest_prefix.clone(),
347 ),
348 ),
349 (
350 "root_state_hash_by_epoch".to_string(),
351 ThConfig::new(8, 1, KeyType::uniform(1)),
352 ),
353 (
354 "epoch_start_configuration".to_string(),
355 ThConfig::new(0, 1, KeyType::uniform(1)),
356 ),
357 (
358 "pruned_checkpoint".to_string(),
359 ThConfig::new(0, 1, KeyType::uniform(1)),
360 ),
361 (
362 "expected_network_sui_amount".to_string(),
363 ThConfig::new(0, 1, KeyType::uniform(1)),
364 ),
365 (
366 "expected_storage_fund_imbalance".to_string(),
367 ThConfig::new(0, 1, KeyType::uniform(1)),
368 ),
369 (
370 "object_per_epoch_marker_table".to_string(),
371 ThConfig::new_with_config_indexing(
372 KeyIndexing::VariableLength,
373 mutexes,
374 epoch_prefix_key,
375 apply_relocation_filter(
376 KeySpaceConfig::default(),
377 pruner_watermark.clone(),
378 |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
379 true,
380 ),
381 ),
382 ),
383 (
384 "object_per_epoch_marker_table_v2".to_string(),
385 ThConfig::new_with_config_indexing(
386 KeyIndexing::fixed(EPOCH_MARKER_KEY_SIZE),
387 mutexes,
388 epoch_prefix_key,
389 apply_relocation_filter(
390 bloom_config.clone(),
391 pruner_watermark.clone(),
392 |k: EpochMarkerKey| k.0,
393 true,
394 ),
395 ),
396 ),
397 (
398 "executed_transaction_digests".to_string(),
399 ThConfig::new_with_config_indexing(
400 KeyIndexing::fixed(8 + (32 + 8)),
402 transaction_mutexes,
403 epoch_tx_digest_prefix_key,
404 apply_relocation_filter(
405 bloom_config.clone(),
406 pruner_watermark.clone(),
407 |(epoch_id, _): (EpochId, TransactionDigest)| epoch_id,
408 true,
409 ),
410 ),
411 ),
412 ];
413 Self::open_tables_read_write(
414 Self::path(parent_path),
415 MetricConf::new("perpetual")
416 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
417 configs.into_iter().collect(),
418 )
419 }
420
421 #[cfg(not(tidehunter))]
422 pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
423 Self::get_read_only_handle(
424 Self::path(parent_path),
425 None,
426 None,
427 MetricConf::new("perpetual_readonly"),
428 )
429 }
430
431 #[cfg(tidehunter)]
432 pub fn open_readonly(parent_path: &Path) -> Self {
433 Self::open(parent_path, None, None)
434 }
435
436 #[cfg(tidehunter)]
437 pub fn force_rebuild_control_region(&self) -> anyhow::Result<()> {
438 self.objects.db.force_rebuild_control_region()
439 }
440
441 #[cfg(tidehunter)]
446 pub fn wait_for_tidehunter_background_threads(self: Arc<Self>) {
447 let strong = Arc::strong_count(&self);
448 if strong != 1 {
449 println!(
450 "WARNING: wait_for_tidehunter_background_threads called with Arc<AuthorityPerpetualTables> strong_count={} (expected 1); other clones will keep DBMap.db Arc<Database> alive past drop(self) and the inner Database wait will warn/timeout",
451 strong,
452 );
453 }
454 let db = self.objects.db.clone();
455 drop(self);
456 db.wait_for_tidehunter_background_threads();
457 }
458
459 pub fn find_object_lt_or_eq_version(
463 &self,
464 object_id: ObjectID,
465 version: SequenceNumber,
466 ) -> SuiResult<Option<Object>> {
467 let mut iter = self.objects.reversed_safe_iter_with_bounds(
468 Some(ObjectKey::min_for_id(&object_id)),
469 Some(ObjectKey(object_id, version)),
470 )?;
471 match iter.next() {
472 Some(Ok((key, o))) => self.object(&key, o),
473 Some(Err(e)) => Err(e.into()),
474 None => Ok(None),
475 }
476 }
477
478 fn construct_object(
479 &self,
480 object_key: &ObjectKey,
481 store_object: StoreObjectValue,
482 ) -> Result<Object, SuiError> {
483 try_construct_object(object_key, store_object)
484 }
485
486 pub fn object(
489 &self,
490 object_key: &ObjectKey,
491 store_object: StoreObjectWrapper,
492 ) -> Result<Option<Object>, SuiError> {
493 let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
494 return Ok(None);
495 };
496 Ok(Some(self.construct_object(object_key, *store_object)?))
497 }
498
499 pub fn object_reference(
500 &self,
501 object_key: &ObjectKey,
502 store_object: StoreObjectWrapper,
503 ) -> Result<ObjectRef, SuiError> {
504 let obj_ref = match store_object.migrate().into_inner() {
505 StoreObject::Value(object) => self
506 .construct_object(object_key, *object)?
507 .compute_object_reference(),
508 StoreObject::Deleted => (
509 object_key.0,
510 object_key.1,
511 ObjectDigest::OBJECT_DIGEST_DELETED,
512 ),
513 StoreObject::Wrapped => (
514 object_key.0,
515 object_key.1,
516 ObjectDigest::OBJECT_DIGEST_WRAPPED,
517 ),
518 };
519 Ok(obj_ref)
520 }
521
522 pub fn tombstone_reference(
523 &self,
524 object_key: &ObjectKey,
525 store_object: &StoreObjectWrapper,
526 ) -> Result<Option<ObjectRef>, SuiError> {
527 let obj_ref = match store_object.inner() {
528 StoreObject::Deleted => Some((
529 object_key.0,
530 object_key.1,
531 ObjectDigest::OBJECT_DIGEST_DELETED,
532 )),
533 StoreObject::Wrapped => Some((
534 object_key.0,
535 object_key.1,
536 ObjectDigest::OBJECT_DIGEST_WRAPPED,
537 )),
538 _ => None,
539 };
540 Ok(obj_ref)
541 }
542
543 pub fn get_latest_object_ref_or_tombstone(
544 &self,
545 object_id: ObjectID,
546 ) -> Result<Option<ObjectRef>, SuiError> {
547 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
548 Some(ObjectKey::min_for_id(&object_id)),
549 Some(ObjectKey::max_for_id(&object_id)),
550 )?;
551
552 if let Some(Ok((object_key, value))) = iterator.next()
553 && object_key.0 == object_id
554 {
555 return Ok(Some(self.object_reference(&object_key, value)?));
556 }
557 Ok(None)
558 }
559
560 pub fn get_latest_object_or_tombstone(
561 &self,
562 object_id: ObjectID,
563 ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
564 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
565 Some(ObjectKey::min_for_id(&object_id)),
566 Some(ObjectKey::max_for_id(&object_id)),
567 )?;
568
569 if let Some(Ok((object_key, value))) = iterator.next()
570 && object_key.0 == object_id
571 {
572 return Ok(Some((object_key, value)));
573 }
574 Ok(None)
575 }
576
577 pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
578 Ok(self
579 .epoch_start_configuration
580 .get(&())?
581 .expect("Must have current epoch.")
582 .epoch_start_state()
583 .epoch())
584 }
585
586 pub fn set_epoch_start_configuration(
587 &self,
588 epoch_start_configuration: &EpochStartConfiguration,
589 ) -> SuiResult {
590 let mut wb = self.epoch_start_configuration.batch();
591 wb.insert_batch(
592 &self.epoch_start_configuration,
593 std::iter::once(((), epoch_start_configuration)),
594 )?;
595 wb.write()?;
596 Ok(())
597 }
598
599 pub fn get_highest_pruned_checkpoint(
600 &self,
601 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
602 self.pruned_checkpoint.get(&())
603 }
604
605 pub fn set_highest_pruned_checkpoint(
606 &self,
607 wb: &mut DBBatch,
608 checkpoint_number: CheckpointSequenceNumber,
609 ) -> SuiResult {
610 wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
611 Ok(())
612 }
613
614 pub fn get_transaction(
615 &self,
616 digest: &TransactionDigest,
617 ) -> SuiResult<Option<TrustedTransaction>> {
618 let Some(transaction) = self.transactions.get(digest)? else {
619 return Ok(None);
620 };
621 Ok(Some(transaction))
622 }
623
624 pub fn insert_executed_transaction_digests_batch(
627 &self,
628 epoch: EpochId,
629 digests: impl Iterator<Item = TransactionDigest>,
630 ) -> SuiResult {
631 let mut batch = self.executed_transaction_digests.batch();
632 batch.insert_batch(
633 &self.executed_transaction_digests,
634 digests.map(|digest| ((epoch, digest), ())),
635 )?;
636 batch.write()?;
637 Ok(())
638 }
639
640 pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
641 let Some(effect_digest) = self.executed_effects.get(digest)? else {
642 return Ok(None);
643 };
644 Ok(self.effects.get(&effect_digest)?)
645 }
646
647 pub(crate) fn was_transaction_executed_in_last_epoch(
648 &self,
649 digest: &TransactionDigest,
650 current_epoch: EpochId,
651 ) -> bool {
652 if current_epoch == 0 {
653 return false;
654 }
655 self.executed_transaction_digests
656 .contains_key(&(current_epoch - 1, *digest))
657 .expect("db error")
658 }
659
660 pub fn get_checkpoint_sequence_number(
663 &self,
664 digest: &TransactionDigest,
665 ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
666 Ok(self.executed_transactions_to_checkpoint.get(digest)?)
667 }
668
669 pub fn get_newer_object_keys(
670 &self,
671 object: &(ObjectID, SequenceNumber),
672 ) -> SuiResult<Vec<ObjectKey>> {
673 let mut objects = vec![];
674 for result in self.objects.safe_iter_with_bounds(
675 Some(ObjectKey(object.0, object.1.next())),
676 Some(ObjectKey(object.0, VersionNumber::MAX)),
677 ) {
678 let (key, _) = result?;
679 objects.push(key);
680 }
681 Ok(objects)
682 }
683
684 pub fn set_highest_pruned_checkpoint_without_wb(
685 &self,
686 checkpoint_number: CheckpointSequenceNumber,
687 ) -> SuiResult {
688 let mut wb = self.pruned_checkpoint.batch();
689 self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
690 wb.write()?;
691 Ok(())
692 }
693
694 pub fn database_is_empty(&self) -> SuiResult<bool> {
695 Ok(self.objects.safe_iter().next().is_none())
696 }
697
698 pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
699 LiveSetIter {
700 iter: Box::new(self.objects.safe_iter()),
701 tables: self,
702 prev: None,
703 include_wrapped_object,
704 }
705 }
706
707 pub fn range_iter_live_object_set(
708 &self,
709 lower_bound: Option<ObjectID>,
710 upper_bound: Option<ObjectID>,
711 include_wrapped_object: bool,
712 ) -> LiveSetIter<'_> {
713 let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
714 let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
715
716 LiveSetIter {
717 iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
718 tables: self,
719 prev: None,
720 include_wrapped_object,
721 }
722 }
723
724 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
725 self.objects.checkpoint_db(path).map_err(Into::into)
727 }
728
729 pub fn get_root_state_hash(
730 &self,
731 epoch: EpochId,
732 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
733 Ok(self.root_state_hash_by_epoch.get(&epoch)?)
734 }
735
736 pub fn insert_root_state_hash(
737 &self,
738 epoch: EpochId,
739 last_checkpoint_of_epoch: CheckpointSequenceNumber,
740 hash: GlobalStateHash,
741 ) -> SuiResult {
742 self.root_state_hash_by_epoch
743 .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
744 Ok(())
745 }
746
747 pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
748 let object_reference = object.compute_object_reference();
749 let wrapper = get_store_object(object);
750 let mut wb = self.objects.batch();
751 wb.insert_batch(
752 &self.objects,
753 std::iter::once((ObjectKey::from(object_reference), wrapper)),
754 )?;
755 wb.write()?;
756 Ok(())
757 }
758
759 pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
761 let obj_entry = self
762 .objects
763 .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
764 .next();
765
766 match obj_entry.transpose()? {
767 Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
768 Ok(self.object(&ObjectKey(obj_id, version), obj)?)
769 }
770 _ => Ok(None),
771 }
772 }
773
774 pub fn get_object_by_key_fallible(
775 &self,
776 object_id: &ObjectID,
777 version: VersionNumber,
778 ) -> SuiResult<Option<Object>> {
779 Ok(self
780 .objects
781 .get(&ObjectKey(*object_id, version))?
782 .and_then(|object| {
783 self.object(&ObjectKey(*object_id, version), object)
784 .expect("object construction error")
785 }))
786 }
787}
788
789impl ObjectStore for AuthorityPerpetualTables {
790 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
792 self.get_object_fallible(object_id).expect("db error")
793 }
794
795 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
796 self.get_object_by_key_fallible(object_id, version)
797 .expect("db error")
798 }
799}
800
801pub struct LiveSetIter<'a> {
802 iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
803 tables: &'a AuthorityPerpetualTables,
804 prev: Option<(ObjectKey, StoreObjectWrapper)>,
805 include_wrapped_object: bool,
807}
808
809#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
810pub enum LiveObject {
811 Normal(Object),
812 Wrapped(ObjectKey),
813}
814
815impl LiveObject {
816 pub fn object_id(&self) -> ObjectID {
817 match self {
818 LiveObject::Normal(obj) => obj.id(),
819 LiveObject::Wrapped(key) => key.0,
820 }
821 }
822
823 pub fn version(&self) -> SequenceNumber {
824 match self {
825 LiveObject::Normal(obj) => obj.version(),
826 LiveObject::Wrapped(key) => key.1,
827 }
828 }
829
830 pub fn object_reference(&self) -> ObjectRef {
831 match self {
832 LiveObject::Normal(obj) => obj.compute_object_reference(),
833 LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
834 }
835 }
836
837 pub fn to_normal(self) -> Option<Object> {
838 match self {
839 LiveObject::Normal(object) => Some(object),
840 LiveObject::Wrapped(_) => None,
841 }
842 }
843}
844
845impl LiveSetIter<'_> {
846 fn store_object_wrapper_to_live_object(
847 &self,
848 object_key: ObjectKey,
849 store_object: StoreObjectWrapper,
850 ) -> Option<LiveObject> {
851 match store_object.migrate().into_inner() {
852 StoreObject::Value(object) => {
853 let object = self
854 .tables
855 .construct_object(&object_key, *object)
856 .expect("Constructing object from store cannot fail");
857 Some(LiveObject::Normal(object))
858 }
859 StoreObject::Wrapped => {
860 if self.include_wrapped_object {
861 Some(LiveObject::Wrapped(object_key))
862 } else {
863 None
864 }
865 }
866 StoreObject::Deleted => None,
867 }
868 }
869}
870
871impl Iterator for LiveSetIter<'_> {
872 type Item = LiveObject;
873
874 fn next(&mut self) -> Option<Self::Item> {
875 loop {
876 if let Some(Ok((next_key, next_value))) = self.iter.next() {
877 let prev = self.prev.take();
878 self.prev = Some((next_key, next_value));
879
880 if let Some((prev_key, prev_value)) = prev
881 && prev_key.0 != next_key.0
882 {
883 let live_object =
884 self.store_object_wrapper_to_live_object(prev_key, prev_value);
885 if live_object.is_some() {
886 return live_object;
887 }
888 }
889 continue;
890 }
891 if let Some((key, value)) = self.prev.take() {
892 let live_object = self.store_object_wrapper_to_live_object(key, value);
893 if live_object.is_some() {
894 return live_object;
895 }
896 }
897 return None;
898 }
899 }
900}
901
902fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
904 DBOptions {
905 options: db_options
906 .clone()
907 .optimize_for_write_throughput()
908 .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
909 .options,
910 rw_options: db_options.rw_options.set_ignore_range_deletions(false),
911 }
912}
913
914fn objects_table_config(db_options: DBOptions) -> DBOptions {
915 db_options
916 .optimize_for_write_throughput()
917 .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
918}
919
920fn transactions_table_config(db_options: DBOptions) -> DBOptions {
921 db_options
922 .optimize_for_write_throughput()
923 .optimize_for_point_lookup(
924 read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
925 )
926}
927
928fn effects_table_config(db_options: DBOptions) -> DBOptions {
929 db_options
930 .optimize_for_write_throughput()
931 .optimize_for_point_lookup(
932 read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
933 )
934}