1use crate::authority::AuthorityStore;
41use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
42use crate::authority::authority_store::{
43 ExecutionLockWriteGuard, LockDetailsDeprecated, ObjectLockStatus, SuiLockResult,
44};
45use crate::authority::authority_store_tables::LiveObject;
46use crate::authority::backpressure::BackpressureManager;
47use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
48use crate::fallback_fetch::{do_fallback_lookup, do_fallback_lookup_fallible};
49use crate::global_state_hasher::GlobalStateHashStore;
50use crate::transaction_outputs::TransactionOutputs;
51
52use dashmap::DashMap;
53use dashmap::mapref::entry::Entry as DashMapEntry;
54use futures::{FutureExt, future::BoxFuture};
55use moka::sync::SegmentedCache as MokaCache;
56use mysten_common::debug_fatal;
57use mysten_common::random_util::randomize_cache_capacity_in_tests;
58use mysten_common::sync::notify_read::NotifyRead;
59use parking_lot::Mutex;
60use std::collections::{BTreeMap, HashSet};
61use std::hash::Hash;
62use std::sync::Arc;
63use std::sync::atomic::AtomicU64;
64use sui_config::ExecutionCacheConfig;
65use sui_macros::fail_point;
66use sui_protocol_config::ProtocolVersion;
67use sui_types::accumulator_event::AccumulatorEvent;
68use sui_types::base_types::{
69 EpochId, FullObjectID, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData,
70};
71use sui_types::bridge::{Bridge, get_bridge};
72use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest};
73use sui_types::effects::{TransactionEffects, TransactionEvents};
74use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
75use sui_types::executable_transaction::VerifiedExecutableTransaction;
76use sui_types::global_state_hash::GlobalStateHash;
77use sui_types::message_envelope::Message;
78use sui_types::messages_checkpoint::CheckpointSequenceNumber;
79use sui_types::object::Object;
80use sui_types::storage::{
81 FullObjectKey, InputKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject,
82};
83use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
84use sui_types::transaction::{TransactionDataAPI, VerifiedSignedTransaction, VerifiedTransaction};
85use tap::TapOptional;
86use tracing::{debug, info, instrument, trace, warn};
87
88use super::ExecutionCacheAPI;
89use super::cache_types::Ticket;
90use super::{
91 Batch, CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
92 ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
93 cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache},
94 implement_passthrough_traits,
95 object_locks::ObjectLocks,
96};
97
98#[cfg(test)]
99#[path = "unit_tests/writeback_cache_tests.rs"]
100pub mod writeback_cache_tests;
101
102#[cfg(test)]
103#[path = "unit_tests/notify_read_input_objects_tests.rs"]
104mod notify_read_input_objects_tests;
105
106#[derive(Clone, PartialEq, Eq)]
107enum ObjectEntry {
108 Object(Object),
109 Deleted,
110 Wrapped,
111}
112
113impl ObjectEntry {
114 #[cfg(test)]
115 fn unwrap_object(&self) -> &Object {
116 match self {
117 ObjectEntry::Object(o) => o,
118 _ => panic!("unwrap_object called on non-Object"),
119 }
120 }
121
122 fn is_tombstone(&self) -> bool {
123 match self {
124 ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
125 ObjectEntry::Object(_) => false,
126 }
127 }
128}
129
130impl std::fmt::Debug for ObjectEntry {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 ObjectEntry::Object(o) => {
134 write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
135 }
136 ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
137 ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
138 }
139 }
140}
141
142impl From<Object> for ObjectEntry {
143 fn from(object: Object) -> Self {
144 ObjectEntry::Object(object)
145 }
146}
147
148impl From<ObjectOrTombstone> for ObjectEntry {
149 fn from(object: ObjectOrTombstone) -> Self {
150 match object {
151 ObjectOrTombstone::Object(o) => o.into(),
152 ObjectOrTombstone::Tombstone(obj_ref) => {
153 if obj_ref.2.is_deleted() {
154 ObjectEntry::Deleted
155 } else if obj_ref.2.is_wrapped() {
156 ObjectEntry::Wrapped
157 } else {
158 panic!("tombstone digest must either be deleted or wrapped");
159 }
160 }
161 }
162 }
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166enum LatestObjectCacheEntry {
167 Object(SequenceNumber, ObjectEntry),
168 NonExistent,
169}
170
171impl LatestObjectCacheEntry {
172 #[cfg(test)]
173 fn version(&self) -> Option<SequenceNumber> {
174 match self {
175 LatestObjectCacheEntry::Object(version, _) => Some(*version),
176 LatestObjectCacheEntry::NonExistent => None,
177 }
178 }
179
180 fn is_alive(&self) -> bool {
181 match self {
182 LatestObjectCacheEntry::Object(_, entry) => !entry.is_tombstone(),
183 LatestObjectCacheEntry::NonExistent => false,
184 }
185 }
186}
187
188impl IsNewer for LatestObjectCacheEntry {
189 fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
190 match (self, other) {
191 (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
192 v1 > v2
193 }
194 (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
195 _ => false,
196 }
197 }
198}
199
200type MarkerKey = (EpochId, FullObjectID);
201
202struct UncommittedData {
205 objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
218
219 markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
224
225 transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
226
227 transaction_events: DashMap<TransactionDigest, TransactionEvents>,
228
229 unchanged_loaded_runtime_objects: DashMap<TransactionDigest, Vec<ObjectKey>>,
230
231 executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
232
233 pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
236
237 fastpath_transaction_outputs: MokaCache<TransactionDigest, Arc<TransactionOutputs>>,
250
251 total_transaction_inserts: AtomicU64,
252 total_transaction_commits: AtomicU64,
253}
254
255impl UncommittedData {
256 fn new(config: &ExecutionCacheConfig) -> Self {
257 Self {
258 objects: DashMap::with_shard_amount(2048),
259 markers: DashMap::with_shard_amount(2048),
260 transaction_effects: DashMap::with_shard_amount(2048),
261 executed_effects_digests: DashMap::with_shard_amount(2048),
262 pending_transaction_writes: DashMap::with_shard_amount(2048),
263 fastpath_transaction_outputs: MokaCache::builder(8)
264 .max_capacity(randomize_cache_capacity_in_tests(
265 config.fastpath_transaction_outputs_cache_size(),
266 ))
267 .build(),
268 transaction_events: DashMap::with_shard_amount(2048),
269 unchanged_loaded_runtime_objects: DashMap::with_shard_amount(2048),
270 total_transaction_inserts: AtomicU64::new(0),
271 total_transaction_commits: AtomicU64::new(0),
272 }
273 }
274
275 fn clear(&self) {
276 self.objects.clear();
277 self.markers.clear();
278 self.transaction_effects.clear();
279 self.executed_effects_digests.clear();
280 self.pending_transaction_writes.clear();
281 self.fastpath_transaction_outputs.invalidate_all();
282 self.transaction_events.clear();
283 self.unchanged_loaded_runtime_objects.clear();
284 self.total_transaction_inserts
285 .store(0, std::sync::atomic::Ordering::Relaxed);
286 self.total_transaction_commits
287 .store(0, std::sync::atomic::Ordering::Relaxed);
288 }
289
290 fn is_empty(&self) -> bool {
291 let empty = self.pending_transaction_writes.is_empty();
292 if empty && cfg!(debug_assertions) {
293 assert!(
294 self.objects.is_empty()
295 && self.markers.is_empty()
296 && self.transaction_effects.is_empty()
297 && self.executed_effects_digests.is_empty()
298 && self.transaction_events.is_empty()
299 && self.unchanged_loaded_runtime_objects.is_empty()
300 && self
301 .total_transaction_inserts
302 .load(std::sync::atomic::Ordering::Relaxed)
303 == self
304 .total_transaction_commits
305 .load(std::sync::atomic::Ordering::Relaxed),
306 );
307 }
308 empty
309 }
310}
311
312type PointCacheItem<T> = Option<T>;
314
315impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
318 fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
319 match (self, other) {
320 (Some(_), None) => true,
321
322 (Some(a), Some(b)) => {
323 debug_assert_eq!(a, b);
325 false
326 }
327
328 _ => false,
329 }
330 }
331}
332
333struct CachedCommittedData {
335 object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
337
338 marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
340
341 transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
342
343 transaction_effects:
344 MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
345
346 transaction_events: MonotonicCache<TransactionDigest, PointCacheItem<Arc<TransactionEvents>>>,
347
348 executed_effects_digests:
349 MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
350
351 transaction_executed_in_last_epoch:
352 MonotonicCache<(EpochId, TransactionDigest), PointCacheItem<()>>,
353
354 _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
357}
358
359impl CachedCommittedData {
360 fn new(config: &ExecutionCacheConfig) -> Self {
361 let object_cache = MokaCache::builder(8)
362 .max_capacity(randomize_cache_capacity_in_tests(
363 config.object_cache_size(),
364 ))
365 .build();
366 let marker_cache = MokaCache::builder(8)
367 .max_capacity(randomize_cache_capacity_in_tests(
368 config.marker_cache_size(),
369 ))
370 .build();
371
372 let transactions = MonotonicCache::new(randomize_cache_capacity_in_tests(
373 config.transaction_cache_size(),
374 ));
375 let transaction_effects = MonotonicCache::new(randomize_cache_capacity_in_tests(
376 config.effect_cache_size(),
377 ));
378 let transaction_events = MonotonicCache::new(randomize_cache_capacity_in_tests(
379 config.events_cache_size(),
380 ));
381 let executed_effects_digests = MonotonicCache::new(randomize_cache_capacity_in_tests(
382 config.executed_effect_cache_size(),
383 ));
384
385 let transaction_objects = MokaCache::builder(8)
386 .max_capacity(randomize_cache_capacity_in_tests(
387 config.transaction_objects_cache_size(),
388 ))
389 .build();
390
391 let transaction_executed_in_last_epoch = MonotonicCache::new(
392 randomize_cache_capacity_in_tests(config.executed_effect_cache_size()),
393 );
394
395 Self {
396 object_cache,
397 marker_cache,
398 transactions,
399 transaction_effects,
400 transaction_events,
401 executed_effects_digests,
402 transaction_executed_in_last_epoch,
403 _transaction_objects: transaction_objects,
404 }
405 }
406
407 fn clear_and_assert_empty(&self) {
408 self.object_cache.invalidate_all();
409 self.marker_cache.invalidate_all();
410 self.transactions.invalidate_all();
411 self.transaction_effects.invalidate_all();
412 self.transaction_events.invalidate_all();
413 self.executed_effects_digests.invalidate_all();
414 self.transaction_executed_in_last_epoch.invalidate_all();
415 self._transaction_objects.invalidate_all();
416
417 assert_empty(&self.object_cache);
418 assert_empty(&self.marker_cache);
419 assert!(self.transactions.is_empty());
420 assert!(self.transaction_effects.is_empty());
421 assert!(self.transaction_events.is_empty());
422 assert!(self.executed_effects_digests.is_empty());
423 assert!(self.transaction_executed_in_last_epoch.is_empty());
424 assert_empty(&self._transaction_objects);
425 }
426}
427
428fn assert_empty<K, V>(cache: &MokaCache<K, V>)
429where
430 K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
431 V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
432{
433 if cache.iter().next().is_some() {
434 panic!("cache should be empty");
435 }
436}
437
438pub struct WritebackCache {
439 dirty: UncommittedData,
440 cached: CachedCommittedData,
441
442 object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
449
450 packages: MokaCache<ObjectID, PackageObject>,
461
462 object_locks: ObjectLocks,
463
464 executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
465 object_notify_read: NotifyRead<InputKey, ()>,
466 fastpath_transaction_outputs_notify_read:
467 NotifyRead<TransactionDigest, Arc<TransactionOutputs>>,
468
469 store: Arc<AuthorityStore>,
470 backpressure_threshold: u64,
471 backpressure_manager: Arc<BackpressureManager>,
472 metrics: Arc<ExecutionCacheMetrics>,
473}
474
475macro_rules! check_cache_entry_by_version {
476 ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
477 $self.metrics.record_cache_request($table, $level);
478 if let Some(cache) = $cache {
479 if let Some(entry) = cache.get(&$version) {
480 $self.metrics.record_cache_hit($table, $level);
481 return CacheResult::Hit(entry.clone());
482 }
483
484 if let Some(least_version) = cache.get_least() {
485 if least_version.0 < $version {
486 $self.metrics.record_cache_negative_hit($table, $level);
489 return CacheResult::NegativeHit;
490 }
491 }
492 }
493 $self.metrics.record_cache_miss($table, $level);
494 };
495}
496
497macro_rules! check_cache_entry_by_latest {
498 ($self: ident, $table: expr, $level: expr, $cache: expr) => {
499 $self.metrics.record_cache_request($table, $level);
500 if let Some(cache) = $cache {
501 if let Some((version, entry)) = cache.get_highest() {
502 $self.metrics.record_cache_hit($table, $level);
503 return CacheResult::Hit((*version, entry.clone()));
504 } else {
505 panic!("empty CachedVersionMap should have been removed");
506 }
507 }
508 $self.metrics.record_cache_miss($table, $level);
509 };
510}
511
512impl WritebackCache {
513 pub fn new(
514 config: &ExecutionCacheConfig,
515 store: Arc<AuthorityStore>,
516 metrics: Arc<ExecutionCacheMetrics>,
517 backpressure_manager: Arc<BackpressureManager>,
518 ) -> Self {
519 let packages = MokaCache::builder(8)
520 .max_capacity(randomize_cache_capacity_in_tests(
521 config.package_cache_size(),
522 ))
523 .build();
524 Self {
525 dirty: UncommittedData::new(config),
526 cached: CachedCommittedData::new(config),
527 object_by_id_cache: MonotonicCache::new(randomize_cache_capacity_in_tests(
528 config.object_by_id_cache_size(),
529 )),
530 packages,
531 object_locks: ObjectLocks::new(),
532 executed_effects_digests_notify_read: NotifyRead::new(),
533 object_notify_read: NotifyRead::new(),
534 fastpath_transaction_outputs_notify_read: NotifyRead::new(),
535 store,
536 backpressure_manager,
537 backpressure_threshold: config.backpressure_threshold(),
538 metrics,
539 }
540 }
541
542 pub fn new_for_tests(store: Arc<AuthorityStore>) -> Self {
543 Self::new(
544 &Default::default(),
545 store,
546 ExecutionCacheMetrics::new(&prometheus::Registry::new()).into(),
547 BackpressureManager::new_for_tests(),
548 )
549 }
550
551 #[cfg(test)]
552 pub fn reset_for_test(&mut self) {
553 let mut new = Self::new(
554 &Default::default(),
555 self.store.clone(),
556 self.metrics.clone(),
557 self.backpressure_manager.clone(),
558 );
559 std::mem::swap(self, &mut new);
560 }
561
562 pub fn evict_executed_effects_from_cache_for_testing(&self, tx_digest: &TransactionDigest) {
563 self.cached.executed_effects_digests.invalidate(tx_digest);
564 self.cached.transaction_events.invalidate(tx_digest);
565 self.cached.transactions.invalidate(tx_digest);
566 }
567
568 fn write_object_entry(
569 &self,
570 object_id: &ObjectID,
571 version: SequenceNumber,
572 object: ObjectEntry,
573 ) {
574 trace!(?object_id, ?version, ?object, "inserting object entry");
575 self.metrics.record_cache_write("object");
576
577 let mut entry = self.dirty.objects.entry(*object_id).or_default();
597
598 self.object_by_id_cache
599 .insert(
600 object_id,
601 LatestObjectCacheEntry::Object(version, object.clone()),
602 Ticket::Write,
603 )
604 .ok();
607
608 entry.insert(version, object.clone());
609
610 if let ObjectEntry::Object(object) = &object {
611 if object.is_package() {
612 self.object_notify_read
613 .notify(&InputKey::Package { id: *object_id }, &());
614 } else if !object.is_child_object() {
615 self.object_notify_read.notify(
616 &InputKey::VersionedObject {
617 id: object.full_id(),
618 version: object.version(),
619 },
620 &(),
621 );
622 }
623 }
624 }
625
626 fn write_marker_value(
627 &self,
628 epoch_id: EpochId,
629 object_key: FullObjectKey,
630 marker_value: MarkerValue,
631 ) {
632 tracing::trace!("inserting marker value {object_key:?}: {marker_value:?}",);
633 self.metrics.record_cache_write("marker");
634 self.dirty
635 .markers
636 .entry((epoch_id, object_key.id()))
637 .or_default()
638 .value_mut()
639 .insert(object_key.version(), marker_value);
640 if matches!(marker_value, MarkerValue::ConsensusStreamEnded(_)) {
645 self.object_notify_read.notify(
646 &InputKey::VersionedObject {
647 id: object_key.id(),
648 version: object_key.version(),
649 },
650 &(),
651 );
652 }
653 }
654
655 fn with_locked_cache_entries<K, V, R>(
659 dirty_map: &DashMap<K, CachedVersionMap<V>>,
660 cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
661 key: &K,
662 cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
663 ) -> R
664 where
665 K: Copy + Eq + Hash + Send + Sync + 'static,
666 V: Send + Sync + 'static,
667 {
668 let dirty_entry = dirty_map.entry(*key);
669 let dirty_entry = match &dirty_entry {
670 DashMapEntry::Occupied(occupied) => Some(occupied.get()),
671 DashMapEntry::Vacant(_) => None,
672 };
673
674 let cached_entry = cached_map.get(key);
675 let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
676 let cached_entry = cached_lock.as_deref();
677
678 cb(dirty_entry, cached_entry)
679 }
680
681 fn get_object_entry_by_key_cache_only(
684 &self,
685 object_id: &ObjectID,
686 version: SequenceNumber,
687 ) -> CacheResult<ObjectEntry> {
688 Self::with_locked_cache_entries(
689 &self.dirty.objects,
690 &self.cached.object_cache,
691 object_id,
692 |dirty_entry, cached_entry| {
693 check_cache_entry_by_version!(
694 self,
695 "object_by_version",
696 "uncommitted",
697 dirty_entry,
698 version
699 );
700 check_cache_entry_by_version!(
701 self,
702 "object_by_version",
703 "committed",
704 cached_entry,
705 version
706 );
707 CacheResult::Miss
708 },
709 )
710 }
711
712 fn get_object_by_key_cache_only(
713 &self,
714 object_id: &ObjectID,
715 version: SequenceNumber,
716 ) -> CacheResult<Object> {
717 match self.get_object_entry_by_key_cache_only(object_id, version) {
718 CacheResult::Hit(entry) => match entry {
719 ObjectEntry::Object(object) => CacheResult::Hit(object),
720 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
721 },
722 CacheResult::Miss => CacheResult::Miss,
723 CacheResult::NegativeHit => CacheResult::NegativeHit,
724 }
725 }
726
727 fn get_object_entry_by_id_cache_only(
728 &self,
729 request_type: &'static str,
730 object_id: &ObjectID,
731 ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
732 self.metrics
733 .record_cache_request(request_type, "object_by_id");
734 let entry = self.object_by_id_cache.get(object_id);
735
736 if cfg!(debug_assertions)
737 && let Some(entry) = &entry
738 {
739 let highest: Option<ObjectEntry> = self
741 .dirty
742 .objects
743 .get(object_id)
744 .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
745 .or_else(|| {
746 let obj: Option<ObjectEntry> = self
747 .store
748 .get_latest_object_or_tombstone(*object_id)
749 .unwrap()
750 .map(|(_, o)| o.into());
751 obj
752 });
753
754 let cache_entry = match &*entry.lock() {
755 LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
756 LatestObjectCacheEntry::NonExistent => None,
757 };
758
759 let tombstone_possibly_pruned = highest.is_none()
761 && cache_entry
762 .as_ref()
763 .map(|e| e.is_tombstone())
764 .unwrap_or(false);
765
766 if highest != cache_entry && !tombstone_possibly_pruned {
767 tracing::error!(
768 ?highest,
769 ?cache_entry,
770 ?tombstone_possibly_pruned,
771 "object_by_id cache is incoherent for {:?}",
772 object_id
773 );
774 panic!("object_by_id cache is incoherent for {:?}", object_id);
775 }
776 }
777
778 if let Some(entry) = entry {
779 let entry = entry.lock();
780 match &*entry {
781 LatestObjectCacheEntry::Object(latest_version, latest_object) => {
782 self.metrics.record_cache_hit(request_type, "object_by_id");
783 return CacheResult::Hit((*latest_version, latest_object.clone()));
784 }
785 LatestObjectCacheEntry::NonExistent => {
786 self.metrics
787 .record_cache_negative_hit(request_type, "object_by_id");
788 return CacheResult::NegativeHit;
789 }
790 }
791 } else {
792 self.metrics.record_cache_miss(request_type, "object_by_id");
793 }
794
795 Self::with_locked_cache_entries(
796 &self.dirty.objects,
797 &self.cached.object_cache,
798 object_id,
799 |dirty_entry, cached_entry| {
800 check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
801 check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
802 CacheResult::Miss
803 },
804 )
805 }
806
807 fn get_object_by_id_cache_only(
808 &self,
809 request_type: &'static str,
810 object_id: &ObjectID,
811 ) -> CacheResult<(SequenceNumber, Object)> {
812 match self.get_object_entry_by_id_cache_only(request_type, object_id) {
813 CacheResult::Hit((version, entry)) => match entry {
814 ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
815 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
816 },
817 CacheResult::NegativeHit => CacheResult::NegativeHit,
818 CacheResult::Miss => CacheResult::Miss,
819 }
820 }
821
822 fn get_marker_value_cache_only(
823 &self,
824 object_key: FullObjectKey,
825 epoch_id: EpochId,
826 ) -> CacheResult<MarkerValue> {
827 Self::with_locked_cache_entries(
828 &self.dirty.markers,
829 &self.cached.marker_cache,
830 &(epoch_id, object_key.id()),
831 |dirty_entry, cached_entry| {
832 check_cache_entry_by_version!(
833 self,
834 "marker_by_version",
835 "uncommitted",
836 dirty_entry,
837 object_key.version()
838 );
839 check_cache_entry_by_version!(
840 self,
841 "marker_by_version",
842 "committed",
843 cached_entry,
844 object_key.version()
845 );
846 CacheResult::Miss
847 },
848 )
849 }
850
851 fn get_latest_marker_value_cache_only(
852 &self,
853 object_id: FullObjectID,
854 epoch_id: EpochId,
855 ) -> CacheResult<(SequenceNumber, MarkerValue)> {
856 Self::with_locked_cache_entries(
857 &self.dirty.markers,
858 &self.cached.marker_cache,
859 &(epoch_id, object_id),
860 |dirty_entry, cached_entry| {
861 check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
862 check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
863 CacheResult::Miss
864 },
865 )
866 }
867
868 fn get_object_impl(&self, request_type: &'static str, id: &ObjectID) -> Option<Object> {
869 let ticket = self.object_by_id_cache.get_ticket_for_read(id);
870 match self.get_object_entry_by_id_cache_only(request_type, id) {
871 CacheResult::Hit((_, entry)) => match entry {
872 ObjectEntry::Object(object) => Some(object),
873 ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
874 },
875 CacheResult::NegativeHit => None,
876 CacheResult::Miss => {
877 let obj = self
878 .store
879 .get_latest_object_or_tombstone(*id)
880 .expect("db error");
881 match obj {
882 Some((key, obj)) => {
883 self.cache_latest_object_by_id(
884 id,
885 LatestObjectCacheEntry::Object(key.1, obj.clone().into()),
886 ticket,
887 );
888 match obj {
889 ObjectOrTombstone::Object(object) => Some(object),
890 ObjectOrTombstone::Tombstone(_) => None,
891 }
892 }
893 None => {
894 self.cache_object_not_found(id, ticket);
895 None
896 }
897 }
898 }
899 }
900 }
901
902 fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
903 self.metrics.record_cache_request(request_type, "db");
904 &self.store
905 }
906
907 fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
908 self.metrics
909 .record_cache_multi_request(request_type, "db", count);
910 &self.store
911 }
912
913 #[instrument(level = "debug", skip_all)]
914 fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
915 let tx_digest = *tx_outputs.transaction.digest();
916 trace!(?tx_digest, "writing transaction outputs to cache");
917
918 assert!(
919 !self.transaction_executed_in_last_epoch(&tx_digest, epoch_id),
920 "Transaction {:?} was already executed in epoch {}",
921 tx_digest,
922 epoch_id.saturating_sub(1)
923 );
924
925 self.dirty.fastpath_transaction_outputs.remove(&tx_digest);
926
927 let TransactionOutputs {
928 transaction,
929 effects,
930 markers,
931 written,
932 deleted,
933 wrapped,
934 events,
935 unchanged_loaded_runtime_objects,
936 ..
937 } = &*tx_outputs;
938
939 for ObjectKey(id, version) in deleted.iter() {
944 self.write_object_entry(id, *version, ObjectEntry::Deleted);
945 }
946
947 for ObjectKey(id, version) in wrapped.iter() {
948 self.write_object_entry(id, *version, ObjectEntry::Wrapped);
949 }
950
951 for (object_key, marker_value) in markers.iter() {
953 self.write_marker_value(epoch_id, *object_key, *marker_value);
954 }
955
956 for (object_id, object) in written.iter() {
959 if object.is_child_object() {
960 self.write_object_entry(object_id, object.version(), object.clone().into());
961 }
962 }
963 for (object_id, object) in written.iter() {
964 if !object.is_child_object() {
965 self.write_object_entry(object_id, object.version(), object.clone().into());
966 if object.is_package() {
967 debug!("caching package: {:?}", object.compute_object_reference());
968 self.packages
969 .insert(*object_id, PackageObject::new(object.clone()));
970 }
971 }
972 }
973
974 let tx_digest = *transaction.digest();
975 debug!(
976 ?tx_digest,
977 "Writing transaction output objects to cache: {:?}",
978 written
979 .values()
980 .map(|o| (o.id(), o.version()))
981 .collect::<Vec<_>>(),
982 );
983 let effects_digest = effects.digest();
984
985 self.metrics.record_cache_write("transaction_block");
986 self.dirty
987 .pending_transaction_writes
988 .insert(tx_digest, tx_outputs.clone());
989
990 self.metrics.record_cache_write("transaction_effects");
993 self.dirty
994 .transaction_effects
995 .insert(effects_digest, effects.clone());
996
997 self.metrics.record_cache_write("transaction_events");
1001 self.dirty
1002 .transaction_events
1003 .insert(tx_digest, events.clone());
1004
1005 self.metrics
1006 .record_cache_write("unchanged_loaded_runtime_objects");
1007 self.dirty
1008 .unchanged_loaded_runtime_objects
1009 .insert(tx_digest, unchanged_loaded_runtime_objects.clone());
1010
1011 self.metrics.record_cache_write("executed_effects_digests");
1012 self.dirty
1013 .executed_effects_digests
1014 .insert(tx_digest, effects_digest);
1015
1016 self.executed_effects_digests_notify_read
1017 .notify(&tx_digest, &effects_digest);
1018
1019 self.metrics
1020 .pending_notify_read
1021 .set(self.executed_effects_digests_notify_read.num_pending() as i64);
1022
1023 let prev = self
1024 .dirty
1025 .total_transaction_inserts
1026 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1027
1028 let pending_count = (prev + 1).saturating_sub(
1029 self.dirty
1030 .total_transaction_commits
1031 .load(std::sync::atomic::Ordering::Relaxed),
1032 );
1033
1034 self.set_backpressure(pending_count);
1035 }
1036
1037 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1038 let _metrics_guard = mysten_metrics::monitored_scope("WritebackCache::build_db_batch");
1039 let mut all_outputs = Vec::with_capacity(digests.len());
1040 for tx in digests {
1041 let Some(outputs) = self
1042 .dirty
1043 .pending_transaction_writes
1044 .get(tx)
1045 .map(|o| o.clone())
1046 else {
1047 warn!("Attempt to commit unknown transaction {:?}", tx);
1053 continue;
1054 };
1055 all_outputs.push(outputs);
1056 }
1057
1058 let batch = self
1059 .store
1060 .build_db_batch(epoch, &all_outputs)
1061 .expect("db error");
1062 (all_outputs, batch)
1063 }
1064
1065 #[instrument(level = "debug", skip_all)]
1067 fn commit_transaction_outputs(
1068 &self,
1069 epoch: EpochId,
1070 (all_outputs, db_batch): Batch,
1071 digests: &[TransactionDigest],
1072 ) {
1073 let _metrics_guard =
1074 mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs");
1075 fail_point!("writeback-cache-commit");
1076 trace!(?digests);
1077
1078 db_batch.write().expect("db error");
1082
1083 let _metrics_guard =
1084 mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs::flush");
1085 for outputs in all_outputs.iter() {
1086 let tx_digest = outputs.transaction.digest();
1087 assert!(
1088 self.dirty
1089 .pending_transaction_writes
1090 .remove(tx_digest)
1091 .is_some()
1092 );
1093 self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
1094 }
1095
1096 let num_outputs = all_outputs.len() as u64;
1097 let num_commits = self
1098 .dirty
1099 .total_transaction_commits
1100 .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
1101 + num_outputs;
1102
1103 let pending_count = self
1104 .dirty
1105 .total_transaction_inserts
1106 .load(std::sync::atomic::Ordering::Relaxed)
1107 .saturating_sub(num_commits);
1108
1109 self.set_backpressure(pending_count);
1110 }
1111
1112 fn approximate_pending_transaction_count(&self) -> u64 {
1113 let num_commits = self
1114 .dirty
1115 .total_transaction_commits
1116 .load(std::sync::atomic::Ordering::Relaxed);
1117
1118 self.dirty
1119 .total_transaction_inserts
1120 .load(std::sync::atomic::Ordering::Relaxed)
1121 .saturating_sub(num_commits)
1122 }
1123
1124 fn set_backpressure(&self, pending_count: u64) {
1125 let backpressure = pending_count > self.backpressure_threshold;
1126 let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1127 if backpressure_changed {
1128 self.metrics.backpressure_toggles.inc();
1129 }
1130 self.metrics
1131 .backpressure_status
1132 .set(if backpressure { 1 } else { 0 });
1133 }
1134
1135 fn flush_transactions_from_dirty_to_cached(
1136 &self,
1137 epoch: EpochId,
1138 tx_digest: TransactionDigest,
1139 outputs: &TransactionOutputs,
1140 ) {
1141 let TransactionOutputs {
1144 transaction,
1145 effects,
1146 markers,
1147 written,
1148 deleted,
1149 wrapped,
1150 events,
1151 ..
1152 } = outputs;
1153
1154 let effects_digest = effects.digest();
1155
1156 self.cached
1159 .transactions
1160 .insert(
1161 &tx_digest,
1162 PointCacheItem::Some(transaction.clone()),
1163 Ticket::Write,
1164 )
1165 .ok();
1166 self.cached
1167 .transaction_effects
1168 .insert(
1169 &effects_digest,
1170 PointCacheItem::Some(effects.clone().into()),
1171 Ticket::Write,
1172 )
1173 .ok();
1174 self.cached
1175 .executed_effects_digests
1176 .insert(
1177 &tx_digest,
1178 PointCacheItem::Some(effects_digest),
1179 Ticket::Write,
1180 )
1181 .ok();
1182 self.cached
1183 .transaction_events
1184 .insert(
1185 &tx_digest,
1186 PointCacheItem::Some(events.clone().into()),
1187 Ticket::Write,
1188 )
1189 .ok();
1190
1191 self.dirty
1192 .transaction_effects
1193 .remove(&effects_digest)
1194 .expect("effects must exist");
1195
1196 self.dirty
1197 .transaction_events
1198 .remove(&tx_digest)
1199 .expect("events must exist");
1200
1201 self.dirty
1202 .unchanged_loaded_runtime_objects
1203 .remove(&tx_digest)
1204 .expect("unchanged_loaded_runtime_objects must exist");
1205
1206 self.dirty
1207 .executed_effects_digests
1208 .remove(&tx_digest)
1209 .expect("executed effects must exist");
1210
1211 for (object_key, marker_value) in markers.iter() {
1213 Self::move_version_from_dirty_to_cache(
1214 &self.dirty.markers,
1215 &self.cached.marker_cache,
1216 (epoch, object_key.id()),
1217 object_key.version(),
1218 marker_value,
1219 );
1220 }
1221
1222 for (object_id, object) in written.iter() {
1223 Self::move_version_from_dirty_to_cache(
1224 &self.dirty.objects,
1225 &self.cached.object_cache,
1226 *object_id,
1227 object.version(),
1228 &ObjectEntry::Object(object.clone()),
1229 );
1230 }
1231
1232 for ObjectKey(object_id, version) in deleted.iter() {
1233 Self::move_version_from_dirty_to_cache(
1234 &self.dirty.objects,
1235 &self.cached.object_cache,
1236 *object_id,
1237 *version,
1238 &ObjectEntry::Deleted,
1239 );
1240 }
1241
1242 for ObjectKey(object_id, version) in wrapped.iter() {
1243 Self::move_version_from_dirty_to_cache(
1244 &self.dirty.objects,
1245 &self.cached.object_cache,
1246 *object_id,
1247 *version,
1248 &ObjectEntry::Wrapped,
1249 );
1250 }
1251 }
1252
1253 fn move_version_from_dirty_to_cache<K, V>(
1256 dirty: &DashMap<K, CachedVersionMap<V>>,
1257 cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1258 key: K,
1259 version: SequenceNumber,
1260 value: &V,
1261 ) where
1262 K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1263 V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1264 {
1265 static MAX_VERSIONS: usize = 3;
1266
1267 let dirty_entry = dirty.entry(key);
1270 let cache_entry = cache.entry(key).or_default();
1271 let mut cache_map = cache_entry.value().lock();
1272
1273 cache_map.insert(version, value.clone());
1275 cache_map.truncate_to(MAX_VERSIONS);
1277
1278 let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1279 panic!("dirty map must exist");
1280 };
1281
1282 let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1283
1284 assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1285
1286 if occupied_dirty_entry.get().is_empty() {
1288 occupied_dirty_entry.remove();
1289 }
1290 }
1291
1292 fn cache_latest_object_by_id(
1294 &self,
1295 object_id: &ObjectID,
1296 object: LatestObjectCacheEntry,
1297 ticket: Ticket,
1298 ) {
1299 trace!("caching object by id: {:?} {:?}", object_id, object);
1300 if self
1301 .object_by_id_cache
1302 .insert(object_id, object, ticket)
1303 .is_ok()
1304 {
1305 self.metrics.record_cache_write("object_by_id");
1306 } else {
1307 trace!("discarded cache write due to expired ticket");
1308 self.metrics.record_ticket_expiry();
1309 }
1310 }
1311
1312 fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1313 self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1314 }
1315
1316 fn clear_state_end_of_epoch_impl(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
1317 info!("clearing state at end of epoch");
1318
1319 for r in self.dirty.pending_transaction_writes.iter() {
1322 let outputs = r.value();
1323 if !outputs
1324 .transaction
1325 .transaction_data()
1326 .shared_input_objects()
1327 .is_empty()
1328 {
1329 debug_fatal!("transaction must be single writer");
1330 }
1331 info!(
1332 "clearing state for transaction {:?}",
1333 outputs.transaction.digest()
1334 );
1335 for (object_id, object) in outputs.written.iter() {
1336 if object.is_package() {
1337 info!("removing non-finalized package from cache: {:?}", object_id);
1338 self.packages.invalidate(object_id);
1339 }
1340 self.object_by_id_cache.invalidate(object_id);
1341 self.cached.object_cache.invalidate(object_id);
1342 }
1343
1344 for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1345 self.object_by_id_cache.invalidate(object_id);
1346 self.cached.object_cache.invalidate(object_id);
1347 }
1348 }
1349
1350 self.dirty.clear();
1351
1352 info!("clearing old transaction locks");
1353 self.object_locks.clear();
1354 info!("clearing object per epoch marker table");
1355 self.store
1356 .clear_object_per_epoch_marker_table(execution_guard)
1357 .expect("db error");
1358 }
1359
1360 fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) {
1361 self.store
1362 .bulk_insert_genesis_objects(objects)
1363 .expect("db error");
1364 for obj in objects {
1365 self.cached.object_cache.invalidate(&obj.id());
1366 self.object_by_id_cache.invalidate(&obj.id());
1367 }
1368 }
1369
1370 fn insert_genesis_object_impl(&self, object: Object) {
1371 self.object_by_id_cache.invalidate(&object.id());
1372 self.cached.object_cache.invalidate(&object.id());
1373 self.store.insert_genesis_object(object).expect("db error");
1374 }
1375
1376 pub fn clear_caches_and_assert_empty(&self) {
1377 info!("clearing caches");
1378 self.cached.clear_and_assert_empty();
1379 self.object_by_id_cache.invalidate_all();
1380 assert!(&self.object_by_id_cache.is_empty());
1381 self.packages.invalidate_all();
1382 assert_empty(&self.packages);
1383 }
1384}
1385
1386impl ExecutionCacheAPI for WritebackCache {}
1387
1388impl ExecutionCacheCommit for WritebackCache {
1389 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1390 self.build_db_batch(epoch, digests)
1391 }
1392
1393 fn commit_transaction_outputs(
1394 &self,
1395 epoch: EpochId,
1396 batch: Batch,
1397 digests: &[TransactionDigest],
1398 ) {
1399 WritebackCache::commit_transaction_outputs(self, epoch, batch, digests)
1400 }
1401
1402 fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) {
1403 self.store.persist_transaction(tx).expect("db error");
1404 }
1405
1406 fn approximate_pending_transaction_count(&self) -> u64 {
1407 WritebackCache::approximate_pending_transaction_count(self)
1408 }
1409}
1410
1411impl ObjectCacheRead for WritebackCache {
1412 fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1413 self.metrics
1414 .record_cache_request("package", "package_cache");
1415 if let Some(p) = self.packages.get(package_id) {
1416 if cfg!(debug_assertions) {
1417 let canonical_package = self
1418 .dirty
1419 .objects
1420 .get(package_id)
1421 .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1422 Some(ObjectEntry::Object(object)) => Some(object),
1423 _ => None,
1424 })
1425 .or_else(|| self.store.get_object(package_id));
1426
1427 if let Some(canonical_package) = canonical_package {
1428 assert_eq!(
1429 canonical_package.digest(),
1430 p.object().digest(),
1431 "Package object cache is inconsistent for package {:?}",
1432 package_id
1433 );
1434 }
1435 }
1436 self.metrics.record_cache_hit("package", "package_cache");
1437 return Ok(Some(p));
1438 } else {
1439 self.metrics.record_cache_miss("package", "package_cache");
1440 }
1441
1442 if let Some(p) = self.get_object_impl("package", package_id) {
1446 if p.is_package() {
1447 let p = PackageObject::new(p);
1448 tracing::trace!(
1449 "caching package: {:?}",
1450 p.object().compute_object_reference()
1451 );
1452 self.metrics.record_cache_write("package");
1453 self.packages.insert(*package_id, p.clone());
1454 Ok(Some(p))
1455 } else {
1456 Err(SuiErrorKind::UserInputError {
1457 error: UserInputError::MoveObjectAsPackage {
1458 object_id: *package_id,
1459 },
1460 }
1461 .into())
1462 }
1463 } else {
1464 Ok(None)
1465 }
1466 }
1467
1468 fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1469 }
1472
1473 fn get_object(&self, id: &ObjectID) -> Option<Object> {
1476 self.get_object_impl("object_latest", id)
1477 }
1478
1479 fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object> {
1480 match self.get_object_by_key_cache_only(object_id, version) {
1481 CacheResult::Hit(object) => Some(object),
1482 CacheResult::NegativeHit => None,
1483 CacheResult::Miss => self
1484 .record_db_get("object_by_version")
1485 .get_object_by_key(object_id, version),
1486 }
1487 }
1488
1489 fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
1490 do_fallback_lookup(
1491 object_keys,
1492 |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1493 CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1494 CacheResult::NegativeHit => CacheResult::NegativeHit,
1495 CacheResult::Miss => CacheResult::Miss,
1496 },
1497 |remaining| {
1498 self.record_db_multi_get("object_by_version", remaining.len())
1499 .multi_get_objects_by_key(remaining)
1500 .expect("db error")
1501 },
1502 )
1503 }
1504
1505 fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool {
1506 match self.get_object_by_key_cache_only(object_id, version) {
1507 CacheResult::Hit(_) => true,
1508 CacheResult::NegativeHit => false,
1509 CacheResult::Miss => self
1510 .record_db_get("object_by_version")
1511 .object_exists_by_key(object_id, version)
1512 .expect("db error"),
1513 }
1514 }
1515
1516 fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool> {
1517 do_fallback_lookup(
1518 object_keys,
1519 |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1520 CacheResult::Hit(_) => CacheResult::Hit(true),
1521 CacheResult::NegativeHit => CacheResult::Hit(false),
1522 CacheResult::Miss => CacheResult::Miss,
1523 },
1524 |remaining| {
1525 self.record_db_multi_get("object_by_version", remaining.len())
1526 .multi_object_exists_by_key(remaining)
1527 .expect("db error")
1528 },
1529 )
1530 }
1531
1532 fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
1533 match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1534 CacheResult::Hit((version, entry)) => Some(match entry {
1535 ObjectEntry::Object(object) => object.compute_object_reference(),
1536 ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1537 ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1538 }),
1539 CacheResult::NegativeHit => None,
1540 CacheResult::Miss => self
1541 .record_db_get("latest_objref_or_tombstone")
1542 .get_latest_object_ref_or_tombstone(object_id)
1543 .expect("db error"),
1544 }
1545 }
1546
1547 fn get_latest_object_or_tombstone(
1548 &self,
1549 object_id: ObjectID,
1550 ) -> Option<(ObjectKey, ObjectOrTombstone)> {
1551 match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1552 CacheResult::Hit((version, entry)) => {
1553 let key = ObjectKey(object_id, version);
1554 Some(match entry {
1555 ObjectEntry::Object(object) => (key, object.into()),
1556 ObjectEntry::Deleted => (
1557 key,
1558 ObjectOrTombstone::Tombstone((
1559 object_id,
1560 version,
1561 ObjectDigest::OBJECT_DIGEST_DELETED,
1562 )),
1563 ),
1564 ObjectEntry::Wrapped => (
1565 key,
1566 ObjectOrTombstone::Tombstone((
1567 object_id,
1568 version,
1569 ObjectDigest::OBJECT_DIGEST_WRAPPED,
1570 )),
1571 ),
1572 })
1573 }
1574 CacheResult::NegativeHit => None,
1575 CacheResult::Miss => self
1576 .record_db_get("latest_object_or_tombstone")
1577 .get_latest_object_or_tombstone(object_id)
1578 .expect("db error"),
1579 }
1580 }
1581
1582 fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool> {
1583 keys.iter()
1584 .map(|key| {
1585 if key.is_cancelled() {
1586 true
1587 } else {
1588 match key {
1589 InputKey::VersionedObject { id, version } => {
1590 matches!(
1591 self.get_object_by_key_cache_only(&id.id(), *version),
1592 CacheResult::Hit(_)
1593 )
1594 }
1595 InputKey::Package { id } => self.packages.contains_key(id),
1596 }
1597 }
1598 })
1599 .collect()
1600 }
1601
1602 #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1603 fn find_object_lt_or_eq_version(
1604 &self,
1605 object_id: ObjectID,
1606 version_bound: SequenceNumber,
1607 ) -> Option<Object> {
1608 macro_rules! check_cache_entry {
1609 ($level: expr, $objects: expr) => {
1610 self.metrics
1611 .record_cache_request("object_lt_or_eq_version", $level);
1612 if let Some(objects) = $objects {
1613 if let Some((_, object)) = objects
1614 .all_versions_lt_or_eq_descending(&version_bound)
1615 .next()
1616 {
1617 if let ObjectEntry::Object(object) = object {
1618 self.metrics
1619 .record_cache_hit("object_lt_or_eq_version", $level);
1620 return Some(object.clone());
1621 } else {
1622 self.metrics
1624 .record_cache_negative_hit("object_lt_or_eq_version", $level);
1625 return None;
1626 }
1627 } else {
1628 self.metrics
1629 .record_cache_miss("object_lt_or_eq_version", $level);
1630 }
1631 }
1632 };
1633 }
1634
1635 self.metrics
1637 .record_cache_request("object_lt_or_eq_version", "object_by_id");
1638 let latest_cache_entry = self.object_by_id_cache.get(&object_id);
1639 if let Some(latest) = &latest_cache_entry {
1640 let latest = latest.lock();
1641 match &*latest {
1642 LatestObjectCacheEntry::Object(latest_version, object) => {
1643 if *latest_version <= version_bound {
1644 if let ObjectEntry::Object(object) = object {
1645 self.metrics
1646 .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1647 return Some(object.clone());
1648 } else {
1649 self.metrics.record_cache_negative_hit(
1651 "object_lt_or_eq_version",
1652 "object_by_id",
1653 );
1654 return None;
1655 }
1656 }
1657 }
1659 LatestObjectCacheEntry::NonExistent => {
1661 self.metrics
1662 .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1663 return None;
1664 }
1665 }
1666 }
1667 self.metrics
1668 .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1669
1670 Self::with_locked_cache_entries(
1671 &self.dirty.objects,
1672 &self.cached.object_cache,
1673 &object_id,
1674 |dirty_entry, cached_entry| {
1675 check_cache_entry!("committed", dirty_entry);
1676 check_cache_entry!("uncommitted", cached_entry);
1677
1678 let latest: Option<(SequenceNumber, ObjectEntry)> =
1699 if let Some(dirty_set) = dirty_entry {
1700 dirty_set
1701 .get_highest()
1702 .cloned()
1703 .tap_none(|| panic!("dirty set cannot be empty"))
1704 } else {
1705 self.record_db_get("object_lt_or_eq_version_latest")
1707 .get_latest_object_or_tombstone(object_id)
1708 .expect("db error")
1709 .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1710 (version, ObjectEntry::from(obj_or_tombstone))
1711 })
1712 };
1713
1714 if let Some((obj_version, obj_entry)) = latest {
1715 self.cache_latest_object_by_id(
1723 &object_id,
1724 LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1725 self.object_by_id_cache.get_ticket_for_read(&object_id),
1728 );
1729
1730 if obj_version <= version_bound {
1731 match obj_entry {
1732 ObjectEntry::Object(object) => Some(object),
1733 ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
1734 }
1735 } else {
1736 self.record_db_get("object_lt_or_eq_version_scan")
1740 .find_object_lt_or_eq_version(object_id, version_bound)
1741 .expect("db error")
1742 }
1743
1744 } else if let Some(latest_cache_entry) = latest_cache_entry {
1750 assert!(!latest_cache_entry.lock().is_alive());
1752 None
1753 } else {
1754 let highest = cached_entry.and_then(|c| c.get_highest());
1756 assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1757 self.cache_object_not_found(
1758 &object_id,
1759 self.object_by_id_cache.get_ticket_for_read(&object_id),
1761 );
1762 None
1763 }
1764 },
1765 )
1766 }
1767
1768 fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1769 get_sui_system_state(self)
1770 }
1771
1772 fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge> {
1773 get_bridge(self)
1774 }
1775
1776 fn get_marker_value(
1777 &self,
1778 object_key: FullObjectKey,
1779 epoch_id: EpochId,
1780 ) -> Option<MarkerValue> {
1781 match self.get_marker_value_cache_only(object_key, epoch_id) {
1782 CacheResult::Hit(marker) => Some(marker),
1783 CacheResult::NegativeHit => None,
1784 CacheResult::Miss => self
1785 .record_db_get("marker_by_version")
1786 .get_marker_value(object_key, epoch_id)
1787 .expect("db error"),
1788 }
1789 }
1790
1791 fn get_latest_marker(
1792 &self,
1793 object_id: FullObjectID,
1794 epoch_id: EpochId,
1795 ) -> Option<(SequenceNumber, MarkerValue)> {
1796 match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1797 CacheResult::Hit((v, marker)) => Some((v, marker)),
1798 CacheResult::NegativeHit => {
1799 panic!("cannot have negative hit when getting latest marker")
1800 }
1801 CacheResult::Miss => self
1802 .record_db_get("marker_latest")
1803 .get_latest_marker(object_id, epoch_id)
1804 .expect("db error"),
1805 }
1806 }
1807
1808 fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult {
1809 let cur_epoch = epoch_store.epoch();
1810 match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1811 CacheResult::Hit((_, obj)) => {
1812 let actual_objref = obj.compute_object_reference();
1813 if obj_ref != actual_objref {
1814 Ok(ObjectLockStatus::LockedAtDifferentVersion {
1815 locked_ref: actual_objref,
1816 })
1817 } else {
1818 Ok(
1820 match self
1821 .object_locks
1822 .get_transaction_lock(&obj_ref, epoch_store)?
1823 {
1824 Some(tx_digest) => ObjectLockStatus::LockedToTx {
1825 locked_by_tx: LockDetailsDeprecated {
1826 epoch: cur_epoch,
1827 tx_digest,
1828 },
1829 },
1830 None => ObjectLockStatus::Initialized,
1831 },
1832 )
1833 }
1834 }
1835 CacheResult::NegativeHit => {
1836 Err(SuiError::from(UserInputError::ObjectNotFound {
1837 object_id: obj_ref.0,
1838 version: None,
1841 }))
1842 }
1843 CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1844 }
1845 }
1846
1847 fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef> {
1848 let obj = self.get_object_impl("live_objref", &object_id).ok_or(
1849 UserInputError::ObjectNotFound {
1850 object_id,
1851 version: None,
1852 },
1853 )?;
1854 Ok(obj.compute_object_reference())
1855 }
1856
1857 fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1858 do_fallback_lookup_fallible(
1859 owned_object_refs,
1860 |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1861 CacheResult::Hit((version, obj)) => {
1862 if obj.compute_object_reference() != *obj_ref {
1863 Err(UserInputError::ObjectVersionUnavailableForConsumption {
1864 provided_obj_ref: *obj_ref,
1865 current_version: version,
1866 }
1867 .into())
1868 } else {
1869 Ok(CacheResult::Hit(()))
1870 }
1871 }
1872 CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1873 object_id: obj_ref.0,
1874 version: None,
1875 }
1876 .into()),
1877 CacheResult::Miss => Ok(CacheResult::Miss),
1878 },
1879 |remaining| {
1880 self.record_db_multi_get("object_is_live", remaining.len())
1881 .check_owned_objects_are_live(remaining)?;
1882 Ok(vec![(); remaining.len()])
1883 },
1884 )?;
1885 Ok(())
1886 }
1887
1888 fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber> {
1889 self.store
1890 .perpetual_tables
1891 .get_highest_pruned_checkpoint()
1892 .expect("db error")
1893 }
1894
1895 fn notify_read_input_objects<'a>(
1896 &'a self,
1897 input_and_receiving_keys: &'a [InputKey],
1898 receiving_keys: &'a HashSet<InputKey>,
1899 epoch: EpochId,
1900 ) -> BoxFuture<'a, ()> {
1901 self.object_notify_read
1902 .read(
1903 "notify_read_input_objects",
1904 input_and_receiving_keys,
1905 move |keys| {
1906 self.multi_input_objects_available(keys, receiving_keys, epoch)
1907 .into_iter()
1908 .map(|available| if available { Some(()) } else { None })
1909 .collect::<Vec<_>>()
1910 },
1911 )
1912 .map(|_| ())
1913 .boxed()
1914 }
1915}
1916
1917impl TransactionCacheRead for WritebackCache {
1918 fn multi_get_transaction_blocks(
1919 &self,
1920 digests: &[TransactionDigest],
1921 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
1922 let digests_and_tickets: Vec<_> = digests
1923 .iter()
1924 .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1925 .collect();
1926 do_fallback_lookup(
1927 &digests_and_tickets,
1928 |(digest, _)| {
1929 self.metrics
1930 .record_cache_request("transaction_block", "uncommitted");
1931 if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1932 self.metrics
1933 .record_cache_hit("transaction_block", "uncommitted");
1934 return CacheResult::Hit(Some(tx.transaction.clone()));
1935 }
1936 self.metrics
1937 .record_cache_miss("transaction_block", "uncommitted");
1938
1939 self.metrics
1940 .record_cache_request("transaction_block", "committed");
1941
1942 match self
1943 .cached
1944 .transactions
1945 .get(digest)
1946 .map(|l| l.lock().clone())
1947 {
1948 Some(PointCacheItem::Some(tx)) => {
1949 self.metrics
1950 .record_cache_hit("transaction_block", "committed");
1951 CacheResult::Hit(Some(tx))
1952 }
1953 Some(PointCacheItem::None) => CacheResult::NegativeHit,
1954 None => {
1955 self.metrics
1956 .record_cache_miss("transaction_block", "committed");
1957
1958 CacheResult::Miss
1959 }
1960 }
1961 },
1962 |remaining| {
1963 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1964 let results: Vec<_> = self
1965 .record_db_multi_get("transaction_block", remaining.len())
1966 .multi_get_transaction_blocks(&remaining_digests)
1967 .expect("db error")
1968 .into_iter()
1969 .map(|o| o.map(Arc::new))
1970 .collect();
1971 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1972 if result.is_none() {
1973 self.cached.transactions.insert(digest, None, *ticket).ok();
1974 }
1975 }
1976 results
1977 },
1978 )
1979 }
1980
1981 fn multi_get_executed_effects_digests(
1982 &self,
1983 digests: &[TransactionDigest],
1984 ) -> Vec<Option<TransactionEffectsDigest>> {
1985 let digests_and_tickets: Vec<_> = digests
1986 .iter()
1987 .map(|d| {
1988 (
1989 *d,
1990 self.cached.executed_effects_digests.get_ticket_for_read(d),
1991 )
1992 })
1993 .collect();
1994 do_fallback_lookup(
1995 &digests_and_tickets,
1996 |(digest, _)| {
1997 self.metrics
1998 .record_cache_request("executed_effects_digests", "uncommitted");
1999 if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
2000 self.metrics
2001 .record_cache_hit("executed_effects_digests", "uncommitted");
2002 return CacheResult::Hit(Some(*digest));
2003 }
2004 self.metrics
2005 .record_cache_miss("executed_effects_digests", "uncommitted");
2006
2007 self.metrics
2008 .record_cache_request("executed_effects_digests", "committed");
2009 match self
2010 .cached
2011 .executed_effects_digests
2012 .get(digest)
2013 .map(|l| *l.lock())
2014 {
2015 Some(PointCacheItem::Some(digest)) => {
2016 self.metrics
2017 .record_cache_hit("executed_effects_digests", "committed");
2018 CacheResult::Hit(Some(digest))
2019 }
2020 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2021 None => {
2022 self.metrics
2023 .record_cache_miss("executed_effects_digests", "committed");
2024 CacheResult::Miss
2025 }
2026 }
2027 },
2028 |remaining| {
2029 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2030 let results = self
2031 .record_db_multi_get("executed_effects_digests", remaining.len())
2032 .multi_get_executed_effects_digests(&remaining_digests)
2033 .expect("db error");
2034 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2035 if result.is_none() {
2036 self.cached
2037 .executed_effects_digests
2038 .insert(digest, None, *ticket)
2039 .ok();
2040 }
2041 }
2042 results
2043 },
2044 )
2045 }
2046
2047 fn multi_get_effects(
2048 &self,
2049 digests: &[TransactionEffectsDigest],
2050 ) -> Vec<Option<TransactionEffects>> {
2051 let digests_and_tickets: Vec<_> = digests
2052 .iter()
2053 .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
2054 .collect();
2055 do_fallback_lookup(
2056 &digests_and_tickets,
2057 |(digest, _)| {
2058 self.metrics
2059 .record_cache_request("transaction_effects", "uncommitted");
2060 if let Some(effects) = self.dirty.transaction_effects.get(digest) {
2061 self.metrics
2062 .record_cache_hit("transaction_effects", "uncommitted");
2063 return CacheResult::Hit(Some(effects.clone()));
2064 }
2065 self.metrics
2066 .record_cache_miss("transaction_effects", "uncommitted");
2067
2068 self.metrics
2069 .record_cache_request("transaction_effects", "committed");
2070 match self
2071 .cached
2072 .transaction_effects
2073 .get(digest)
2074 .map(|l| l.lock().clone())
2075 {
2076 Some(PointCacheItem::Some(effects)) => {
2077 self.metrics
2078 .record_cache_hit("transaction_effects", "committed");
2079 CacheResult::Hit(Some((*effects).clone()))
2080 }
2081 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2082 None => {
2083 self.metrics
2084 .record_cache_miss("transaction_effects", "committed");
2085 CacheResult::Miss
2086 }
2087 }
2088 },
2089 |remaining| {
2090 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2091 let results = self
2092 .record_db_multi_get("transaction_effects", remaining.len())
2093 .multi_get_effects(remaining_digests.iter())
2094 .expect("db error");
2095 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2096 if result.is_none() {
2097 self.cached
2098 .transaction_effects
2099 .insert(digest, None, *ticket)
2100 .ok();
2101 }
2102 }
2103 results
2104 },
2105 )
2106 }
2107
2108 fn transaction_executed_in_last_epoch(
2109 &self,
2110 digest: &TransactionDigest,
2111 current_epoch: EpochId,
2112 ) -> bool {
2113 if current_epoch == 0 {
2114 return false;
2115 }
2116 let last_epoch = current_epoch - 1;
2117 let cache_key = (last_epoch, *digest);
2118
2119 let ticket = self
2120 .cached
2121 .transaction_executed_in_last_epoch
2122 .get_ticket_for_read(&cache_key);
2123
2124 if let Some(cached) = self
2125 .cached
2126 .transaction_executed_in_last_epoch
2127 .get(&cache_key)
2128 {
2129 return cached.lock().is_some();
2130 }
2131
2132 let was_executed = self
2133 .store
2134 .perpetual_tables
2135 .was_transaction_executed_in_last_epoch(digest, current_epoch);
2136
2137 let value = if was_executed { Some(()) } else { None };
2138 self.cached
2139 .transaction_executed_in_last_epoch
2140 .insert(&cache_key, value, ticket)
2141 .ok();
2142
2143 was_executed
2144 }
2145
2146 fn notify_read_executed_effects_digests<'a>(
2147 &'a self,
2148 task_name: &'static str,
2149 digests: &'a [TransactionDigest],
2150 ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>> {
2151 self.executed_effects_digests_notify_read
2152 .read(task_name, digests, |digests| {
2153 self.multi_get_executed_effects_digests(digests)
2154 })
2155 .boxed()
2156 }
2157
2158 fn multi_get_events(
2159 &self,
2160 event_digests: &[TransactionDigest],
2161 ) -> Vec<Option<TransactionEvents>> {
2162 fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
2163 if events.data.is_empty() {
2164 None
2165 } else {
2166 Some(events)
2167 }
2168 }
2169
2170 let digests_and_tickets: Vec<_> = event_digests
2171 .iter()
2172 .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2173 .collect();
2174 do_fallback_lookup(
2175 &digests_and_tickets,
2176 |(digest, _)| {
2177 self.metrics
2178 .record_cache_request("transaction_events", "uncommitted");
2179 if let Some(events) = self.dirty.transaction_events.get(digest).map(|e| e.clone()) {
2180 self.metrics
2181 .record_cache_hit("transaction_events", "uncommitted");
2182
2183 return CacheResult::Hit(map_events(events));
2184 }
2185 self.metrics
2186 .record_cache_miss("transaction_events", "uncommitted");
2187
2188 self.metrics
2189 .record_cache_request("transaction_events", "committed");
2190 match self
2191 .cached
2192 .transaction_events
2193 .get(digest)
2194 .map(|l| l.lock().clone())
2195 {
2196 Some(PointCacheItem::Some(events)) => {
2197 self.metrics
2198 .record_cache_hit("transaction_events", "committed");
2199 CacheResult::Hit(map_events((*events).clone()))
2200 }
2201 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2202 None => {
2203 self.metrics
2204 .record_cache_miss("transaction_events", "committed");
2205
2206 CacheResult::Miss
2207 }
2208 }
2209 },
2210 |remaining| {
2211 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2212 let results = self
2213 .store
2214 .multi_get_events(&remaining_digests)
2215 .expect("db error");
2216 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2217 if result.is_none() {
2218 self.cached
2219 .transaction_events
2220 .insert(digest, None, *ticket)
2221 .ok();
2222 }
2223 }
2224 results
2225 },
2226 )
2227 }
2228
2229 fn get_unchanged_loaded_runtime_objects(
2230 &self,
2231 digest: &TransactionDigest,
2232 ) -> Option<Vec<ObjectKey>> {
2233 self.dirty
2234 .unchanged_loaded_runtime_objects
2235 .get(digest)
2236 .map(|b| b.clone())
2237 .or_else(|| {
2238 self.store
2239 .get_unchanged_loaded_runtime_objects(digest)
2240 .expect("db error")
2241 })
2242 }
2243
2244 fn get_mysticeti_fastpath_outputs(
2245 &self,
2246 tx_digest: &TransactionDigest,
2247 ) -> Option<Arc<TransactionOutputs>> {
2248 self.dirty.fastpath_transaction_outputs.get(tx_digest)
2249 }
2250
2251 fn notify_read_fastpath_transaction_outputs<'a>(
2252 &'a self,
2253 tx_digests: &'a [TransactionDigest],
2254 ) -> BoxFuture<'a, Vec<Arc<TransactionOutputs>>> {
2255 self.fastpath_transaction_outputs_notify_read
2256 .read(
2257 "notify_read_fastpath_transaction_outputs",
2258 tx_digests,
2259 |tx_digests| {
2260 tx_digests
2261 .iter()
2262 .map(|tx_digest| self.get_mysticeti_fastpath_outputs(tx_digest))
2263 .collect()
2264 },
2265 )
2266 .boxed()
2267 }
2268
2269 fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
2270 self.dirty
2271 .pending_transaction_writes
2272 .get(digest)
2273 .map(|transaction_output| transaction_output.take_accumulator_events())
2274 }
2275}
2276
2277impl ExecutionCacheWrite for WritebackCache {
2278 fn acquire_transaction_locks(
2279 &self,
2280 epoch_store: &AuthorityPerEpochStore,
2281 owned_input_objects: &[ObjectRef],
2282 tx_digest: TransactionDigest,
2283 signed_transaction: Option<VerifiedSignedTransaction>,
2284 ) -> SuiResult {
2285 self.object_locks.acquire_transaction_locks(
2286 self,
2287 epoch_store,
2288 owned_input_objects,
2289 tx_digest,
2290 signed_transaction,
2291 )
2292 }
2293
2294 fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
2295 WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs);
2296 }
2297
2298 fn write_fastpath_transaction_outputs(&self, tx_outputs: Arc<TransactionOutputs>) {
2299 let tx_digest = *tx_outputs.transaction.digest();
2300 debug!(
2301 ?tx_digest,
2302 "writing mysticeti fastpath certified transaction outputs"
2303 );
2304 self.dirty
2305 .fastpath_transaction_outputs
2306 .insert(tx_digest, tx_outputs.clone());
2307 self.fastpath_transaction_outputs_notify_read
2308 .notify(&tx_digest, &tx_outputs);
2309 }
2310
2311 #[cfg(test)]
2312 fn write_object_entry_for_test(&self, object: Object) {
2313 self.write_object_entry(&object.id(), object.version(), object.into());
2314 }
2315}
2316
2317implement_passthrough_traits!(WritebackCache);
2318
2319impl GlobalStateHashStore for WritebackCache {
2320 fn get_object_ref_prior_to_key_deprecated(
2321 &self,
2322 object_id: &ObjectID,
2323 version: SequenceNumber,
2324 ) -> SuiResult<Option<ObjectRef>> {
2325 let mut candidates = Vec::new();
2329
2330 let check_versions =
2331 |versions: &CachedVersionMap<ObjectEntry>| match versions.get_prior_to(&version) {
2332 Some((version, object_entry)) => match object_entry {
2333 ObjectEntry::Object(object) => {
2334 assert_eq!(object.version(), version);
2335 Some(object.compute_object_reference())
2336 }
2337 ObjectEntry::Deleted => {
2338 Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED))
2339 }
2340 ObjectEntry::Wrapped => {
2341 Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED))
2342 }
2343 },
2344 None => None,
2345 };
2346
2347 if let Some(objects) = self.dirty.objects.get(object_id)
2349 && let Some(prior) = check_versions(&objects)
2350 {
2351 candidates.push(prior);
2352 }
2353
2354 if let Some(objects) = self.cached.object_cache.get(object_id)
2355 && let Some(prior) = check_versions(&objects.lock())
2356 {
2357 candidates.push(prior);
2358 }
2359
2360 if let Some(prior) = self
2361 .store
2362 .get_object_ref_prior_to_key_deprecated(object_id, version)?
2363 {
2364 candidates.push(prior);
2365 }
2366
2367 candidates.sort_by_key(|(_, version, _)| *version);
2369 Ok(candidates.pop())
2370 }
2371
2372 fn get_root_state_hash_for_epoch(
2373 &self,
2374 epoch: EpochId,
2375 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
2376 self.store.get_root_state_hash_for_epoch(epoch)
2377 }
2378
2379 fn get_root_state_hash_for_highest_epoch(
2380 &self,
2381 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
2382 self.store.get_root_state_hash_for_highest_epoch()
2383 }
2384
2385 fn insert_state_hash_for_epoch(
2386 &self,
2387 epoch: EpochId,
2388 checkpoint_seq_num: &CheckpointSequenceNumber,
2389 acc: &GlobalStateHash,
2390 ) -> SuiResult {
2391 self.store
2392 .insert_state_hash_for_epoch(epoch, checkpoint_seq_num, acc)
2393 }
2394
2395 fn iter_live_object_set(
2396 &self,
2397 include_wrapped_tombstone: bool,
2398 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2399 assert!(
2403 self.dirty.is_empty(),
2404 "cannot iterate live object set with dirty data"
2405 );
2406 self.store.iter_live_object_set(include_wrapped_tombstone)
2407 }
2408
2409 fn iter_cached_live_object_set_for_testing(
2413 &self,
2414 include_wrapped_tombstone: bool,
2415 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2416 let iter = self.dirty.objects.iter();
2418 let mut dirty_objects = BTreeMap::new();
2419
2420 for obj in self.store.iter_live_object_set(include_wrapped_tombstone) {
2422 dirty_objects.insert(obj.object_id(), obj);
2423 }
2424
2425 for entry in iter {
2427 let id = *entry.key();
2428 let value = entry.value();
2429 match value.get_highest().unwrap() {
2430 (_, ObjectEntry::Object(object)) => {
2431 dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2432 }
2433 (version, ObjectEntry::Wrapped) => {
2434 if include_wrapped_tombstone {
2435 dirty_objects.insert(id, LiveObject::Wrapped(ObjectKey(id, *version)));
2436 } else {
2437 dirty_objects.remove(&id);
2438 }
2439 }
2440 (_, ObjectEntry::Deleted) => {
2441 dirty_objects.remove(&id);
2442 }
2443 }
2444 }
2445
2446 Box::new(dirty_objects.into_values())
2447 }
2448}
2449
2450impl StateSyncAPI for WritebackCache {
2456 fn insert_transaction_and_effects(
2457 &self,
2458 transaction: &VerifiedTransaction,
2459 transaction_effects: &TransactionEffects,
2460 ) {
2461 self.store
2462 .insert_transaction_and_effects(transaction, transaction_effects)
2463 .expect("db error");
2464 self.cached
2465 .transactions
2466 .insert(
2467 transaction.digest(),
2468 PointCacheItem::Some(Arc::new(transaction.clone())),
2469 Ticket::Write,
2470 )
2471 .ok();
2472 self.cached
2473 .transaction_effects
2474 .insert(
2475 &transaction_effects.digest(),
2476 PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2477 Ticket::Write,
2478 )
2479 .ok();
2480 }
2481
2482 fn multi_insert_transaction_and_effects(
2483 &self,
2484 transactions_and_effects: &[VerifiedExecutionData],
2485 ) {
2486 self.store
2487 .multi_insert_transaction_and_effects(transactions_and_effects.iter())
2488 .expect("db error");
2489 for VerifiedExecutionData {
2490 transaction,
2491 effects,
2492 } in transactions_and_effects
2493 {
2494 self.cached
2495 .transactions
2496 .insert(
2497 transaction.digest(),
2498 PointCacheItem::Some(Arc::new(transaction.clone())),
2499 Ticket::Write,
2500 )
2501 .ok();
2502 self.cached
2503 .transaction_effects
2504 .insert(
2505 &effects.digest(),
2506 PointCacheItem::Some(Arc::new(effects.clone())),
2507 Ticket::Write,
2508 )
2509 .ok();
2510 }
2511 }
2512}