1use crate::accumulators::funds_read::AccountFundsRead;
41use crate::authority::AuthorityStore;
42use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
43use crate::authority::authority_store::{
44 ExecutionLockWriteGuard, LockDetailsDeprecated, ObjectLockStatus, SuiLockResult,
45};
46use crate::authority::authority_store_tables::LiveObject;
47use crate::authority::backpressure::BackpressureManager;
48use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
49use crate::fallback_fetch::{do_fallback_lookup, do_fallback_lookup_fallible};
50use crate::global_state_hasher::GlobalStateHashStore;
51use crate::transaction_outputs::TransactionOutputs;
52
53use dashmap::DashMap;
54use dashmap::mapref::entry::Entry as DashMapEntry;
55use futures::{FutureExt, future::BoxFuture};
56use moka::sync::SegmentedCache as MokaCache;
57use mysten_common::debug_fatal;
58use mysten_common::random_util::randomize_cache_capacity_in_tests;
59use mysten_common::sync::notify_read::NotifyRead;
60use parking_lot::Mutex;
61use std::collections::{BTreeMap, HashSet};
62use std::hash::Hash;
63use std::sync::Arc;
64use std::sync::atomic::AtomicU64;
65use sui_config::ExecutionCacheConfig;
66use sui_macros::fail_point;
67use sui_protocol_config::ProtocolVersion;
68use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
69use sui_types::accumulator_event::AccumulatorEvent;
70use sui_types::accumulator_root::{AccumulatorObjId, AccumulatorValue};
71use sui_types::base_types::{
72 EpochId, FullObjectID, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData,
73};
74use sui_types::bridge::{Bridge, get_bridge};
75use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest};
76use sui_types::effects::{TransactionEffects, TransactionEvents};
77use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
78use sui_types::executable_transaction::VerifiedExecutableTransaction;
79use sui_types::global_state_hash::GlobalStateHash;
80use sui_types::message_envelope::Message;
81use sui_types::messages_checkpoint::CheckpointSequenceNumber;
82use sui_types::object::Object;
83use sui_types::storage::{
84 FullObjectKey, InputKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject,
85};
86use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
87use sui_types::transaction::{TransactionDataAPI, VerifiedTransaction};
88use tap::TapOptional;
89use tracing::{debug, info, instrument, trace, warn};
90
91use super::ExecutionCacheAPI;
92use super::cache_types::Ticket;
93use super::{
94 Batch, CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
95 ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
96 cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache},
97 implement_passthrough_traits,
98 object_locks::ObjectLocks,
99};
100
101#[cfg(test)]
102#[path = "unit_tests/writeback_cache_tests.rs"]
103pub mod writeback_cache_tests;
104
105#[cfg(test)]
106#[path = "unit_tests/notify_read_input_objects_tests.rs"]
107mod notify_read_input_objects_tests;
108
109#[derive(Clone, PartialEq, Eq)]
110enum ObjectEntry {
111 Object(Object),
112 Deleted,
113 Wrapped,
114}
115
116impl ObjectEntry {
117 #[cfg(test)]
118 fn unwrap_object(&self) -> &Object {
119 match self {
120 ObjectEntry::Object(o) => o,
121 _ => panic!("unwrap_object called on non-Object"),
122 }
123 }
124
125 fn is_tombstone(&self) -> bool {
126 match self {
127 ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
128 ObjectEntry::Object(_) => false,
129 }
130 }
131}
132
133impl std::fmt::Debug for ObjectEntry {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 match self {
136 ObjectEntry::Object(o) => {
137 write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
138 }
139 ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
140 ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
141 }
142 }
143}
144
145impl From<Object> for ObjectEntry {
146 fn from(object: Object) -> Self {
147 ObjectEntry::Object(object)
148 }
149}
150
151impl From<ObjectOrTombstone> for ObjectEntry {
152 fn from(object: ObjectOrTombstone) -> Self {
153 match object {
154 ObjectOrTombstone::Object(o) => o.into(),
155 ObjectOrTombstone::Tombstone(obj_ref) => {
156 if obj_ref.2.is_deleted() {
157 ObjectEntry::Deleted
158 } else if obj_ref.2.is_wrapped() {
159 ObjectEntry::Wrapped
160 } else {
161 panic!("tombstone digest must either be deleted or wrapped");
162 }
163 }
164 }
165 }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169enum LatestObjectCacheEntry {
170 Object(SequenceNumber, ObjectEntry),
171 NonExistent,
172}
173
174impl LatestObjectCacheEntry {
175 #[cfg(test)]
176 fn version(&self) -> Option<SequenceNumber> {
177 match self {
178 LatestObjectCacheEntry::Object(version, _) => Some(*version),
179 LatestObjectCacheEntry::NonExistent => None,
180 }
181 }
182
183 fn is_alive(&self) -> bool {
184 match self {
185 LatestObjectCacheEntry::Object(_, entry) => !entry.is_tombstone(),
186 LatestObjectCacheEntry::NonExistent => false,
187 }
188 }
189}
190
191impl IsNewer for LatestObjectCacheEntry {
192 fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
193 match (self, other) {
194 (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
195 v1 > v2
196 }
197 (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
198 _ => false,
199 }
200 }
201}
202
203type MarkerKey = (EpochId, FullObjectID);
204
205struct UncommittedData {
208 objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
221
222 markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
227
228 transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
229
230 transaction_events: DashMap<TransactionDigest, TransactionEvents>,
231
232 unchanged_loaded_runtime_objects: DashMap<TransactionDigest, Vec<ObjectKey>>,
233
234 executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
235
236 pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
239
240 total_transaction_inserts: AtomicU64,
241 total_transaction_commits: AtomicU64,
242}
243
244impl UncommittedData {
245 fn new() -> Self {
246 Self {
247 objects: DashMap::with_shard_amount(2048),
248 markers: DashMap::with_shard_amount(2048),
249 transaction_effects: DashMap::with_shard_amount(2048),
250 executed_effects_digests: DashMap::with_shard_amount(2048),
251 pending_transaction_writes: DashMap::with_shard_amount(2048),
252 transaction_events: DashMap::with_shard_amount(2048),
253 unchanged_loaded_runtime_objects: DashMap::with_shard_amount(2048),
254 total_transaction_inserts: AtomicU64::new(0),
255 total_transaction_commits: AtomicU64::new(0),
256 }
257 }
258
259 fn clear(&self) {
260 self.objects.clear();
261 self.markers.clear();
262 self.transaction_effects.clear();
263 self.executed_effects_digests.clear();
264 self.pending_transaction_writes.clear();
265 self.transaction_events.clear();
266 self.unchanged_loaded_runtime_objects.clear();
267 self.total_transaction_inserts
268 .store(0, std::sync::atomic::Ordering::Relaxed);
269 self.total_transaction_commits
270 .store(0, std::sync::atomic::Ordering::Relaxed);
271 }
272
273 fn is_empty(&self) -> bool {
274 let empty = self.pending_transaction_writes.is_empty();
275 if empty && cfg!(debug_assertions) {
276 assert!(
277 self.objects.is_empty()
278 && self.markers.is_empty()
279 && self.transaction_effects.is_empty()
280 && self.executed_effects_digests.is_empty()
281 && self.transaction_events.is_empty()
282 && self.unchanged_loaded_runtime_objects.is_empty()
283 && self
284 .total_transaction_inserts
285 .load(std::sync::atomic::Ordering::Relaxed)
286 == self
287 .total_transaction_commits
288 .load(std::sync::atomic::Ordering::Relaxed),
289 );
290 }
291 empty
292 }
293}
294
295type PointCacheItem<T> = Option<T>;
297
298impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
301 fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
302 match (self, other) {
303 (Some(_), None) => true,
304
305 (Some(a), Some(b)) => {
306 debug_assert_eq!(a, b);
308 false
309 }
310
311 _ => false,
312 }
313 }
314}
315
316struct CachedCommittedData {
318 object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
320
321 marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
323
324 transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
325
326 transaction_effects:
327 MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
328
329 transaction_events: MonotonicCache<TransactionDigest, PointCacheItem<Arc<TransactionEvents>>>,
330
331 executed_effects_digests:
332 MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
333
334 transaction_executed_in_last_epoch:
335 MonotonicCache<(EpochId, TransactionDigest), PointCacheItem<()>>,
336
337 _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
340}
341
342impl CachedCommittedData {
343 fn new(config: &ExecutionCacheConfig) -> Self {
344 let object_cache = MokaCache::builder(8)
345 .max_capacity(randomize_cache_capacity_in_tests(
346 config.object_cache_size(),
347 ))
348 .build();
349 let marker_cache = MokaCache::builder(8)
350 .max_capacity(randomize_cache_capacity_in_tests(
351 config.marker_cache_size(),
352 ))
353 .build();
354
355 let transactions = MonotonicCache::new(randomize_cache_capacity_in_tests(
356 config.transaction_cache_size(),
357 ));
358 let transaction_effects = MonotonicCache::new(randomize_cache_capacity_in_tests(
359 config.effect_cache_size(),
360 ));
361 let transaction_events = MonotonicCache::new(randomize_cache_capacity_in_tests(
362 config.events_cache_size(),
363 ));
364 let executed_effects_digests = MonotonicCache::new(randomize_cache_capacity_in_tests(
365 config.executed_effect_cache_size(),
366 ));
367
368 let transaction_objects = MokaCache::builder(8)
369 .max_capacity(randomize_cache_capacity_in_tests(
370 config.transaction_objects_cache_size(),
371 ))
372 .build();
373
374 let transaction_executed_in_last_epoch = MonotonicCache::new(
375 randomize_cache_capacity_in_tests(config.executed_effect_cache_size()),
376 );
377
378 Self {
379 object_cache,
380 marker_cache,
381 transactions,
382 transaction_effects,
383 transaction_events,
384 executed_effects_digests,
385 transaction_executed_in_last_epoch,
386 _transaction_objects: transaction_objects,
387 }
388 }
389
390 fn clear_and_assert_empty(&self) {
391 self.object_cache.invalidate_all();
392 self.marker_cache.invalidate_all();
393 self.transactions.invalidate_all();
394 self.transaction_effects.invalidate_all();
395 self.transaction_events.invalidate_all();
396 self.executed_effects_digests.invalidate_all();
397 self.transaction_executed_in_last_epoch.invalidate_all();
398 self._transaction_objects.invalidate_all();
399
400 assert_empty(&self.object_cache);
401 assert_empty(&self.marker_cache);
402 assert!(self.transactions.is_empty());
403 assert!(self.transaction_effects.is_empty());
404 assert!(self.transaction_events.is_empty());
405 assert!(self.executed_effects_digests.is_empty());
406 assert!(self.transaction_executed_in_last_epoch.is_empty());
407 assert_empty(&self._transaction_objects);
408 }
409}
410
411fn assert_empty<K, V>(cache: &MokaCache<K, V>)
412where
413 K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
414 V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
415{
416 if cache.iter().next().is_some() {
417 panic!("cache should be empty");
418 }
419}
420
421pub struct WritebackCache {
422 dirty: UncommittedData,
423 cached: CachedCommittedData,
424
425 object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
432
433 packages: MokaCache<ObjectID, PackageObject>,
444
445 object_locks: ObjectLocks,
446
447 executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
448 object_notify_read: NotifyRead<InputKey, ()>,
449
450 store: Arc<AuthorityStore>,
451 backpressure_threshold: u64,
452 backpressure_manager: Arc<BackpressureManager>,
453 metrics: Arc<ExecutionCacheMetrics>,
454}
455
456macro_rules! check_cache_entry_by_version {
457 ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
458 $self.metrics.record_cache_request($table, $level);
459 if let Some(cache) = $cache {
460 if let Some(entry) = cache.get(&$version) {
461 $self.metrics.record_cache_hit($table, $level);
462 return CacheResult::Hit(entry.clone());
463 }
464
465 if let Some(least_version) = cache.get_least() {
466 if least_version.0 < $version {
467 $self.metrics.record_cache_negative_hit($table, $level);
470 return CacheResult::NegativeHit;
471 }
472 }
473 }
474 $self.metrics.record_cache_miss($table, $level);
475 };
476}
477
478macro_rules! check_cache_entry_by_latest {
479 ($self: ident, $table: expr, $level: expr, $cache: expr) => {
480 $self.metrics.record_cache_request($table, $level);
481 if let Some(cache) = $cache {
482 if let Some((version, entry)) = cache.get_highest() {
483 $self.metrics.record_cache_hit($table, $level);
484 return CacheResult::Hit((*version, entry.clone()));
485 } else {
486 panic!("empty CachedVersionMap should have been removed");
487 }
488 }
489 $self.metrics.record_cache_miss($table, $level);
490 };
491}
492
493impl WritebackCache {
494 pub fn new(
495 config: &ExecutionCacheConfig,
496 store: Arc<AuthorityStore>,
497 metrics: Arc<ExecutionCacheMetrics>,
498 backpressure_manager: Arc<BackpressureManager>,
499 ) -> Self {
500 let packages = MokaCache::builder(8)
501 .max_capacity(randomize_cache_capacity_in_tests(
502 config.package_cache_size(),
503 ))
504 .build();
505 Self {
506 dirty: UncommittedData::new(),
507 cached: CachedCommittedData::new(config),
508 object_by_id_cache: MonotonicCache::new(randomize_cache_capacity_in_tests(
509 config.object_by_id_cache_size(),
510 )),
511 packages,
512 object_locks: ObjectLocks::new(),
513 executed_effects_digests_notify_read: NotifyRead::new(),
514 object_notify_read: NotifyRead::new(),
515 store,
516 backpressure_manager,
517 backpressure_threshold: config.backpressure_threshold(),
518 metrics,
519 }
520 }
521
522 pub fn new_for_tests(store: Arc<AuthorityStore>) -> Self {
523 Self::new(
524 &Default::default(),
525 store,
526 ExecutionCacheMetrics::new(&prometheus::Registry::new()).into(),
527 BackpressureManager::new_for_tests(),
528 )
529 }
530
531 #[cfg(test)]
532 pub fn reset_for_test(&mut self) {
533 let mut new = Self::new(
534 &Default::default(),
535 self.store.clone(),
536 self.metrics.clone(),
537 self.backpressure_manager.clone(),
538 );
539 std::mem::swap(self, &mut new);
540 }
541
542 pub fn evict_executed_effects_from_cache_for_testing(&self, tx_digest: &TransactionDigest) {
543 self.cached.executed_effects_digests.invalidate(tx_digest);
544 self.cached.transaction_events.invalidate(tx_digest);
545 self.cached.transactions.invalidate(tx_digest);
546 }
547
548 fn write_object_entry(
549 &self,
550 object_id: &ObjectID,
551 version: SequenceNumber,
552 object: ObjectEntry,
553 ) {
554 trace!(?object_id, ?version, ?object, "inserting object entry");
555 self.metrics.record_cache_write("object");
556
557 let mut entry = self.dirty.objects.entry(*object_id).or_default();
577
578 self.object_by_id_cache
579 .insert(
580 object_id,
581 LatestObjectCacheEntry::Object(version, object.clone()),
582 Ticket::Write,
583 )
584 .ok();
587
588 entry.insert(version, object.clone());
589
590 if let ObjectEntry::Object(object) = &object {
591 if object.is_package() {
592 self.object_notify_read
593 .notify(&InputKey::Package { id: *object_id }, &());
594 } else if !object.is_child_object() {
595 self.object_notify_read.notify(
596 &InputKey::VersionedObject {
597 id: object.full_id(),
598 version: object.version(),
599 },
600 &(),
601 );
602 }
603 }
604 }
605
606 fn write_marker_value(
607 &self,
608 epoch_id: EpochId,
609 object_key: FullObjectKey,
610 marker_value: MarkerValue,
611 ) {
612 tracing::trace!("inserting marker value {object_key:?}: {marker_value:?}",);
613 self.metrics.record_cache_write("marker");
614 self.dirty
615 .markers
616 .entry((epoch_id, object_key.id()))
617 .or_default()
618 .value_mut()
619 .insert(object_key.version(), marker_value);
620 if matches!(marker_value, MarkerValue::ConsensusStreamEnded(_)) {
625 self.object_notify_read.notify(
626 &InputKey::VersionedObject {
627 id: object_key.id(),
628 version: object_key.version(),
629 },
630 &(),
631 );
632 }
633 }
634
635 fn with_locked_cache_entries<K, V, R>(
639 dirty_map: &DashMap<K, CachedVersionMap<V>>,
640 cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
641 key: &K,
642 cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
643 ) -> R
644 where
645 K: Copy + Eq + Hash + Send + Sync + 'static,
646 V: Send + Sync + 'static,
647 {
648 let dirty_entry = dirty_map.entry(*key);
649 let dirty_entry = match &dirty_entry {
650 DashMapEntry::Occupied(occupied) => Some(occupied.get()),
651 DashMapEntry::Vacant(_) => None,
652 };
653
654 let cached_entry = cached_map.get(key);
655 let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
656 let cached_entry = cached_lock.as_deref();
657
658 cb(dirty_entry, cached_entry)
659 }
660
661 fn get_object_entry_by_key_cache_only(
664 &self,
665 object_id: &ObjectID,
666 version: SequenceNumber,
667 ) -> CacheResult<ObjectEntry> {
668 Self::with_locked_cache_entries(
669 &self.dirty.objects,
670 &self.cached.object_cache,
671 object_id,
672 |dirty_entry, cached_entry| {
673 check_cache_entry_by_version!(
674 self,
675 "object_by_version",
676 "uncommitted",
677 dirty_entry,
678 version
679 );
680 check_cache_entry_by_version!(
681 self,
682 "object_by_version",
683 "committed",
684 cached_entry,
685 version
686 );
687 CacheResult::Miss
688 },
689 )
690 }
691
692 fn get_object_by_key_cache_only(
693 &self,
694 object_id: &ObjectID,
695 version: SequenceNumber,
696 ) -> CacheResult<Object> {
697 match self.get_object_entry_by_key_cache_only(object_id, version) {
698 CacheResult::Hit(entry) => match entry {
699 ObjectEntry::Object(object) => CacheResult::Hit(object),
700 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
701 },
702 CacheResult::Miss => CacheResult::Miss,
703 CacheResult::NegativeHit => CacheResult::NegativeHit,
704 }
705 }
706
707 fn get_object_entry_by_id_cache_only(
708 &self,
709 request_type: &'static str,
710 object_id: &ObjectID,
711 ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
712 self.metrics
713 .record_cache_request(request_type, "object_by_id");
714 let entry = self.object_by_id_cache.get(object_id);
715
716 if cfg!(debug_assertions)
717 && let Some(entry) = &entry
718 {
719 let highest: Option<ObjectEntry> = self
721 .dirty
722 .objects
723 .get(object_id)
724 .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
725 .or_else(|| {
726 let obj: Option<ObjectEntry> = self
727 .store
728 .get_latest_object_or_tombstone(*object_id)
729 .unwrap()
730 .map(|(_, o)| o.into());
731 obj
732 });
733
734 let cache_entry = match &*entry.lock() {
735 LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
736 LatestObjectCacheEntry::NonExistent => None,
737 };
738
739 let tombstone_possibly_pruned = highest.is_none()
741 && cache_entry
742 .as_ref()
743 .map(|e| e.is_tombstone())
744 .unwrap_or(false);
745
746 if highest != cache_entry && !tombstone_possibly_pruned {
747 tracing::error!(
748 ?highest,
749 ?cache_entry,
750 ?tombstone_possibly_pruned,
751 "object_by_id cache is incoherent for {:?}",
752 object_id
753 );
754 panic!("object_by_id cache is incoherent for {:?}", object_id);
755 }
756 }
757
758 if let Some(entry) = entry {
759 let entry = entry.lock();
760 match &*entry {
761 LatestObjectCacheEntry::Object(latest_version, latest_object) => {
762 self.metrics.record_cache_hit(request_type, "object_by_id");
763 return CacheResult::Hit((*latest_version, latest_object.clone()));
764 }
765 LatestObjectCacheEntry::NonExistent => {
766 self.metrics
767 .record_cache_negative_hit(request_type, "object_by_id");
768 return CacheResult::NegativeHit;
769 }
770 }
771 } else {
772 self.metrics.record_cache_miss(request_type, "object_by_id");
773 }
774
775 Self::with_locked_cache_entries(
776 &self.dirty.objects,
777 &self.cached.object_cache,
778 object_id,
779 |dirty_entry, cached_entry| {
780 check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
781 check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
782 CacheResult::Miss
783 },
784 )
785 }
786
787 fn get_object_by_id_cache_only(
788 &self,
789 request_type: &'static str,
790 object_id: &ObjectID,
791 ) -> CacheResult<(SequenceNumber, Object)> {
792 match self.get_object_entry_by_id_cache_only(request_type, object_id) {
793 CacheResult::Hit((version, entry)) => match entry {
794 ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
795 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
796 },
797 CacheResult::NegativeHit => CacheResult::NegativeHit,
798 CacheResult::Miss => CacheResult::Miss,
799 }
800 }
801
802 fn get_marker_value_cache_only(
803 &self,
804 object_key: FullObjectKey,
805 epoch_id: EpochId,
806 ) -> CacheResult<MarkerValue> {
807 Self::with_locked_cache_entries(
808 &self.dirty.markers,
809 &self.cached.marker_cache,
810 &(epoch_id, object_key.id()),
811 |dirty_entry, cached_entry| {
812 check_cache_entry_by_version!(
813 self,
814 "marker_by_version",
815 "uncommitted",
816 dirty_entry,
817 object_key.version()
818 );
819 check_cache_entry_by_version!(
820 self,
821 "marker_by_version",
822 "committed",
823 cached_entry,
824 object_key.version()
825 );
826 CacheResult::Miss
827 },
828 )
829 }
830
831 fn get_latest_marker_value_cache_only(
832 &self,
833 object_id: FullObjectID,
834 epoch_id: EpochId,
835 ) -> CacheResult<(SequenceNumber, MarkerValue)> {
836 Self::with_locked_cache_entries(
837 &self.dirty.markers,
838 &self.cached.marker_cache,
839 &(epoch_id, object_id),
840 |dirty_entry, cached_entry| {
841 check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
842 check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
843 CacheResult::Miss
844 },
845 )
846 }
847
848 fn get_object_impl(&self, request_type: &'static str, id: &ObjectID) -> Option<Object> {
849 let ticket = self.object_by_id_cache.get_ticket_for_read(id);
850 match self.get_object_entry_by_id_cache_only(request_type, id) {
851 CacheResult::Hit((_, entry)) => match entry {
852 ObjectEntry::Object(object) => Some(object),
853 ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
854 },
855 CacheResult::NegativeHit => None,
856 CacheResult::Miss => {
857 let obj = self
858 .store
859 .get_latest_object_or_tombstone(*id)
860 .expect("db error");
861 match obj {
862 Some((key, obj)) => {
863 self.cache_latest_object_by_id(
864 id,
865 LatestObjectCacheEntry::Object(key.1, obj.clone().into()),
866 ticket,
867 );
868 match obj {
869 ObjectOrTombstone::Object(object) => Some(object),
870 ObjectOrTombstone::Tombstone(_) => None,
871 }
872 }
873 None => {
874 self.cache_object_not_found(id, ticket);
875 None
876 }
877 }
878 }
879 }
880 }
881
882 fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
883 self.metrics.record_cache_request(request_type, "db");
884 &self.store
885 }
886
887 fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
888 self.metrics
889 .record_cache_multi_request(request_type, "db", count);
890 &self.store
891 }
892
893 #[instrument(level = "debug", skip_all)]
894 fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
895 let tx_digest = *tx_outputs.transaction.digest();
896 trace!(?tx_digest, "writing transaction outputs to cache");
897
898 assert!(
899 !self.transaction_executed_in_last_epoch(&tx_digest, epoch_id),
900 "Transaction {:?} was already executed in epoch {}",
901 tx_digest,
902 epoch_id.saturating_sub(1)
903 );
904
905 let TransactionOutputs {
906 transaction,
907 effects,
908 markers,
909 written,
910 deleted,
911 wrapped,
912 events,
913 unchanged_loaded_runtime_objects,
914 ..
915 } = &*tx_outputs;
916
917 for ObjectKey(id, version) in deleted.iter() {
922 self.write_object_entry(id, *version, ObjectEntry::Deleted);
923 }
924
925 for ObjectKey(id, version) in wrapped.iter() {
926 self.write_object_entry(id, *version, ObjectEntry::Wrapped);
927 }
928
929 for (object_key, marker_value) in markers.iter() {
931 self.write_marker_value(epoch_id, *object_key, *marker_value);
932 }
933
934 for (object_id, object) in written.iter() {
937 if object.is_child_object() {
938 self.write_object_entry(object_id, object.version(), object.clone().into());
939 }
940 }
941 for (object_id, object) in written.iter() {
942 if !object.is_child_object() {
943 self.write_object_entry(object_id, object.version(), object.clone().into());
944 if object.is_package() {
945 debug!("caching package: {:?}", object.compute_object_reference());
946 self.packages
947 .insert(*object_id, PackageObject::new(object.clone()));
948 }
949 }
950 }
951
952 let tx_digest = *transaction.digest();
953 debug!(
954 ?tx_digest,
955 "Writing transaction output objects to cache: {:?}",
956 written
957 .values()
958 .map(|o| (o.id(), o.version()))
959 .collect::<Vec<_>>(),
960 );
961 let effects_digest = effects.digest();
962
963 self.metrics.record_cache_write("transaction_block");
964 self.dirty
965 .pending_transaction_writes
966 .insert(tx_digest, tx_outputs.clone());
967
968 self.metrics.record_cache_write("transaction_effects");
971 self.dirty
972 .transaction_effects
973 .insert(effects_digest, effects.clone());
974
975 self.metrics.record_cache_write("transaction_events");
979 self.dirty
980 .transaction_events
981 .insert(tx_digest, events.clone());
982
983 self.metrics
984 .record_cache_write("unchanged_loaded_runtime_objects");
985 self.dirty
986 .unchanged_loaded_runtime_objects
987 .insert(tx_digest, unchanged_loaded_runtime_objects.clone());
988
989 self.metrics.record_cache_write("executed_effects_digests");
990 self.dirty
991 .executed_effects_digests
992 .insert(tx_digest, effects_digest);
993
994 self.executed_effects_digests_notify_read
995 .notify(&tx_digest, &effects_digest);
996
997 self.metrics
998 .pending_notify_read
999 .set(self.executed_effects_digests_notify_read.num_pending() as i64);
1000
1001 let prev = self
1002 .dirty
1003 .total_transaction_inserts
1004 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1005
1006 let pending_count = (prev + 1).saturating_sub(
1007 self.dirty
1008 .total_transaction_commits
1009 .load(std::sync::atomic::Ordering::Relaxed),
1010 );
1011
1012 self.set_backpressure(pending_count);
1013 }
1014
1015 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1016 let _metrics_guard = mysten_metrics::monitored_scope("WritebackCache::build_db_batch");
1017 let mut all_outputs = Vec::with_capacity(digests.len());
1018 for tx in digests {
1019 let Some(outputs) = self
1020 .dirty
1021 .pending_transaction_writes
1022 .get(tx)
1023 .map(|o| o.clone())
1024 else {
1025 warn!("Attempt to commit unknown transaction {:?}", tx);
1031 continue;
1032 };
1033 all_outputs.push(outputs);
1034 }
1035
1036 let batch = self
1037 .store
1038 .build_db_batch(epoch, &all_outputs)
1039 .expect("db error");
1040 (all_outputs, batch)
1041 }
1042
1043 #[instrument(level = "debug", skip_all)]
1045 fn commit_transaction_outputs(
1046 &self,
1047 epoch: EpochId,
1048 (all_outputs, db_batch): Batch,
1049 digests: &[TransactionDigest],
1050 ) {
1051 let _metrics_guard =
1052 mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs");
1053 fail_point!("writeback-cache-commit");
1054 trace!(?digests);
1055
1056 db_batch.write().expect("db error");
1060
1061 let _metrics_guard =
1062 mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs::flush");
1063 for outputs in all_outputs.iter() {
1064 let tx_digest = outputs.transaction.digest();
1065 assert!(
1066 self.dirty
1067 .pending_transaction_writes
1068 .remove(tx_digest)
1069 .is_some()
1070 );
1071 self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
1072 }
1073
1074 let num_outputs = all_outputs.len() as u64;
1075 let num_commits = self
1076 .dirty
1077 .total_transaction_commits
1078 .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
1079 + num_outputs;
1080
1081 let pending_count = self
1082 .dirty
1083 .total_transaction_inserts
1084 .load(std::sync::atomic::Ordering::Relaxed)
1085 .saturating_sub(num_commits);
1086
1087 self.set_backpressure(pending_count);
1088 }
1089
1090 fn approximate_pending_transaction_count(&self) -> u64 {
1091 let num_commits = self
1092 .dirty
1093 .total_transaction_commits
1094 .load(std::sync::atomic::Ordering::Relaxed);
1095
1096 self.dirty
1097 .total_transaction_inserts
1098 .load(std::sync::atomic::Ordering::Relaxed)
1099 .saturating_sub(num_commits)
1100 }
1101
1102 fn set_backpressure(&self, pending_count: u64) {
1103 let backpressure = pending_count > self.backpressure_threshold;
1104 let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1105 if backpressure_changed {
1106 self.metrics.backpressure_toggles.inc();
1107 }
1108 self.metrics
1109 .backpressure_status
1110 .set(if backpressure { 1 } else { 0 });
1111 }
1112
1113 fn flush_transactions_from_dirty_to_cached(
1114 &self,
1115 epoch: EpochId,
1116 tx_digest: TransactionDigest,
1117 outputs: &TransactionOutputs,
1118 ) {
1119 let TransactionOutputs {
1122 transaction,
1123 effects,
1124 markers,
1125 written,
1126 deleted,
1127 wrapped,
1128 events,
1129 ..
1130 } = outputs;
1131
1132 let effects_digest = effects.digest();
1133
1134 self.cached
1137 .transactions
1138 .insert(
1139 &tx_digest,
1140 PointCacheItem::Some(transaction.clone()),
1141 Ticket::Write,
1142 )
1143 .ok();
1144 self.cached
1145 .transaction_effects
1146 .insert(
1147 &effects_digest,
1148 PointCacheItem::Some(effects.clone().into()),
1149 Ticket::Write,
1150 )
1151 .ok();
1152 self.cached
1153 .executed_effects_digests
1154 .insert(
1155 &tx_digest,
1156 PointCacheItem::Some(effects_digest),
1157 Ticket::Write,
1158 )
1159 .ok();
1160 self.cached
1161 .transaction_events
1162 .insert(
1163 &tx_digest,
1164 PointCacheItem::Some(events.clone().into()),
1165 Ticket::Write,
1166 )
1167 .ok();
1168
1169 self.dirty
1170 .transaction_effects
1171 .remove(&effects_digest)
1172 .expect("effects must exist");
1173
1174 self.dirty
1175 .transaction_events
1176 .remove(&tx_digest)
1177 .expect("events must exist");
1178
1179 self.dirty
1180 .unchanged_loaded_runtime_objects
1181 .remove(&tx_digest)
1182 .expect("unchanged_loaded_runtime_objects must exist");
1183
1184 self.dirty
1185 .executed_effects_digests
1186 .remove(&tx_digest)
1187 .expect("executed effects must exist");
1188
1189 for (object_key, marker_value) in markers.iter() {
1191 Self::move_version_from_dirty_to_cache(
1192 &self.dirty.markers,
1193 &self.cached.marker_cache,
1194 (epoch, object_key.id()),
1195 object_key.version(),
1196 marker_value,
1197 );
1198 }
1199
1200 for (object_id, object) in written.iter() {
1201 Self::move_version_from_dirty_to_cache(
1202 &self.dirty.objects,
1203 &self.cached.object_cache,
1204 *object_id,
1205 object.version(),
1206 &ObjectEntry::Object(object.clone()),
1207 );
1208 }
1209
1210 for ObjectKey(object_id, version) in deleted.iter() {
1211 Self::move_version_from_dirty_to_cache(
1212 &self.dirty.objects,
1213 &self.cached.object_cache,
1214 *object_id,
1215 *version,
1216 &ObjectEntry::Deleted,
1217 );
1218 }
1219
1220 for ObjectKey(object_id, version) in wrapped.iter() {
1221 Self::move_version_from_dirty_to_cache(
1222 &self.dirty.objects,
1223 &self.cached.object_cache,
1224 *object_id,
1225 *version,
1226 &ObjectEntry::Wrapped,
1227 );
1228 }
1229 }
1230
1231 fn move_version_from_dirty_to_cache<K, V>(
1234 dirty: &DashMap<K, CachedVersionMap<V>>,
1235 cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1236 key: K,
1237 version: SequenceNumber,
1238 value: &V,
1239 ) where
1240 K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1241 V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1242 {
1243 static MAX_VERSIONS: usize = 3;
1244
1245 let dirty_entry = dirty.entry(key);
1248 let cache_entry = cache.entry(key).or_default();
1249 let mut cache_map = cache_entry.value().lock();
1250
1251 cache_map.insert(version, value.clone());
1253 cache_map.truncate_to(MAX_VERSIONS);
1255
1256 let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1257 panic!("dirty map must exist");
1258 };
1259
1260 let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1261
1262 assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1263
1264 if occupied_dirty_entry.get().is_empty() {
1266 occupied_dirty_entry.remove();
1267 }
1268 }
1269
1270 fn cache_latest_object_by_id(
1272 &self,
1273 object_id: &ObjectID,
1274 object: LatestObjectCacheEntry,
1275 ticket: Ticket,
1276 ) {
1277 trace!("caching object by id: {:?} {:?}", object_id, object);
1278 if self
1279 .object_by_id_cache
1280 .insert(object_id, object, ticket)
1281 .is_ok()
1282 {
1283 self.metrics.record_cache_write("object_by_id");
1284 } else {
1285 trace!("discarded cache write due to expired ticket");
1286 self.metrics.record_ticket_expiry();
1287 }
1288 }
1289
1290 fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1291 self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1292 }
1293
1294 fn clear_state_end_of_epoch_impl(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
1295 info!("clearing state at end of epoch");
1296
1297 for r in self.dirty.pending_transaction_writes.iter() {
1300 let outputs = r.value();
1301 if !outputs
1302 .transaction
1303 .transaction_data()
1304 .shared_input_objects()
1305 .is_empty()
1306 {
1307 debug_fatal!("transaction must be single writer");
1308 }
1309 info!(
1310 "clearing state for transaction {:?}",
1311 outputs.transaction.digest()
1312 );
1313 for (object_id, object) in outputs.written.iter() {
1314 if object.is_package() {
1315 info!("removing non-finalized package from cache: {:?}", object_id);
1316 self.packages.invalidate(object_id);
1317 }
1318 self.object_by_id_cache.invalidate(object_id);
1319 self.cached.object_cache.invalidate(object_id);
1320 }
1321
1322 for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1323 self.object_by_id_cache.invalidate(object_id);
1324 self.cached.object_cache.invalidate(object_id);
1325 }
1326 }
1327
1328 self.dirty.clear();
1329
1330 info!("clearing old transaction locks");
1331 self.object_locks.clear();
1332 info!("clearing object per epoch marker table");
1333 self.store
1334 .clear_object_per_epoch_marker_table(execution_guard)
1335 .expect("db error");
1336 }
1337
1338 fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) {
1339 self.store
1340 .bulk_insert_genesis_objects(objects)
1341 .expect("db error");
1342 for obj in objects {
1343 self.cached.object_cache.invalidate(&obj.id());
1344 self.object_by_id_cache.invalidate(&obj.id());
1345 }
1346 }
1347
1348 fn insert_genesis_object_impl(&self, object: Object) {
1349 self.object_by_id_cache.invalidate(&object.id());
1350 self.cached.object_cache.invalidate(&object.id());
1351 self.store.insert_genesis_object(object).expect("db error");
1352 }
1353
1354 pub fn clear_caches_and_assert_empty(&self) {
1355 info!("clearing caches");
1356 self.cached.clear_and_assert_empty();
1357 self.object_by_id_cache.invalidate_all();
1358 assert!(&self.object_by_id_cache.is_empty());
1359 self.packages.invalidate_all();
1360 assert_empty(&self.packages);
1361 }
1362}
1363
1364impl AccountFundsRead for WritebackCache {
1365 fn get_latest_account_amount(&self, account_id: &AccumulatorObjId) -> (u128, SequenceNumber) {
1366 let mut pre_root_version =
1367 ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1368 .unwrap()
1369 .version();
1370 let mut loop_iter = 0;
1371 loop {
1372 let account_obj = ObjectCacheRead::get_object(self, account_id.inner());
1373 if let Some(account_obj) = account_obj {
1374 let (_, AccumulatorValue::U128(value)) =
1375 account_obj.data.try_as_move().unwrap().try_into().unwrap();
1376 return (value.value, account_obj.version());
1377 }
1378 let post_root_version =
1379 ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1380 .unwrap()
1381 .version();
1382 if pre_root_version == post_root_version {
1383 return (0, pre_root_version);
1384 }
1385 debug!(
1386 "Root version changed from {} to {} while reading account amount, retrying",
1387 pre_root_version, post_root_version
1388 );
1389 pre_root_version = post_root_version;
1390 loop_iter += 1;
1391 if loop_iter >= 3 {
1392 debug_fatal!("Unable to get a stable version after 3 iterations");
1393 }
1394 }
1395 }
1396
1397 fn get_account_amount_at_version(
1398 &self,
1399 account_id: &AccumulatorObjId,
1400 version: SequenceNumber,
1401 ) -> u128 {
1402 let account_obj = self.find_object_lt_or_eq_version(*account_id.inner(), version);
1403 if let Some(account_obj) = account_obj {
1404 let (_, AccumulatorValue::U128(value)) =
1405 account_obj.data.try_as_move().unwrap().try_into().unwrap();
1406 value.value
1407 } else {
1408 0
1409 }
1410 }
1411}
1412
1413impl ExecutionCacheAPI for WritebackCache {}
1414
1415impl ExecutionCacheCommit for WritebackCache {
1416 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1417 self.build_db_batch(epoch, digests)
1418 }
1419
1420 fn commit_transaction_outputs(
1421 &self,
1422 epoch: EpochId,
1423 batch: Batch,
1424 digests: &[TransactionDigest],
1425 ) {
1426 WritebackCache::commit_transaction_outputs(self, epoch, batch, digests)
1427 }
1428
1429 fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) {
1430 self.store.persist_transaction(tx).expect("db error");
1431 }
1432
1433 fn approximate_pending_transaction_count(&self) -> u64 {
1434 WritebackCache::approximate_pending_transaction_count(self)
1435 }
1436}
1437
1438impl ObjectCacheRead for WritebackCache {
1439 fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1440 self.metrics
1441 .record_cache_request("package", "package_cache");
1442 if let Some(p) = self.packages.get(package_id) {
1443 if cfg!(debug_assertions) {
1444 let canonical_package = self
1445 .dirty
1446 .objects
1447 .get(package_id)
1448 .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1449 Some(ObjectEntry::Object(object)) => Some(object),
1450 _ => None,
1451 })
1452 .or_else(|| self.store.get_object(package_id));
1453
1454 if let Some(canonical_package) = canonical_package {
1455 assert_eq!(
1456 canonical_package.digest(),
1457 p.object().digest(),
1458 "Package object cache is inconsistent for package {:?}",
1459 package_id
1460 );
1461 }
1462 }
1463 self.metrics.record_cache_hit("package", "package_cache");
1464 return Ok(Some(p));
1465 } else {
1466 self.metrics.record_cache_miss("package", "package_cache");
1467 }
1468
1469 if let Some(p) = self.get_object_impl("package", package_id) {
1473 if p.is_package() {
1474 let p = PackageObject::new(p);
1475 tracing::trace!(
1476 "caching package: {:?}",
1477 p.object().compute_object_reference()
1478 );
1479 self.metrics.record_cache_write("package");
1480 self.packages.insert(*package_id, p.clone());
1481 Ok(Some(p))
1482 } else {
1483 Err(SuiErrorKind::UserInputError {
1484 error: UserInputError::MoveObjectAsPackage {
1485 object_id: *package_id,
1486 },
1487 }
1488 .into())
1489 }
1490 } else {
1491 Ok(None)
1492 }
1493 }
1494
1495 fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1496 }
1499
1500 fn get_object(&self, id: &ObjectID) -> Option<Object> {
1503 self.get_object_impl("object_latest", id)
1504 }
1505
1506 fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object> {
1507 match self.get_object_by_key_cache_only(object_id, version) {
1508 CacheResult::Hit(object) => Some(object),
1509 CacheResult::NegativeHit => None,
1510 CacheResult::Miss => self
1511 .record_db_get("object_by_version")
1512 .get_object_by_key(object_id, version),
1513 }
1514 }
1515
1516 fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
1517 do_fallback_lookup(
1518 object_keys,
1519 |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1520 CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1521 CacheResult::NegativeHit => CacheResult::NegativeHit,
1522 CacheResult::Miss => CacheResult::Miss,
1523 },
1524 |remaining| {
1525 self.record_db_multi_get("object_by_version", remaining.len())
1526 .multi_get_objects_by_key(remaining)
1527 .expect("db error")
1528 },
1529 )
1530 }
1531
1532 fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool {
1533 match self.get_object_by_key_cache_only(object_id, version) {
1534 CacheResult::Hit(_) => true,
1535 CacheResult::NegativeHit => false,
1536 CacheResult::Miss => self
1537 .record_db_get("object_by_version")
1538 .object_exists_by_key(object_id, version)
1539 .expect("db error"),
1540 }
1541 }
1542
1543 fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool> {
1544 do_fallback_lookup(
1545 object_keys,
1546 |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1547 CacheResult::Hit(_) => CacheResult::Hit(true),
1548 CacheResult::NegativeHit => CacheResult::Hit(false),
1549 CacheResult::Miss => CacheResult::Miss,
1550 },
1551 |remaining| {
1552 self.record_db_multi_get("object_by_version", remaining.len())
1553 .multi_object_exists_by_key(remaining)
1554 .expect("db error")
1555 },
1556 )
1557 }
1558
1559 fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
1560 match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1561 CacheResult::Hit((version, entry)) => Some(match entry {
1562 ObjectEntry::Object(object) => object.compute_object_reference(),
1563 ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1564 ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1565 }),
1566 CacheResult::NegativeHit => None,
1567 CacheResult::Miss => self
1568 .record_db_get("latest_objref_or_tombstone")
1569 .get_latest_object_ref_or_tombstone(object_id)
1570 .expect("db error"),
1571 }
1572 }
1573
1574 fn get_latest_object_or_tombstone(
1575 &self,
1576 object_id: ObjectID,
1577 ) -> Option<(ObjectKey, ObjectOrTombstone)> {
1578 match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1579 CacheResult::Hit((version, entry)) => {
1580 let key = ObjectKey(object_id, version);
1581 Some(match entry {
1582 ObjectEntry::Object(object) => (key, object.into()),
1583 ObjectEntry::Deleted => (
1584 key,
1585 ObjectOrTombstone::Tombstone((
1586 object_id,
1587 version,
1588 ObjectDigest::OBJECT_DIGEST_DELETED,
1589 )),
1590 ),
1591 ObjectEntry::Wrapped => (
1592 key,
1593 ObjectOrTombstone::Tombstone((
1594 object_id,
1595 version,
1596 ObjectDigest::OBJECT_DIGEST_WRAPPED,
1597 )),
1598 ),
1599 })
1600 }
1601 CacheResult::NegativeHit => None,
1602 CacheResult::Miss => self
1603 .record_db_get("latest_object_or_tombstone")
1604 .get_latest_object_or_tombstone(object_id)
1605 .expect("db error"),
1606 }
1607 }
1608
1609 fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool> {
1610 keys.iter()
1611 .map(|key| {
1612 if key.is_cancelled() {
1613 true
1614 } else {
1615 match key {
1616 InputKey::VersionedObject { id, version } => {
1617 matches!(
1618 self.get_object_by_key_cache_only(&id.id(), *version),
1619 CacheResult::Hit(_)
1620 )
1621 }
1622 InputKey::Package { id } => self.packages.contains_key(id),
1623 }
1624 }
1625 })
1626 .collect()
1627 }
1628
1629 #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1630 fn find_object_lt_or_eq_version(
1631 &self,
1632 object_id: ObjectID,
1633 version_bound: SequenceNumber,
1634 ) -> Option<Object> {
1635 macro_rules! check_cache_entry {
1636 ($level: expr, $objects: expr) => {
1637 self.metrics
1638 .record_cache_request("object_lt_or_eq_version", $level);
1639 if let Some(objects) = $objects {
1640 if let Some((_, object)) = objects
1641 .all_versions_lt_or_eq_descending(&version_bound)
1642 .next()
1643 {
1644 if let ObjectEntry::Object(object) = object {
1645 self.metrics
1646 .record_cache_hit("object_lt_or_eq_version", $level);
1647 return Some(object.clone());
1648 } else {
1649 self.metrics
1651 .record_cache_negative_hit("object_lt_or_eq_version", $level);
1652 return None;
1653 }
1654 } else {
1655 self.metrics
1656 .record_cache_miss("object_lt_or_eq_version", $level);
1657 }
1658 }
1659 };
1660 }
1661
1662 self.metrics
1664 .record_cache_request("object_lt_or_eq_version", "object_by_id");
1665 let latest_cache_entry = self.object_by_id_cache.get(&object_id);
1666 if let Some(latest) = &latest_cache_entry {
1667 let latest = latest.lock();
1668 match &*latest {
1669 LatestObjectCacheEntry::Object(latest_version, object) => {
1670 if *latest_version <= version_bound {
1671 if let ObjectEntry::Object(object) = object {
1672 self.metrics
1673 .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1674 return Some(object.clone());
1675 } else {
1676 self.metrics.record_cache_negative_hit(
1678 "object_lt_or_eq_version",
1679 "object_by_id",
1680 );
1681 return None;
1682 }
1683 }
1684 }
1686 LatestObjectCacheEntry::NonExistent => {
1688 self.metrics
1689 .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1690 return None;
1691 }
1692 }
1693 }
1694 self.metrics
1695 .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1696
1697 Self::with_locked_cache_entries(
1698 &self.dirty.objects,
1699 &self.cached.object_cache,
1700 &object_id,
1701 |dirty_entry, cached_entry| {
1702 check_cache_entry!("committed", dirty_entry);
1703 check_cache_entry!("uncommitted", cached_entry);
1704
1705 let latest: Option<(SequenceNumber, ObjectEntry)> =
1726 if let Some(dirty_set) = dirty_entry {
1727 dirty_set
1728 .get_highest()
1729 .cloned()
1730 .tap_none(|| panic!("dirty set cannot be empty"))
1731 } else {
1732 self.record_db_get("object_lt_or_eq_version_latest")
1734 .get_latest_object_or_tombstone(object_id)
1735 .expect("db error")
1736 .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1737 (version, ObjectEntry::from(obj_or_tombstone))
1738 })
1739 };
1740
1741 if let Some((obj_version, obj_entry)) = latest {
1742 self.cache_latest_object_by_id(
1750 &object_id,
1751 LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1752 self.object_by_id_cache.get_ticket_for_read(&object_id),
1755 );
1756
1757 if obj_version <= version_bound {
1758 match obj_entry {
1759 ObjectEntry::Object(object) => Some(object),
1760 ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
1761 }
1762 } else {
1763 self.record_db_get("object_lt_or_eq_version_scan")
1767 .find_object_lt_or_eq_version(object_id, version_bound)
1768 .expect("db error")
1769 }
1770
1771 } else if let Some(latest_cache_entry) = latest_cache_entry {
1777 assert!(!latest_cache_entry.lock().is_alive());
1779 None
1780 } else {
1781 let highest = cached_entry.and_then(|c| c.get_highest());
1783 assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1784 self.cache_object_not_found(
1785 &object_id,
1786 self.object_by_id_cache.get_ticket_for_read(&object_id),
1788 );
1789 None
1790 }
1791 },
1792 )
1793 }
1794
1795 fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1796 get_sui_system_state(self)
1797 }
1798
1799 fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge> {
1800 get_bridge(self)
1801 }
1802
1803 fn get_marker_value(
1804 &self,
1805 object_key: FullObjectKey,
1806 epoch_id: EpochId,
1807 ) -> Option<MarkerValue> {
1808 match self.get_marker_value_cache_only(object_key, epoch_id) {
1809 CacheResult::Hit(marker) => Some(marker),
1810 CacheResult::NegativeHit => None,
1811 CacheResult::Miss => self
1812 .record_db_get("marker_by_version")
1813 .get_marker_value(object_key, epoch_id)
1814 .expect("db error"),
1815 }
1816 }
1817
1818 fn get_latest_marker(
1819 &self,
1820 object_id: FullObjectID,
1821 epoch_id: EpochId,
1822 ) -> Option<(SequenceNumber, MarkerValue)> {
1823 match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1824 CacheResult::Hit((v, marker)) => Some((v, marker)),
1825 CacheResult::NegativeHit => {
1826 panic!("cannot have negative hit when getting latest marker")
1827 }
1828 CacheResult::Miss => self
1829 .record_db_get("marker_latest")
1830 .get_latest_marker(object_id, epoch_id)
1831 .expect("db error"),
1832 }
1833 }
1834
1835 fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult {
1836 let cur_epoch = epoch_store.epoch();
1837 match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1838 CacheResult::Hit((_, obj)) => {
1839 let actual_objref = obj.compute_object_reference();
1840 if obj_ref != actual_objref {
1841 Ok(ObjectLockStatus::LockedAtDifferentVersion {
1842 locked_ref: actual_objref,
1843 })
1844 } else {
1845 Ok(
1847 match self
1848 .object_locks
1849 .get_transaction_lock(&obj_ref, epoch_store)?
1850 {
1851 Some(tx_digest) => ObjectLockStatus::LockedToTx {
1852 locked_by_tx: LockDetailsDeprecated {
1853 epoch: cur_epoch,
1854 tx_digest,
1855 },
1856 },
1857 None => ObjectLockStatus::Initialized,
1858 },
1859 )
1860 }
1861 }
1862 CacheResult::NegativeHit => {
1863 Err(SuiError::from(UserInputError::ObjectNotFound {
1864 object_id: obj_ref.0,
1865 version: None,
1868 }))
1869 }
1870 CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1871 }
1872 }
1873
1874 fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef> {
1875 let obj = self.get_object_impl("live_objref", &object_id).ok_or(
1876 UserInputError::ObjectNotFound {
1877 object_id,
1878 version: None,
1879 },
1880 )?;
1881 Ok(obj.compute_object_reference())
1882 }
1883
1884 fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1885 do_fallback_lookup_fallible(
1886 owned_object_refs,
1887 |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1888 CacheResult::Hit((version, obj)) => {
1889 if obj.compute_object_reference() != *obj_ref {
1890 Err(UserInputError::ObjectVersionUnavailableForConsumption {
1891 provided_obj_ref: *obj_ref,
1892 current_version: version,
1893 }
1894 .into())
1895 } else {
1896 Ok(CacheResult::Hit(()))
1897 }
1898 }
1899 CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1900 object_id: obj_ref.0,
1901 version: None,
1902 }
1903 .into()),
1904 CacheResult::Miss => Ok(CacheResult::Miss),
1905 },
1906 |remaining| {
1907 self.record_db_multi_get("object_is_live", remaining.len())
1908 .check_owned_objects_are_live(remaining)?;
1909 Ok(vec![(); remaining.len()])
1910 },
1911 )?;
1912 Ok(())
1913 }
1914
1915 fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber> {
1916 self.store
1917 .perpetual_tables
1918 .get_highest_pruned_checkpoint()
1919 .expect("db error")
1920 }
1921
1922 fn notify_read_input_objects<'a>(
1923 &'a self,
1924 input_and_receiving_keys: &'a [InputKey],
1925 receiving_keys: &'a HashSet<InputKey>,
1926 epoch: EpochId,
1927 ) -> BoxFuture<'a, ()> {
1928 self.object_notify_read
1929 .read(
1930 "notify_read_input_objects",
1931 input_and_receiving_keys,
1932 move |keys| {
1933 self.multi_input_objects_available(keys, receiving_keys, epoch)
1934 .into_iter()
1935 .map(|available| if available { Some(()) } else { None })
1936 .collect::<Vec<_>>()
1937 },
1938 )
1939 .map(|_| ())
1940 .boxed()
1941 }
1942}
1943
1944impl TransactionCacheRead for WritebackCache {
1945 fn multi_get_transaction_blocks(
1946 &self,
1947 digests: &[TransactionDigest],
1948 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
1949 let digests_and_tickets: Vec<_> = digests
1950 .iter()
1951 .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1952 .collect();
1953 do_fallback_lookup(
1954 &digests_and_tickets,
1955 |(digest, _)| {
1956 self.metrics
1957 .record_cache_request("transaction_block", "uncommitted");
1958 if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1959 self.metrics
1960 .record_cache_hit("transaction_block", "uncommitted");
1961 return CacheResult::Hit(Some(tx.transaction.clone()));
1962 }
1963 self.metrics
1964 .record_cache_miss("transaction_block", "uncommitted");
1965
1966 self.metrics
1967 .record_cache_request("transaction_block", "committed");
1968
1969 match self
1970 .cached
1971 .transactions
1972 .get(digest)
1973 .map(|l| l.lock().clone())
1974 {
1975 Some(PointCacheItem::Some(tx)) => {
1976 self.metrics
1977 .record_cache_hit("transaction_block", "committed");
1978 CacheResult::Hit(Some(tx))
1979 }
1980 Some(PointCacheItem::None) => CacheResult::NegativeHit,
1981 None => {
1982 self.metrics
1983 .record_cache_miss("transaction_block", "committed");
1984
1985 CacheResult::Miss
1986 }
1987 }
1988 },
1989 |remaining| {
1990 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1991 let results: Vec<_> = self
1992 .record_db_multi_get("transaction_block", remaining.len())
1993 .multi_get_transaction_blocks(&remaining_digests)
1994 .expect("db error")
1995 .into_iter()
1996 .map(|o| o.map(Arc::new))
1997 .collect();
1998 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1999 if result.is_none() {
2000 self.cached.transactions.insert(digest, None, *ticket).ok();
2001 }
2002 }
2003 results
2004 },
2005 )
2006 }
2007
2008 fn multi_get_executed_effects_digests(
2009 &self,
2010 digests: &[TransactionDigest],
2011 ) -> Vec<Option<TransactionEffectsDigest>> {
2012 let digests_and_tickets: Vec<_> = digests
2013 .iter()
2014 .map(|d| {
2015 (
2016 *d,
2017 self.cached.executed_effects_digests.get_ticket_for_read(d),
2018 )
2019 })
2020 .collect();
2021 do_fallback_lookup(
2022 &digests_and_tickets,
2023 |(digest, _)| {
2024 self.metrics
2025 .record_cache_request("executed_effects_digests", "uncommitted");
2026 if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
2027 self.metrics
2028 .record_cache_hit("executed_effects_digests", "uncommitted");
2029 return CacheResult::Hit(Some(*digest));
2030 }
2031 self.metrics
2032 .record_cache_miss("executed_effects_digests", "uncommitted");
2033
2034 self.metrics
2035 .record_cache_request("executed_effects_digests", "committed");
2036 match self
2037 .cached
2038 .executed_effects_digests
2039 .get(digest)
2040 .map(|l| *l.lock())
2041 {
2042 Some(PointCacheItem::Some(digest)) => {
2043 self.metrics
2044 .record_cache_hit("executed_effects_digests", "committed");
2045 CacheResult::Hit(Some(digest))
2046 }
2047 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2048 None => {
2049 self.metrics
2050 .record_cache_miss("executed_effects_digests", "committed");
2051 CacheResult::Miss
2052 }
2053 }
2054 },
2055 |remaining| {
2056 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2057 let results = self
2058 .record_db_multi_get("executed_effects_digests", remaining.len())
2059 .multi_get_executed_effects_digests(&remaining_digests)
2060 .expect("db error");
2061 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2062 if result.is_none() {
2063 self.cached
2064 .executed_effects_digests
2065 .insert(digest, None, *ticket)
2066 .ok();
2067 }
2068 }
2069 results
2070 },
2071 )
2072 }
2073
2074 fn multi_get_effects(
2075 &self,
2076 digests: &[TransactionEffectsDigest],
2077 ) -> Vec<Option<TransactionEffects>> {
2078 let digests_and_tickets: Vec<_> = digests
2079 .iter()
2080 .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
2081 .collect();
2082 do_fallback_lookup(
2083 &digests_and_tickets,
2084 |(digest, _)| {
2085 self.metrics
2086 .record_cache_request("transaction_effects", "uncommitted");
2087 if let Some(effects) = self.dirty.transaction_effects.get(digest) {
2088 self.metrics
2089 .record_cache_hit("transaction_effects", "uncommitted");
2090 return CacheResult::Hit(Some(effects.clone()));
2091 }
2092 self.metrics
2093 .record_cache_miss("transaction_effects", "uncommitted");
2094
2095 self.metrics
2096 .record_cache_request("transaction_effects", "committed");
2097 match self
2098 .cached
2099 .transaction_effects
2100 .get(digest)
2101 .map(|l| l.lock().clone())
2102 {
2103 Some(PointCacheItem::Some(effects)) => {
2104 self.metrics
2105 .record_cache_hit("transaction_effects", "committed");
2106 CacheResult::Hit(Some((*effects).clone()))
2107 }
2108 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2109 None => {
2110 self.metrics
2111 .record_cache_miss("transaction_effects", "committed");
2112 CacheResult::Miss
2113 }
2114 }
2115 },
2116 |remaining| {
2117 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2118 let results = self
2119 .record_db_multi_get("transaction_effects", remaining.len())
2120 .multi_get_effects(remaining_digests.iter())
2121 .expect("db error");
2122 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2123 if result.is_none() {
2124 self.cached
2125 .transaction_effects
2126 .insert(digest, None, *ticket)
2127 .ok();
2128 }
2129 }
2130 results
2131 },
2132 )
2133 }
2134
2135 fn transaction_executed_in_last_epoch(
2136 &self,
2137 digest: &TransactionDigest,
2138 current_epoch: EpochId,
2139 ) -> bool {
2140 if current_epoch == 0 {
2141 return false;
2142 }
2143 let last_epoch = current_epoch - 1;
2144 let cache_key = (last_epoch, *digest);
2145
2146 let ticket = self
2147 .cached
2148 .transaction_executed_in_last_epoch
2149 .get_ticket_for_read(&cache_key);
2150
2151 if let Some(cached) = self
2152 .cached
2153 .transaction_executed_in_last_epoch
2154 .get(&cache_key)
2155 {
2156 return cached.lock().is_some();
2157 }
2158
2159 let was_executed = self
2160 .store
2161 .perpetual_tables
2162 .was_transaction_executed_in_last_epoch(digest, current_epoch);
2163
2164 let value = if was_executed { Some(()) } else { None };
2165 self.cached
2166 .transaction_executed_in_last_epoch
2167 .insert(&cache_key, value, ticket)
2168 .ok();
2169
2170 was_executed
2171 }
2172
2173 fn notify_read_executed_effects_digests<'a>(
2174 &'a self,
2175 task_name: &'static str,
2176 digests: &'a [TransactionDigest],
2177 ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>> {
2178 self.executed_effects_digests_notify_read
2179 .read(task_name, digests, |digests| {
2180 self.multi_get_executed_effects_digests(digests)
2181 })
2182 .boxed()
2183 }
2184
2185 fn multi_get_events(
2186 &self,
2187 event_digests: &[TransactionDigest],
2188 ) -> Vec<Option<TransactionEvents>> {
2189 fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
2190 if events.data.is_empty() {
2191 None
2192 } else {
2193 Some(events)
2194 }
2195 }
2196
2197 let digests_and_tickets: Vec<_> = event_digests
2198 .iter()
2199 .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2200 .collect();
2201 do_fallback_lookup(
2202 &digests_and_tickets,
2203 |(digest, _)| {
2204 self.metrics
2205 .record_cache_request("transaction_events", "uncommitted");
2206 if let Some(events) = self.dirty.transaction_events.get(digest).map(|e| e.clone()) {
2207 self.metrics
2208 .record_cache_hit("transaction_events", "uncommitted");
2209
2210 return CacheResult::Hit(map_events(events));
2211 }
2212 self.metrics
2213 .record_cache_miss("transaction_events", "uncommitted");
2214
2215 self.metrics
2216 .record_cache_request("transaction_events", "committed");
2217 match self
2218 .cached
2219 .transaction_events
2220 .get(digest)
2221 .map(|l| l.lock().clone())
2222 {
2223 Some(PointCacheItem::Some(events)) => {
2224 self.metrics
2225 .record_cache_hit("transaction_events", "committed");
2226 CacheResult::Hit(map_events((*events).clone()))
2227 }
2228 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2229 None => {
2230 self.metrics
2231 .record_cache_miss("transaction_events", "committed");
2232
2233 CacheResult::Miss
2234 }
2235 }
2236 },
2237 |remaining| {
2238 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2239 let results = self
2240 .store
2241 .multi_get_events(&remaining_digests)
2242 .expect("db error");
2243 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2244 if result.is_none() {
2245 self.cached
2246 .transaction_events
2247 .insert(digest, None, *ticket)
2248 .ok();
2249 }
2250 }
2251 results
2252 },
2253 )
2254 }
2255
2256 fn get_unchanged_loaded_runtime_objects(
2257 &self,
2258 digest: &TransactionDigest,
2259 ) -> Option<Vec<ObjectKey>> {
2260 self.dirty
2261 .unchanged_loaded_runtime_objects
2262 .get(digest)
2263 .map(|b| b.clone())
2264 .or_else(|| {
2265 self.store
2266 .get_unchanged_loaded_runtime_objects(digest)
2267 .expect("db error")
2268 })
2269 }
2270
2271 fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
2272 self.dirty
2273 .pending_transaction_writes
2274 .get(digest)
2275 .map(|transaction_output| transaction_output.take_accumulator_events())
2276 }
2277}
2278
2279impl ExecutionCacheWrite for WritebackCache {
2280 fn validate_owned_object_versions(&self, owned_input_objects: &[ObjectRef]) -> SuiResult {
2281 ObjectLocks::validate_owned_object_versions(self, owned_input_objects)
2282 }
2283
2284 fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
2285 WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs);
2286 }
2287
2288 #[cfg(test)]
2289 fn write_object_entry_for_test(&self, object: Object) {
2290 self.write_object_entry(&object.id(), object.version(), object.into());
2291 }
2292}
2293
2294implement_passthrough_traits!(WritebackCache);
2295
2296impl GlobalStateHashStore for WritebackCache {
2297 fn get_object_ref_prior_to_key_deprecated(
2298 &self,
2299 object_id: &ObjectID,
2300 version: SequenceNumber,
2301 ) -> SuiResult<Option<ObjectRef>> {
2302 let mut candidates = Vec::new();
2306
2307 let check_versions =
2308 |versions: &CachedVersionMap<ObjectEntry>| match versions.get_prior_to(&version) {
2309 Some((version, object_entry)) => match object_entry {
2310 ObjectEntry::Object(object) => {
2311 assert_eq!(object.version(), version);
2312 Some(object.compute_object_reference())
2313 }
2314 ObjectEntry::Deleted => {
2315 Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED))
2316 }
2317 ObjectEntry::Wrapped => {
2318 Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED))
2319 }
2320 },
2321 None => None,
2322 };
2323
2324 if let Some(objects) = self.dirty.objects.get(object_id)
2326 && let Some(prior) = check_versions(&objects)
2327 {
2328 candidates.push(prior);
2329 }
2330
2331 if let Some(objects) = self.cached.object_cache.get(object_id)
2332 && let Some(prior) = check_versions(&objects.lock())
2333 {
2334 candidates.push(prior);
2335 }
2336
2337 if let Some(prior) = self
2338 .store
2339 .get_object_ref_prior_to_key_deprecated(object_id, version)?
2340 {
2341 candidates.push(prior);
2342 }
2343
2344 candidates.sort_by_key(|(_, version, _)| *version);
2346 Ok(candidates.pop())
2347 }
2348
2349 fn get_root_state_hash_for_epoch(
2350 &self,
2351 epoch: EpochId,
2352 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
2353 self.store.get_root_state_hash_for_epoch(epoch)
2354 }
2355
2356 fn get_root_state_hash_for_highest_epoch(
2357 &self,
2358 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
2359 self.store.get_root_state_hash_for_highest_epoch()
2360 }
2361
2362 fn insert_state_hash_for_epoch(
2363 &self,
2364 epoch: EpochId,
2365 checkpoint_seq_num: &CheckpointSequenceNumber,
2366 acc: &GlobalStateHash,
2367 ) -> SuiResult {
2368 self.store
2369 .insert_state_hash_for_epoch(epoch, checkpoint_seq_num, acc)
2370 }
2371
2372 fn iter_live_object_set(
2373 &self,
2374 include_wrapped_tombstone: bool,
2375 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2376 assert!(
2380 self.dirty.is_empty(),
2381 "cannot iterate live object set with dirty data"
2382 );
2383 self.store.iter_live_object_set(include_wrapped_tombstone)
2384 }
2385
2386 fn iter_cached_live_object_set_for_testing(
2390 &self,
2391 include_wrapped_tombstone: bool,
2392 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2393 let iter = self.dirty.objects.iter();
2395 let mut dirty_objects = BTreeMap::new();
2396
2397 for obj in self.store.iter_live_object_set(include_wrapped_tombstone) {
2399 dirty_objects.insert(obj.object_id(), obj);
2400 }
2401
2402 for entry in iter {
2404 let id = *entry.key();
2405 let value = entry.value();
2406 match value.get_highest().unwrap() {
2407 (_, ObjectEntry::Object(object)) => {
2408 dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2409 }
2410 (version, ObjectEntry::Wrapped) => {
2411 if include_wrapped_tombstone {
2412 dirty_objects.insert(id, LiveObject::Wrapped(ObjectKey(id, *version)));
2413 } else {
2414 dirty_objects.remove(&id);
2415 }
2416 }
2417 (_, ObjectEntry::Deleted) => {
2418 dirty_objects.remove(&id);
2419 }
2420 }
2421 }
2422
2423 Box::new(dirty_objects.into_values())
2424 }
2425}
2426
2427impl StateSyncAPI for WritebackCache {
2433 fn insert_transaction_and_effects(
2434 &self,
2435 transaction: &VerifiedTransaction,
2436 transaction_effects: &TransactionEffects,
2437 ) {
2438 self.store
2439 .insert_transaction_and_effects(transaction, transaction_effects)
2440 .expect("db error");
2441 self.cached
2442 .transactions
2443 .insert(
2444 transaction.digest(),
2445 PointCacheItem::Some(Arc::new(transaction.clone())),
2446 Ticket::Write,
2447 )
2448 .ok();
2449 self.cached
2450 .transaction_effects
2451 .insert(
2452 &transaction_effects.digest(),
2453 PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2454 Ticket::Write,
2455 )
2456 .ok();
2457 }
2458
2459 fn multi_insert_transaction_and_effects(
2460 &self,
2461 transactions_and_effects: &[VerifiedExecutionData],
2462 ) {
2463 self.store
2464 .multi_insert_transaction_and_effects(transactions_and_effects.iter())
2465 .expect("db error");
2466 for VerifiedExecutionData {
2467 transaction,
2468 effects,
2469 } in transactions_and_effects
2470 {
2471 self.cached
2472 .transactions
2473 .insert(
2474 transaction.digest(),
2475 PointCacheItem::Some(Arc::new(transaction.clone())),
2476 Ticket::Write,
2477 )
2478 .ok();
2479 self.cached
2480 .transaction_effects
2481 .insert(
2482 &effects.digest(),
2483 PointCacheItem::Some(Arc::new(effects.clone())),
2484 Ticket::Write,
2485 )
2486 .ok();
2487 }
2488 }
2489}