1use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6use serde::{Deserialize, Serialize};
7use std::path::Path;
8use std::sync::atomic::AtomicU64;
9use sui_types::base_types::SequenceNumber;
10use sui_types::digests::TransactionEventsDigest;
11use sui_types::effects::{TransactionEffects, TransactionEvents};
12use sui_types::global_state_hash::GlobalStateHash;
13use sui_types::storage::{FullObjectKey, MarkerValue};
14use tracing::error;
15use typed_store::metrics::SamplingInterval;
16use typed_store::rocks::{
17 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
18 read_size_from_env,
19};
20use typed_store::traits::Map;
21
22use crate::authority::authority_store_pruner::ObjectsCompactionFilter;
23use crate::authority::authority_store_types::{
24 StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
25};
26use crate::authority::epoch_start_configuration::EpochStartConfiguration;
27use typed_store::rocksdb::compaction_filter::Decision;
28use typed_store::{DBMapUtils, DbIterator};
29
30const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
31pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
32const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
33const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
34
35#[derive(Default)]
37pub struct AuthorityPerpetualTablesOptions {
38 pub enable_write_stall: bool,
40 pub compaction_filter: Option<ObjectsCompactionFilter>,
41}
42
43impl AuthorityPerpetualTablesOptions {
44 fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
45 if !self.enable_write_stall {
46 db_options = db_options.disable_write_throttling();
47 }
48 db_options
49 }
50}
51
52#[derive(DBMapUtils)]
54#[cfg_attr(tidehunter, tidehunter)]
55pub struct AuthorityPerpetualTables {
56 pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
69
70 #[rename = "owned_object_transaction_locks"]
75 pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
76
77 pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
81
82 pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
91
92 pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
97
98 #[allow(dead_code)]
99 #[deprecated]
100 events: DBMap<(TransactionEventsDigest, usize), Event>,
101
102 pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
104
105 pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
107
108 pub(crate) executed_transactions_to_checkpoint:
112 DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
113
114 pub(crate) root_state_hash_by_epoch:
118 DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
119
120 pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
122
123 pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
125
126 pub(crate) expected_network_sui_amount: DBMap<(), u64>,
131
132 pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
136
137 pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
143 pub(crate) object_per_epoch_marker_table_v2: DBMap<(EpochId, FullObjectKey), MarkerValue>,
144}
145
146#[derive(DBMapUtils)]
147pub struct AuthorityPrunerTables {
148 pub(crate) object_tombstones: DBMap<ObjectID, SequenceNumber>,
149}
150
151impl AuthorityPrunerTables {
152 pub fn path(parent_path: &Path) -> PathBuf {
153 parent_path.join("pruner")
154 }
155
156 pub fn open(parent_path: &Path) -> Self {
157 Self::open_tables_read_write(
158 Self::path(parent_path),
159 MetricConf::new("pruner")
160 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
161 None,
162 None,
163 )
164 }
165}
166
167impl AuthorityPerpetualTables {
168 pub fn path(parent_path: &Path) -> PathBuf {
169 parent_path.join("perpetual")
170 }
171
172 #[cfg(not(tidehunter))]
173 pub fn open(
174 parent_path: &Path,
175 db_options_override: Option<AuthorityPerpetualTablesOptions>,
176 _pruner_watermark: Option<Arc<AtomicU64>>,
177 ) -> Self {
178 let db_options_override = db_options_override.unwrap_or_default();
179 let db_options =
180 db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
181 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
182 (
183 "objects".to_string(),
184 objects_table_config(db_options.clone(), db_options_override.compaction_filter),
185 ),
186 (
187 "owned_object_transaction_locks".to_string(),
188 owned_object_transaction_locks_table_config(db_options.clone()),
189 ),
190 (
191 "transactions".to_string(),
192 transactions_table_config(db_options.clone()),
193 ),
194 (
195 "effects".to_string(),
196 effects_table_config(db_options.clone()),
197 ),
198 ]));
199
200 Self::open_tables_read_write(
201 Self::path(parent_path),
202 MetricConf::new("perpetual")
203 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
204 Some(db_options.options),
205 Some(table_options),
206 )
207 }
208
209 #[cfg(tidehunter)]
210 pub fn open(
211 parent_path: &Path,
212 _: Option<AuthorityPerpetualTablesOptions>,
213 pruner_watermark: Option<Arc<AtomicU64>>,
214 ) -> Self {
215 use crate::authority::authority_store_pruner::apply_relocation_filter;
216 tracing::warn!("AuthorityPerpetualTables using tidehunter");
217 use typed_store::tidehunter_util::{
218 Bytes, Decision, IndexWalPosition, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
219 default_cells_per_mutex, default_mutex_count, default_value_cache_size,
220 };
221 let mutexes = default_mutex_count() * 2;
222 let value_cache_size = default_value_cache_size();
223 let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
225
226 let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
227 let objects_compactor = |index: &mut BTreeMap<Bytes, IndexWalPosition>| {
228 let mut retain = HashSet::new();
229 let mut previous: Option<&[u8]> = None;
230 const OID_SIZE: usize = 16;
231 for (key, _) in index.iter().rev() {
232 if let Some(prev) = previous {
233 if prev == &key[..OID_SIZE] {
234 continue;
235 }
236 }
237 previous = Some(&key[..OID_SIZE]);
238 retain.insert(key.clone());
239 }
240 index.retain(|k, _| retain.contains(k));
241 };
242 let mut digest_prefix = vec![0; 8];
243 digest_prefix[7] = 32;
244 let uniform_key = KeyType::uniform(default_cells_per_mutex());
245 let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
246 let object_indexing = KeyIndexing::key_reduction(32 + 8, 16..(32 + 8));
247 let obj_ref_size = 32 + 8 + 32 + 8;
249 let owned_object_transaction_locks_indexing =
250 KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
251
252 let configs = vec![
253 (
254 "objects".to_string(),
255 ThConfig::new_with_config_indexing(
256 object_indexing,
257 mutexes,
258 KeyType::uniform(default_cells_per_mutex() * 4),
259 KeySpaceConfig::new()
260 .with_unloaded_iterator(true)
261 .with_max_dirty_keys(4048)
262 .with_compactor(Box::new(objects_compactor)),
263 ),
264 ),
265 (
266 "owned_object_transaction_locks".to_string(),
267 ThConfig::new_with_config_indexing(
268 owned_object_transaction_locks_indexing,
269 mutexes,
270 KeyType::uniform(default_cells_per_mutex() * 4),
271 bloom_config.clone().with_max_dirty_keys(4048),
272 ),
273 ),
274 (
275 "transactions".to_string(),
276 ThConfig::new_with_rm_prefix_indexing(
277 KeyIndexing::key_reduction(32, 0..16),
278 mutexes,
279 uniform_key,
280 KeySpaceConfig::new()
281 .with_value_cache_size(value_cache_size)
282 .with_relocation_filter(|_, _| Decision::Remove),
283 digest_prefix.clone(),
284 ),
285 ),
286 (
287 "effects".to_string(),
288 ThConfig::new_with_rm_prefix_indexing(
289 KeyIndexing::key_reduction(32, 0..16),
290 mutexes,
291 uniform_key,
292 apply_relocation_filter(
293 bloom_config.clone().with_value_cache_size(value_cache_size),
294 pruner_watermark.clone(),
295 |effects: TransactionEffects| effects.executed_epoch(),
296 false,
297 ),
298 digest_prefix.clone(),
299 ),
300 ),
301 (
302 "executed_effects".to_string(),
303 ThConfig::new_with_rm_prefix_indexing(
304 KeyIndexing::key_reduction(32, 0..16),
305 mutexes,
306 uniform_key,
307 bloom_config
308 .clone()
309 .with_value_cache_size(value_cache_size)
310 .with_relocation_filter(|_, _| Decision::Remove),
311 digest_prefix.clone(),
312 ),
313 ),
314 (
315 "events".to_string(),
316 ThConfig::new_with_rm_prefix(
317 32 + 8,
318 mutexes,
319 uniform_key,
320 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
321 digest_prefix.clone(),
322 ),
323 ),
324 (
325 "events_2".to_string(),
326 ThConfig::new_with_rm_prefix(
327 32,
328 mutexes,
329 uniform_key,
330 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
331 digest_prefix.clone(),
332 ),
333 ),
334 (
335 "unchanged_loaded_runtime_objects".to_string(),
336 ThConfig::new_with_rm_prefix(
337 32,
338 mutexes,
339 uniform_key,
340 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
341 digest_prefix.clone(),
342 ),
343 ),
344 (
345 "executed_transactions_to_checkpoint".to_string(),
346 ThConfig::new_with_rm_prefix(
347 32,
348 mutexes,
349 uniform_key,
350 apply_relocation_filter(
351 KeySpaceConfig::default(),
352 pruner_watermark.clone(),
353 |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
354 false,
355 ),
356 digest_prefix.clone(),
357 ),
358 ),
359 (
360 "root_state_hash_by_epoch".to_string(),
361 ThConfig::new(8, 1, KeyType::uniform(1)),
362 ),
363 (
364 "epoch_start_configuration".to_string(),
365 ThConfig::new(0, 1, KeyType::uniform(1)),
366 ),
367 (
368 "pruned_checkpoint".to_string(),
369 ThConfig::new(0, 1, KeyType::uniform(1)),
370 ),
371 (
372 "expected_network_sui_amount".to_string(),
373 ThConfig::new(0, 1, KeyType::uniform(1)),
374 ),
375 (
376 "expected_storage_fund_imbalance".to_string(),
377 ThConfig::new(0, 1, KeyType::uniform(1)),
378 ),
379 (
380 "object_per_epoch_marker_table".to_string(),
381 ThConfig::new_with_config_indexing(
382 KeyIndexing::VariableLength,
383 mutexes,
384 epoch_prefix_key,
385 apply_relocation_filter(
386 KeySpaceConfig::default(),
387 pruner_watermark.clone(),
388 |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
389 true,
390 ),
391 ),
392 ),
393 (
394 "object_per_epoch_marker_table_v2".to_string(),
395 ThConfig::new_with_config_indexing(
396 KeyIndexing::VariableLength,
397 mutexes,
398 epoch_prefix_key,
399 apply_relocation_filter(
400 bloom_config.clone(),
401 pruner_watermark.clone(),
402 |(epoch_id, _): (EpochId, FullObjectKey)| epoch_id,
403 true,
404 ),
405 ),
406 ),
407 ];
408 Self::open_tables_read_write(
409 Self::path(parent_path),
410 MetricConf::new("perpetual")
411 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
412 configs.into_iter().collect(),
413 )
414 }
415
416 pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
417 Self::get_read_only_handle(
418 Self::path(parent_path),
419 None,
420 None,
421 MetricConf::new("perpetual_readonly"),
422 )
423 }
424
425 pub fn find_object_lt_or_eq_version(
429 &self,
430 object_id: ObjectID,
431 version: SequenceNumber,
432 ) -> SuiResult<Option<Object>> {
433 let mut iter = self.objects.reversed_safe_iter_with_bounds(
434 Some(ObjectKey::min_for_id(&object_id)),
435 Some(ObjectKey(object_id, version)),
436 )?;
437 match iter.next() {
438 Some(Ok((key, o))) => self.object(&key, o),
439 Some(Err(e)) => Err(e.into()),
440 None => Ok(None),
441 }
442 }
443
444 fn construct_object(
445 &self,
446 object_key: &ObjectKey,
447 store_object: StoreObjectValue,
448 ) -> Result<Object, SuiError> {
449 try_construct_object(object_key, store_object)
450 }
451
452 pub fn object(
455 &self,
456 object_key: &ObjectKey,
457 store_object: StoreObjectWrapper,
458 ) -> Result<Option<Object>, SuiError> {
459 let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
460 return Ok(None);
461 };
462 Ok(Some(self.construct_object(object_key, *store_object)?))
463 }
464
465 pub fn object_reference(
466 &self,
467 object_key: &ObjectKey,
468 store_object: StoreObjectWrapper,
469 ) -> Result<ObjectRef, SuiError> {
470 let obj_ref = match store_object.migrate().into_inner() {
471 StoreObject::Value(object) => self
472 .construct_object(object_key, *object)?
473 .compute_object_reference(),
474 StoreObject::Deleted => (
475 object_key.0,
476 object_key.1,
477 ObjectDigest::OBJECT_DIGEST_DELETED,
478 ),
479 StoreObject::Wrapped => (
480 object_key.0,
481 object_key.1,
482 ObjectDigest::OBJECT_DIGEST_WRAPPED,
483 ),
484 };
485 Ok(obj_ref)
486 }
487
488 pub fn tombstone_reference(
489 &self,
490 object_key: &ObjectKey,
491 store_object: &StoreObjectWrapper,
492 ) -> Result<Option<ObjectRef>, SuiError> {
493 let obj_ref = match store_object.inner() {
494 StoreObject::Deleted => Some((
495 object_key.0,
496 object_key.1,
497 ObjectDigest::OBJECT_DIGEST_DELETED,
498 )),
499 StoreObject::Wrapped => Some((
500 object_key.0,
501 object_key.1,
502 ObjectDigest::OBJECT_DIGEST_WRAPPED,
503 )),
504 _ => None,
505 };
506 Ok(obj_ref)
507 }
508
509 pub fn get_latest_object_ref_or_tombstone(
510 &self,
511 object_id: ObjectID,
512 ) -> Result<Option<ObjectRef>, SuiError> {
513 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
514 Some(ObjectKey::min_for_id(&object_id)),
515 Some(ObjectKey::max_for_id(&object_id)),
516 )?;
517
518 if let Some(Ok((object_key, value))) = iterator.next()
519 && object_key.0 == object_id
520 {
521 return Ok(Some(self.object_reference(&object_key, value)?));
522 }
523 Ok(None)
524 }
525
526 pub fn get_latest_object_or_tombstone(
527 &self,
528 object_id: ObjectID,
529 ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
530 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
531 Some(ObjectKey::min_for_id(&object_id)),
532 Some(ObjectKey::max_for_id(&object_id)),
533 )?;
534
535 if let Some(Ok((object_key, value))) = iterator.next()
536 && object_key.0 == object_id
537 {
538 return Ok(Some((object_key, value)));
539 }
540 Ok(None)
541 }
542
543 pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
544 Ok(self
545 .epoch_start_configuration
546 .get(&())?
547 .expect("Must have current epoch.")
548 .epoch_start_state()
549 .epoch())
550 }
551
552 pub fn set_epoch_start_configuration(
553 &self,
554 epoch_start_configuration: &EpochStartConfiguration,
555 ) -> SuiResult {
556 let mut wb = self.epoch_start_configuration.batch();
557 wb.insert_batch(
558 &self.epoch_start_configuration,
559 std::iter::once(((), epoch_start_configuration)),
560 )?;
561 wb.write()?;
562 Ok(())
563 }
564
565 pub fn get_highest_pruned_checkpoint(
566 &self,
567 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
568 self.pruned_checkpoint.get(&())
569 }
570
571 pub fn set_highest_pruned_checkpoint(
572 &self,
573 wb: &mut DBBatch,
574 checkpoint_number: CheckpointSequenceNumber,
575 ) -> SuiResult {
576 wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
577 Ok(())
578 }
579
580 pub fn get_transaction(
581 &self,
582 digest: &TransactionDigest,
583 ) -> SuiResult<Option<TrustedTransaction>> {
584 let Some(transaction) = self.transactions.get(digest)? else {
585 return Ok(None);
586 };
587 Ok(Some(transaction))
588 }
589
590 pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
591 let Some(effect_digest) = self.executed_effects.get(digest)? else {
592 return Ok(None);
593 };
594 Ok(self.effects.get(&effect_digest)?)
595 }
596
597 pub fn get_checkpoint_sequence_number(
600 &self,
601 digest: &TransactionDigest,
602 ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
603 Ok(self.executed_transactions_to_checkpoint.get(digest)?)
604 }
605
606 pub fn get_newer_object_keys(
607 &self,
608 object: &(ObjectID, SequenceNumber),
609 ) -> SuiResult<Vec<ObjectKey>> {
610 let mut objects = vec![];
611 for result in self.objects.safe_iter_with_bounds(
612 Some(ObjectKey(object.0, object.1.next())),
613 Some(ObjectKey(object.0, VersionNumber::MAX)),
614 ) {
615 let (key, _) = result?;
616 objects.push(key);
617 }
618 Ok(objects)
619 }
620
621 pub fn set_highest_pruned_checkpoint_without_wb(
622 &self,
623 checkpoint_number: CheckpointSequenceNumber,
624 ) -> SuiResult {
625 let mut wb = self.pruned_checkpoint.batch();
626 self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
627 wb.write()?;
628 Ok(())
629 }
630
631 pub fn database_is_empty(&self) -> SuiResult<bool> {
632 Ok(self.objects.safe_iter().next().is_none())
633 }
634
635 pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
636 LiveSetIter {
637 iter: Box::new(self.objects.safe_iter()),
638 tables: self,
639 prev: None,
640 include_wrapped_object,
641 }
642 }
643
644 pub fn range_iter_live_object_set(
645 &self,
646 lower_bound: Option<ObjectID>,
647 upper_bound: Option<ObjectID>,
648 include_wrapped_object: bool,
649 ) -> LiveSetIter<'_> {
650 let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
651 let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
652
653 LiveSetIter {
654 iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
655 tables: self,
656 prev: None,
657 include_wrapped_object,
658 }
659 }
660
661 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
662 self.objects.checkpoint_db(path).map_err(Into::into)
664 }
665
666 pub fn get_root_state_hash(
667 &self,
668 epoch: EpochId,
669 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
670 Ok(self.root_state_hash_by_epoch.get(&epoch)?)
671 }
672
673 pub fn insert_root_state_hash(
674 &self,
675 epoch: EpochId,
676 last_checkpoint_of_epoch: CheckpointSequenceNumber,
677 hash: GlobalStateHash,
678 ) -> SuiResult {
679 self.root_state_hash_by_epoch
680 .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
681 Ok(())
682 }
683
684 pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
685 let object_reference = object.compute_object_reference();
686 let wrapper = get_store_object(object);
687 let mut wb = self.objects.batch();
688 wb.insert_batch(
689 &self.objects,
690 std::iter::once((ObjectKey::from(object_reference), wrapper)),
691 )?;
692 wb.write()?;
693 Ok(())
694 }
695
696 pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
698 let obj_entry = self
699 .objects
700 .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
701 .next();
702
703 match obj_entry.transpose()? {
704 Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
705 Ok(self.object(&ObjectKey(obj_id, version), obj)?)
706 }
707 _ => Ok(None),
708 }
709 }
710
711 pub fn get_object_by_key_fallible(
712 &self,
713 object_id: &ObjectID,
714 version: VersionNumber,
715 ) -> SuiResult<Option<Object>> {
716 Ok(self
717 .objects
718 .get(&ObjectKey(*object_id, version))?
719 .and_then(|object| {
720 self.object(&ObjectKey(*object_id, version), object)
721 .expect("object construction error")
722 }))
723 }
724}
725
726impl ObjectStore for AuthorityPerpetualTables {
727 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
729 self.get_object_fallible(object_id).expect("db error")
730 }
731
732 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
733 self.get_object_by_key_fallible(object_id, version)
734 .expect("db error")
735 }
736}
737
738pub struct LiveSetIter<'a> {
739 iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
740 tables: &'a AuthorityPerpetualTables,
741 prev: Option<(ObjectKey, StoreObjectWrapper)>,
742 include_wrapped_object: bool,
744}
745
746#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
747pub enum LiveObject {
748 Normal(Object),
749 Wrapped(ObjectKey),
750}
751
752impl LiveObject {
753 pub fn object_id(&self) -> ObjectID {
754 match self {
755 LiveObject::Normal(obj) => obj.id(),
756 LiveObject::Wrapped(key) => key.0,
757 }
758 }
759
760 pub fn version(&self) -> SequenceNumber {
761 match self {
762 LiveObject::Normal(obj) => obj.version(),
763 LiveObject::Wrapped(key) => key.1,
764 }
765 }
766
767 pub fn object_reference(&self) -> ObjectRef {
768 match self {
769 LiveObject::Normal(obj) => obj.compute_object_reference(),
770 LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
771 }
772 }
773
774 pub fn to_normal(self) -> Option<Object> {
775 match self {
776 LiveObject::Normal(object) => Some(object),
777 LiveObject::Wrapped(_) => None,
778 }
779 }
780}
781
782impl LiveSetIter<'_> {
783 fn store_object_wrapper_to_live_object(
784 &self,
785 object_key: ObjectKey,
786 store_object: StoreObjectWrapper,
787 ) -> Option<LiveObject> {
788 match store_object.migrate().into_inner() {
789 StoreObject::Value(object) => {
790 let object = self
791 .tables
792 .construct_object(&object_key, *object)
793 .expect("Constructing object from store cannot fail");
794 Some(LiveObject::Normal(object))
795 }
796 StoreObject::Wrapped => {
797 if self.include_wrapped_object {
798 Some(LiveObject::Wrapped(object_key))
799 } else {
800 None
801 }
802 }
803 StoreObject::Deleted => None,
804 }
805 }
806}
807
808impl Iterator for LiveSetIter<'_> {
809 type Item = LiveObject;
810
811 fn next(&mut self) -> Option<Self::Item> {
812 loop {
813 if let Some(Ok((next_key, next_value))) = self.iter.next() {
814 let prev = self.prev.take();
815 self.prev = Some((next_key, next_value));
816
817 if let Some((prev_key, prev_value)) = prev
818 && prev_key.0 != next_key.0
819 {
820 let live_object =
821 self.store_object_wrapper_to_live_object(prev_key, prev_value);
822 if live_object.is_some() {
823 return live_object;
824 }
825 }
826 continue;
827 }
828 if let Some((key, value)) = self.prev.take() {
829 let live_object = self.store_object_wrapper_to_live_object(key, value);
830 if live_object.is_some() {
831 return live_object;
832 }
833 }
834 return None;
835 }
836 }
837}
838
839fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
841 DBOptions {
842 options: db_options
843 .clone()
844 .optimize_for_write_throughput()
845 .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
846 .options,
847 rw_options: db_options.rw_options.set_ignore_range_deletions(false),
848 }
849}
850
851fn objects_table_config(
852 mut db_options: DBOptions,
853 compaction_filter: Option<ObjectsCompactionFilter>,
854) -> DBOptions {
855 if let Some(mut compaction_filter) = compaction_filter {
856 db_options
857 .options
858 .set_compaction_filter("objects", move |_, key, value| {
859 match compaction_filter.filter(key, value) {
860 Ok(decision) => decision,
861 Err(err) => {
862 error!("Compaction error: {:?}", err);
863 Decision::Keep
864 }
865 }
866 });
867 }
868 db_options
869 .optimize_for_write_throughput()
870 .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
871}
872
873fn transactions_table_config(db_options: DBOptions) -> DBOptions {
874 db_options
875 .optimize_for_write_throughput()
876 .optimize_for_point_lookup(
877 read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
878 )
879}
880
881fn effects_table_config(db_options: DBOptions) -> DBOptions {
882 db_options
883 .optimize_for_write_throughput()
884 .optimize_for_point_lookup(
885 read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
886 )
887}