1use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6#[cfg(tidehunter)]
7use crate::authority::epoch_marker_key::EPOCH_MARKER_KEY_SIZE;
8use crate::authority::epoch_marker_key::EpochMarkerKey;
9use serde::{Deserialize, Serialize};
10use std::path::Path;
11use std::sync::atomic::AtomicU64;
12use sui_types::base_types::SequenceNumber;
13use sui_types::effects::{TransactionEffects, TransactionEvents};
14use sui_types::global_state_hash::GlobalStateHash;
15use sui_types::storage::MarkerValue;
16use typed_store::metrics::SamplingInterval;
17use typed_store::rocks::{
18 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
19 read_size_from_env,
20};
21use typed_store::traits::Map;
22
23use crate::authority::authority_store_types::{
24 StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
25};
26use crate::authority::epoch_start_configuration::EpochStartConfiguration;
27use typed_store::{DBMapUtils, DbIterator};
28
29const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
30pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
31const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
32const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
33
34#[derive(Default)]
36pub struct AuthorityPerpetualTablesOptions {
37 pub enable_write_stall: bool,
39 pub is_validator: bool,
40}
41
42impl AuthorityPerpetualTablesOptions {
43 fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
44 if !self.enable_write_stall {
45 db_options = db_options.disable_write_throttling();
46 }
47 db_options
48 }
49}
50
51#[derive(DBMapUtils)]
53#[cfg_attr(tidehunter, tidehunter)]
54pub struct AuthorityPerpetualTables {
55 pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
68
69 #[rename = "owned_object_transaction_locks"]
74 pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
75
76 pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
80
81 pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
90
91 pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
96
97 pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
99
100 pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
102
103 pub(crate) executed_transactions_to_checkpoint:
107 DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
108
109 pub(crate) root_state_hash_by_epoch:
113 DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
114
115 pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
117
118 pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
120
121 pub(crate) expected_network_sui_amount: DBMap<(), u64>,
126
127 pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
131
132 pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
138 pub(crate) object_per_epoch_marker_table_v2: DBMap<EpochMarkerKey, MarkerValue>,
139
140 pub(crate) executed_transaction_digests: DBMap<(EpochId, TransactionDigest), ()>,
144}
145
146impl AuthorityPerpetualTables {
147 pub fn path(parent_path: &Path) -> PathBuf {
148 parent_path.join("perpetual")
149 }
150
151 #[cfg(not(tidehunter))]
152 pub fn open(
153 parent_path: &Path,
154 db_options_override: Option<AuthorityPerpetualTablesOptions>,
155 _pruner_watermark: Option<Arc<AtomicU64>>,
156 ) -> Self {
157 let db_options_override = db_options_override.unwrap_or_default();
158 let db_options = db_options_override
159 .apply_to(default_db_options().optimize_db_for_write_throughput(4, false));
160 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
161 (
162 "objects".to_string(),
163 objects_table_config(db_options.clone()),
164 ),
165 (
166 "owned_object_transaction_locks".to_string(),
167 owned_object_transaction_locks_table_config(db_options.clone()),
168 ),
169 (
170 "transactions".to_string(),
171 transactions_table_config(db_options.clone()),
172 ),
173 (
174 "effects".to_string(),
175 effects_table_config(db_options.clone()),
176 ),
177 ]));
178
179 Self::open_tables_read_write(
180 Self::path(parent_path),
181 MetricConf::new("perpetual")
182 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
183 Some(db_options.options),
184 Some(table_options),
185 )
186 }
187
188 #[cfg(tidehunter)]
189 pub fn open(
190 parent_path: &Path,
191 db_options_override: Option<AuthorityPerpetualTablesOptions>,
192 pruner_watermark: Option<Arc<AtomicU64>>,
193 ) -> Self {
194 use crate::authority::authority_store_pruner::apply_relocation_filter;
195 tracing::warn!("AuthorityPerpetualTables using tidehunter");
196 use typed_store::tidehunter_util::{
197 Bytes, Decision, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
198 default_cells_per_mutex, default_mutex_count, default_value_cache_size,
199 };
200 let mutexes = default_mutex_count() * 2;
201 let transaction_mutexes = mutexes * 4;
202 let value_cache_size = default_value_cache_size();
203 let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
205
206 let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
207 let objects_compactor = |iter: &mut dyn DoubleEndedIterator<Item = &Bytes>| {
208 let mut retain = HashSet::new();
209 let mut previous: Option<&[u8]> = None;
210 const OID_SIZE: usize = 32;
211 for key in iter.rev() {
212 if let Some(prev) = previous {
213 if prev == &key[..OID_SIZE] {
214 continue;
215 }
216 }
217 previous = Some(&key[..OID_SIZE]);
218 retain.insert(key.clone());
219 }
220 retain
221 };
222 let mut digest_prefix = vec![0; 8];
223 digest_prefix[7] = 32;
224 let uniform_key = KeyType::uniform(default_cells_per_mutex());
225 let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
226 let epoch_tx_digest_prefix_key =
228 KeyType::from_prefix_bits((8+ 8) * 8 + 12);
229 let object_indexing = KeyIndexing::fixed(32 + 8); let obj_ref_size = 32 + 8 + 32 + 8;
232 let owned_object_transaction_locks_indexing =
233 KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
234
235 let mut objects_config = KeySpaceConfig::new()
236 .with_max_dirty_keys(4096)
237 .with_value_cache_size(value_cache_size);
238 if matches!(db_options_override, Some(options) if options.is_validator) {
239 objects_config = objects_config.with_compactor(Box::new(objects_compactor));
240 }
241
242 let configs = vec![
243 (
244 "objects".to_string(),
245 ThConfig::new_with_config_indexing(
246 object_indexing,
247 mutexes * 4,
248 KeyType::uniform(1),
249 objects_config,
250 ),
251 ),
252 (
253 "owned_object_transaction_locks".to_string(),
254 ThConfig::new_with_config_indexing(
255 owned_object_transaction_locks_indexing,
256 mutexes * 16,
257 KeyType::uniform(default_cells_per_mutex()),
258 bloom_config.clone().with_max_dirty_keys(16192),
259 ),
260 ),
261 (
262 "transactions".to_string(),
263 ThConfig::new_with_rm_prefix_indexing(
264 KeyIndexing::key_reduction(32, 0..16),
265 transaction_mutexes,
266 uniform_key,
267 KeySpaceConfig::new()
268 .with_value_cache_size(value_cache_size)
269 .with_relocation_filter(|_, _| Decision::Remove),
270 digest_prefix.clone(),
271 ),
272 ),
273 (
274 "effects".to_string(),
275 ThConfig::new_with_rm_prefix_indexing(
276 KeyIndexing::key_reduction(32, 0..16),
277 transaction_mutexes,
278 uniform_key,
279 apply_relocation_filter(
280 bloom_config.clone().with_value_cache_size(value_cache_size),
281 pruner_watermark.clone(),
282 |effects: TransactionEffects| effects.executed_epoch(),
283 false,
284 ),
285 digest_prefix.clone(),
286 ),
287 ),
288 (
289 "executed_effects".to_string(),
290 ThConfig::new_with_rm_prefix_indexing(
291 KeyIndexing::key_reduction(32, 0..16),
292 transaction_mutexes,
293 uniform_key,
294 bloom_config
295 .clone()
296 .with_value_cache_size(value_cache_size)
297 .with_relocation_filter(|_, _| Decision::Remove),
298 digest_prefix.clone(),
299 ),
300 ),
301 (
302 "events".to_string(),
303 ThConfig::new_with_rm_prefix(
304 32 + 8,
305 mutexes,
306 uniform_key,
307 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
308 digest_prefix.clone(),
309 ),
310 ),
311 (
312 "events_2".to_string(),
313 ThConfig::new_with_rm_prefix(
314 32,
315 mutexes,
316 uniform_key,
317 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
318 digest_prefix.clone(),
319 ),
320 ),
321 (
322 "unchanged_loaded_runtime_objects".to_string(),
323 ThConfig::new_with_rm_prefix(
324 32,
325 mutexes,
326 uniform_key,
327 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
328 digest_prefix.clone(),
329 ),
330 ),
331 (
332 "executed_transactions_to_checkpoint".to_string(),
333 ThConfig::new_with_rm_prefix(
334 32,
335 mutexes,
336 uniform_key,
337 apply_relocation_filter(
338 KeySpaceConfig::default(),
339 pruner_watermark.clone(),
340 |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
341 false,
342 ),
343 digest_prefix.clone(),
344 ),
345 ),
346 (
347 "root_state_hash_by_epoch".to_string(),
348 ThConfig::new(8, 1, KeyType::uniform(1)),
349 ),
350 (
351 "epoch_start_configuration".to_string(),
352 ThConfig::new(0, 1, KeyType::uniform(1)),
353 ),
354 (
355 "pruned_checkpoint".to_string(),
356 ThConfig::new(0, 1, KeyType::uniform(1)),
357 ),
358 (
359 "expected_network_sui_amount".to_string(),
360 ThConfig::new(0, 1, KeyType::uniform(1)),
361 ),
362 (
363 "expected_storage_fund_imbalance".to_string(),
364 ThConfig::new(0, 1, KeyType::uniform(1)),
365 ),
366 (
367 "object_per_epoch_marker_table".to_string(),
368 ThConfig::new_with_config_indexing(
369 KeyIndexing::VariableLength,
370 mutexes,
371 epoch_prefix_key,
372 apply_relocation_filter(
373 KeySpaceConfig::default(),
374 pruner_watermark.clone(),
375 |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
376 true,
377 ),
378 ),
379 ),
380 (
381 "object_per_epoch_marker_table_v2".to_string(),
382 ThConfig::new_with_config_indexing(
383 KeyIndexing::fixed(EPOCH_MARKER_KEY_SIZE),
384 mutexes,
385 epoch_prefix_key,
386 apply_relocation_filter(
387 bloom_config.clone(),
388 pruner_watermark.clone(),
389 |k: EpochMarkerKey| k.0,
390 true,
391 ),
392 ),
393 ),
394 (
395 "executed_transaction_digests".to_string(),
396 ThConfig::new_with_config_indexing(
397 KeyIndexing::fixed(8 + (32 + 8)),
399 transaction_mutexes,
400 epoch_tx_digest_prefix_key,
401 apply_relocation_filter(
402 bloom_config.clone(),
403 pruner_watermark.clone(),
404 |(epoch_id, _): (EpochId, TransactionDigest)| epoch_id,
405 true,
406 ),
407 ),
408 ),
409 ];
410 Self::open_tables_read_write(
411 Self::path(parent_path),
412 MetricConf::new("perpetual")
413 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
414 configs.into_iter().collect(),
415 )
416 }
417
418 #[cfg(not(tidehunter))]
419 pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
420 Self::get_read_only_handle(
421 Self::path(parent_path),
422 None,
423 None,
424 MetricConf::new("perpetual_readonly"),
425 )
426 }
427
428 #[cfg(tidehunter)]
429 pub fn open_readonly(parent_path: &Path) -> Self {
430 Self::open(parent_path, None, None)
431 }
432
433 #[cfg(tidehunter)]
434 pub fn force_rebuild_control_region(&self) -> anyhow::Result<()> {
435 self.objects.db.force_rebuild_control_region()
436 }
437
438 pub fn find_object_lt_or_eq_version(
442 &self,
443 object_id: ObjectID,
444 version: SequenceNumber,
445 ) -> SuiResult<Option<Object>> {
446 let mut iter = self.objects.reversed_safe_iter_with_bounds(
447 Some(ObjectKey::min_for_id(&object_id)),
448 Some(ObjectKey(object_id, version)),
449 )?;
450 match iter.next() {
451 Some(Ok((key, o))) => self.object(&key, o),
452 Some(Err(e)) => Err(e.into()),
453 None => Ok(None),
454 }
455 }
456
457 fn construct_object(
458 &self,
459 object_key: &ObjectKey,
460 store_object: StoreObjectValue,
461 ) -> Result<Object, SuiError> {
462 try_construct_object(object_key, store_object)
463 }
464
465 pub fn object(
468 &self,
469 object_key: &ObjectKey,
470 store_object: StoreObjectWrapper,
471 ) -> Result<Option<Object>, SuiError> {
472 let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
473 return Ok(None);
474 };
475 Ok(Some(self.construct_object(object_key, *store_object)?))
476 }
477
478 pub fn object_reference(
479 &self,
480 object_key: &ObjectKey,
481 store_object: StoreObjectWrapper,
482 ) -> Result<ObjectRef, SuiError> {
483 let obj_ref = match store_object.migrate().into_inner() {
484 StoreObject::Value(object) => self
485 .construct_object(object_key, *object)?
486 .compute_object_reference(),
487 StoreObject::Deleted => (
488 object_key.0,
489 object_key.1,
490 ObjectDigest::OBJECT_DIGEST_DELETED,
491 ),
492 StoreObject::Wrapped => (
493 object_key.0,
494 object_key.1,
495 ObjectDigest::OBJECT_DIGEST_WRAPPED,
496 ),
497 };
498 Ok(obj_ref)
499 }
500
501 pub fn tombstone_reference(
502 &self,
503 object_key: &ObjectKey,
504 store_object: &StoreObjectWrapper,
505 ) -> Result<Option<ObjectRef>, SuiError> {
506 let obj_ref = match store_object.inner() {
507 StoreObject::Deleted => Some((
508 object_key.0,
509 object_key.1,
510 ObjectDigest::OBJECT_DIGEST_DELETED,
511 )),
512 StoreObject::Wrapped => Some((
513 object_key.0,
514 object_key.1,
515 ObjectDigest::OBJECT_DIGEST_WRAPPED,
516 )),
517 _ => None,
518 };
519 Ok(obj_ref)
520 }
521
522 pub fn get_latest_object_ref_or_tombstone(
523 &self,
524 object_id: ObjectID,
525 ) -> Result<Option<ObjectRef>, SuiError> {
526 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
527 Some(ObjectKey::min_for_id(&object_id)),
528 Some(ObjectKey::max_for_id(&object_id)),
529 )?;
530
531 if let Some(Ok((object_key, value))) = iterator.next()
532 && object_key.0 == object_id
533 {
534 return Ok(Some(self.object_reference(&object_key, value)?));
535 }
536 Ok(None)
537 }
538
539 pub fn get_latest_object_or_tombstone(
540 &self,
541 object_id: ObjectID,
542 ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
543 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
544 Some(ObjectKey::min_for_id(&object_id)),
545 Some(ObjectKey::max_for_id(&object_id)),
546 )?;
547
548 if let Some(Ok((object_key, value))) = iterator.next()
549 && object_key.0 == object_id
550 {
551 return Ok(Some((object_key, value)));
552 }
553 Ok(None)
554 }
555
556 pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
557 Ok(self
558 .epoch_start_configuration
559 .get(&())?
560 .expect("Must have current epoch.")
561 .epoch_start_state()
562 .epoch())
563 }
564
565 pub fn set_epoch_start_configuration(
566 &self,
567 epoch_start_configuration: &EpochStartConfiguration,
568 ) -> SuiResult {
569 let mut wb = self.epoch_start_configuration.batch();
570 wb.insert_batch(
571 &self.epoch_start_configuration,
572 std::iter::once(((), epoch_start_configuration)),
573 )?;
574 wb.write()?;
575 Ok(())
576 }
577
578 pub fn get_highest_pruned_checkpoint(
579 &self,
580 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
581 self.pruned_checkpoint.get(&())
582 }
583
584 pub fn set_highest_pruned_checkpoint(
585 &self,
586 wb: &mut DBBatch,
587 checkpoint_number: CheckpointSequenceNumber,
588 ) -> SuiResult {
589 wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
590 Ok(())
591 }
592
593 pub fn get_transaction(
594 &self,
595 digest: &TransactionDigest,
596 ) -> SuiResult<Option<TrustedTransaction>> {
597 let Some(transaction) = self.transactions.get(digest)? else {
598 return Ok(None);
599 };
600 Ok(Some(transaction))
601 }
602
603 pub fn insert_executed_transaction_digests_batch(
606 &self,
607 epoch: EpochId,
608 digests: impl Iterator<Item = TransactionDigest>,
609 ) -> SuiResult {
610 let mut batch = self.executed_transaction_digests.batch();
611 batch.insert_batch(
612 &self.executed_transaction_digests,
613 digests.map(|digest| ((epoch, digest), ())),
614 )?;
615 batch.write()?;
616 Ok(())
617 }
618
619 pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
620 let Some(effect_digest) = self.executed_effects.get(digest)? else {
621 return Ok(None);
622 };
623 Ok(self.effects.get(&effect_digest)?)
624 }
625
626 pub(crate) fn was_transaction_executed_in_last_epoch(
627 &self,
628 digest: &TransactionDigest,
629 current_epoch: EpochId,
630 ) -> bool {
631 if current_epoch == 0 {
632 return false;
633 }
634 self.executed_transaction_digests
635 .contains_key(&(current_epoch - 1, *digest))
636 .expect("db error")
637 }
638
639 pub fn get_checkpoint_sequence_number(
642 &self,
643 digest: &TransactionDigest,
644 ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
645 Ok(self.executed_transactions_to_checkpoint.get(digest)?)
646 }
647
648 pub fn get_newer_object_keys(
649 &self,
650 object: &(ObjectID, SequenceNumber),
651 ) -> SuiResult<Vec<ObjectKey>> {
652 let mut objects = vec![];
653 for result in self.objects.safe_iter_with_bounds(
654 Some(ObjectKey(object.0, object.1.next())),
655 Some(ObjectKey(object.0, VersionNumber::MAX)),
656 ) {
657 let (key, _) = result?;
658 objects.push(key);
659 }
660 Ok(objects)
661 }
662
663 pub fn set_highest_pruned_checkpoint_without_wb(
664 &self,
665 checkpoint_number: CheckpointSequenceNumber,
666 ) -> SuiResult {
667 let mut wb = self.pruned_checkpoint.batch();
668 self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
669 wb.write()?;
670 Ok(())
671 }
672
673 pub fn database_is_empty(&self) -> SuiResult<bool> {
674 Ok(self.objects.safe_iter().next().is_none())
675 }
676
677 pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
678 LiveSetIter {
679 iter: Box::new(self.objects.safe_iter()),
680 tables: self,
681 prev: None,
682 include_wrapped_object,
683 }
684 }
685
686 pub fn range_iter_live_object_set(
687 &self,
688 lower_bound: Option<ObjectID>,
689 upper_bound: Option<ObjectID>,
690 include_wrapped_object: bool,
691 ) -> LiveSetIter<'_> {
692 let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
693 let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
694
695 LiveSetIter {
696 iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
697 tables: self,
698 prev: None,
699 include_wrapped_object,
700 }
701 }
702
703 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
704 self.objects.checkpoint_db(path).map_err(Into::into)
706 }
707
708 pub fn get_root_state_hash(
709 &self,
710 epoch: EpochId,
711 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
712 Ok(self.root_state_hash_by_epoch.get(&epoch)?)
713 }
714
715 pub fn insert_root_state_hash(
716 &self,
717 epoch: EpochId,
718 last_checkpoint_of_epoch: CheckpointSequenceNumber,
719 hash: GlobalStateHash,
720 ) -> SuiResult {
721 self.root_state_hash_by_epoch
722 .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
723 Ok(())
724 }
725
726 pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
727 let object_reference = object.compute_object_reference();
728 let wrapper = get_store_object(object);
729 let mut wb = self.objects.batch();
730 wb.insert_batch(
731 &self.objects,
732 std::iter::once((ObjectKey::from(object_reference), wrapper)),
733 )?;
734 wb.write()?;
735 Ok(())
736 }
737
738 pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
740 let obj_entry = self
741 .objects
742 .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
743 .next();
744
745 match obj_entry.transpose()? {
746 Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
747 Ok(self.object(&ObjectKey(obj_id, version), obj)?)
748 }
749 _ => Ok(None),
750 }
751 }
752
753 pub fn get_object_by_key_fallible(
754 &self,
755 object_id: &ObjectID,
756 version: VersionNumber,
757 ) -> SuiResult<Option<Object>> {
758 Ok(self
759 .objects
760 .get(&ObjectKey(*object_id, version))?
761 .and_then(|object| {
762 self.object(&ObjectKey(*object_id, version), object)
763 .expect("object construction error")
764 }))
765 }
766}
767
768impl ObjectStore for AuthorityPerpetualTables {
769 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
771 self.get_object_fallible(object_id).expect("db error")
772 }
773
774 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
775 self.get_object_by_key_fallible(object_id, version)
776 .expect("db error")
777 }
778}
779
780pub struct LiveSetIter<'a> {
781 iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
782 tables: &'a AuthorityPerpetualTables,
783 prev: Option<(ObjectKey, StoreObjectWrapper)>,
784 include_wrapped_object: bool,
786}
787
788#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
789pub enum LiveObject {
790 Normal(Object),
791 Wrapped(ObjectKey),
792}
793
794impl LiveObject {
795 pub fn object_id(&self) -> ObjectID {
796 match self {
797 LiveObject::Normal(obj) => obj.id(),
798 LiveObject::Wrapped(key) => key.0,
799 }
800 }
801
802 pub fn version(&self) -> SequenceNumber {
803 match self {
804 LiveObject::Normal(obj) => obj.version(),
805 LiveObject::Wrapped(key) => key.1,
806 }
807 }
808
809 pub fn object_reference(&self) -> ObjectRef {
810 match self {
811 LiveObject::Normal(obj) => obj.compute_object_reference(),
812 LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
813 }
814 }
815
816 pub fn to_normal(self) -> Option<Object> {
817 match self {
818 LiveObject::Normal(object) => Some(object),
819 LiveObject::Wrapped(_) => None,
820 }
821 }
822}
823
824impl LiveSetIter<'_> {
825 fn store_object_wrapper_to_live_object(
826 &self,
827 object_key: ObjectKey,
828 store_object: StoreObjectWrapper,
829 ) -> Option<LiveObject> {
830 match store_object.migrate().into_inner() {
831 StoreObject::Value(object) => {
832 let object = self
833 .tables
834 .construct_object(&object_key, *object)
835 .expect("Constructing object from store cannot fail");
836 Some(LiveObject::Normal(object))
837 }
838 StoreObject::Wrapped => {
839 if self.include_wrapped_object {
840 Some(LiveObject::Wrapped(object_key))
841 } else {
842 None
843 }
844 }
845 StoreObject::Deleted => None,
846 }
847 }
848}
849
850impl Iterator for LiveSetIter<'_> {
851 type Item = LiveObject;
852
853 fn next(&mut self) -> Option<Self::Item> {
854 loop {
855 if let Some(Ok((next_key, next_value))) = self.iter.next() {
856 let prev = self.prev.take();
857 self.prev = Some((next_key, next_value));
858
859 if let Some((prev_key, prev_value)) = prev
860 && prev_key.0 != next_key.0
861 {
862 let live_object =
863 self.store_object_wrapper_to_live_object(prev_key, prev_value);
864 if live_object.is_some() {
865 return live_object;
866 }
867 }
868 continue;
869 }
870 if let Some((key, value)) = self.prev.take() {
871 let live_object = self.store_object_wrapper_to_live_object(key, value);
872 if live_object.is_some() {
873 return live_object;
874 }
875 }
876 return None;
877 }
878 }
879}
880
881fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
883 DBOptions {
884 options: db_options
885 .clone()
886 .optimize_for_write_throughput()
887 .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
888 .options,
889 rw_options: db_options.rw_options.set_ignore_range_deletions(false),
890 }
891}
892
893fn objects_table_config(db_options: DBOptions) -> DBOptions {
894 db_options
895 .optimize_for_write_throughput()
896 .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
897}
898
899fn transactions_table_config(db_options: DBOptions) -> DBOptions {
900 db_options
901 .optimize_for_write_throughput()
902 .optimize_for_point_lookup(
903 read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
904 )
905}
906
907fn effects_table_config(db_options: DBOptions) -> DBOptions {
908 db_options
909 .optimize_for_write_throughput()
910 .optimize_for_point_lookup(
911 read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
912 )
913}