1use super::*;
5use crate::authority::authority_store::LockDetailsWrapperDeprecated;
6use serde::{Deserialize, Serialize};
7use std::path::Path;
8use std::sync::atomic::AtomicU64;
9use sui_types::base_types::SequenceNumber;
10use sui_types::digests::TransactionEventsDigest;
11use sui_types::effects::{TransactionEffects, TransactionEvents};
12use sui_types::global_state_hash::GlobalStateHash;
13use sui_types::storage::{FullObjectKey, MarkerValue};
14use tracing::error;
15use typed_store::metrics::SamplingInterval;
16use typed_store::rocks::{
17 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
18 read_size_from_env,
19};
20use typed_store::traits::Map;
21
22use crate::authority::authority_store_pruner::ObjectsCompactionFilter;
23use crate::authority::authority_store_types::{
24 StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
25};
26use crate::authority::epoch_start_configuration::EpochStartConfiguration;
27use typed_store::rocksdb::compaction_filter::Decision;
28use typed_store::{DBMapUtils, DbIterator};
29
30const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
31pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
32const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
33const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
34
35#[derive(Default)]
37pub struct AuthorityPerpetualTablesOptions {
38 pub enable_write_stall: bool,
40 pub compaction_filter: Option<ObjectsCompactionFilter>,
41 pub is_validator: bool,
42}
43
44impl AuthorityPerpetualTablesOptions {
45 fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
46 if !self.enable_write_stall {
47 db_options = db_options.disable_write_throttling();
48 }
49 db_options
50 }
51}
52
53#[derive(DBMapUtils)]
55#[cfg_attr(tidehunter, tidehunter)]
56pub struct AuthorityPerpetualTables {
57 pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
70
71 #[rename = "owned_object_transaction_locks"]
76 pub(crate) live_owned_object_markers: DBMap<ObjectRef, Option<LockDetailsWrapperDeprecated>>,
77
78 pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
82
83 pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
92
93 pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
98
99 #[allow(dead_code)]
100 #[deprecated]
101 events: DBMap<(TransactionEventsDigest, usize), Event>,
102
103 pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
105
106 pub(crate) unchanged_loaded_runtime_objects: DBMap<TransactionDigest, Vec<ObjectKey>>,
108
109 pub(crate) executed_transactions_to_checkpoint:
113 DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
114
115 pub(crate) root_state_hash_by_epoch:
119 DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
120
121 pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
123
124 pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
126
127 pub(crate) expected_network_sui_amount: DBMap<(), u64>,
132
133 pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
137
138 pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
144 pub(crate) object_per_epoch_marker_table_v2: DBMap<(EpochId, FullObjectKey), MarkerValue>,
145
146 pub(crate) executed_transaction_digests: DBMap<(EpochId, TransactionDigest), ()>,
150}
151
152#[derive(DBMapUtils)]
153pub struct AuthorityPrunerTables {
154 pub(crate) object_tombstones: DBMap<ObjectID, SequenceNumber>,
155}
156
157impl AuthorityPrunerTables {
158 pub fn path(parent_path: &Path) -> PathBuf {
159 parent_path.join("pruner")
160 }
161
162 pub fn open(parent_path: &Path) -> Self {
163 Self::open_tables_read_write(
164 Self::path(parent_path),
165 MetricConf::new("pruner")
166 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
167 None,
168 None,
169 )
170 }
171}
172
173impl AuthorityPerpetualTables {
174 pub fn path(parent_path: &Path) -> PathBuf {
175 parent_path.join("perpetual")
176 }
177
178 #[cfg(not(tidehunter))]
179 pub fn open(
180 parent_path: &Path,
181 db_options_override: Option<AuthorityPerpetualTablesOptions>,
182 _pruner_watermark: Option<Arc<AtomicU64>>,
183 ) -> Self {
184 let db_options_override = db_options_override.unwrap_or_default();
185 let db_options =
186 db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
187 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
188 (
189 "objects".to_string(),
190 objects_table_config(db_options.clone(), db_options_override.compaction_filter),
191 ),
192 (
193 "owned_object_transaction_locks".to_string(),
194 owned_object_transaction_locks_table_config(db_options.clone()),
195 ),
196 (
197 "transactions".to_string(),
198 transactions_table_config(db_options.clone()),
199 ),
200 (
201 "effects".to_string(),
202 effects_table_config(db_options.clone()),
203 ),
204 ]));
205
206 Self::open_tables_read_write(
207 Self::path(parent_path),
208 MetricConf::new("perpetual")
209 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
210 Some(db_options.options),
211 Some(table_options),
212 )
213 }
214
215 #[cfg(tidehunter)]
216 pub fn open(
217 parent_path: &Path,
218 db_options_override: Option<AuthorityPerpetualTablesOptions>,
219 pruner_watermark: Option<Arc<AtomicU64>>,
220 ) -> Self {
221 use crate::authority::authority_store_pruner::apply_relocation_filter;
222 tracing::warn!("AuthorityPerpetualTables using tidehunter");
223 use typed_store::tidehunter_util::{
224 Bytes, Decision, KeyIndexing, KeySpaceConfig, KeyType, ThConfig,
225 default_cells_per_mutex, default_mutex_count, default_value_cache_size,
226 };
227 let mutexes = default_mutex_count() * 2;
228 let value_cache_size = default_value_cache_size();
229 let pruner_watermark = pruner_watermark.unwrap_or(Arc::new(AtomicU64::new(0)));
231
232 let bloom_config = KeySpaceConfig::new().with_bloom_filter(0.001, 32_000);
233 let objects_compactor = |iter: &mut dyn DoubleEndedIterator<Item = &Bytes>| {
234 let mut retain = HashSet::new();
235 let mut previous: Option<&[u8]> = None;
236 const OID_SIZE: usize = 16;
237 for key in iter.rev() {
238 if let Some(prev) = previous {
239 if prev == &key[..OID_SIZE] {
240 continue;
241 }
242 }
243 previous = Some(&key[..OID_SIZE]);
244 retain.insert(key.clone());
245 }
246 retain
247 };
248 let mut digest_prefix = vec![0; 8];
249 digest_prefix[7] = 32;
250 let uniform_key = KeyType::uniform(default_cells_per_mutex());
251 let epoch_prefix_key = KeyType::from_prefix_bits(9 * 8 + 4);
252 let epoch_tx_digest_prefix_key =
254 KeyType::from_prefix_bits((8+ 8) * 8 + 12);
255 let object_indexing = KeyIndexing::key_reduction(32 + 8, 16..(32 + 8));
256 let obj_ref_size = 32 + 8 + 32 + 8;
258 let owned_object_transaction_locks_indexing =
259 KeyIndexing::key_reduction(obj_ref_size, 16..(obj_ref_size - 16));
260
261 let mut objects_config = KeySpaceConfig::new()
262 .with_unloaded_iterator(true)
263 .with_max_dirty_keys(4048);
264 if matches!(db_options_override, Some(options) if options.is_validator) {
265 objects_config = objects_config.with_compactor(Box::new(objects_compactor));
266 }
267
268 let configs = vec![
269 (
270 "objects".to_string(),
271 ThConfig::new_with_config_indexing(
272 object_indexing,
273 mutexes,
274 KeyType::uniform(default_cells_per_mutex() * 4),
275 objects_config,
276 ),
277 ),
278 (
279 "owned_object_transaction_locks".to_string(),
280 ThConfig::new_with_config_indexing(
281 owned_object_transaction_locks_indexing,
282 mutexes,
283 KeyType::uniform(default_cells_per_mutex() * 4),
284 bloom_config.clone().with_max_dirty_keys(4048),
285 ),
286 ),
287 (
288 "transactions".to_string(),
289 ThConfig::new_with_rm_prefix_indexing(
290 KeyIndexing::key_reduction(32, 0..16),
291 mutexes,
292 uniform_key,
293 KeySpaceConfig::new()
294 .with_value_cache_size(value_cache_size)
295 .with_relocation_filter(|_, _| Decision::Remove),
296 digest_prefix.clone(),
297 ),
298 ),
299 (
300 "effects".to_string(),
301 ThConfig::new_with_rm_prefix_indexing(
302 KeyIndexing::key_reduction(32, 0..16),
303 mutexes,
304 uniform_key,
305 apply_relocation_filter(
306 bloom_config.clone().with_value_cache_size(value_cache_size),
307 pruner_watermark.clone(),
308 |effects: TransactionEffects| effects.executed_epoch(),
309 false,
310 ),
311 digest_prefix.clone(),
312 ),
313 ),
314 (
315 "executed_effects".to_string(),
316 ThConfig::new_with_rm_prefix_indexing(
317 KeyIndexing::key_reduction(32, 0..16),
318 mutexes,
319 uniform_key,
320 bloom_config
321 .clone()
322 .with_value_cache_size(value_cache_size)
323 .with_relocation_filter(|_, _| Decision::Remove),
324 digest_prefix.clone(),
325 ),
326 ),
327 (
328 "events".to_string(),
329 ThConfig::new_with_rm_prefix(
330 32 + 8,
331 mutexes,
332 uniform_key,
333 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
334 digest_prefix.clone(),
335 ),
336 ),
337 (
338 "events_2".to_string(),
339 ThConfig::new_with_rm_prefix(
340 32,
341 mutexes,
342 uniform_key,
343 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
344 digest_prefix.clone(),
345 ),
346 ),
347 (
348 "unchanged_loaded_runtime_objects".to_string(),
349 ThConfig::new_with_rm_prefix(
350 32,
351 mutexes,
352 uniform_key,
353 KeySpaceConfig::default().with_relocation_filter(|_, _| Decision::Remove),
354 digest_prefix.clone(),
355 ),
356 ),
357 (
358 "executed_transactions_to_checkpoint".to_string(),
359 ThConfig::new_with_rm_prefix(
360 32,
361 mutexes,
362 uniform_key,
363 apply_relocation_filter(
364 KeySpaceConfig::default(),
365 pruner_watermark.clone(),
366 |(epoch_id, _): (EpochId, CheckpointSequenceNumber)| epoch_id,
367 false,
368 ),
369 digest_prefix.clone(),
370 ),
371 ),
372 (
373 "root_state_hash_by_epoch".to_string(),
374 ThConfig::new(8, 1, KeyType::uniform(1)),
375 ),
376 (
377 "epoch_start_configuration".to_string(),
378 ThConfig::new(0, 1, KeyType::uniform(1)),
379 ),
380 (
381 "pruned_checkpoint".to_string(),
382 ThConfig::new(0, 1, KeyType::uniform(1)),
383 ),
384 (
385 "expected_network_sui_amount".to_string(),
386 ThConfig::new(0, 1, KeyType::uniform(1)),
387 ),
388 (
389 "expected_storage_fund_imbalance".to_string(),
390 ThConfig::new(0, 1, KeyType::uniform(1)),
391 ),
392 (
393 "object_per_epoch_marker_table".to_string(),
394 ThConfig::new_with_config_indexing(
395 KeyIndexing::VariableLength,
396 mutexes,
397 epoch_prefix_key,
398 apply_relocation_filter(
399 KeySpaceConfig::default(),
400 pruner_watermark.clone(),
401 |(epoch_id, _): (EpochId, ObjectKey)| epoch_id,
402 true,
403 ),
404 ),
405 ),
406 (
407 "object_per_epoch_marker_table_v2".to_string(),
408 ThConfig::new_with_config_indexing(
409 KeyIndexing::VariableLength,
410 mutexes,
411 epoch_prefix_key,
412 apply_relocation_filter(
413 bloom_config.clone(),
414 pruner_watermark.clone(),
415 |(epoch_id, _): (EpochId, FullObjectKey)| epoch_id,
416 true,
417 ),
418 ),
419 ),
420 (
421 "executed_transaction_digests".to_string(),
422 ThConfig::new_with_config_indexing(
423 KeyIndexing::fixed(8 + (32 + 8)),
425 mutexes,
426 epoch_tx_digest_prefix_key,
427 apply_relocation_filter(
428 bloom_config.clone(),
429 pruner_watermark.clone(),
430 |(epoch_id, _): (EpochId, TransactionDigest)| epoch_id,
431 true,
432 ),
433 ),
434 ),
435 ];
436 Self::open_tables_read_write(
437 Self::path(parent_path),
438 MetricConf::new("perpetual")
439 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
440 configs.into_iter().collect(),
441 )
442 }
443
444 pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
445 Self::get_read_only_handle(
446 Self::path(parent_path),
447 None,
448 None,
449 MetricConf::new("perpetual_readonly"),
450 )
451 }
452
453 pub fn find_object_lt_or_eq_version(
457 &self,
458 object_id: ObjectID,
459 version: SequenceNumber,
460 ) -> SuiResult<Option<Object>> {
461 let mut iter = self.objects.reversed_safe_iter_with_bounds(
462 Some(ObjectKey::min_for_id(&object_id)),
463 Some(ObjectKey(object_id, version)),
464 )?;
465 match iter.next() {
466 Some(Ok((key, o))) => self.object(&key, o),
467 Some(Err(e)) => Err(e.into()),
468 None => Ok(None),
469 }
470 }
471
472 fn construct_object(
473 &self,
474 object_key: &ObjectKey,
475 store_object: StoreObjectValue,
476 ) -> Result<Object, SuiError> {
477 try_construct_object(object_key, store_object)
478 }
479
480 pub fn object(
483 &self,
484 object_key: &ObjectKey,
485 store_object: StoreObjectWrapper,
486 ) -> Result<Option<Object>, SuiError> {
487 let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
488 return Ok(None);
489 };
490 Ok(Some(self.construct_object(object_key, *store_object)?))
491 }
492
493 pub fn object_reference(
494 &self,
495 object_key: &ObjectKey,
496 store_object: StoreObjectWrapper,
497 ) -> Result<ObjectRef, SuiError> {
498 let obj_ref = match store_object.migrate().into_inner() {
499 StoreObject::Value(object) => self
500 .construct_object(object_key, *object)?
501 .compute_object_reference(),
502 StoreObject::Deleted => (
503 object_key.0,
504 object_key.1,
505 ObjectDigest::OBJECT_DIGEST_DELETED,
506 ),
507 StoreObject::Wrapped => (
508 object_key.0,
509 object_key.1,
510 ObjectDigest::OBJECT_DIGEST_WRAPPED,
511 ),
512 };
513 Ok(obj_ref)
514 }
515
516 pub fn tombstone_reference(
517 &self,
518 object_key: &ObjectKey,
519 store_object: &StoreObjectWrapper,
520 ) -> Result<Option<ObjectRef>, SuiError> {
521 let obj_ref = match store_object.inner() {
522 StoreObject::Deleted => Some((
523 object_key.0,
524 object_key.1,
525 ObjectDigest::OBJECT_DIGEST_DELETED,
526 )),
527 StoreObject::Wrapped => Some((
528 object_key.0,
529 object_key.1,
530 ObjectDigest::OBJECT_DIGEST_WRAPPED,
531 )),
532 _ => None,
533 };
534 Ok(obj_ref)
535 }
536
537 pub fn get_latest_object_ref_or_tombstone(
538 &self,
539 object_id: ObjectID,
540 ) -> Result<Option<ObjectRef>, SuiError> {
541 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
542 Some(ObjectKey::min_for_id(&object_id)),
543 Some(ObjectKey::max_for_id(&object_id)),
544 )?;
545
546 if let Some(Ok((object_key, value))) = iterator.next()
547 && object_key.0 == object_id
548 {
549 return Ok(Some(self.object_reference(&object_key, value)?));
550 }
551 Ok(None)
552 }
553
554 pub fn get_latest_object_or_tombstone(
555 &self,
556 object_id: ObjectID,
557 ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
558 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
559 Some(ObjectKey::min_for_id(&object_id)),
560 Some(ObjectKey::max_for_id(&object_id)),
561 )?;
562
563 if let Some(Ok((object_key, value))) = iterator.next()
564 && object_key.0 == object_id
565 {
566 return Ok(Some((object_key, value)));
567 }
568 Ok(None)
569 }
570
571 pub fn get_recovery_epoch_at_restart(&self) -> SuiResult<EpochId> {
572 Ok(self
573 .epoch_start_configuration
574 .get(&())?
575 .expect("Must have current epoch.")
576 .epoch_start_state()
577 .epoch())
578 }
579
580 pub fn set_epoch_start_configuration(
581 &self,
582 epoch_start_configuration: &EpochStartConfiguration,
583 ) -> SuiResult {
584 let mut wb = self.epoch_start_configuration.batch();
585 wb.insert_batch(
586 &self.epoch_start_configuration,
587 std::iter::once(((), epoch_start_configuration)),
588 )?;
589 wb.write()?;
590 Ok(())
591 }
592
593 pub fn get_highest_pruned_checkpoint(
594 &self,
595 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
596 self.pruned_checkpoint.get(&())
597 }
598
599 pub fn set_highest_pruned_checkpoint(
600 &self,
601 wb: &mut DBBatch,
602 checkpoint_number: CheckpointSequenceNumber,
603 ) -> SuiResult {
604 wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
605 Ok(())
606 }
607
608 pub fn get_transaction(
609 &self,
610 digest: &TransactionDigest,
611 ) -> SuiResult<Option<TrustedTransaction>> {
612 let Some(transaction) = self.transactions.get(digest)? else {
613 return Ok(None);
614 };
615 Ok(Some(transaction))
616 }
617
618 pub fn insert_executed_transaction_digests_batch(
621 &self,
622 epoch: EpochId,
623 digests: impl Iterator<Item = TransactionDigest>,
624 ) -> SuiResult {
625 let mut batch = self.executed_transaction_digests.batch();
626 batch.insert_batch(
627 &self.executed_transaction_digests,
628 digests.map(|digest| ((epoch, digest), ())),
629 )?;
630 batch.write()?;
631 Ok(())
632 }
633
634 pub fn get_effects(&self, digest: &TransactionDigest) -> SuiResult<Option<TransactionEffects>> {
635 let Some(effect_digest) = self.executed_effects.get(digest)? else {
636 return Ok(None);
637 };
638 Ok(self.effects.get(&effect_digest)?)
639 }
640
641 pub(crate) fn was_transaction_executed_in_last_epoch(
642 &self,
643 digest: &TransactionDigest,
644 current_epoch: EpochId,
645 ) -> bool {
646 if current_epoch == 0 {
647 return false;
648 }
649 self.executed_transaction_digests
650 .contains_key(&(current_epoch - 1, *digest))
651 .expect("db error")
652 }
653
654 pub fn get_checkpoint_sequence_number(
657 &self,
658 digest: &TransactionDigest,
659 ) -> SuiResult<Option<(EpochId, CheckpointSequenceNumber)>> {
660 Ok(self.executed_transactions_to_checkpoint.get(digest)?)
661 }
662
663 pub fn get_newer_object_keys(
664 &self,
665 object: &(ObjectID, SequenceNumber),
666 ) -> SuiResult<Vec<ObjectKey>> {
667 let mut objects = vec![];
668 for result in self.objects.safe_iter_with_bounds(
669 Some(ObjectKey(object.0, object.1.next())),
670 Some(ObjectKey(object.0, VersionNumber::MAX)),
671 ) {
672 let (key, _) = result?;
673 objects.push(key);
674 }
675 Ok(objects)
676 }
677
678 pub fn set_highest_pruned_checkpoint_without_wb(
679 &self,
680 checkpoint_number: CheckpointSequenceNumber,
681 ) -> SuiResult {
682 let mut wb = self.pruned_checkpoint.batch();
683 self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
684 wb.write()?;
685 Ok(())
686 }
687
688 pub fn database_is_empty(&self) -> SuiResult<bool> {
689 Ok(self.objects.safe_iter().next().is_none())
690 }
691
692 pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
693 LiveSetIter {
694 iter: Box::new(self.objects.safe_iter()),
695 tables: self,
696 prev: None,
697 include_wrapped_object,
698 }
699 }
700
701 pub fn range_iter_live_object_set(
702 &self,
703 lower_bound: Option<ObjectID>,
704 upper_bound: Option<ObjectID>,
705 include_wrapped_object: bool,
706 ) -> LiveSetIter<'_> {
707 let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
708 let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
709
710 LiveSetIter {
711 iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
712 tables: self,
713 prev: None,
714 include_wrapped_object,
715 }
716 }
717
718 pub fn checkpoint_db(&self, path: &Path) -> SuiResult {
719 self.objects.checkpoint_db(path).map_err(Into::into)
721 }
722
723 pub fn get_root_state_hash(
724 &self,
725 epoch: EpochId,
726 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
727 Ok(self.root_state_hash_by_epoch.get(&epoch)?)
728 }
729
730 pub fn insert_root_state_hash(
731 &self,
732 epoch: EpochId,
733 last_checkpoint_of_epoch: CheckpointSequenceNumber,
734 hash: GlobalStateHash,
735 ) -> SuiResult {
736 self.root_state_hash_by_epoch
737 .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
738 Ok(())
739 }
740
741 pub fn insert_object_test_only(&self, object: Object) -> SuiResult {
742 let object_reference = object.compute_object_reference();
743 let wrapper = get_store_object(object);
744 let mut wb = self.objects.batch();
745 wb.insert_batch(
746 &self.objects,
747 std::iter::once((ObjectKey::from(object_reference), wrapper)),
748 )?;
749 wb.write()?;
750 Ok(())
751 }
752
753 pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
755 let obj_entry = self
756 .objects
757 .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
758 .next();
759
760 match obj_entry.transpose()? {
761 Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
762 Ok(self.object(&ObjectKey(obj_id, version), obj)?)
763 }
764 _ => Ok(None),
765 }
766 }
767
768 pub fn get_object_by_key_fallible(
769 &self,
770 object_id: &ObjectID,
771 version: VersionNumber,
772 ) -> SuiResult<Option<Object>> {
773 Ok(self
774 .objects
775 .get(&ObjectKey(*object_id, version))?
776 .and_then(|object| {
777 self.object(&ObjectKey(*object_id, version), object)
778 .expect("object construction error")
779 }))
780 }
781}
782
783impl ObjectStore for AuthorityPerpetualTables {
784 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
786 self.get_object_fallible(object_id).expect("db error")
787 }
788
789 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
790 self.get_object_by_key_fallible(object_id, version)
791 .expect("db error")
792 }
793}
794
795pub struct LiveSetIter<'a> {
796 iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
797 tables: &'a AuthorityPerpetualTables,
798 prev: Option<(ObjectKey, StoreObjectWrapper)>,
799 include_wrapped_object: bool,
801}
802
803#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
804pub enum LiveObject {
805 Normal(Object),
806 Wrapped(ObjectKey),
807}
808
809impl LiveObject {
810 pub fn object_id(&self) -> ObjectID {
811 match self {
812 LiveObject::Normal(obj) => obj.id(),
813 LiveObject::Wrapped(key) => key.0,
814 }
815 }
816
817 pub fn version(&self) -> SequenceNumber {
818 match self {
819 LiveObject::Normal(obj) => obj.version(),
820 LiveObject::Wrapped(key) => key.1,
821 }
822 }
823
824 pub fn object_reference(&self) -> ObjectRef {
825 match self {
826 LiveObject::Normal(obj) => obj.compute_object_reference(),
827 LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
828 }
829 }
830
831 pub fn to_normal(self) -> Option<Object> {
832 match self {
833 LiveObject::Normal(object) => Some(object),
834 LiveObject::Wrapped(_) => None,
835 }
836 }
837}
838
839impl LiveSetIter<'_> {
840 fn store_object_wrapper_to_live_object(
841 &self,
842 object_key: ObjectKey,
843 store_object: StoreObjectWrapper,
844 ) -> Option<LiveObject> {
845 match store_object.migrate().into_inner() {
846 StoreObject::Value(object) => {
847 let object = self
848 .tables
849 .construct_object(&object_key, *object)
850 .expect("Constructing object from store cannot fail");
851 Some(LiveObject::Normal(object))
852 }
853 StoreObject::Wrapped => {
854 if self.include_wrapped_object {
855 Some(LiveObject::Wrapped(object_key))
856 } else {
857 None
858 }
859 }
860 StoreObject::Deleted => None,
861 }
862 }
863}
864
865impl Iterator for LiveSetIter<'_> {
866 type Item = LiveObject;
867
868 fn next(&mut self) -> Option<Self::Item> {
869 loop {
870 if let Some(Ok((next_key, next_value))) = self.iter.next() {
871 let prev = self.prev.take();
872 self.prev = Some((next_key, next_value));
873
874 if let Some((prev_key, prev_value)) = prev
875 && prev_key.0 != next_key.0
876 {
877 let live_object =
878 self.store_object_wrapper_to_live_object(prev_key, prev_value);
879 if live_object.is_some() {
880 return live_object;
881 }
882 }
883 continue;
884 }
885 if let Some((key, value)) = self.prev.take() {
886 let live_object = self.store_object_wrapper_to_live_object(key, value);
887 if live_object.is_some() {
888 return live_object;
889 }
890 }
891 return None;
892 }
893 }
894}
895
896fn owned_object_transaction_locks_table_config(db_options: DBOptions) -> DBOptions {
898 DBOptions {
899 options: db_options
900 .clone()
901 .optimize_for_write_throughput()
902 .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
903 .options,
904 rw_options: db_options.rw_options.set_ignore_range_deletions(false),
905 }
906}
907
908fn objects_table_config(
909 mut db_options: DBOptions,
910 compaction_filter: Option<ObjectsCompactionFilter>,
911) -> DBOptions {
912 if let Some(mut compaction_filter) = compaction_filter {
913 db_options
914 .options
915 .set_compaction_filter("objects", move |_, key, value| {
916 match compaction_filter.filter(key, value) {
917 Ok(decision) => decision,
918 Err(err) => {
919 error!("Compaction error: {:?}", err);
920 Decision::Keep
921 }
922 }
923 });
924 }
925 db_options
926 .optimize_for_write_throughput()
927 .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
928}
929
930fn transactions_table_config(db_options: DBOptions) -> DBOptions {
931 db_options
932 .optimize_for_write_throughput()
933 .optimize_for_point_lookup(
934 read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
935 )
936}
937
938fn effects_table_config(db_options: DBOptions) -> DBOptions {
939 db_options
940 .optimize_for_write_throughput()
941 .optimize_for_point_lookup(
942 read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
943 )
944}