1use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6use serde::{Deserialize, Serialize};
7use std::path::Path;
8use std::sync::atomic::AtomicU64;
9use sui_types::base_types::SequenceNumber;
10use sui_types::digests::TransactionEventsDigest;
11use sui_types::effects::{TransactionEffects, TransactionEvents};
12use sui_types::global_state_hash::GlobalStateHash;
13use sui_types::storage::{FullObjectKey, MarkerValue};
14use typed_store::metrics::SamplingInterval;
15use typed_store::rocks::{
16 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
17 read_size_from_env,
18};
19use typed_store::traits::Map;
20
21use crate::authority::authority_store_types::{
22 StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
23};
24use crate::authority::epoch_start_configuration::EpochStartConfiguration;
25use typed_store::{DBMapUtils, DbIterator};
26
27const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
28pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
29const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
30const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
31
32#[derive(Default)]
34pub struct AuthorityPerpetualTablesOptions {
35 pub enable_write_stall: bool,
37 pub is_validator: bool,
38}
39
40impl AuthorityPerpetualTablesOptions {
41 fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
42 if !self.enable_write_stall {
43 db_options = db_options.disable_write_throttling();
44 }
45 db_options
46 }
47}
48
49#[derive(DBMapUtils)]
51#[cfg_attr(tidehunter, tidehunter)]
52pub struct AuthorityPerpetualTables {
53 pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
66
67 #[rename = "owned_object_transaction_locks"]
72 pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
73
74 pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
78
79 pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
88
89 pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
94
95 #[allow(dead_code)]
96 #[deprecated]
97 events: DBMap<(TransactionEventsDigest, usize), Event>,
98
99 pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
101
102 pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
104
105 pub(crate) executed_transactions_to_checkpoint:
109 DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
110
111 pub(crate) root_state_hash_by_epoch:
115 DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
116
117 pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
119
120 pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
122
123 pub(crate) expected_network_sui_amount: DBMap<(), u64>,
128
129 pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
133
134 pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
140 pub(crate) object_per_epoch_marker_table_v2: DBMap<(EpochId, FullObjectKey), MarkerValue>,
141
142 pub(crate) executed_transaction_digests: DBMap<(EpochId, TransactionDigest), ()>,
146}
147
148impl AuthorityPerpetualTables {
149 pub fn path(parent_path: &Path) -> PathBuf {
150 parent_path.join("perpetual")
151 }
152
153 #[cfg(not(tidehunter))]
154 pub fn open(
155 parent_path: &Path,
156 db_options_override: Option<AuthorityPerpetualTablesOptions>,
157 _pruner_watermark: Option<Arc<AtomicU64>>,
158 ) -> Self {
159 let db_options_override = db_options_override.unwrap_or_default();
160 let db_options =
161 db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
162 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
163 (
164 "objects".to_string(),
165 objects_table_config(db_options.clone()),
166 ),
167 (
168 "owned_object_transaction_locks".to_string(),
169 owned_object_transaction_locks_table_config(db_options.clone()),
170 ),
171 (
172 "transactions".to_string(),
173 transactions_table_config(db_options.clone()),
174 ),
175 (
176 "effects".to_string(),
177 effects_table_config(db_options.clone()),
178 ),
179 ]));
180
181 Self::open_tables_read_write(
182 Self::path(parent_path),
183 MetricConf::new("perpetual")
184 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
185 Some(db_options.options),
186 Some(table_options),
187 )
188 }
189
190 #[cfg(tidehunter)]
191 pub fn open(
192 parent_path: &Path,
193 db_options_override: Option<AuthorityPerpetualTablesOptions>,
194 pruner_watermark: Option<Arc<AtomicU64>>,
195 ) -> Self {
196 use crate::authority::authority_store_pruner::apply_relocation_filter;
197 tracing::warn!("AuthorityPerpetualTables using tidehunter");
198 use typed_store::tidehunter_util::{
199 Bytes, Decision, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
200 default_cells_per_mutex, default_mutex_count, default_value_cache_size,
201 };
202 let mutexes = default_mutex_count() * 2;
203 let value_cache_size = default_value_cache_size();
204 let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
206
207 let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
208 let objects_compactor = |iter: &mut dyn DoubleEndedIterator<Item = &Bytes>| {
209 let mut retain = HashSet::new();
210 let mut previous: Option<&[u8]> = None;
211 const OID_SIZE: usize = 16;
212 for key in iter.rev() {
213 if let Some(prev) = previous {
214 if prev == &key[..OID_SIZE] {
215 continue;
216 }
217 }
218 previous = Some(&key[..OID_SIZE]);
219 retain.insert(key.clone());
220 }
221 retain
222 };
223 let mut digest_prefix = vec![0; 8];
224 digest_prefix[7] = 32;
225 let uniform_key = KeyType::uniform(default_cells_per_mutex());
226 let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
227 let epoch_tx_digest_prefix_key =
229 KeyType::from_prefix_bits((8+ 8) * 8 + 12);
230 let object_indexing = KeyIndexing::key_reduction(32 + 8, 16..(32 + 8));
231 let obj_ref_size = 32 + 8 + 32 + 8;
233 let owned_object_transaction_locks_indexing =
234 KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
235
236 let mut objects_config = KeySpaceConfig::new()
237 .with_unloaded_iterator(true)
238 .with_max_dirty_keys(4048);
239 if matches!(db_options_override, Some(options) if options.is_validator) {
240 objects_config = objects_config.with_compactor(Box::new(objects_compactor));
241 }
242
243 let configs = vec![
244 (
245 "objects".to_string(),
246 ThConfig::new_with_config_indexing(
247 object_indexing,
248 mutexes,
249 KeyType::uniform(default_cells_per_mutex() * 4),
250 objects_config,
251 ),
252 ),
253 (
254 "owned_object_transaction_locks".to_string(),
255 ThConfig::new_with_config_indexing(
256 owned_object_transaction_locks_indexing,
257 mutexes,
258 KeyType::uniform(default_cells_per_mutex() * 4),
259 bloom_config.clone().with_max_dirty_keys(4048),
260 ),
261 ),
262 (
263 "transactions".to_string(),
264 ThConfig::new_with_rm_prefix_indexing(
265 KeyIndexing::key_reduction(32, 0..16),
266 mutexes,
267 uniform_key,
268 KeySpaceConfig::new()
269 .with_value_cache_size(value_cache_size)
270 .with_relocation_filter(|_, _| Decision::Remove),
271 digest_prefix.clone(),
272 ),
273 ),
274 (
275 "effects".to_string(),
276 ThConfig::new_with_rm_prefix_indexing(
277 KeyIndexing::key_reduction(32, 0..16),
278 mutexes,
279 uniform_key,
280 apply_relocation_filter(
281 bloom_config.clone().with_value_cache_size(value_cache_size),
282 pruner_watermark.clone(),
283 |effects: TransactionEffects| effects.executed_epoch(),
284 false,
285 ),
286 digest_prefix.clone(),
287 ),
288 ),
289 (
290 "executed_effects".to_string(),
291 ThConfig::new_with_rm_prefix_indexing(
292 KeyIndexing::key_reduction(32, 0..16),
293 mutexes,
294 uniform_key,
295 bloom_config
296 .clone()
297 .with_value_cache_size(value_cache_size)
298 .with_relocation_filter(|_, _| Decision::Remove),
299 digest_prefix.clone(),
300 ),
301 ),
302 (
303 "events".to_string(),
304 ThConfig::new_with_rm_prefix(
305 32 + 8,
306 mutexes,
307 uniform_key,
308 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
309 digest_prefix.clone(),
310 ),
311 ),
312 (
313 "events_2".to_string(),
314 ThConfig::new_with_rm_prefix(
315 32,
316 mutexes,
317 uniform_key,
318 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
319 digest_prefix.clone(),
320 ),
321 ),
322 (
323 "unchanged_loaded_runtime_objects".to_string(),
324 ThConfig::new_with_rm_prefix(
325 32,
326 mutexes,
327 uniform_key,
328 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
329 digest_prefix.clone(),
330 ),
331 ),
332 (
333 "executed_transactions_to_checkpoint".to_string(),
334 ThConfig::new_with_rm_prefix(
335 32,
336 mutexes,
337 uniform_key,
338 apply_relocation_filter(
339 KeySpaceConfig::default(),
340 pruner_watermark.clone(),
341 |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
342 false,
343 ),
344 digest_prefix.clone(),
345 ),
346 ),
347 (
348 "root_state_hash_by_epoch".to_string(),
349 ThConfig::new(8, 1, KeyType::uniform(1)),
350 ),
351 (
352 "epoch_start_configuration".to_string(),
353 ThConfig::new(0, 1, KeyType::uniform(1)),
354 ),
355 (
356 "pruned_checkpoint".to_string(),
357 ThConfig::new(0, 1, KeyType::uniform(1)),
358 ),
359 (
360 "expected_network_sui_amount".to_string(),
361 ThConfig::new(0, 1, KeyType::uniform(1)),
362 ),
363 (
364 "expected_storage_fund_imbalance".to_string(),
365 ThConfig::new(0, 1, KeyType::uniform(1)),
366 ),
367 (
368 "object_per_epoch_marker_table".to_string(),
369 ThConfig::new_with_config_indexing(
370 KeyIndexing::VariableLength,
371 mutexes,
372 epoch_prefix_key,
373 apply_relocation_filter(
374 KeySpaceConfig::default(),
375 pruner_watermark.clone(),
376 |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
377 true,
378 ),
379 ),
380 ),
381 (
382 "object_per_epoch_marker_table_v2".to_string(),
383 ThConfig::new_with_config_indexing(
384 KeyIndexing::VariableLength,
385 mutexes,
386 epoch_prefix_key,
387 apply_relocation_filter(
388 bloom_config.clone(),
389 pruner_watermark.clone(),
390 |(epoch_id, _): (EpochId, FullObjectKey)| epoch_id,
391 true,
392 ),
393 ),
394 ),
395 (
396 "executed_transaction_digests".to_string(),
397 ThConfig::new_with_config_indexing(
398 KeyIndexing::fixed(8 + (32 + 8)),
400 mutexes,
401 epoch_tx_digest_prefix_key,
402 apply_relocation_filter(
403 bloom_config.clone(),
404 pruner_watermark.clone(),
405 |(epoch_id, _): (EpochId, TransactionDigest)| epoch_id,
406 true,
407 ),
408 ),
409 ),
410 ];
411 Self::open_tables_read_write(
412 Self::path(parent_path),
413 MetricConf::new("perpetual")
414 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
415 configs.into_iter().collect(),
416 )
417 }
418
419 pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
420 Self::get_read_only_handle(
421 Self::path(parent_path),
422 None,
423 None,
424 MetricConf::new("perpetual_readonly"),
425 )
426 }
427
428 pub fn find_object_lt_or_eq_version(
432 &self,
433 object_id: ObjectID,
434 version: SequenceNumber,
435 ) -> SuiResult<Option<Object>> {
436 let mut iter = self.objects.reversed_safe_iter_with_bounds(
437 Some(ObjectKey::min_for_id(&object_id)),
438 Some(ObjectKey(object_id, version)),
439 )?;
440 match iter.next() {
441 Some(Ok((key, o))) => self.object(&key, o),
442 Some(Err(e)) => Err(e.into()),
443 None => Ok(None),
444 }
445 }
446
447 fn construct_object(
448 &self,
449 object_key: &ObjectKey,
450 store_object: StoreObjectValue,
451 ) -> Result<Object, SuiError> {
452 try_construct_object(object_key, store_object)
453 }
454
455 pub fn object(
458 &self,
459 object_key: &ObjectKey,
460 store_object: StoreObjectWrapper,
461 ) -> Result<Option<Object>, SuiError> {
462 let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
463 return Ok(None);
464 };
465 Ok(Some(self.construct_object(object_key, *store_object)?))
466 }
467
468 pub fn object_reference(
469 &self,
470 object_key: &ObjectKey,
471 store_object: StoreObjectWrapper,
472 ) -> Result<ObjectRef, SuiError> {
473 let obj_ref = match store_object.migrate().into_inner() {
474 StoreObject::Value(object) => self
475 .construct_object(object_key, *object)?
476 .compute_object_reference(),
477 StoreObject::Deleted => (
478 object_key.0,
479 object_key.1,
480 ObjectDigest::OBJECT_DIGEST_DELETED,
481 ),
482 StoreObject::Wrapped => (
483 object_key.0,
484 object_key.1,
485 ObjectDigest::OBJECT_DIGEST_WRAPPED,
486 ),
487 };
488 Ok(obj_ref)
489 }
490
491 pub fn tombstone_reference(
492 &self,
493 object_key: &ObjectKey,
494 store_object: &StoreObjectWrapper,
495 ) -> Result<Option<ObjectRef>, SuiError> {
496 let obj_ref = match store_object.inner() {
497 StoreObject::Deleted => Some((
498 object_key.0,
499 object_key.1,
500 ObjectDigest::OBJECT_DIGEST_DELETED,
501 )),
502 StoreObject::Wrapped => Some((
503 object_key.0,
504 object_key.1,
505 ObjectDigest::OBJECT_DIGEST_WRAPPED,
506 )),
507 _ => None,
508 };
509 Ok(obj_ref)
510 }
511
512 pub fn get_latest_object_ref_or_tombstone(
513 &self,
514 object_id: ObjectID,
515 ) -> Result<Option<ObjectRef>, SuiError> {
516 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
517 Some(ObjectKey::min_for_id(&object_id)),
518 Some(ObjectKey::max_for_id(&object_id)),
519 )?;
520
521 if let Some(Ok((object_key, value))) = iterator.next()
522 && object_key.0 == object_id
523 {
524 return Ok(Some(self.object_reference(&object_key, value)?));
525 }
526 Ok(None)
527 }
528
529 pub fn get_latest_object_or_tombstone(
530 &self,
531 object_id: ObjectID,
532 ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
533 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
534 Some(ObjectKey::min_for_id(&object_id)),
535 Some(ObjectKey::max_for_id(&object_id)),
536 )?;
537
538 if let Some(Ok((object_key, value))) = iterator.next()
539 && object_key.0 == object_id
540 {
541 return Ok(Some((object_key, value)));
542 }
543 Ok(None)
544 }
545
546 pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
547 Ok(self
548 .epoch_start_configuration
549 .get(&())?
550 .expect("Must have current epoch.")
551 .epoch_start_state()
552 .epoch())
553 }
554
555 pub fn set_epoch_start_configuration(
556 &self,
557 epoch_start_configuration: &EpochStartConfiguration,
558 ) -> SuiResult {
559 let mut wb = self.epoch_start_configuration.batch();
560 wb.insert_batch(
561 &self.epoch_start_configuration,
562 std::iter::once(((), epoch_start_configuration)),
563 )?;
564 wb.write()?;
565 Ok(())
566 }
567
568 pub fn get_highest_pruned_checkpoint(
569 &self,
570 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
571 self.pruned_checkpoint.get(&())
572 }
573
574 pub fn set_highest_pruned_checkpoint(
575 &self,
576 wb: &mut DBBatch,
577 checkpoint_number: CheckpointSequenceNumber,
578 ) -> SuiResult {
579 wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
580 Ok(())
581 }
582
583 pub fn get_transaction(
584 &self,
585 digest: &TransactionDigest,
586 ) -> SuiResult<Option<TrustedTransaction>> {
587 let Some(transaction) = self.transactions.get(digest)? else {
588 return Ok(None);
589 };
590 Ok(Some(transaction))
591 }
592
593 pub fn insert_executed_transaction_digests_batch(
596 &self,
597 epoch: EpochId,
598 digests: impl Iterator<Item = TransactionDigest>,
599 ) -> SuiResult {
600 let mut batch = self.executed_transaction_digests.batch();
601 batch.insert_batch(
602 &self.executed_transaction_digests,
603 digests.map(|digest| ((epoch, digest), ())),
604 )?;
605 batch.write()?;
606 Ok(())
607 }
608
609 pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
610 let Some(effect_digest) = self.executed_effects.get(digest)? else {
611 return Ok(None);
612 };
613 Ok(self.effects.get(&effect_digest)?)
614 }
615
616 pub(crate) fn was_transaction_executed_in_last_epoch(
617 &self,
618 digest: &TransactionDigest,
619 current_epoch: EpochId,
620 ) -> bool {
621 if current_epoch == 0 {
622 return false;
623 }
624 self.executed_transaction_digests
625 .contains_key(&(current_epoch - 1, *digest))
626 .expect("db error")
627 }
628
629 pub fn get_checkpoint_sequence_number(
632 &self,
633 digest: &TransactionDigest,
634 ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
635 Ok(self.executed_transactions_to_checkpoint.get(digest)?)
636 }
637
638 pub fn get_newer_object_keys(
639 &self,
640 object: &(ObjectID, SequenceNumber),
641 ) -> SuiResult<Vec<ObjectKey>> {
642 let mut objects = vec![];
643 for result in self.objects.safe_iter_with_bounds(
644 Some(ObjectKey(object.0, object.1.next())),
645 Some(ObjectKey(object.0, VersionNumber::MAX)),
646 ) {
647 let (key, _) = result?;
648 objects.push(key);
649 }
650 Ok(objects)
651 }
652
653 pub fn set_highest_pruned_checkpoint_without_wb(
654 &self,
655 checkpoint_number: CheckpointSequenceNumber,
656 ) -> SuiResult {
657 let mut wb = self.pruned_checkpoint.batch();
658 self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
659 wb.write()?;
660 Ok(())
661 }
662
663 pub fn database_is_empty(&self) -> SuiResult<bool> {
664 Ok(self.objects.safe_iter().next().is_none())
665 }
666
667 pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
668 LiveSetIter {
669 iter: Box::new(self.objects.safe_iter()),
670 tables: self,
671 prev: None,
672 include_wrapped_object,
673 }
674 }
675
676 pub fn range_iter_live_object_set(
677 &self,
678 lower_bound: Option<ObjectID>,
679 upper_bound: Option<ObjectID>,
680 include_wrapped_object: bool,
681 ) -> LiveSetIter<'_> {
682 let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
683 let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
684
685 LiveSetIter {
686 iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
687 tables: self,
688 prev: None,
689 include_wrapped_object,
690 }
691 }
692
693 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
694 self.objects.checkpoint_db(path).map_err(Into::into)
696 }
697
698 pub fn get_root_state_hash(
699 &self,
700 epoch: EpochId,
701 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
702 Ok(self.root_state_hash_by_epoch.get(&epoch)?)
703 }
704
705 pub fn insert_root_state_hash(
706 &self,
707 epoch: EpochId,
708 last_checkpoint_of_epoch: CheckpointSequenceNumber,
709 hash: GlobalStateHash,
710 ) -> SuiResult {
711 self.root_state_hash_by_epoch
712 .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
713 Ok(())
714 }
715
716 pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
717 let object_reference = object.compute_object_reference();
718 let wrapper = get_store_object(object);
719 let mut wb = self.objects.batch();
720 wb.insert_batch(
721 &self.objects,
722 std::iter::once((ObjectKey::from(object_reference), wrapper)),
723 )?;
724 wb.write()?;
725 Ok(())
726 }
727
728 pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
730 let obj_entry = self
731 .objects
732 .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
733 .next();
734
735 match obj_entry.transpose()? {
736 Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
737 Ok(self.object(&ObjectKey(obj_id, version), obj)?)
738 }
739 _ => Ok(None),
740 }
741 }
742
743 pub fn get_object_by_key_fallible(
744 &self,
745 object_id: &ObjectID,
746 version: VersionNumber,
747 ) -> SuiResult<Option<Object>> {
748 Ok(self
749 .objects
750 .get(&ObjectKey(*object_id, version))?
751 .and_then(|object| {
752 self.object(&ObjectKey(*object_id, version), object)
753 .expect("object construction error")
754 }))
755 }
756}
757
758impl ObjectStore for AuthorityPerpetualTables {
759 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
761 self.get_object_fallible(object_id).expect("db error")
762 }
763
764 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
765 self.get_object_by_key_fallible(object_id, version)
766 .expect("db error")
767 }
768}
769
770pub struct LiveSetIter<'a> {
771 iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
772 tables: &'a AuthorityPerpetualTables,
773 prev: Option<(ObjectKey, StoreObjectWrapper)>,
774 include_wrapped_object: bool,
776}
777
778#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
779pub enum LiveObject {
780 Normal(Object),
781 Wrapped(ObjectKey),
782}
783
784impl LiveObject {
785 pub fn object_id(&self) -> ObjectID {
786 match self {
787 LiveObject::Normal(obj) => obj.id(),
788 LiveObject::Wrapped(key) => key.0,
789 }
790 }
791
792 pub fn version(&self) -> SequenceNumber {
793 match self {
794 LiveObject::Normal(obj) => obj.version(),
795 LiveObject::Wrapped(key) => key.1,
796 }
797 }
798
799 pub fn object_reference(&self) -> ObjectRef {
800 match self {
801 LiveObject::Normal(obj) => obj.compute_object_reference(),
802 LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
803 }
804 }
805
806 pub fn to_normal(self) -> Option<Object> {
807 match self {
808 LiveObject::Normal(object) => Some(object),
809 LiveObject::Wrapped(_) => None,
810 }
811 }
812}
813
814impl LiveSetIter<'_> {
815 fn store_object_wrapper_to_live_object(
816 &self,
817 object_key: ObjectKey,
818 store_object: StoreObjectWrapper,
819 ) -> Option<LiveObject> {
820 match store_object.migrate().into_inner() {
821 StoreObject::Value(object) => {
822 let object = self
823 .tables
824 .construct_object(&object_key, *object)
825 .expect("Constructing object from store cannot fail");
826 Some(LiveObject::Normal(object))
827 }
828 StoreObject::Wrapped => {
829 if self.include_wrapped_object {
830 Some(LiveObject::Wrapped(object_key))
831 } else {
832 None
833 }
834 }
835 StoreObject::Deleted => None,
836 }
837 }
838}
839
840impl Iterator for LiveSetIter<'_> {
841 type Item = LiveObject;
842
843 fn next(&mut self) -> Option<Self::Item> {
844 loop {
845 if let Some(Ok((next_key, next_value))) = self.iter.next() {
846 let prev = self.prev.take();
847 self.prev = Some((next_key, next_value));
848
849 if let Some((prev_key, prev_value)) = prev
850 && prev_key.0 != next_key.0
851 {
852 let live_object =
853 self.store_object_wrapper_to_live_object(prev_key, prev_value);
854 if live_object.is_some() {
855 return live_object;
856 }
857 }
858 continue;
859 }
860 if let Some((key, value)) = self.prev.take() {
861 let live_object = self.store_object_wrapper_to_live_object(key, value);
862 if live_object.is_some() {
863 return live_object;
864 }
865 }
866 return None;
867 }
868 }
869}
870
871fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
873 DBOptions {
874 options: db_options
875 .clone()
876 .optimize_for_write_throughput()
877 .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
878 .options,
879 rw_options: db_options.rw_options.set_ignore_range_deletions(false),
880 }
881}
882
883fn objects_table_config(db_options: DBOptions) -> DBOptions {
884 db_options
885 .optimize_for_write_throughput()
886 .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
887}
888
889fn transactions_table_config(db_options: DBOptions) -> DBOptions {
890 db_options
891 .optimize_for_write_throughput()
892 .optimize_for_point_lookup(
893 read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
894 )
895}
896
897fn effects_table_config(db_options: DBOptions) -> DBOptions {
898 db_options
899 .optimize_for_write_throughput()
900 .optimize_for_point_lookup(
901 read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
902 )
903}