1use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6use serde::{Deserialize, Serialize};
7use std::path::Path;
8use std::sync::atomic::AtomicU64;
9use sui_types::base_types::SequenceNumber;
10use sui_types::digests::TransactionEventsDigest;
11use sui_types::effects::{TransactionEffects, TransactionEvents};
12use sui_types::global_state_hash::GlobalStateHash;
13use sui_types::storage::{FullObjectKey, MarkerValue};
14use typed_store::metrics::SamplingInterval;
15use typed_store::rocks::{
16 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
17 read_size_from_env,
18};
19use typed_store::traits::Map;
20
21use crate::authority::authority_store_types::{
22 StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
23};
24use crate::authority::epoch_start_configuration::EpochStartConfiguration;
25use typed_store::{DBMapUtils, DbIterator};
26
27const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
28pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
29const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
30const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
31
32#[derive(Default)]
34pub struct AuthorityPerpetualTablesOptions {
35 pub enable_write_stall: bool,
37 pub is_validator: bool,
38}
39
40impl AuthorityPerpetualTablesOptions {
41 fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
42 if !self.enable_write_stall {
43 db_options = db_options.disable_write_throttling();
44 }
45 db_options
46 }
47}
48
49#[derive(DBMapUtils)]
51#[cfg_attr(tidehunter, tidehunter)]
52pub struct AuthorityPerpetualTables {
53 pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
66
67 #[rename = "owned_object_transaction_locks"]
72 pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
73
74 pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
78
79 pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
88
89 pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
94
95 #[allow(dead_code)]
96 #[deprecated]
97 events: DBMap<(TransactionEventsDigest, usize), Event>,
98
99 pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
101
102 pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
104
105 pub(crate) executed_transactions_to_checkpoint:
109 DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
110
111 pub(crate) root_state_hash_by_epoch:
115 DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
116
117 pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
119
120 pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
122
123 pub(crate) expected_network_sui_amount: DBMap<(), u64>,
128
129 pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
133
134 pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
140 pub(crate) object_per_epoch_marker_table_v2: DBMap<(EpochId, FullObjectKey), MarkerValue>,
141
142 pub(crate) executed_transaction_digests: DBMap<(EpochId, TransactionDigest), ()>,
146}
147
148impl AuthorityPerpetualTables {
149 pub fn path(parent_path: &Path) -> PathBuf {
150 parent_path.join("perpetual")
151 }
152
153 #[cfg(not(tidehunter))]
154 pub fn open(
155 parent_path: &Path,
156 db_options_override: Option<AuthorityPerpetualTablesOptions>,
157 _pruner_watermark: Option<Arc<AtomicU64>>,
158 ) -> Self {
159 let db_options_override = db_options_override.unwrap_or_default();
160 let db_options =
161 db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
162 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
163 (
164 "objects".to_string(),
165 objects_table_config(db_options.clone()),
166 ),
167 (
168 "owned_object_transaction_locks".to_string(),
169 owned_object_transaction_locks_table_config(db_options.clone()),
170 ),
171 (
172 "transactions".to_string(),
173 transactions_table_config(db_options.clone()),
174 ),
175 (
176 "effects".to_string(),
177 effects_table_config(db_options.clone()),
178 ),
179 ]));
180
181 Self::open_tables_read_write(
182 Self::path(parent_path),
183 MetricConf::new("perpetual")
184 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
185 Some(db_options.options),
186 Some(table_options),
187 )
188 }
189
190 #[cfg(tidehunter)]
191 pub fn open(
192 parent_path: &Path,
193 db_options_override: Option<AuthorityPerpetualTablesOptions>,
194 pruner_watermark: Option<Arc<AtomicU64>>,
195 ) -> Self {
196 use crate::authority::authority_store_pruner::apply_relocation_filter;
197 tracing::warn!("AuthorityPerpetualTables using tidehunter");
198 use typed_store::tidehunter_util::{
199 Bytes, Decision, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
200 default_cells_per_mutex, default_mutex_count, default_value_cache_size,
201 };
202 let mutexes = default_mutex_count() * 2;
203 let value_cache_size = default_value_cache_size();
204 let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
206
207 let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
208 let objects_compactor = |iter: &mut dyn DoubleEndedIterator<Item = &Bytes>| {
209 let mut retain = HashSet::new();
210 let mut previous: Option<&[u8]> = None;
211 const OID_SIZE: usize = 16;
212 for key in iter.rev() {
213 if let Some(prev) = previous {
214 if prev == &key[..OID_SIZE] {
215 continue;
216 }
217 }
218 previous = Some(&key[..OID_SIZE]);
219 retain.insert(key.clone());
220 }
221 retain
222 };
223 let mut digest_prefix = vec![0; 8];
224 digest_prefix[7] = 32;
225 let uniform_key = KeyType::uniform(default_cells_per_mutex());
226 let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
227 let epoch_tx_digest_prefix_key =
229 KeyType::from_prefix_bits((8+ 8) * 8 + 12);
230 let object_indexing = KeyIndexing::key_reduction(32 + 8, 16..(32 + 8));
231 let obj_ref_size = 32 + 8 + 32 + 8;
233 let owned_object_transaction_locks_indexing =
234 KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
235
236 let mut objects_config = KeySpaceConfig::new()
237 .with_unloaded_iterator(true)
238 .with_max_dirty_keys(4048);
239 if matches!(db_options_override, Some(options) if options.is_validator) {
240 objects_config = objects_config.with_compactor(Box::new(objects_compactor));
241 }
242
243 let configs = vec![
244 (
245 "objects".to_string(),
246 ThConfig::new_with_config_indexing(
247 object_indexing,
248 mutexes,
249 KeyType::uniform(default_cells_per_mutex() * 4),
250 objects_config,
251 ),
252 ),
253 (
254 "owned_object_transaction_locks".to_string(),
255 ThConfig::new_with_config_indexing(
256 owned_object_transaction_locks_indexing,
257 mutexes,
258 KeyType::uniform(default_cells_per_mutex() * 4),
259 bloom_config.clone().with_max_dirty_keys(4048),
260 ),
261 ),
262 (
263 "transactions".to_string(),
264 ThConfig::new_with_rm_prefix_indexing(
265 KeyIndexing::key_reduction(32, 0..16),
266 mutexes,
267 uniform_key,
268 KeySpaceConfig::new()
269 .with_value_cache_size(value_cache_size)
270 .with_relocation_filter(|_, _| Decision::Remove),
271 digest_prefix.clone(),
272 ),
273 ),
274 (
275 "effects".to_string(),
276 ThConfig::new_with_rm_prefix_indexing(
277 KeyIndexing::key_reduction(32, 0..16),
278 mutexes,
279 uniform_key,
280 apply_relocation_filter(
281 bloom_config.clone().with_value_cache_size(value_cache_size),
282 pruner_watermark.clone(),
283 |effects: TransactionEffects| effects.executed_epoch(),
284 false,
285 ),
286 digest_prefix.clone(),
287 ),
288 ),
289 (
290 "executed_effects".to_string(),
291 ThConfig::new_with_rm_prefix_indexing(
292 KeyIndexing::key_reduction(32, 0..16),
293 mutexes,
294 uniform_key,
295 bloom_config
296 .clone()
297 .with_value_cache_size(value_cache_size)
298 .with_relocation_filter(|_, _| Decision::Remove),
299 digest_prefix.clone(),
300 ),
301 ),
302 (
303 "events".to_string(),
304 ThConfig::new_with_rm_prefix(
305 32 + 8,
306 mutexes,
307 uniform_key,
308 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
309 digest_prefix.clone(),
310 ),
311 ),
312 (
313 "events_2".to_string(),
314 ThConfig::new_with_rm_prefix(
315 32,
316 mutexes,
317 uniform_key,
318 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
319 digest_prefix.clone(),
320 ),
321 ),
322 (
323 "unchanged_loaded_runtime_objects".to_string(),
324 ThConfig::new_with_rm_prefix(
325 32,
326 mutexes,
327 uniform_key,
328 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
329 digest_prefix.clone(),
330 ),
331 ),
332 (
333 "executed_transactions_to_checkpoint".to_string(),
334 ThConfig::new_with_rm_prefix(
335 32,
336 mutexes,
337 uniform_key,
338 apply_relocation_filter(
339 KeySpaceConfig::default(),
340 pruner_watermark.clone(),
341 |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
342 false,
343 ),
344 digest_prefix.clone(),
345 ),
346 ),
347 (
348 "root_state_hash_by_epoch".to_string(),
349 ThConfig::new(8, 1, KeyType::uniform(1)),
350 ),
351 (
352 "epoch_start_configuration".to_string(),
353 ThConfig::new(0, 1, KeyType::uniform(1)),
354 ),
355 (
356 "pruned_checkpoint".to_string(),
357 ThConfig::new(0, 1, KeyType::uniform(1)),
358 ),
359 (
360 "expected_network_sui_amount".to_string(),
361 ThConfig::new(0, 1, KeyType::uniform(1)),
362 ),
363 (
364 "expected_storage_fund_imbalance".to_string(),
365 ThConfig::new(0, 1, KeyType::uniform(1)),
366 ),
367 (
368 "object_per_epoch_marker_table".to_string(),
369 ThConfig::new_with_config_indexing(
370 KeyIndexing::VariableLength,
371 mutexes,
372 epoch_prefix_key,
373 apply_relocation_filter(
374 KeySpaceConfig::default(),
375 pruner_watermark.clone(),
376 |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
377 true,
378 ),
379 ),
380 ),
381 (
382 "object_per_epoch_marker_table_v2".to_string(),
383 ThConfig::new_with_config_indexing(
384 KeyIndexing::VariableLength,
385 mutexes,
386 epoch_prefix_key,
387 apply_relocation_filter(
388 bloom_config.clone(),
389 pruner_watermark.clone(),
390 |(epoch_id, _): (EpochId, FullObjectKey)| epoch_id,
391 true,
392 ),
393 ),
394 ),
395 (
396 "executed_transaction_digests".to_string(),
397 ThConfig::new_with_config_indexing(
398 KeyIndexing::fixed(8 + (32 + 8)),
400 mutexes,
401 epoch_tx_digest_prefix_key,
402 apply_relocation_filter(
403 bloom_config.clone(),
404 pruner_watermark.clone(),
405 |(epoch_id, _): (EpochId, TransactionDigest)| epoch_id,
406 true,
407 ),
408 ),
409 ),
410 ];
411 Self::open_tables_read_write(
412 Self::path(parent_path),
413 MetricConf::new("perpetual")
414 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
415 configs.into_iter().collect(),
416 )
417 }
418
419 #[cfg(not(tidehunter))]
420 pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
421 Self::get_read_only_handle(
422 Self::path(parent_path),
423 None,
424 None,
425 MetricConf::new("perpetual_readonly"),
426 )
427 }
428
429 #[cfg(tidehunter)]
430 pub fn open_readonly(parent_path: &Path) -> Self {
431 Self::open(parent_path, None, None)
432 }
433
434 pub fn find_object_lt_or_eq_version(
438 &self,
439 object_id: ObjectID,
440 version: SequenceNumber,
441 ) -> SuiResult<Option<Object>> {
442 let mut iter = self.objects.reversed_safe_iter_with_bounds(
443 Some(ObjectKey::min_for_id(&object_id)),
444 Some(ObjectKey(object_id, version)),
445 )?;
446 match iter.next() {
447 Some(Ok((key, o))) => self.object(&key, o),
448 Some(Err(e)) => Err(e.into()),
449 None => Ok(None),
450 }
451 }
452
453 fn construct_object(
454 &self,
455 object_key: &ObjectKey,
456 store_object: StoreObjectValue,
457 ) -> Result<Object, SuiError> {
458 try_construct_object(object_key, store_object)
459 }
460
461 pub fn object(
464 &self,
465 object_key: &ObjectKey,
466 store_object: StoreObjectWrapper,
467 ) -> Result<Option<Object>, SuiError> {
468 let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
469 return Ok(None);
470 };
471 Ok(Some(self.construct_object(object_key, *store_object)?))
472 }
473
474 pub fn object_reference(
475 &self,
476 object_key: &ObjectKey,
477 store_object: StoreObjectWrapper,
478 ) -> Result<ObjectRef, SuiError> {
479 let obj_ref = match store_object.migrate().into_inner() {
480 StoreObject::Value(object) => self
481 .construct_object(object_key, *object)?
482 .compute_object_reference(),
483 StoreObject::Deleted => (
484 object_key.0,
485 object_key.1,
486 ObjectDigest::OBJECT_DIGEST_DELETED,
487 ),
488 StoreObject::Wrapped => (
489 object_key.0,
490 object_key.1,
491 ObjectDigest::OBJECT_DIGEST_WRAPPED,
492 ),
493 };
494 Ok(obj_ref)
495 }
496
497 pub fn tombstone_reference(
498 &self,
499 object_key: &ObjectKey,
500 store_object: &StoreObjectWrapper,
501 ) -> Result<Option<ObjectRef>, SuiError> {
502 let obj_ref = match store_object.inner() {
503 StoreObject::Deleted => Some((
504 object_key.0,
505 object_key.1,
506 ObjectDigest::OBJECT_DIGEST_DELETED,
507 )),
508 StoreObject::Wrapped => Some((
509 object_key.0,
510 object_key.1,
511 ObjectDigest::OBJECT_DIGEST_WRAPPED,
512 )),
513 _ => None,
514 };
515 Ok(obj_ref)
516 }
517
518 pub fn get_latest_object_ref_or_tombstone(
519 &self,
520 object_id: ObjectID,
521 ) -> Result<Option<ObjectRef>, SuiError> {
522 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
523 Some(ObjectKey::min_for_id(&object_id)),
524 Some(ObjectKey::max_for_id(&object_id)),
525 )?;
526
527 if let Some(Ok((object_key, value))) = iterator.next()
528 && object_key.0 == object_id
529 {
530 return Ok(Some(self.object_reference(&object_key, value)?));
531 }
532 Ok(None)
533 }
534
535 pub fn get_latest_object_or_tombstone(
536 &self,
537 object_id: ObjectID,
538 ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
539 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
540 Some(ObjectKey::min_for_id(&object_id)),
541 Some(ObjectKey::max_for_id(&object_id)),
542 )?;
543
544 if let Some(Ok((object_key, value))) = iterator.next()
545 && object_key.0 == object_id
546 {
547 return Ok(Some((object_key, value)));
548 }
549 Ok(None)
550 }
551
552 pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
553 Ok(self
554 .epoch_start_configuration
555 .get(&())?
556 .expect("Must have current epoch.")
557 .epoch_start_state()
558 .epoch())
559 }
560
561 pub fn set_epoch_start_configuration(
562 &self,
563 epoch_start_configuration: &EpochStartConfiguration,
564 ) -> SuiResult {
565 let mut wb = self.epoch_start_configuration.batch();
566 wb.insert_batch(
567 &self.epoch_start_configuration,
568 std::iter::once(((), epoch_start_configuration)),
569 )?;
570 wb.write()?;
571 Ok(())
572 }
573
574 pub fn get_highest_pruned_checkpoint(
575 &self,
576 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
577 self.pruned_checkpoint.get(&())
578 }
579
580 pub fn set_highest_pruned_checkpoint(
581 &self,
582 wb: &mut DBBatch,
583 checkpoint_number: CheckpointSequenceNumber,
584 ) -> SuiResult {
585 wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
586 Ok(())
587 }
588
589 pub fn get_transaction(
590 &self,
591 digest: &TransactionDigest,
592 ) -> SuiResult<Option<TrustedTransaction>> {
593 let Some(transaction) = self.transactions.get(digest)? else {
594 return Ok(None);
595 };
596 Ok(Some(transaction))
597 }
598
599 pub fn insert_executed_transaction_digests_batch(
602 &self,
603 epoch: EpochId,
604 digests: impl Iterator<Item = TransactionDigest>,
605 ) -> SuiResult {
606 let mut batch = self.executed_transaction_digests.batch();
607 batch.insert_batch(
608 &self.executed_transaction_digests,
609 digests.map(|digest| ((epoch, digest), ())),
610 )?;
611 batch.write()?;
612 Ok(())
613 }
614
615 pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
616 let Some(effect_digest) = self.executed_effects.get(digest)? else {
617 return Ok(None);
618 };
619 Ok(self.effects.get(&effect_digest)?)
620 }
621
622 pub(crate) fn was_transaction_executed_in_last_epoch(
623 &self,
624 digest: &TransactionDigest,
625 current_epoch: EpochId,
626 ) -> bool {
627 if current_epoch == 0 {
628 return false;
629 }
630 self.executed_transaction_digests
631 .contains_key(&(current_epoch - 1, *digest))
632 .expect("db error")
633 }
634
635 pub fn get_checkpoint_sequence_number(
638 &self,
639 digest: &TransactionDigest,
640 ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
641 Ok(self.executed_transactions_to_checkpoint.get(digest)?)
642 }
643
644 pub fn get_newer_object_keys(
645 &self,
646 object: &(ObjectID, SequenceNumber),
647 ) -> SuiResult<Vec<ObjectKey>> {
648 let mut objects = vec![];
649 for result in self.objects.safe_iter_with_bounds(
650 Some(ObjectKey(object.0, object.1.next())),
651 Some(ObjectKey(object.0, VersionNumber::MAX)),
652 ) {
653 let (key, _) = result?;
654 objects.push(key);
655 }
656 Ok(objects)
657 }
658
659 pub fn set_highest_pruned_checkpoint_without_wb(
660 &self,
661 checkpoint_number: CheckpointSequenceNumber,
662 ) -> SuiResult {
663 let mut wb = self.pruned_checkpoint.batch();
664 self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
665 wb.write()?;
666 Ok(())
667 }
668
669 pub fn database_is_empty(&self) -> SuiResult<bool> {
670 Ok(self.objects.safe_iter().next().is_none())
671 }
672
673 pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
674 LiveSetIter {
675 iter: Box::new(self.objects.safe_iter()),
676 tables: self,
677 prev: None,
678 include_wrapped_object,
679 }
680 }
681
682 pub fn range_iter_live_object_set(
683 &self,
684 lower_bound: Option<ObjectID>,
685 upper_bound: Option<ObjectID>,
686 include_wrapped_object: bool,
687 ) -> LiveSetIter<'_> {
688 let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
689 let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
690
691 LiveSetIter {
692 iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
693 tables: self,
694 prev: None,
695 include_wrapped_object,
696 }
697 }
698
699 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
700 self.objects.checkpoint_db(path).map_err(Into::into)
702 }
703
704 pub fn get_root_state_hash(
705 &self,
706 epoch: EpochId,
707 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
708 Ok(self.root_state_hash_by_epoch.get(&epoch)?)
709 }
710
711 pub fn insert_root_state_hash(
712 &self,
713 epoch: EpochId,
714 last_checkpoint_of_epoch: CheckpointSequenceNumber,
715 hash: GlobalStateHash,
716 ) -> SuiResult {
717 self.root_state_hash_by_epoch
718 .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
719 Ok(())
720 }
721
722 pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
723 let object_reference = object.compute_object_reference();
724 let wrapper = get_store_object(object);
725 let mut wb = self.objects.batch();
726 wb.insert_batch(
727 &self.objects,
728 std::iter::once((ObjectKey::from(object_reference), wrapper)),
729 )?;
730 wb.write()?;
731 Ok(())
732 }
733
734 pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
736 let obj_entry = self
737 .objects
738 .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
739 .next();
740
741 match obj_entry.transpose()? {
742 Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
743 Ok(self.object(&ObjectKey(obj_id, version), obj)?)
744 }
745 _ => Ok(None),
746 }
747 }
748
749 pub fn get_object_by_key_fallible(
750 &self,
751 object_id: &ObjectID,
752 version: VersionNumber,
753 ) -> SuiResult<Option<Object>> {
754 Ok(self
755 .objects
756 .get(&ObjectKey(*object_id, version))?
757 .and_then(|object| {
758 self.object(&ObjectKey(*object_id, version), object)
759 .expect("object construction error")
760 }))
761 }
762}
763
764impl ObjectStore for AuthorityPerpetualTables {
765 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
767 self.get_object_fallible(object_id).expect("db error")
768 }
769
770 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
771 self.get_object_by_key_fallible(object_id, version)
772 .expect("db error")
773 }
774}
775
776pub struct LiveSetIter<'a> {
777 iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
778 tables: &'a AuthorityPerpetualTables,
779 prev: Option<(ObjectKey, StoreObjectWrapper)>,
780 include_wrapped_object: bool,
782}
783
784#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
785pub enum LiveObject {
786 Normal(Object),
787 Wrapped(ObjectKey),
788}
789
790impl LiveObject {
791 pub fn object_id(&self) -> ObjectID {
792 match self {
793 LiveObject::Normal(obj) => obj.id(),
794 LiveObject::Wrapped(key) => key.0,
795 }
796 }
797
798 pub fn version(&self) -> SequenceNumber {
799 match self {
800 LiveObject::Normal(obj) => obj.version(),
801 LiveObject::Wrapped(key) => key.1,
802 }
803 }
804
805 pub fn object_reference(&self) -> ObjectRef {
806 match self {
807 LiveObject::Normal(obj) => obj.compute_object_reference(),
808 LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
809 }
810 }
811
812 pub fn to_normal(self) -> Option<Object> {
813 match self {
814 LiveObject::Normal(object) => Some(object),
815 LiveObject::Wrapped(_) => None,
816 }
817 }
818}
819
820impl LiveSetIter<'_> {
821 fn store_object_wrapper_to_live_object(
822 &self,
823 object_key: ObjectKey,
824 store_object: StoreObjectWrapper,
825 ) -> Option<LiveObject> {
826 match store_object.migrate().into_inner() {
827 StoreObject::Value(object) => {
828 let object = self
829 .tables
830 .construct_object(&object_key, *object)
831 .expect("Constructing object from store cannot fail");
832 Some(LiveObject::Normal(object))
833 }
834 StoreObject::Wrapped => {
835 if self.include_wrapped_object {
836 Some(LiveObject::Wrapped(object_key))
837 } else {
838 None
839 }
840 }
841 StoreObject::Deleted => None,
842 }
843 }
844}
845
846impl Iterator for LiveSetIter<'_> {
847 type Item = LiveObject;
848
849 fn next(&mut self) -> Option<Self::Item> {
850 loop {
851 if let Some(Ok((next_key, next_value))) = self.iter.next() {
852 let prev = self.prev.take();
853 self.prev = Some((next_key, next_value));
854
855 if let Some((prev_key, prev_value)) = prev
856 && prev_key.0 != next_key.0
857 {
858 let live_object =
859 self.store_object_wrapper_to_live_object(prev_key, prev_value);
860 if live_object.is_some() {
861 return live_object;
862 }
863 }
864 continue;
865 }
866 if let Some((key, value)) = self.prev.take() {
867 let live_object = self.store_object_wrapper_to_live_object(key, value);
868 if live_object.is_some() {
869 return live_object;
870 }
871 }
872 return None;
873 }
874 }
875}
876
877fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
879 DBOptions {
880 options: db_options
881 .clone()
882 .optimize_for_write_throughput()
883 .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
884 .options,
885 rw_options: db_options.rw_options.set_ignore_range_deletions(false),
886 }
887}
888
889fn objects_table_config(db_options: DBOptions) -> DBOptions {
890 db_options
891 .optimize_for_write_throughput()
892 .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
893}
894
895fn transactions_table_config(db_options: DBOptions) -> DBOptions {
896 db_options
897 .optimize_for_write_throughput()
898 .optimize_for_point_lookup(
899 read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
900 )
901}
902
903fn effects_table_config(db_options: DBOptions) -> DBOptions {
904 db_options
905 .optimize_for_write_throughput()
906 .optimize_for_point_lookup(
907 read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
908 )
909}