1use crate::accumulators::funds_read::AccountFundsRead;
41use crate::authority::AuthorityStore;
42use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
43use crate::authority::authority_store::{
44 ExecutionLockWriteGuard, LockDetailsDeprecated, ObjectLockStatus, SuiLockResult,
45};
46use crate::authority::authority_store_tables::LiveObject;
47use crate::authority::backpressure::BackpressureManager;
48use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
49use crate::fallback_fetch::{do_fallback_lookup, do_fallback_lookup_fallible};
50use crate::global_state_hasher::GlobalStateHashStore;
51use crate::transaction_outputs::TransactionOutputs;
52
53use dashmap::DashMap;
54use dashmap::mapref::entry::Entry as DashMapEntry;
55use futures::{FutureExt, future::BoxFuture};
56use moka::sync::SegmentedCache as MokaCache;
57use mysten_common::debug_fatal;
58use mysten_common::random_util::randomize_cache_capacity_in_tests;
59use mysten_common::sync::notify_read::NotifyRead;
60use parking_lot::Mutex;
61use rayon::prelude::*;
62use std::collections::{BTreeMap, HashSet};
63use std::hash::Hash;
64use std::sync::Arc;
65use std::sync::atomic::AtomicU64;
66use sui_config::ExecutionCacheConfig;
67use sui_macros::fail_point;
68use sui_protocol_config::ProtocolVersion;
69use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
70use sui_types::accumulator_event::AccumulatorEvent;
71use sui_types::accumulator_root::{AccumulatorObjId, AccumulatorValue};
72use sui_types::base_types::{
73 EpochId, FullObjectID, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData,
74};
75use sui_types::bridge::{Bridge, get_bridge};
76use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest};
77use sui_types::effects::{TransactionEffects, TransactionEvents};
78use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
79use sui_types::executable_transaction::VerifiedExecutableTransaction;
80use sui_types::global_state_hash::GlobalStateHash;
81use sui_types::message_envelope::Message;
82use sui_types::messages_checkpoint::CheckpointSequenceNumber;
83use sui_types::object::Object;
84use sui_types::storage::{
85 FullObjectKey, InputKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject,
86};
87use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
88use sui_types::transaction::{TransactionDataAPI, VerifiedTransaction};
89use tap::TapOptional;
90use tracing::{debug, info, instrument, trace, warn};
91
92use super::ExecutionCacheAPI;
93use super::cache_types::Ticket;
94use super::{
95 Batch, CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
96 ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
97 cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache},
98 implement_passthrough_traits,
99 object_locks::ObjectLocks,
100};
101
102#[cfg(test)]
103#[path = "unit_tests/writeback_cache_tests.rs"]
104pub mod writeback_cache_tests;
105
106#[cfg(test)]
107#[path = "unit_tests/notify_read_input_objects_tests.rs"]
108mod notify_read_input_objects_tests;
109
110#[derive(Clone, PartialEq, Eq)]
111enum ObjectEntry {
112 Object(Object),
113 Deleted,
114 Wrapped,
115}
116
117impl ObjectEntry {
118 #[cfg(test)]
119 fn unwrap_object(&self) -> &Object {
120 match self {
121 ObjectEntry::Object(o) => o,
122 _ => panic!("unwrap_object called on non-Object"),
123 }
124 }
125
126 fn is_tombstone(&self) -> bool {
127 match self {
128 ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
129 ObjectEntry::Object(_) => false,
130 }
131 }
132}
133
134impl std::fmt::Debug for ObjectEntry {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 match self {
137 ObjectEntry::Object(o) => {
138 write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
139 }
140 ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
141 ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
142 }
143 }
144}
145
146impl From<Object> for ObjectEntry {
147 fn from(object: Object) -> Self {
148 ObjectEntry::Object(object)
149 }
150}
151
152impl From<ObjectOrTombstone> for ObjectEntry {
153 fn from(object: ObjectOrTombstone) -> Self {
154 match object {
155 ObjectOrTombstone::Object(o) => o.into(),
156 ObjectOrTombstone::Tombstone(obj_ref) => {
157 if obj_ref.2.is_deleted() {
158 ObjectEntry::Deleted
159 } else if obj_ref.2.is_wrapped() {
160 ObjectEntry::Wrapped
161 } else {
162 panic!("tombstone digest must either be deleted or wrapped");
163 }
164 }
165 }
166 }
167}
168
169#[derive(Debug, Clone, PartialEq, Eq)]
170enum LatestObjectCacheEntry {
171 Object(SequenceNumber, ObjectEntry),
172 NonExistent,
173}
174
175impl LatestObjectCacheEntry {
176 #[cfg(test)]
177 fn version(&self) -> Option<SequenceNumber> {
178 match self {
179 LatestObjectCacheEntry::Object(version, _) => Some(*version),
180 LatestObjectCacheEntry::NonExistent => None,
181 }
182 }
183
184 fn is_alive(&self) -> bool {
185 match self {
186 LatestObjectCacheEntry::Object(_, entry) => !entry.is_tombstone(),
187 LatestObjectCacheEntry::NonExistent => false,
188 }
189 }
190}
191
192impl IsNewer for LatestObjectCacheEntry {
193 fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
194 match (self, other) {
195 (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
196 v1 > v2
197 }
198 (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
199 _ => false,
200 }
201 }
202}
203
204type MarkerKey = (EpochId, FullObjectID);
205
206struct UncommittedData {
209 objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
222
223 markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
228
229 transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
230
231 transaction_events: DashMap<TransactionDigest, TransactionEvents>,
232
233 unchanged_loaded_runtime_objects: DashMap<TransactionDigest, Vec<ObjectKey>>,
234
235 executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
236
237 pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
240
241 total_transaction_inserts: AtomicU64,
242 total_transaction_commits: AtomicU64,
243}
244
245impl UncommittedData {
246 fn new() -> Self {
247 Self {
248 objects: DashMap::with_shard_amount(2048),
249 markers: DashMap::with_shard_amount(2048),
250 transaction_effects: DashMap::with_shard_amount(2048),
251 executed_effects_digests: DashMap::with_shard_amount(2048),
252 pending_transaction_writes: DashMap::with_shard_amount(2048),
253 transaction_events: DashMap::with_shard_amount(2048),
254 unchanged_loaded_runtime_objects: DashMap::with_shard_amount(2048),
255 total_transaction_inserts: AtomicU64::new(0),
256 total_transaction_commits: AtomicU64::new(0),
257 }
258 }
259
260 fn clear(&self) {
261 self.objects.clear();
262 self.markers.clear();
263 self.transaction_effects.clear();
264 self.executed_effects_digests.clear();
265 self.pending_transaction_writes.clear();
266 self.transaction_events.clear();
267 self.unchanged_loaded_runtime_objects.clear();
268 self.total_transaction_inserts
269 .store(0, std::sync::atomic::Ordering::Relaxed);
270 self.total_transaction_commits
271 .store(0, std::sync::atomic::Ordering::Relaxed);
272 }
273
274 fn is_empty(&self) -> bool {
275 let empty = self.pending_transaction_writes.is_empty();
276 if empty && cfg!(debug_assertions) {
277 assert!(
278 self.objects.is_empty()
279 && self.markers.is_empty()
280 && self.transaction_effects.is_empty()
281 && self.executed_effects_digests.is_empty()
282 && self.transaction_events.is_empty()
283 && self.unchanged_loaded_runtime_objects.is_empty()
284 && self
285 .total_transaction_inserts
286 .load(std::sync::atomic::Ordering::Relaxed)
287 == self
288 .total_transaction_commits
289 .load(std::sync::atomic::Ordering::Relaxed),
290 );
291 }
292 empty
293 }
294}
295
296type PointCacheItem<T> = Option<T>;
298
299impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
302 fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
303 match (self, other) {
304 (Some(_), None) => true,
305
306 (Some(a), Some(b)) => {
307 debug_assert_eq!(a, b);
309 false
310 }
311
312 _ => false,
313 }
314 }
315}
316
317struct CachedCommittedData {
319 object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
321
322 marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
324
325 transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
326
327 transaction_effects:
328 MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
329
330 transaction_events: MonotonicCache<TransactionDigest, PointCacheItem<Arc<TransactionEvents>>>,
331
332 executed_effects_digests:
333 MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
334
335 transaction_executed_in_last_epoch:
336 MonotonicCache<(EpochId, TransactionDigest), PointCacheItem<()>>,
337
338 _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
341}
342
343impl CachedCommittedData {
344 fn new(config: &ExecutionCacheConfig) -> Self {
345 let object_cache = MokaCache::builder(8)
346 .max_capacity(randomize_cache_capacity_in_tests(
347 config.object_cache_size(),
348 ))
349 .build();
350 let marker_cache = MokaCache::builder(8)
351 .max_capacity(randomize_cache_capacity_in_tests(
352 config.marker_cache_size(),
353 ))
354 .build();
355
356 let transactions = MonotonicCache::new(randomize_cache_capacity_in_tests(
357 config.transaction_cache_size(),
358 ));
359 let transaction_effects = MonotonicCache::new(randomize_cache_capacity_in_tests(
360 config.effect_cache_size(),
361 ));
362 let transaction_events = MonotonicCache::new(randomize_cache_capacity_in_tests(
363 config.events_cache_size(),
364 ));
365 let executed_effects_digests = MonotonicCache::new(randomize_cache_capacity_in_tests(
366 config.executed_effect_cache_size(),
367 ));
368
369 let transaction_objects = MokaCache::builder(8)
370 .max_capacity(randomize_cache_capacity_in_tests(
371 config.transaction_objects_cache_size(),
372 ))
373 .build();
374
375 let transaction_executed_in_last_epoch = MonotonicCache::new(
376 randomize_cache_capacity_in_tests(config.executed_effect_cache_size()),
377 );
378
379 Self {
380 object_cache,
381 marker_cache,
382 transactions,
383 transaction_effects,
384 transaction_events,
385 executed_effects_digests,
386 transaction_executed_in_last_epoch,
387 _transaction_objects: transaction_objects,
388 }
389 }
390
391 fn clear_and_assert_empty(&self) {
392 self.object_cache.invalidate_all();
393 self.marker_cache.invalidate_all();
394 self.transactions.invalidate_all();
395 self.transaction_effects.invalidate_all();
396 self.transaction_events.invalidate_all();
397 self.executed_effects_digests.invalidate_all();
398 self.transaction_executed_in_last_epoch.invalidate_all();
399 self._transaction_objects.invalidate_all();
400
401 assert_empty(&self.object_cache);
402 assert_empty(&self.marker_cache);
403 assert!(self.transactions.is_empty());
404 assert!(self.transaction_effects.is_empty());
405 assert!(self.transaction_events.is_empty());
406 assert!(self.executed_effects_digests.is_empty());
407 assert!(self.transaction_executed_in_last_epoch.is_empty());
408 assert_empty(&self._transaction_objects);
409 }
410}
411
412fn assert_empty<K, V>(cache: &MokaCache<K, V>)
413where
414 K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
415 V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
416{
417 if cache.iter().next().is_some() {
418 panic!("cache should be empty");
419 }
420}
421
422pub struct WritebackCache {
423 dirty: UncommittedData,
424 cached: CachedCommittedData,
425
426 object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
433
434 packages: MokaCache<ObjectID, PackageObject>,
445
446 object_locks: ObjectLocks,
447
448 executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
449 object_notify_read: NotifyRead<InputKey, ()>,
450
451 store: Arc<AuthorityStore>,
452 backpressure_threshold: u64,
453 backpressure_manager: Arc<BackpressureManager>,
454 metrics: Arc<ExecutionCacheMetrics>,
455}
456
457macro_rules! check_cache_entry_by_version {
458 ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
459 $self.metrics.record_cache_request($table, $level);
460 if let Some(cache) = $cache {
461 if let Some(entry) = cache.get(&$version) {
462 $self.metrics.record_cache_hit($table, $level);
463 return CacheResult::Hit(entry.clone());
464 }
465
466 if let Some(least_version) = cache.get_least() {
467 if least_version.0 < $version {
468 $self.metrics.record_cache_negative_hit($table, $level);
471 return CacheResult::NegativeHit;
472 }
473 }
474 }
475 $self.metrics.record_cache_miss($table, $level);
476 };
477}
478
479macro_rules! check_cache_entry_by_latest {
480 ($self: ident, $table: expr, $level: expr, $cache: expr) => {
481 $self.metrics.record_cache_request($table, $level);
482 if let Some(cache) = $cache {
483 if let Some((version, entry)) = cache.get_highest() {
484 $self.metrics.record_cache_hit($table, $level);
485 return CacheResult::Hit((*version, entry.clone()));
486 } else {
487 panic!("empty CachedVersionMap should have been removed");
488 }
489 }
490 $self.metrics.record_cache_miss($table, $level);
491 };
492}
493
494impl WritebackCache {
495 pub fn new(
496 config: &ExecutionCacheConfig,
497 store: Arc<AuthorityStore>,
498 metrics: Arc<ExecutionCacheMetrics>,
499 backpressure_manager: Arc<BackpressureManager>,
500 ) -> Self {
501 let packages = MokaCache::builder(8)
502 .max_capacity(randomize_cache_capacity_in_tests(
503 config.package_cache_size(),
504 ))
505 .build();
506 Self {
507 dirty: UncommittedData::new(),
508 cached: CachedCommittedData::new(config),
509 object_by_id_cache: MonotonicCache::new(randomize_cache_capacity_in_tests(
510 config.object_by_id_cache_size(),
511 )),
512 packages,
513 object_locks: ObjectLocks::new(),
514 executed_effects_digests_notify_read: NotifyRead::new(),
515 object_notify_read: NotifyRead::new(),
516 store,
517 backpressure_manager,
518 backpressure_threshold: config.backpressure_threshold(),
519 metrics,
520 }
521 }
522
523 pub fn new_for_tests(store: Arc<AuthorityStore>) -> Self {
524 Self::new(
525 &Default::default(),
526 store,
527 ExecutionCacheMetrics::new(&prometheus::Registry::new()).into(),
528 BackpressureManager::new_for_tests(),
529 )
530 }
531
532 #[cfg(test)]
533 pub fn reset_for_test(&mut self) {
534 let mut new = Self::new(
535 &Default::default(),
536 self.store.clone(),
537 self.metrics.clone(),
538 self.backpressure_manager.clone(),
539 );
540 std::mem::swap(self, &mut new);
541 }
542
543 pub fn evict_executed_effects_from_cache_for_testing(&self, tx_digest: &TransactionDigest) {
544 self.cached.executed_effects_digests.invalidate(tx_digest);
545 self.cached.transaction_events.invalidate(tx_digest);
546 self.cached.transactions.invalidate(tx_digest);
547 }
548
549 fn write_object_entry(
550 &self,
551 object_id: &ObjectID,
552 version: SequenceNumber,
553 object: ObjectEntry,
554 ) {
555 trace!(?object_id, ?version, ?object, "inserting object entry");
556 self.metrics.record_cache_write("object");
557
558 let mut entry = self.dirty.objects.entry(*object_id).or_default();
578
579 self.object_by_id_cache
580 .insert(
581 object_id,
582 LatestObjectCacheEntry::Object(version, object.clone()),
583 Ticket::Write,
584 )
585 .ok();
588
589 entry.insert(version, object.clone());
590
591 if let ObjectEntry::Object(object) = &object {
592 if object.is_package() {
593 self.object_notify_read
594 .notify(&InputKey::Package { id: *object_id }, &());
595 } else if !object.is_child_object() {
596 self.object_notify_read.notify(
597 &InputKey::VersionedObject {
598 id: object.full_id(),
599 version: object.version(),
600 },
601 &(),
602 );
603 }
604 }
605 }
606
607 fn write_marker_value(
608 &self,
609 epoch_id: EpochId,
610 object_key: FullObjectKey,
611 marker_value: MarkerValue,
612 ) {
613 tracing::trace!("inserting marker value {object_key:?}: {marker_value:?}",);
614 self.metrics.record_cache_write("marker");
615 self.dirty
616 .markers
617 .entry((epoch_id, object_key.id()))
618 .or_default()
619 .value_mut()
620 .insert(object_key.version(), marker_value);
621 if matches!(marker_value, MarkerValue::ConsensusStreamEnded(_)) {
626 self.object_notify_read.notify(
627 &InputKey::VersionedObject {
628 id: object_key.id(),
629 version: object_key.version(),
630 },
631 &(),
632 );
633 }
634 }
635
636 fn with_locked_cache_entries<K, V, R>(
640 dirty_map: &DashMap<K, CachedVersionMap<V>>,
641 cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
642 key: &K,
643 cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
644 ) -> R
645 where
646 K: Copy + Eq + Hash + Send + Sync + 'static,
647 V: Send + Sync + 'static,
648 {
649 let dirty_entry = dirty_map.entry(*key);
650 let dirty_entry = match &dirty_entry {
651 DashMapEntry::Occupied(occupied) => Some(occupied.get()),
652 DashMapEntry::Vacant(_) => None,
653 };
654
655 let cached_entry = cached_map.get(key);
656 let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
657 let cached_entry = cached_lock.as_deref();
658
659 cb(dirty_entry, cached_entry)
660 }
661
662 fn get_object_entry_by_key_cache_only(
665 &self,
666 object_id: &ObjectID,
667 version: SequenceNumber,
668 ) -> CacheResult<ObjectEntry> {
669 Self::with_locked_cache_entries(
670 &self.dirty.objects,
671 &self.cached.object_cache,
672 object_id,
673 |dirty_entry, cached_entry| {
674 check_cache_entry_by_version!(
675 self,
676 "object_by_version",
677 "uncommitted",
678 dirty_entry,
679 version
680 );
681 check_cache_entry_by_version!(
682 self,
683 "object_by_version",
684 "committed",
685 cached_entry,
686 version
687 );
688 CacheResult::Miss
689 },
690 )
691 }
692
693 fn get_object_by_key_cache_only(
694 &self,
695 object_id: &ObjectID,
696 version: SequenceNumber,
697 ) -> CacheResult<Object> {
698 match self.get_object_entry_by_key_cache_only(object_id, version) {
699 CacheResult::Hit(entry) => match entry {
700 ObjectEntry::Object(object) => CacheResult::Hit(object),
701 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
702 },
703 CacheResult::Miss => CacheResult::Miss,
704 CacheResult::NegativeHit => CacheResult::NegativeHit,
705 }
706 }
707
708 fn get_object_entry_by_id_cache_only(
709 &self,
710 request_type: &'static str,
711 object_id: &ObjectID,
712 ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
713 self.metrics
714 .record_cache_request(request_type, "object_by_id");
715 let entry = self.object_by_id_cache.get(object_id);
716
717 if cfg!(debug_assertions)
718 && let Some(entry) = &entry
719 {
720 let highest: Option<ObjectEntry> = self
722 .dirty
723 .objects
724 .get(object_id)
725 .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
726 .or_else(|| {
727 let obj: Option<ObjectEntry> = self
728 .store
729 .get_latest_object_or_tombstone(*object_id)
730 .unwrap()
731 .map(|(_, o)| o.into());
732 obj
733 });
734
735 let cache_entry = match &*entry.lock() {
736 LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
737 LatestObjectCacheEntry::NonExistent => None,
738 };
739
740 let tombstone_possibly_pruned = highest.is_none()
742 && cache_entry
743 .as_ref()
744 .map(|e| e.is_tombstone())
745 .unwrap_or(false);
746
747 if highest != cache_entry && !tombstone_possibly_pruned {
748 tracing::error!(
749 ?highest,
750 ?cache_entry,
751 ?tombstone_possibly_pruned,
752 "object_by_id cache is incoherent for {:?}",
753 object_id
754 );
755 panic!("object_by_id cache is incoherent for {:?}", object_id);
756 }
757 }
758
759 if let Some(entry) = entry {
760 let entry = entry.lock();
761 match &*entry {
762 LatestObjectCacheEntry::Object(latest_version, latest_object) => {
763 self.metrics.record_cache_hit(request_type, "object_by_id");
764 return CacheResult::Hit((*latest_version, latest_object.clone()));
765 }
766 LatestObjectCacheEntry::NonExistent => {
767 self.metrics
768 .record_cache_negative_hit(request_type, "object_by_id");
769 return CacheResult::NegativeHit;
770 }
771 }
772 } else {
773 self.metrics.record_cache_miss(request_type, "object_by_id");
774 }
775
776 Self::with_locked_cache_entries(
777 &self.dirty.objects,
778 &self.cached.object_cache,
779 object_id,
780 |dirty_entry, cached_entry| {
781 check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
782 check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
783 CacheResult::Miss
784 },
785 )
786 }
787
788 fn get_object_by_id_cache_only(
789 &self,
790 request_type: &'static str,
791 object_id: &ObjectID,
792 ) -> CacheResult<(SequenceNumber, Object)> {
793 match self.get_object_entry_by_id_cache_only(request_type, object_id) {
794 CacheResult::Hit((version, entry)) => match entry {
795 ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
796 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
797 },
798 CacheResult::NegativeHit => CacheResult::NegativeHit,
799 CacheResult::Miss => CacheResult::Miss,
800 }
801 }
802
803 fn get_marker_value_cache_only(
804 &self,
805 object_key: FullObjectKey,
806 epoch_id: EpochId,
807 ) -> CacheResult<MarkerValue> {
808 Self::with_locked_cache_entries(
809 &self.dirty.markers,
810 &self.cached.marker_cache,
811 &(epoch_id, object_key.id()),
812 |dirty_entry, cached_entry| {
813 check_cache_entry_by_version!(
814 self,
815 "marker_by_version",
816 "uncommitted",
817 dirty_entry,
818 object_key.version()
819 );
820 check_cache_entry_by_version!(
821 self,
822 "marker_by_version",
823 "committed",
824 cached_entry,
825 object_key.version()
826 );
827 CacheResult::Miss
828 },
829 )
830 }
831
832 fn get_latest_marker_value_cache_only(
833 &self,
834 object_id: FullObjectID,
835 epoch_id: EpochId,
836 ) -> CacheResult<(SequenceNumber, MarkerValue)> {
837 Self::with_locked_cache_entries(
838 &self.dirty.markers,
839 &self.cached.marker_cache,
840 &(epoch_id, object_id),
841 |dirty_entry, cached_entry| {
842 check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
843 check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
844 CacheResult::Miss
845 },
846 )
847 }
848
849 fn get_object_impl(&self, request_type: &'static str, id: &ObjectID) -> Option<Object> {
850 let ticket = self.object_by_id_cache.get_ticket_for_read(id);
851 match self.get_object_entry_by_id_cache_only(request_type, id) {
852 CacheResult::Hit((_, entry)) => match entry {
853 ObjectEntry::Object(object) => Some(object),
854 ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
855 },
856 CacheResult::NegativeHit => None,
857 CacheResult::Miss => {
858 let obj = self
859 .store
860 .get_latest_object_or_tombstone(*id)
861 .expect("db error");
862 match obj {
863 Some((key, obj)) => {
864 self.cache_latest_object_by_id(
865 id,
866 LatestObjectCacheEntry::Object(key.1, obj.clone().into()),
867 ticket,
868 );
869 match obj {
870 ObjectOrTombstone::Object(object) => Some(object),
871 ObjectOrTombstone::Tombstone(_) => None,
872 }
873 }
874 None => {
875 self.cache_object_not_found(id, ticket);
876 None
877 }
878 }
879 }
880 }
881 }
882
883 fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
884 self.metrics.record_cache_request(request_type, "db");
885 &self.store
886 }
887
888 fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
889 self.metrics
890 .record_cache_multi_request(request_type, "db", count);
891 &self.store
892 }
893
894 #[instrument(level = "debug", skip_all)]
895 fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
896 let tx_digest = *tx_outputs.transaction.digest();
897 trace!(?tx_digest, "writing transaction outputs to cache");
898
899 assert!(
900 !self.transaction_executed_in_last_epoch(&tx_digest, epoch_id),
901 "Transaction {:?} was already executed in epoch {}",
902 tx_digest,
903 epoch_id.saturating_sub(1)
904 );
905
906 let TransactionOutputs {
907 transaction,
908 effects,
909 markers,
910 written,
911 deleted,
912 wrapped,
913 events,
914 unchanged_loaded_runtime_objects,
915 ..
916 } = &*tx_outputs;
917
918 for ObjectKey(id, version) in deleted.iter() {
923 self.write_object_entry(id, *version, ObjectEntry::Deleted);
924 }
925
926 for ObjectKey(id, version) in wrapped.iter() {
927 self.write_object_entry(id, *version, ObjectEntry::Wrapped);
928 }
929
930 for (object_key, marker_value) in markers.iter() {
932 self.write_marker_value(epoch_id, *object_key, *marker_value);
933 }
934
935 for (object_id, object) in written.iter() {
938 if object.is_child_object() {
939 self.write_object_entry(object_id, object.version(), object.clone().into());
940 }
941 }
942 for (object_id, object) in written.iter() {
943 if !object.is_child_object() {
944 self.write_object_entry(object_id, object.version(), object.clone().into());
945 if object.is_package() {
946 debug!("caching package: {:?}", object.compute_object_reference());
947 self.packages
948 .insert(*object_id, PackageObject::new(object.clone()));
949 }
950 }
951 }
952
953 let tx_digest = *transaction.digest();
954 debug!(
955 ?tx_digest,
956 "Writing transaction output objects to cache: {:?}",
957 written
958 .values()
959 .map(|o| (o.id(), o.version()))
960 .collect::<Vec<_>>(),
961 );
962 let effects_digest = effects.digest();
963
964 self.metrics.record_cache_write("transaction_block");
965 self.dirty
966 .pending_transaction_writes
967 .insert(tx_digest, tx_outputs.clone());
968
969 self.metrics.record_cache_write("transaction_effects");
972 self.dirty
973 .transaction_effects
974 .insert(effects_digest, effects.clone());
975
976 self.metrics.record_cache_write("transaction_events");
980 self.dirty
981 .transaction_events
982 .insert(tx_digest, events.clone());
983
984 self.metrics
985 .record_cache_write("unchanged_loaded_runtime_objects");
986 self.dirty
987 .unchanged_loaded_runtime_objects
988 .insert(tx_digest, unchanged_loaded_runtime_objects.clone());
989
990 self.metrics.record_cache_write("executed_effects_digests");
991 self.dirty
992 .executed_effects_digests
993 .insert(tx_digest, effects_digest);
994
995 self.executed_effects_digests_notify_read
996 .notify(&tx_digest, &effects_digest);
997
998 self.metrics
999 .pending_notify_read
1000 .set(self.executed_effects_digests_notify_read.num_pending() as i64);
1001
1002 let prev = self
1003 .dirty
1004 .total_transaction_inserts
1005 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1006
1007 let pending_count = (prev + 1).saturating_sub(
1008 self.dirty
1009 .total_transaction_commits
1010 .load(std::sync::atomic::Ordering::Relaxed),
1011 );
1012
1013 self.set_backpressure(pending_count);
1014 }
1015
1016 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1017 let _metrics_guard = mysten_metrics::monitored_scope("WritebackCache::build_db_batch");
1018 let mut all_outputs = Vec::with_capacity(digests.len());
1019 for tx in digests {
1020 let Some(outputs) = self
1021 .dirty
1022 .pending_transaction_writes
1023 .get(tx)
1024 .map(|o| o.clone())
1025 else {
1026 warn!("Attempt to commit unknown transaction {:?}", tx);
1032 continue;
1033 };
1034 all_outputs.push(outputs);
1035 }
1036
1037 let batch = self
1038 .store
1039 .build_db_batch(epoch, &all_outputs)
1040 .expect("db error");
1041 (all_outputs, batch)
1042 }
1043
1044 #[instrument(level = "debug", skip_all)]
1046 fn commit_transaction_outputs(
1047 &self,
1048 epoch: EpochId,
1049 (all_outputs, db_batch): Batch,
1050 digests: &[TransactionDigest],
1051 ) {
1052 let _metrics_guard =
1053 mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs");
1054 fail_point!("writeback-cache-commit");
1055 trace!(?digests);
1056
1057 db_batch.write().expect("db error");
1061
1062 let _metrics_guard =
1063 mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs::flush");
1064 all_outputs.par_iter().with_min_len(16).for_each(|outputs| {
1067 let tx_digest = outputs.transaction.digest();
1068 assert!(
1069 self.dirty
1070 .pending_transaction_writes
1071 .remove(tx_digest)
1072 .is_some()
1073 );
1074 self.flush_tx_metadata_from_dirty_to_cached(*tx_digest, outputs);
1075 });
1076
1077 for outputs in all_outputs.iter() {
1082 self.flush_objects_from_dirty_to_cached(epoch, outputs);
1083 }
1084
1085 let num_outputs = all_outputs.len() as u64;
1086 let num_commits = self
1087 .dirty
1088 .total_transaction_commits
1089 .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
1090 + num_outputs;
1091
1092 let pending_count = self
1093 .dirty
1094 .total_transaction_inserts
1095 .load(std::sync::atomic::Ordering::Relaxed)
1096 .saturating_sub(num_commits);
1097
1098 self.set_backpressure(pending_count);
1099 }
1100
1101 fn approximate_pending_transaction_count(&self) -> u64 {
1102 let num_commits = self
1103 .dirty
1104 .total_transaction_commits
1105 .load(std::sync::atomic::Ordering::Relaxed);
1106
1107 self.dirty
1108 .total_transaction_inserts
1109 .load(std::sync::atomic::Ordering::Relaxed)
1110 .saturating_sub(num_commits)
1111 }
1112
1113 fn set_backpressure(&self, pending_count: u64) {
1114 let backpressure = pending_count > self.backpressure_threshold;
1115 let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1116 if backpressure_changed {
1117 self.metrics.backpressure_toggles.inc();
1118 }
1119 self.metrics
1120 .backpressure_status
1121 .set(if backpressure { 1 } else { 0 });
1122 }
1123
1124 fn flush_tx_metadata_from_dirty_to_cached(
1128 &self,
1129 tx_digest: TransactionDigest,
1130 outputs: &TransactionOutputs,
1131 ) {
1132 let TransactionOutputs {
1134 transaction,
1135 effects,
1136 events,
1137 ..
1138 } = outputs;
1139
1140 let effects_digest = effects.digest();
1141
1142 self.cached
1145 .transactions
1146 .insert(
1147 &tx_digest,
1148 PointCacheItem::Some(transaction.clone()),
1149 Ticket::Write,
1150 )
1151 .ok();
1152 self.cached
1153 .transaction_effects
1154 .insert(
1155 &effects_digest,
1156 PointCacheItem::Some(effects.clone().into()),
1157 Ticket::Write,
1158 )
1159 .ok();
1160 self.cached
1161 .executed_effects_digests
1162 .insert(
1163 &tx_digest,
1164 PointCacheItem::Some(effects_digest),
1165 Ticket::Write,
1166 )
1167 .ok();
1168 self.cached
1169 .transaction_events
1170 .insert(
1171 &tx_digest,
1172 PointCacheItem::Some(events.clone().into()),
1173 Ticket::Write,
1174 )
1175 .ok();
1176
1177 self.dirty
1178 .transaction_effects
1179 .remove(&effects_digest)
1180 .expect("effects must exist");
1181
1182 self.dirty
1183 .transaction_events
1184 .remove(&tx_digest)
1185 .expect("events must exist");
1186
1187 self.dirty
1188 .unchanged_loaded_runtime_objects
1189 .remove(&tx_digest)
1190 .expect("unchanged_loaded_runtime_objects must exist");
1191
1192 self.dirty
1193 .executed_effects_digests
1194 .remove(&tx_digest)
1195 .expect("executed effects must exist");
1196 }
1197
1198 fn flush_objects_from_dirty_to_cached(&self, epoch: EpochId, outputs: &TransactionOutputs) {
1202 let TransactionOutputs {
1203 markers,
1204 written,
1205 deleted,
1206 wrapped,
1207 ..
1208 } = outputs;
1209
1210 for (object_key, marker_value) in markers.iter() {
1211 Self::move_version_from_dirty_to_cache(
1212 &self.dirty.markers,
1213 &self.cached.marker_cache,
1214 (epoch, object_key.id()),
1215 object_key.version(),
1216 marker_value,
1217 );
1218 }
1219
1220 for (object_id, object) in written.iter() {
1221 Self::move_version_from_dirty_to_cache(
1222 &self.dirty.objects,
1223 &self.cached.object_cache,
1224 *object_id,
1225 object.version(),
1226 &ObjectEntry::Object(object.clone()),
1227 );
1228 }
1229
1230 for ObjectKey(object_id, version) in deleted.iter() {
1231 Self::move_version_from_dirty_to_cache(
1232 &self.dirty.objects,
1233 &self.cached.object_cache,
1234 *object_id,
1235 *version,
1236 &ObjectEntry::Deleted,
1237 );
1238 }
1239
1240 for ObjectKey(object_id, version) in wrapped.iter() {
1241 Self::move_version_from_dirty_to_cache(
1242 &self.dirty.objects,
1243 &self.cached.object_cache,
1244 *object_id,
1245 *version,
1246 &ObjectEntry::Wrapped,
1247 );
1248 }
1249 }
1250
1251 fn move_version_from_dirty_to_cache<K, V>(
1254 dirty: &DashMap<K, CachedVersionMap<V>>,
1255 cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1256 key: K,
1257 version: SequenceNumber,
1258 value: &V,
1259 ) where
1260 K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1261 V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1262 {
1263 static MAX_VERSIONS: usize = 3;
1264
1265 let dirty_entry = dirty.entry(key);
1268 let cache_entry = cache.entry(key).or_default();
1269 let mut cache_map = cache_entry.value().lock();
1270
1271 cache_map.insert(version, value.clone());
1273 cache_map.truncate_to(MAX_VERSIONS);
1275
1276 let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1277 panic!("dirty map must exist");
1278 };
1279
1280 let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1281
1282 assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1283
1284 if occupied_dirty_entry.get().is_empty() {
1286 occupied_dirty_entry.remove();
1287 }
1288 }
1289
1290 fn cache_latest_object_by_id(
1292 &self,
1293 object_id: &ObjectID,
1294 object: LatestObjectCacheEntry,
1295 ticket: Ticket,
1296 ) {
1297 trace!("caching object by id: {:?} {:?}", object_id, object);
1298 if self
1299 .object_by_id_cache
1300 .insert(object_id, object, ticket)
1301 .is_ok()
1302 {
1303 self.metrics.record_cache_write("object_by_id");
1304 } else {
1305 trace!("discarded cache write due to expired ticket");
1306 self.metrics.record_ticket_expiry();
1307 }
1308 }
1309
1310 fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1311 self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1312 }
1313
1314 fn clear_state_end_of_epoch_impl(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
1315 info!("clearing state at end of epoch");
1316
1317 for r in self.dirty.pending_transaction_writes.iter() {
1320 let outputs = r.value();
1321 if !outputs
1322 .transaction
1323 .transaction_data()
1324 .shared_input_objects()
1325 .is_empty()
1326 {
1327 debug_fatal!("transaction must be single writer");
1328 }
1329 info!(
1330 "clearing state for transaction {:?}",
1331 outputs.transaction.digest()
1332 );
1333 for (object_id, object) in outputs.written.iter() {
1334 if object.is_package() {
1335 info!("removing non-finalized package from cache: {:?}", object_id);
1336 self.packages.invalidate(object_id);
1337 }
1338 self.object_by_id_cache.invalidate(object_id);
1339 self.cached.object_cache.invalidate(object_id);
1340 }
1341
1342 for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1343 self.object_by_id_cache.invalidate(object_id);
1344 self.cached.object_cache.invalidate(object_id);
1345 }
1346 }
1347
1348 self.dirty.clear();
1349
1350 info!("clearing old transaction locks");
1351 self.object_locks.clear();
1352 info!("clearing object per epoch marker table");
1353 self.store
1354 .clear_object_per_epoch_marker_table(execution_guard)
1355 .expect("db error");
1356 }
1357
1358 fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) {
1359 self.store
1360 .bulk_insert_genesis_objects(objects)
1361 .expect("db error");
1362 for obj in objects {
1363 self.cached.object_cache.invalidate(&obj.id());
1364 self.object_by_id_cache.invalidate(&obj.id());
1365 }
1366 }
1367
1368 fn insert_genesis_object_impl(&self, object: Object) {
1369 self.object_by_id_cache.invalidate(&object.id());
1370 self.cached.object_cache.invalidate(&object.id());
1371 self.store.insert_genesis_object(object).expect("db error");
1372 }
1373
1374 pub fn clear_caches_and_assert_empty(&self) {
1375 info!("clearing caches");
1376 self.cached.clear_and_assert_empty();
1377 self.object_by_id_cache.invalidate_all();
1378 assert!(&self.object_by_id_cache.is_empty());
1379 self.packages.invalidate_all();
1380 assert_empty(&self.packages);
1381 }
1382}
1383
1384impl AccountFundsRead for WritebackCache {
1385 fn get_latest_account_amount(&self, account_id: &AccumulatorObjId) -> (u128, SequenceNumber) {
1386 let mut pre_root_version =
1387 ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1388 .unwrap()
1389 .version();
1390 let mut loop_iter = 0;
1391 loop {
1392 let account_obj = ObjectCacheRead::get_object(self, account_id.inner());
1393 if let Some(account_obj) = account_obj {
1394 let (_, AccumulatorValue::U128(value)) =
1395 account_obj.data.try_as_move().unwrap().try_into().unwrap();
1396 return (value.value, account_obj.version());
1397 }
1398 let post_root_version =
1399 ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1400 .unwrap()
1401 .version();
1402 if pre_root_version == post_root_version {
1403 return (0, pre_root_version);
1404 }
1405 debug!(
1406 "Root version changed from {} to {} while reading account amount, retrying",
1407 pre_root_version, post_root_version
1408 );
1409 pre_root_version = post_root_version;
1410 loop_iter += 1;
1411 if loop_iter >= 3 {
1412 debug_fatal!("Unable to get a stable version after 3 iterations");
1413 }
1414 }
1415 }
1416
1417 fn get_account_amount_at_version(
1418 &self,
1419 account_id: &AccumulatorObjId,
1420 version: SequenceNumber,
1421 ) -> u128 {
1422 let account_obj = self.find_object_lt_or_eq_version(*account_id.inner(), version);
1423 if let Some(account_obj) = account_obj {
1424 let (_, AccumulatorValue::U128(value)) =
1425 account_obj.data.try_as_move().unwrap().try_into().unwrap();
1426 value.value
1427 } else {
1428 0
1429 }
1430 }
1431}
1432
1433impl ExecutionCacheAPI for WritebackCache {}
1434
1435impl ExecutionCacheCommit for WritebackCache {
1436 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1437 self.build_db_batch(epoch, digests)
1438 }
1439
1440 fn commit_transaction_outputs(
1441 &self,
1442 epoch: EpochId,
1443 batch: Batch,
1444 digests: &[TransactionDigest],
1445 ) {
1446 WritebackCache::commit_transaction_outputs(self, epoch, batch, digests)
1447 }
1448
1449 fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) {
1450 self.store.persist_transaction(tx).expect("db error");
1451 }
1452
1453 fn approximate_pending_transaction_count(&self) -> u64 {
1454 WritebackCache::approximate_pending_transaction_count(self)
1455 }
1456}
1457
1458impl ObjectCacheRead for WritebackCache {
1459 fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1460 self.metrics
1461 .record_cache_request("package", "package_cache");
1462 if let Some(p) = self.packages.get(package_id) {
1463 if cfg!(debug_assertions) {
1464 let canonical_package = self
1465 .dirty
1466 .objects
1467 .get(package_id)
1468 .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1469 Some(ObjectEntry::Object(object)) => Some(object),
1470 _ => None,
1471 })
1472 .or_else(|| self.store.get_object(package_id));
1473
1474 if let Some(canonical_package) = canonical_package {
1475 assert_eq!(
1476 canonical_package.digest(),
1477 p.object().digest(),
1478 "Package object cache is inconsistent for package {:?}",
1479 package_id
1480 );
1481 }
1482 }
1483 self.metrics.record_cache_hit("package", "package_cache");
1484 return Ok(Some(p));
1485 } else {
1486 self.metrics.record_cache_miss("package", "package_cache");
1487 }
1488
1489 if let Some(p) = self.get_object_impl("package", package_id) {
1493 if p.is_package() {
1494 let p = PackageObject::new(p);
1495 tracing::trace!(
1496 "caching package: {:?}",
1497 p.object().compute_object_reference()
1498 );
1499 self.metrics.record_cache_write("package");
1500 self.packages.insert(*package_id, p.clone());
1501 Ok(Some(p))
1502 } else {
1503 Err(SuiErrorKind::UserInputError {
1504 error: UserInputError::MoveObjectAsPackage {
1505 object_id: *package_id,
1506 },
1507 }
1508 .into())
1509 }
1510 } else {
1511 Ok(None)
1512 }
1513 }
1514
1515 fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1516 }
1519
1520 fn get_object(&self, id: &ObjectID) -> Option<Object> {
1523 self.get_object_impl("object_latest", id)
1524 }
1525
1526 fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object> {
1527 match self.get_object_by_key_cache_only(object_id, version) {
1528 CacheResult::Hit(object) => Some(object),
1529 CacheResult::NegativeHit => None,
1530 CacheResult::Miss => self
1531 .record_db_get("object_by_version")
1532 .get_object_by_key(object_id, version),
1533 }
1534 }
1535
1536 fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
1537 do_fallback_lookup(
1538 object_keys,
1539 |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1540 CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1541 CacheResult::NegativeHit => CacheResult::NegativeHit,
1542 CacheResult::Miss => CacheResult::Miss,
1543 },
1544 |remaining| {
1545 self.record_db_multi_get("object_by_version", remaining.len())
1546 .multi_get_objects_by_key(remaining)
1547 .expect("db error")
1548 },
1549 )
1550 }
1551
1552 fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool {
1553 match self.get_object_by_key_cache_only(object_id, version) {
1554 CacheResult::Hit(_) => true,
1555 CacheResult::NegativeHit => false,
1556 CacheResult::Miss => self
1557 .record_db_get("object_by_version")
1558 .object_exists_by_key(object_id, version)
1559 .expect("db error"),
1560 }
1561 }
1562
1563 fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool> {
1564 do_fallback_lookup(
1565 object_keys,
1566 |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1567 CacheResult::Hit(_) => CacheResult::Hit(true),
1568 CacheResult::NegativeHit => CacheResult::Hit(false),
1569 CacheResult::Miss => CacheResult::Miss,
1570 },
1571 |remaining| {
1572 self.record_db_multi_get("object_by_version", remaining.len())
1573 .multi_object_exists_by_key(remaining)
1574 .expect("db error")
1575 },
1576 )
1577 }
1578
1579 fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
1580 match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1581 CacheResult::Hit((version, entry)) => Some(match entry {
1582 ObjectEntry::Object(object) => object.compute_object_reference(),
1583 ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1584 ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1585 }),
1586 CacheResult::NegativeHit => None,
1587 CacheResult::Miss => self
1588 .record_db_get("latest_objref_or_tombstone")
1589 .get_latest_object_ref_or_tombstone(object_id)
1590 .expect("db error"),
1591 }
1592 }
1593
1594 fn get_latest_object_or_tombstone(
1595 &self,
1596 object_id: ObjectID,
1597 ) -> Option<(ObjectKey, ObjectOrTombstone)> {
1598 match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1599 CacheResult::Hit((version, entry)) => {
1600 let key = ObjectKey(object_id, version);
1601 Some(match entry {
1602 ObjectEntry::Object(object) => (key, object.into()),
1603 ObjectEntry::Deleted => (
1604 key,
1605 ObjectOrTombstone::Tombstone((
1606 object_id,
1607 version,
1608 ObjectDigest::OBJECT_DIGEST_DELETED,
1609 )),
1610 ),
1611 ObjectEntry::Wrapped => (
1612 key,
1613 ObjectOrTombstone::Tombstone((
1614 object_id,
1615 version,
1616 ObjectDigest::OBJECT_DIGEST_WRAPPED,
1617 )),
1618 ),
1619 })
1620 }
1621 CacheResult::NegativeHit => None,
1622 CacheResult::Miss => self
1623 .record_db_get("latest_object_or_tombstone")
1624 .get_latest_object_or_tombstone(object_id)
1625 .expect("db error"),
1626 }
1627 }
1628
1629 fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool> {
1630 keys.iter()
1631 .map(|key| {
1632 if key.is_cancelled() {
1633 true
1634 } else {
1635 match key {
1636 InputKey::VersionedObject { id, version } => {
1637 matches!(
1638 self.get_object_by_key_cache_only(&id.id(), *version),
1639 CacheResult::Hit(_)
1640 )
1641 }
1642 InputKey::Package { id } => self.packages.contains_key(id),
1643 }
1644 }
1645 })
1646 .collect()
1647 }
1648
1649 #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1650 fn find_object_lt_or_eq_version(
1651 &self,
1652 object_id: ObjectID,
1653 version_bound: SequenceNumber,
1654 ) -> Option<Object> {
1655 macro_rules! check_cache_entry {
1656 ($level: expr, $objects: expr) => {
1657 self.metrics
1658 .record_cache_request("object_lt_or_eq_version", $level);
1659 if let Some(objects) = $objects {
1660 if let Some((_, object)) = objects
1661 .all_versions_lt_or_eq_descending(&version_bound)
1662 .next()
1663 {
1664 if let ObjectEntry::Object(object) = object {
1665 self.metrics
1666 .record_cache_hit("object_lt_or_eq_version", $level);
1667 return Some(object.clone());
1668 } else {
1669 self.metrics
1671 .record_cache_negative_hit("object_lt_or_eq_version", $level);
1672 return None;
1673 }
1674 } else {
1675 self.metrics
1676 .record_cache_miss("object_lt_or_eq_version", $level);
1677 }
1678 }
1679 };
1680 }
1681
1682 self.metrics
1684 .record_cache_request("object_lt_or_eq_version", "object_by_id");
1685 let latest_cache_entry = self.object_by_id_cache.get(&object_id);
1686 if let Some(latest) = &latest_cache_entry {
1687 let latest = latest.lock();
1688 match &*latest {
1689 LatestObjectCacheEntry::Object(latest_version, object) => {
1690 if *latest_version <= version_bound {
1691 if let ObjectEntry::Object(object) = object {
1692 self.metrics
1693 .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1694 return Some(object.clone());
1695 } else {
1696 self.metrics.record_cache_negative_hit(
1698 "object_lt_or_eq_version",
1699 "object_by_id",
1700 );
1701 return None;
1702 }
1703 }
1704 }
1706 LatestObjectCacheEntry::NonExistent => {
1708 self.metrics
1709 .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1710 return None;
1711 }
1712 }
1713 }
1714 self.metrics
1715 .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1716
1717 Self::with_locked_cache_entries(
1718 &self.dirty.objects,
1719 &self.cached.object_cache,
1720 &object_id,
1721 |dirty_entry, cached_entry| {
1722 check_cache_entry!("committed", dirty_entry);
1723 check_cache_entry!("uncommitted", cached_entry);
1724
1725 let latest: Option<(SequenceNumber, ObjectEntry)> =
1746 if let Some(dirty_set) = dirty_entry {
1747 dirty_set
1748 .get_highest()
1749 .cloned()
1750 .tap_none(|| panic!("dirty set cannot be empty"))
1751 } else {
1752 self.record_db_get("object_lt_or_eq_version_latest")
1754 .get_latest_object_or_tombstone(object_id)
1755 .expect("db error")
1756 .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1757 (version, ObjectEntry::from(obj_or_tombstone))
1758 })
1759 };
1760
1761 if let Some((obj_version, obj_entry)) = latest {
1762 self.cache_latest_object_by_id(
1770 &object_id,
1771 LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1772 self.object_by_id_cache.get_ticket_for_read(&object_id),
1775 );
1776
1777 if obj_version <= version_bound {
1778 match obj_entry {
1779 ObjectEntry::Object(object) => Some(object),
1780 ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
1781 }
1782 } else {
1783 self.record_db_get("object_lt_or_eq_version_scan")
1787 .find_object_lt_or_eq_version(object_id, version_bound)
1788 .expect("db error")
1789 }
1790
1791 } else if let Some(latest_cache_entry) = latest_cache_entry {
1797 assert!(!latest_cache_entry.lock().is_alive());
1799 None
1800 } else {
1801 let highest = cached_entry.and_then(|c| c.get_highest());
1803 assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1804 self.cache_object_not_found(
1805 &object_id,
1806 self.object_by_id_cache.get_ticket_for_read(&object_id),
1808 );
1809 None
1810 }
1811 },
1812 )
1813 }
1814
1815 fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1816 get_sui_system_state(self)
1817 }
1818
1819 fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge> {
1820 get_bridge(self)
1821 }
1822
1823 fn get_marker_value(
1824 &self,
1825 object_key: FullObjectKey,
1826 epoch_id: EpochId,
1827 ) -> Option<MarkerValue> {
1828 match self.get_marker_value_cache_only(object_key, epoch_id) {
1829 CacheResult::Hit(marker) => Some(marker),
1830 CacheResult::NegativeHit => None,
1831 CacheResult::Miss => self
1832 .record_db_get("marker_by_version")
1833 .get_marker_value(object_key, epoch_id)
1834 .expect("db error"),
1835 }
1836 }
1837
1838 fn get_latest_marker(
1839 &self,
1840 object_id: FullObjectID,
1841 epoch_id: EpochId,
1842 ) -> Option<(SequenceNumber, MarkerValue)> {
1843 match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1844 CacheResult::Hit((v, marker)) => Some((v, marker)),
1845 CacheResult::NegativeHit => {
1846 panic!("cannot have negative hit when getting latest marker")
1847 }
1848 CacheResult::Miss => self
1849 .record_db_get("marker_latest")
1850 .get_latest_marker(object_id, epoch_id)
1851 .expect("db error"),
1852 }
1853 }
1854
1855 fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult {
1856 let cur_epoch = epoch_store.epoch();
1857 match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1858 CacheResult::Hit((_, obj)) => {
1859 let actual_objref = obj.compute_object_reference();
1860 if obj_ref != actual_objref {
1861 Ok(ObjectLockStatus::LockedAtDifferentVersion {
1862 locked_ref: actual_objref,
1863 })
1864 } else {
1865 Ok(
1867 match self
1868 .object_locks
1869 .get_transaction_lock(&obj_ref, epoch_store)?
1870 {
1871 Some(tx_digest) => ObjectLockStatus::LockedToTx {
1872 locked_by_tx: LockDetailsDeprecated {
1873 epoch: cur_epoch,
1874 tx_digest,
1875 },
1876 },
1877 None => ObjectLockStatus::Initialized,
1878 },
1879 )
1880 }
1881 }
1882 CacheResult::NegativeHit => {
1883 Err(SuiError::from(UserInputError::ObjectNotFound {
1884 object_id: obj_ref.0,
1885 version: None,
1888 }))
1889 }
1890 CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1891 }
1892 }
1893
1894 fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef> {
1895 let obj = self.get_object_impl("live_objref", &object_id).ok_or(
1896 UserInputError::ObjectNotFound {
1897 object_id,
1898 version: None,
1899 },
1900 )?;
1901 Ok(obj.compute_object_reference())
1902 }
1903
1904 fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1905 do_fallback_lookup_fallible(
1906 owned_object_refs,
1907 |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1908 CacheResult::Hit((version, obj)) => {
1909 if obj.compute_object_reference() != *obj_ref {
1910 Err(UserInputError::ObjectVersionUnavailableForConsumption {
1911 provided_obj_ref: *obj_ref,
1912 current_version: version,
1913 }
1914 .into())
1915 } else {
1916 Ok(CacheResult::Hit(()))
1917 }
1918 }
1919 CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1920 object_id: obj_ref.0,
1921 version: None,
1922 }
1923 .into()),
1924 CacheResult::Miss => Ok(CacheResult::Miss),
1925 },
1926 |remaining| {
1927 self.record_db_multi_get("object_is_live", remaining.len())
1928 .check_owned_objects_are_live(remaining)?;
1929 Ok(vec![(); remaining.len()])
1930 },
1931 )?;
1932 Ok(())
1933 }
1934
1935 fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber> {
1936 self.store
1937 .perpetual_tables
1938 .get_highest_pruned_checkpoint()
1939 .expect("db error")
1940 }
1941
1942 fn notify_read_input_objects<'a>(
1943 &'a self,
1944 input_and_receiving_keys: &'a [InputKey],
1945 receiving_keys: &'a HashSet<InputKey>,
1946 epoch: EpochId,
1947 ) -> BoxFuture<'a, ()> {
1948 self.object_notify_read
1949 .read(
1950 "notify_read_input_objects",
1951 input_and_receiving_keys,
1952 move |keys| {
1953 self.multi_input_objects_available(keys, receiving_keys, epoch)
1954 .into_iter()
1955 .map(|available| if available { Some(()) } else { None })
1956 .collect::<Vec<_>>()
1957 },
1958 )
1959 .map(|_| ())
1960 .boxed()
1961 }
1962}
1963
1964impl TransactionCacheRead for WritebackCache {
1965 fn multi_get_transaction_blocks(
1966 &self,
1967 digests: &[TransactionDigest],
1968 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
1969 let digests_and_tickets: Vec<_> = digests
1970 .iter()
1971 .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1972 .collect();
1973 do_fallback_lookup(
1974 &digests_and_tickets,
1975 |(digest, _)| {
1976 self.metrics
1977 .record_cache_request("transaction_block", "uncommitted");
1978 if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1979 self.metrics
1980 .record_cache_hit("transaction_block", "uncommitted");
1981 return CacheResult::Hit(Some(tx.transaction.clone()));
1982 }
1983 self.metrics
1984 .record_cache_miss("transaction_block", "uncommitted");
1985
1986 self.metrics
1987 .record_cache_request("transaction_block", "committed");
1988
1989 match self
1990 .cached
1991 .transactions
1992 .get(digest)
1993 .map(|l| l.lock().clone())
1994 {
1995 Some(PointCacheItem::Some(tx)) => {
1996 self.metrics
1997 .record_cache_hit("transaction_block", "committed");
1998 CacheResult::Hit(Some(tx))
1999 }
2000 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2001 None => {
2002 self.metrics
2003 .record_cache_miss("transaction_block", "committed");
2004
2005 CacheResult::Miss
2006 }
2007 }
2008 },
2009 |remaining| {
2010 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2011 let results: Vec<_> = self
2012 .record_db_multi_get("transaction_block", remaining.len())
2013 .multi_get_transaction_blocks(&remaining_digests)
2014 .expect("db error")
2015 .into_iter()
2016 .map(|o| o.map(Arc::new))
2017 .collect();
2018 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2019 if result.is_none() {
2020 self.cached.transactions.insert(digest, None, *ticket).ok();
2021 }
2022 }
2023 results
2024 },
2025 )
2026 }
2027
2028 fn multi_get_executed_effects_digests(
2029 &self,
2030 digests: &[TransactionDigest],
2031 ) -> Vec<Option<TransactionEffectsDigest>> {
2032 let digests_and_tickets: Vec<_> = digests
2033 .iter()
2034 .map(|d| {
2035 (
2036 *d,
2037 self.cached.executed_effects_digests.get_ticket_for_read(d),
2038 )
2039 })
2040 .collect();
2041 do_fallback_lookup(
2042 &digests_and_tickets,
2043 |(digest, _)| {
2044 self.metrics
2045 .record_cache_request("executed_effects_digests", "uncommitted");
2046 if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
2047 self.metrics
2048 .record_cache_hit("executed_effects_digests", "uncommitted");
2049 return CacheResult::Hit(Some(*digest));
2050 }
2051 self.metrics
2052 .record_cache_miss("executed_effects_digests", "uncommitted");
2053
2054 self.metrics
2055 .record_cache_request("executed_effects_digests", "committed");
2056 match self
2057 .cached
2058 .executed_effects_digests
2059 .get(digest)
2060 .map(|l| *l.lock())
2061 {
2062 Some(PointCacheItem::Some(digest)) => {
2063 self.metrics
2064 .record_cache_hit("executed_effects_digests", "committed");
2065 CacheResult::Hit(Some(digest))
2066 }
2067 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2068 None => {
2069 self.metrics
2070 .record_cache_miss("executed_effects_digests", "committed");
2071 CacheResult::Miss
2072 }
2073 }
2074 },
2075 |remaining| {
2076 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2077 let results = self
2078 .record_db_multi_get("executed_effects_digests", remaining.len())
2079 .multi_get_executed_effects_digests(&remaining_digests)
2080 .expect("db error");
2081 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2082 if result.is_none() {
2083 self.cached
2084 .executed_effects_digests
2085 .insert(digest, None, *ticket)
2086 .ok();
2087 }
2088 }
2089 results
2090 },
2091 )
2092 }
2093
2094 fn multi_get_effects(
2095 &self,
2096 digests: &[TransactionEffectsDigest],
2097 ) -> Vec<Option<TransactionEffects>> {
2098 let digests_and_tickets: Vec<_> = digests
2099 .iter()
2100 .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
2101 .collect();
2102 do_fallback_lookup(
2103 &digests_and_tickets,
2104 |(digest, _)| {
2105 self.metrics
2106 .record_cache_request("transaction_effects", "uncommitted");
2107 if let Some(effects) = self.dirty.transaction_effects.get(digest) {
2108 self.metrics
2109 .record_cache_hit("transaction_effects", "uncommitted");
2110 return CacheResult::Hit(Some(effects.clone()));
2111 }
2112 self.metrics
2113 .record_cache_miss("transaction_effects", "uncommitted");
2114
2115 self.metrics
2116 .record_cache_request("transaction_effects", "committed");
2117 match self
2118 .cached
2119 .transaction_effects
2120 .get(digest)
2121 .map(|l| l.lock().clone())
2122 {
2123 Some(PointCacheItem::Some(effects)) => {
2124 self.metrics
2125 .record_cache_hit("transaction_effects", "committed");
2126 CacheResult::Hit(Some((*effects).clone()))
2127 }
2128 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2129 None => {
2130 self.metrics
2131 .record_cache_miss("transaction_effects", "committed");
2132 CacheResult::Miss
2133 }
2134 }
2135 },
2136 |remaining| {
2137 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2138 let results = self
2139 .record_db_multi_get("transaction_effects", remaining.len())
2140 .multi_get_effects(remaining_digests.iter())
2141 .expect("db error");
2142 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2143 if result.is_none() {
2144 self.cached
2145 .transaction_effects
2146 .insert(digest, None, *ticket)
2147 .ok();
2148 }
2149 }
2150 results
2151 },
2152 )
2153 }
2154
2155 fn transaction_executed_in_last_epoch(
2156 &self,
2157 digest: &TransactionDigest,
2158 current_epoch: EpochId,
2159 ) -> bool {
2160 if current_epoch == 0 {
2161 return false;
2162 }
2163 let last_epoch = current_epoch - 1;
2164 let cache_key = (last_epoch, *digest);
2165
2166 let ticket = self
2167 .cached
2168 .transaction_executed_in_last_epoch
2169 .get_ticket_for_read(&cache_key);
2170
2171 if let Some(cached) = self
2172 .cached
2173 .transaction_executed_in_last_epoch
2174 .get(&cache_key)
2175 {
2176 return cached.lock().is_some();
2177 }
2178
2179 let was_executed = self
2180 .store
2181 .perpetual_tables
2182 .was_transaction_executed_in_last_epoch(digest, current_epoch);
2183
2184 let value = if was_executed { Some(()) } else { None };
2185 self.cached
2186 .transaction_executed_in_last_epoch
2187 .insert(&cache_key, value, ticket)
2188 .ok();
2189
2190 was_executed
2191 }
2192
2193 fn notify_read_executed_effects_digests<'a>(
2194 &'a self,
2195 task_name: &'static str,
2196 digests: &'a [TransactionDigest],
2197 ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>> {
2198 self.executed_effects_digests_notify_read
2199 .read(task_name, digests, |digests| {
2200 self.multi_get_executed_effects_digests(digests)
2201 })
2202 .boxed()
2203 }
2204
2205 fn multi_get_events(
2206 &self,
2207 event_digests: &[TransactionDigest],
2208 ) -> Vec<Option<TransactionEvents>> {
2209 fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
2210 if events.data.is_empty() {
2211 None
2212 } else {
2213 Some(events)
2214 }
2215 }
2216
2217 let digests_and_tickets: Vec<_> = event_digests
2218 .iter()
2219 .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2220 .collect();
2221 do_fallback_lookup(
2222 &digests_and_tickets,
2223 |(digest, _)| {
2224 self.metrics
2225 .record_cache_request("transaction_events", "uncommitted");
2226 if let Some(events) = self.dirty.transaction_events.get(digest).map(|e| e.clone()) {
2227 self.metrics
2228 .record_cache_hit("transaction_events", "uncommitted");
2229
2230 return CacheResult::Hit(map_events(events));
2231 }
2232 self.metrics
2233 .record_cache_miss("transaction_events", "uncommitted");
2234
2235 self.metrics
2236 .record_cache_request("transaction_events", "committed");
2237 match self
2238 .cached
2239 .transaction_events
2240 .get(digest)
2241 .map(|l| l.lock().clone())
2242 {
2243 Some(PointCacheItem::Some(events)) => {
2244 self.metrics
2245 .record_cache_hit("transaction_events", "committed");
2246 CacheResult::Hit(map_events((*events).clone()))
2247 }
2248 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2249 None => {
2250 self.metrics
2251 .record_cache_miss("transaction_events", "committed");
2252
2253 CacheResult::Miss
2254 }
2255 }
2256 },
2257 |remaining| {
2258 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2259 let results = self
2260 .store
2261 .multi_get_events(&remaining_digests)
2262 .expect("db error");
2263 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2264 if result.is_none() {
2265 self.cached
2266 .transaction_events
2267 .insert(digest, None, *ticket)
2268 .ok();
2269 }
2270 }
2271 results
2272 },
2273 )
2274 }
2275
2276 fn get_unchanged_loaded_runtime_objects(
2277 &self,
2278 digest: &TransactionDigest,
2279 ) -> Option<Vec<ObjectKey>> {
2280 self.dirty
2281 .unchanged_loaded_runtime_objects
2282 .get(digest)
2283 .map(|b| b.clone())
2284 .or_else(|| {
2285 self.store
2286 .get_unchanged_loaded_runtime_objects(digest)
2287 .expect("db error")
2288 })
2289 }
2290
2291 fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
2292 self.dirty
2293 .pending_transaction_writes
2294 .get(digest)
2295 .map(|transaction_output| transaction_output.take_accumulator_events())
2296 }
2297}
2298
2299impl ExecutionCacheWrite for WritebackCache {
2300 fn validate_owned_object_versions(&self, owned_input_objects: &[ObjectRef]) -> SuiResult {
2301 ObjectLocks::validate_owned_object_versions(self, owned_input_objects)
2302 }
2303
2304 fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
2305 WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs);
2306 }
2307
2308 #[cfg(test)]
2309 fn write_object_entry_for_test(&self, object: Object) {
2310 self.write_object_entry(&object.id(), object.version(), object.into());
2311 }
2312}
2313
2314implement_passthrough_traits!(WritebackCache);
2315
2316impl GlobalStateHashStore for WritebackCache {
2317 fn get_object_ref_prior_to_key_deprecated(
2318 &self,
2319 object_id: &ObjectID,
2320 version: SequenceNumber,
2321 ) -> SuiResult<Option<ObjectRef>> {
2322 let mut candidates = Vec::new();
2326
2327 let check_versions =
2328 |versions: &CachedVersionMap<ObjectEntry>| match versions.get_prior_to(&version) {
2329 Some((version, object_entry)) => match object_entry {
2330 ObjectEntry::Object(object) => {
2331 assert_eq!(object.version(), version);
2332 Some(object.compute_object_reference())
2333 }
2334 ObjectEntry::Deleted => {
2335 Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED))
2336 }
2337 ObjectEntry::Wrapped => {
2338 Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED))
2339 }
2340 },
2341 None => None,
2342 };
2343
2344 if let Some(objects) = self.dirty.objects.get(object_id)
2346 && let Some(prior) = check_versions(&objects)
2347 {
2348 candidates.push(prior);
2349 }
2350
2351 if let Some(objects) = self.cached.object_cache.get(object_id)
2352 && let Some(prior) = check_versions(&objects.lock())
2353 {
2354 candidates.push(prior);
2355 }
2356
2357 if let Some(prior) = self
2358 .store
2359 .get_object_ref_prior_to_key_deprecated(object_id, version)?
2360 {
2361 candidates.push(prior);
2362 }
2363
2364 candidates.sort_by_key(|(_, version, _)| *version);
2366 Ok(candidates.pop())
2367 }
2368
2369 fn get_root_state_hash_for_epoch(
2370 &self,
2371 epoch: EpochId,
2372 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
2373 self.store.get_root_state_hash_for_epoch(epoch)
2374 }
2375
2376 fn get_root_state_hash_for_highest_epoch(
2377 &self,
2378 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
2379 self.store.get_root_state_hash_for_highest_epoch()
2380 }
2381
2382 fn insert_state_hash_for_epoch(
2383 &self,
2384 epoch: EpochId,
2385 checkpoint_seq_num: &CheckpointSequenceNumber,
2386 acc: &GlobalStateHash,
2387 ) -> SuiResult {
2388 self.store
2389 .insert_state_hash_for_epoch(epoch, checkpoint_seq_num, acc)
2390 }
2391
2392 fn iter_live_object_set(
2393 &self,
2394 include_wrapped_tombstone: bool,
2395 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2396 assert!(
2400 self.dirty.is_empty(),
2401 "cannot iterate live object set with dirty data"
2402 );
2403 self.store.iter_live_object_set(include_wrapped_tombstone)
2404 }
2405
2406 fn iter_cached_live_object_set_for_testing(
2410 &self,
2411 include_wrapped_tombstone: bool,
2412 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2413 let iter = self.dirty.objects.iter();
2415 let mut dirty_objects = BTreeMap::new();
2416
2417 for obj in self.store.iter_live_object_set(include_wrapped_tombstone) {
2419 dirty_objects.insert(obj.object_id(), obj);
2420 }
2421
2422 for entry in iter {
2424 let id = *entry.key();
2425 let value = entry.value();
2426 match value.get_highest().unwrap() {
2427 (_, ObjectEntry::Object(object)) => {
2428 dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2429 }
2430 (version, ObjectEntry::Wrapped) => {
2431 if include_wrapped_tombstone {
2432 dirty_objects.insert(id, LiveObject::Wrapped(ObjectKey(id, *version)));
2433 } else {
2434 dirty_objects.remove(&id);
2435 }
2436 }
2437 (_, ObjectEntry::Deleted) => {
2438 dirty_objects.remove(&id);
2439 }
2440 }
2441 }
2442
2443 Box::new(dirty_objects.into_values())
2444 }
2445}
2446
2447impl StateSyncAPI for WritebackCache {
2453 fn insert_transaction_and_effects(
2454 &self,
2455 transaction: &VerifiedTransaction,
2456 transaction_effects: &TransactionEffects,
2457 ) {
2458 self.store
2459 .insert_transaction_and_effects(transaction, transaction_effects)
2460 .expect("db error");
2461 self.cached
2462 .transactions
2463 .insert(
2464 transaction.digest(),
2465 PointCacheItem::Some(Arc::new(transaction.clone())),
2466 Ticket::Write,
2467 )
2468 .ok();
2469 self.cached
2470 .transaction_effects
2471 .insert(
2472 &transaction_effects.digest(),
2473 PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2474 Ticket::Write,
2475 )
2476 .ok();
2477 }
2478
2479 fn multi_insert_transaction_and_effects(
2480 &self,
2481 transactions_and_effects: &[VerifiedExecutionData],
2482 ) {
2483 self.store
2484 .multi_insert_transaction_and_effects(transactions_and_effects.iter())
2485 .expect("db error");
2486 for VerifiedExecutionData {
2487 transaction,
2488 effects,
2489 } in transactions_and_effects
2490 {
2491 self.cached
2492 .transactions
2493 .insert(
2494 transaction.digest(),
2495 PointCacheItem::Some(Arc::new(transaction.clone())),
2496 Ticket::Write,
2497 )
2498 .ok();
2499 self.cached
2500 .transaction_effects
2501 .insert(
2502 &effects.digest(),
2503 PointCacheItem::Some(Arc::new(effects.clone())),
2504 Ticket::Write,
2505 )
2506 .ok();
2507 }
2508 }
2509}