1use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6use serde::{Deserialize, Serialize};
7use std::path::Path;
8use std::sync::atomic::AtomicU64;
9use sui_types::base_types::SequenceNumber;
10use sui_types::digests::TransactionEventsDigest;
11use sui_types::effects::{TransactionEffects, TransactionEvents};
12use sui_types::global_state_hash::GlobalStateHash;
13use sui_types::storage::{FullObjectKey, MarkerValue};
14use tracing::error;
15use typed_store::metrics::SamplingInterval;
16use typed_store::rocks::{
17 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
18 read_size_from_env,
19};
20use typed_store::traits::Map;
21
22use crate::authority::authority_store_pruner::ObjectsCompactionFilter;
23use crate::authority::authority_store_types::{
24 StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
25};
26use crate::authority::epoch_start_configuration::EpochStartConfiguration;
27use typed_store::rocksdb::compaction_filter::Decision;
28use typed_store::{DBMapUtils, DbIterator};
29
30const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
31pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
32const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
33const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
34
35#[derive(Default)]
37pub struct AuthorityPerpetualTablesOptions {
38 pub enable_write_stall: bool,
40 pub compaction_filter: Option<ObjectsCompactionFilter>,
41}
42
43impl AuthorityPerpetualTablesOptions {
44 fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
45 if !self.enable_write_stall {
46 db_options = db_options.disable_write_throttling();
47 }
48 db_options
49 }
50}
51
52#[derive(DBMapUtils)]
54#[cfg_attr(tidehunter, tidehunter)]
55pub struct AuthorityPerpetualTables {
56 pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
69
70 #[rename = "owned_object_transaction_locks"]
75 pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
76
77 pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
81
82 pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
91
92 pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
97
98 #[allow(dead_code)]
99 #[deprecated]
100 events: DBMap<(TransactionEventsDigest, usize), Event>,
101
102 pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
104
105 pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
107
108 pub(crate) executed_transactions_to_checkpoint:
112 DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
113
114 pub(crate) root_state_hash_by_epoch:
118 DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
119
120 pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
122
123 pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
125
126 pub(crate) expected_network_sui_amount: DBMap<(), u64>,
131
132 pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
136
137 pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
143 pub(crate) object_per_epoch_marker_table_v2: DBMap<(EpochId, FullObjectKey), MarkerValue>,
144
145 pub(crate) executed_transaction_digests: DBMap<(EpochId, TransactionDigest), ()>,
149}
150
151#[derive(DBMapUtils)]
152pub struct AuthorityPrunerTables {
153 pub(crate) object_tombstones: DBMap<ObjectID, SequenceNumber>,
154}
155
156impl AuthorityPrunerTables {
157 pub fn path(parent_path: &Path) -> PathBuf {
158 parent_path.join("pruner")
159 }
160
161 pub fn open(parent_path: &Path) -> Self {
162 Self::open_tables_read_write(
163 Self::path(parent_path),
164 MetricConf::new("pruner")
165 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
166 None,
167 None,
168 )
169 }
170}
171
172impl AuthorityPerpetualTables {
173 pub fn path(parent_path: &Path) -> PathBuf {
174 parent_path.join("perpetual")
175 }
176
177 #[cfg(not(tidehunter))]
178 pub fn open(
179 parent_path: &Path,
180 db_options_override: Option<AuthorityPerpetualTablesOptions>,
181 _pruner_watermark: Option<Arc<AtomicU64>>,
182 ) -> Self {
183 let db_options_override = db_options_override.unwrap_or_default();
184 let db_options =
185 db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
186 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
187 (
188 "objects".to_string(),
189 objects_table_config(db_options.clone(), db_options_override.compaction_filter),
190 ),
191 (
192 "owned_object_transaction_locks".to_string(),
193 owned_object_transaction_locks_table_config(db_options.clone()),
194 ),
195 (
196 "transactions".to_string(),
197 transactions_table_config(db_options.clone()),
198 ),
199 (
200 "effects".to_string(),
201 effects_table_config(db_options.clone()),
202 ),
203 ]));
204
205 Self::open_tables_read_write(
206 Self::path(parent_path),
207 MetricConf::new("perpetual")
208 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
209 Some(db_options.options),
210 Some(table_options),
211 )
212 }
213
214 #[cfg(tidehunter)]
215 pub fn open(
216 parent_path: &Path,
217 _: Option<AuthorityPerpetualTablesOptions>,
218 pruner_watermark: Option<Arc<AtomicU64>>,
219 ) -> Self {
220 use crate::authority::authority_store_pruner::apply_relocation_filter;
221 tracing::warn!("AuthorityPerpetualTables using tidehunter");
222 use typed_store::tidehunter_util::{
223 Bytes, Decision, IndexWalPosition, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
224 default_cells_per_mutex, default_mutex_count, default_value_cache_size,
225 };
226 let mutexes = default_mutex_count() * 2;
227 let value_cache_size = default_value_cache_size();
228 let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
230
231 let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
232 let objects_compactor = |index: &mut BTreeMap<Bytes, IndexWalPosition>| {
233 let mut retain = HashSet::new();
234 let mut previous: Option<&[u8]> = None;
235 const OID_SIZE: usize = 16;
236 for (key, _) in index.iter().rev() {
237 if let Some(prev) = previous {
238 if prev == &key[..OID_SIZE] {
239 continue;
240 }
241 }
242 previous = Some(&key[..OID_SIZE]);
243 retain.insert(key.clone());
244 }
245 index.retain(|k, _| retain.contains(k));
246 };
247 let mut digest_prefix = vec![0; 8];
248 digest_prefix[7] = 32;
249 let uniform_key = KeyType::uniform(default_cells_per_mutex());
250 let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
251 let epoch_tx_digest_prefix_key =
253 KeyType::from_prefix_bits((8+ 8) * 8 + 12);
254 let object_indexing = KeyIndexing::key_reduction(32 + 8, 16..(32 + 8));
255 let obj_ref_size = 32 + 8 + 32 + 8;
257 let owned_object_transaction_locks_indexing =
258 KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
259
260 let configs = vec![
261 (
262 "objects".to_string(),
263 ThConfig::new_with_config_indexing(
264 object_indexing,
265 mutexes,
266 KeyType::uniform(default_cells_per_mutex() * 4),
267 KeySpaceConfig::new()
268 .with_unloaded_iterator(true)
269 .with_max_dirty_keys(4048)
270 .with_compactor(Box::new(objects_compactor)),
271 ),
272 ),
273 (
274 "owned_object_transaction_locks".to_string(),
275 ThConfig::new_with_config_indexing(
276 owned_object_transaction_locks_indexing,
277 mutexes,
278 KeyType::uniform(default_cells_per_mutex() * 4),
279 bloom_config.clone().with_max_dirty_keys(4048),
280 ),
281 ),
282 (
283 "transactions".to_string(),
284 ThConfig::new_with_rm_prefix_indexing(
285 KeyIndexing::key_reduction(32, 0..16),
286 mutexes,
287 uniform_key,
288 KeySpaceConfig::new()
289 .with_value_cache_size(value_cache_size)
290 .with_relocation_filter(|_, _| Decision::Remove),
291 digest_prefix.clone(),
292 ),
293 ),
294 (
295 "effects".to_string(),
296 ThConfig::new_with_rm_prefix_indexing(
297 KeyIndexing::key_reduction(32, 0..16),
298 mutexes,
299 uniform_key,
300 apply_relocation_filter(
301 bloom_config.clone().with_value_cache_size(value_cache_size),
302 pruner_watermark.clone(),
303 |effects: TransactionEffects| effects.executed_epoch(),
304 false,
305 ),
306 digest_prefix.clone(),
307 ),
308 ),
309 (
310 "executed_effects".to_string(),
311 ThConfig::new_with_rm_prefix_indexing(
312 KeyIndexing::key_reduction(32, 0..16),
313 mutexes,
314 uniform_key,
315 bloom_config
316 .clone()
317 .with_value_cache_size(value_cache_size)
318 .with_relocation_filter(|_, _| Decision::Remove),
319 digest_prefix.clone(),
320 ),
321 ),
322 (
323 "events".to_string(),
324 ThConfig::new_with_rm_prefix(
325 32 + 8,
326 mutexes,
327 uniform_key,
328 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
329 digest_prefix.clone(),
330 ),
331 ),
332 (
333 "events_2".to_string(),
334 ThConfig::new_with_rm_prefix(
335 32,
336 mutexes,
337 uniform_key,
338 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
339 digest_prefix.clone(),
340 ),
341 ),
342 (
343 "unchanged_loaded_runtime_objects".to_string(),
344 ThConfig::new_with_rm_prefix(
345 32,
346 mutexes,
347 uniform_key,
348 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
349 digest_prefix.clone(),
350 ),
351 ),
352 (
353 "executed_transactions_to_checkpoint".to_string(),
354 ThConfig::new_with_rm_prefix(
355 32,
356 mutexes,
357 uniform_key,
358 apply_relocation_filter(
359 KeySpaceConfig::default(),
360 pruner_watermark.clone(),
361 |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
362 false,
363 ),
364 digest_prefix.clone(),
365 ),
366 ),
367 (
368 "root_state_hash_by_epoch".to_string(),
369 ThConfig::new(8, 1, KeyType::uniform(1)),
370 ),
371 (
372 "epoch_start_configuration".to_string(),
373 ThConfig::new(0, 1, KeyType::uniform(1)),
374 ),
375 (
376 "pruned_checkpoint".to_string(),
377 ThConfig::new(0, 1, KeyType::uniform(1)),
378 ),
379 (
380 "expected_network_sui_amount".to_string(),
381 ThConfig::new(0, 1, KeyType::uniform(1)),
382 ),
383 (
384 "expected_storage_fund_imbalance".to_string(),
385 ThConfig::new(0, 1, KeyType::uniform(1)),
386 ),
387 (
388 "object_per_epoch_marker_table".to_string(),
389 ThConfig::new_with_config_indexing(
390 KeyIndexing::VariableLength,
391 mutexes,
392 epoch_prefix_key,
393 apply_relocation_filter(
394 KeySpaceConfig::default(),
395 pruner_watermark.clone(),
396 |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
397 true,
398 ),
399 ),
400 ),
401 (
402 "object_per_epoch_marker_table_v2".to_string(),
403 ThConfig::new_with_config_indexing(
404 KeyIndexing::VariableLength,
405 mutexes,
406 epoch_prefix_key,
407 apply_relocation_filter(
408 bloom_config.clone(),
409 pruner_watermark.clone(),
410 |(epoch_id, _): (EpochId, FullObjectKey)| epoch_id,
411 true,
412 ),
413 ),
414 ),
415 (
416 "executed_transaction_digests".to_string(),
417 ThConfig::new_with_config_indexing(
418 KeyIndexing::fixed(8 + (32 + 8)),
420 mutexes,
421 epoch_tx_digest_prefix_key,
422 apply_relocation_filter(
423 bloom_config.clone(),
424 pruner_watermark.clone(),
425 |(epoch_id, _): (EpochId, TransactionDigest)| epoch_id,
426 true,
427 ),
428 ),
429 ),
430 ];
431 Self::open_tables_read_write(
432 Self::path(parent_path),
433 MetricConf::new("perpetual")
434 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
435 configs.into_iter().collect(),
436 )
437 }
438
439 pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
440 Self::get_read_only_handle(
441 Self::path(parent_path),
442 None,
443 None,
444 MetricConf::new("perpetual_readonly"),
445 )
446 }
447
448 pub fn find_object_lt_or_eq_version(
452 &self,
453 object_id: ObjectID,
454 version: SequenceNumber,
455 ) -> SuiResult<Option<Object>> {
456 let mut iter = self.objects.reversed_safe_iter_with_bounds(
457 Some(ObjectKey::min_for_id(&object_id)),
458 Some(ObjectKey(object_id, version)),
459 )?;
460 match iter.next() {
461 Some(Ok((key, o))) => self.object(&key, o),
462 Some(Err(e)) => Err(e.into()),
463 None => Ok(None),
464 }
465 }
466
467 fn construct_object(
468 &self,
469 object_key: &ObjectKey,
470 store_object: StoreObjectValue,
471 ) -> Result<Object, SuiError> {
472 try_construct_object(object_key, store_object)
473 }
474
475 pub fn object(
478 &self,
479 object_key: &ObjectKey,
480 store_object: StoreObjectWrapper,
481 ) -> Result<Option<Object>, SuiError> {
482 let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
483 return Ok(None);
484 };
485 Ok(Some(self.construct_object(object_key, *store_object)?))
486 }
487
488 pub fn object_reference(
489 &self,
490 object_key: &ObjectKey,
491 store_object: StoreObjectWrapper,
492 ) -> Result<ObjectRef, SuiError> {
493 let obj_ref = match store_object.migrate().into_inner() {
494 StoreObject::Value(object) => self
495 .construct_object(object_key, *object)?
496 .compute_object_reference(),
497 StoreObject::Deleted => (
498 object_key.0,
499 object_key.1,
500 ObjectDigest::OBJECT_DIGEST_DELETED,
501 ),
502 StoreObject::Wrapped => (
503 object_key.0,
504 object_key.1,
505 ObjectDigest::OBJECT_DIGEST_WRAPPED,
506 ),
507 };
508 Ok(obj_ref)
509 }
510
511 pub fn tombstone_reference(
512 &self,
513 object_key: &ObjectKey,
514 store_object: &StoreObjectWrapper,
515 ) -> Result<Option<ObjectRef>, SuiError> {
516 let obj_ref = match store_object.inner() {
517 StoreObject::Deleted => Some((
518 object_key.0,
519 object_key.1,
520 ObjectDigest::OBJECT_DIGEST_DELETED,
521 )),
522 StoreObject::Wrapped => Some((
523 object_key.0,
524 object_key.1,
525 ObjectDigest::OBJECT_DIGEST_WRAPPED,
526 )),
527 _ => None,
528 };
529 Ok(obj_ref)
530 }
531
532 pub fn get_latest_object_ref_or_tombstone(
533 &self,
534 object_id: ObjectID,
535 ) -> Result<Option<ObjectRef>, SuiError> {
536 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
537 Some(ObjectKey::min_for_id(&object_id)),
538 Some(ObjectKey::max_for_id(&object_id)),
539 )?;
540
541 if let Some(Ok((object_key, value))) = iterator.next()
542 && object_key.0 == object_id
543 {
544 return Ok(Some(self.object_reference(&object_key, value)?));
545 }
546 Ok(None)
547 }
548
549 pub fn get_latest_object_or_tombstone(
550 &self,
551 object_id: ObjectID,
552 ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
553 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
554 Some(ObjectKey::min_for_id(&object_id)),
555 Some(ObjectKey::max_for_id(&object_id)),
556 )?;
557
558 if let Some(Ok((object_key, value))) = iterator.next()
559 && object_key.0 == object_id
560 {
561 return Ok(Some((object_key, value)));
562 }
563 Ok(None)
564 }
565
566 pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
567 Ok(self
568 .epoch_start_configuration
569 .get(&())?
570 .expect("Must have current epoch.")
571 .epoch_start_state()
572 .epoch())
573 }
574
575 pub fn set_epoch_start_configuration(
576 &self,
577 epoch_start_configuration: &EpochStartConfiguration,
578 ) -> SuiResult {
579 let mut wb = self.epoch_start_configuration.batch();
580 wb.insert_batch(
581 &self.epoch_start_configuration,
582 std::iter::once(((), epoch_start_configuration)),
583 )?;
584 wb.write()?;
585 Ok(())
586 }
587
588 pub fn get_highest_pruned_checkpoint(
589 &self,
590 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
591 self.pruned_checkpoint.get(&())
592 }
593
594 pub fn set_highest_pruned_checkpoint(
595 &self,
596 wb: &mut DBBatch,
597 checkpoint_number: CheckpointSequenceNumber,
598 ) -> SuiResult {
599 wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
600 Ok(())
601 }
602
603 pub fn get_transaction(
604 &self,
605 digest: &TransactionDigest,
606 ) -> SuiResult<Option<TrustedTransaction>> {
607 let Some(transaction) = self.transactions.get(digest)? else {
608 return Ok(None);
609 };
610 Ok(Some(transaction))
611 }
612
613 pub fn insert_executed_transaction_digests_batch(
616 &self,
617 epoch: EpochId,
618 digests: impl Iterator<Item = TransactionDigest>,
619 ) -> SuiResult {
620 let mut batch = self.executed_transaction_digests.batch();
621 batch.insert_batch(
622 &self.executed_transaction_digests,
623 digests.map(|digest| ((epoch, digest), ())),
624 )?;
625 batch.write()?;
626 Ok(())
627 }
628
629 pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
630 let Some(effect_digest) = self.executed_effects.get(digest)? else {
631 return Ok(None);
632 };
633 Ok(self.effects.get(&effect_digest)?)
634 }
635
636 pub fn get_checkpoint_sequence_number(
639 &self,
640 digest: &TransactionDigest,
641 ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
642 Ok(self.executed_transactions_to_checkpoint.get(digest)?)
643 }
644
645 pub fn get_newer_object_keys(
646 &self,
647 object: &(ObjectID, SequenceNumber),
648 ) -> SuiResult<Vec<ObjectKey>> {
649 let mut objects = vec![];
650 for result in self.objects.safe_iter_with_bounds(
651 Some(ObjectKey(object.0, object.1.next())),
652 Some(ObjectKey(object.0, VersionNumber::MAX)),
653 ) {
654 let (key, _) = result?;
655 objects.push(key);
656 }
657 Ok(objects)
658 }
659
660 pub fn set_highest_pruned_checkpoint_without_wb(
661 &self,
662 checkpoint_number: CheckpointSequenceNumber,
663 ) -> SuiResult {
664 let mut wb = self.pruned_checkpoint.batch();
665 self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
666 wb.write()?;
667 Ok(())
668 }
669
670 pub fn database_is_empty(&self) -> SuiResult<bool> {
671 Ok(self.objects.safe_iter().next().is_none())
672 }
673
674 pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
675 LiveSetIter {
676 iter: Box::new(self.objects.safe_iter()),
677 tables: self,
678 prev: None,
679 include_wrapped_object,
680 }
681 }
682
683 pub fn range_iter_live_object_set(
684 &self,
685 lower_bound: Option<ObjectID>,
686 upper_bound: Option<ObjectID>,
687 include_wrapped_object: bool,
688 ) -> LiveSetIter<'_> {
689 let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
690 let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
691
692 LiveSetIter {
693 iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
694 tables: self,
695 prev: None,
696 include_wrapped_object,
697 }
698 }
699
700 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
701 self.objects.checkpoint_db(path).map_err(Into::into)
703 }
704
705 pub fn get_root_state_hash(
706 &self,
707 epoch: EpochId,
708 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
709 Ok(self.root_state_hash_by_epoch.get(&epoch)?)
710 }
711
712 pub fn insert_root_state_hash(
713 &self,
714 epoch: EpochId,
715 last_checkpoint_of_epoch: CheckpointSequenceNumber,
716 hash: GlobalStateHash,
717 ) -> SuiResult {
718 self.root_state_hash_by_epoch
719 .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
720 Ok(())
721 }
722
723 pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
724 let object_reference = object.compute_object_reference();
725 let wrapper = get_store_object(object);
726 let mut wb = self.objects.batch();
727 wb.insert_batch(
728 &self.objects,
729 std::iter::once((ObjectKey::from(object_reference), wrapper)),
730 )?;
731 wb.write()?;
732 Ok(())
733 }
734
735 pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
737 let obj_entry = self
738 .objects
739 .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
740 .next();
741
742 match obj_entry.transpose()? {
743 Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
744 Ok(self.object(&ObjectKey(obj_id, version), obj)?)
745 }
746 _ => Ok(None),
747 }
748 }
749
750 pub fn get_object_by_key_fallible(
751 &self,
752 object_id: &ObjectID,
753 version: VersionNumber,
754 ) -> SuiResult<Option<Object>> {
755 Ok(self
756 .objects
757 .get(&ObjectKey(*object_id, version))?
758 .and_then(|object| {
759 self.object(&ObjectKey(*object_id, version), object)
760 .expect("object construction error")
761 }))
762 }
763}
764
765impl ObjectStore for AuthorityPerpetualTables {
766 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
768 self.get_object_fallible(object_id).expect("db error")
769 }
770
771 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
772 self.get_object_by_key_fallible(object_id, version)
773 .expect("db error")
774 }
775}
776
777pub struct LiveSetIter<'a> {
778 iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
779 tables: &'a AuthorityPerpetualTables,
780 prev: Option<(ObjectKey, StoreObjectWrapper)>,
781 include_wrapped_object: bool,
783}
784
785#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
786pub enum LiveObject {
787 Normal(Object),
788 Wrapped(ObjectKey),
789}
790
791impl LiveObject {
792 pub fn object_id(&self) -> ObjectID {
793 match self {
794 LiveObject::Normal(obj) => obj.id(),
795 LiveObject::Wrapped(key) => key.0,
796 }
797 }
798
799 pub fn version(&self) -> SequenceNumber {
800 match self {
801 LiveObject::Normal(obj) => obj.version(),
802 LiveObject::Wrapped(key) => key.1,
803 }
804 }
805
806 pub fn object_reference(&self) -> ObjectRef {
807 match self {
808 LiveObject::Normal(obj) => obj.compute_object_reference(),
809 LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
810 }
811 }
812
813 pub fn to_normal(self) -> Option<Object> {
814 match self {
815 LiveObject::Normal(object) => Some(object),
816 LiveObject::Wrapped(_) => None,
817 }
818 }
819}
820
821impl LiveSetIter<'_> {
822 fn store_object_wrapper_to_live_object(
823 &self,
824 object_key: ObjectKey,
825 store_object: StoreObjectWrapper,
826 ) -> Option<LiveObject> {
827 match store_object.migrate().into_inner() {
828 StoreObject::Value(object) => {
829 let object = self
830 .tables
831 .construct_object(&object_key, *object)
832 .expect("Constructing object from store cannot fail");
833 Some(LiveObject::Normal(object))
834 }
835 StoreObject::Wrapped => {
836 if self.include_wrapped_object {
837 Some(LiveObject::Wrapped(object_key))
838 } else {
839 None
840 }
841 }
842 StoreObject::Deleted => None,
843 }
844 }
845}
846
847impl Iterator for LiveSetIter<'_> {
848 type Item = LiveObject;
849
850 fn next(&mut self) -> Option<Self::Item> {
851 loop {
852 if let Some(Ok((next_key, next_value))) = self.iter.next() {
853 let prev = self.prev.take();
854 self.prev = Some((next_key, next_value));
855
856 if let Some((prev_key, prev_value)) = prev
857 && prev_key.0 != next_key.0
858 {
859 let live_object =
860 self.store_object_wrapper_to_live_object(prev_key, prev_value);
861 if live_object.is_some() {
862 return live_object;
863 }
864 }
865 continue;
866 }
867 if let Some((key, value)) = self.prev.take() {
868 let live_object = self.store_object_wrapper_to_live_object(key, value);
869 if live_object.is_some() {
870 return live_object;
871 }
872 }
873 return None;
874 }
875 }
876}
877
878fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
880 DBOptions {
881 options: db_options
882 .clone()
883 .optimize_for_write_throughput()
884 .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
885 .options,
886 rw_options: db_options.rw_options.set_ignore_range_deletions(false),
887 }
888}
889
890fn objects_table_config(
891 mut db_options: DBOptions,
892 compaction_filter: Option<ObjectsCompactionFilter>,
893) -> DBOptions {
894 if let Some(mut compaction_filter) = compaction_filter {
895 db_options
896 .options
897 .set_compaction_filter("objects", move |_, key, value| {
898 match compaction_filter.filter(key, value) {
899 Ok(decision) => decision,
900 Err(err) => {
901 error!("Compaction error: {:?}", err);
902 Decision::Keep
903 }
904 }
905 });
906 }
907 db_options
908 .optimize_for_write_throughput()
909 .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
910}
911
912fn transactions_table_config(db_options: DBOptions) -> DBOptions {
913 db_options
914 .optimize_for_write_throughput()
915 .optimize_for_point_lookup(
916 read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
917 )
918}
919
920fn effects_table_config(db_options: DBOptions) -> DBOptions {
921 db_options
922 .optimize_for_write_throughput()
923 .optimize_for_point_lookup(
924 read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
925 )
926}