1use crate::accumulators::funds_read::AccountFundsRead;
41use crate::authority::AuthorityStore;
42use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
43use crate::authority::authority_store::{
44 ExecutionLockWriteGuard, LockDetailsDeprecated, ObjectLockStatus, SuiLockResult,
45};
46use crate::authority::authority_store_tables::LiveObject;
47use crate::authority::backpressure::BackpressureManager;
48use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
49use crate::fallback_fetch::{do_fallback_lookup, do_fallback_lookup_fallible};
50use crate::global_state_hasher::GlobalStateHashStore;
51use crate::transaction_outputs::TransactionOutputs;
52
53use dashmap::DashMap;
54use dashmap::mapref::entry::Entry as DashMapEntry;
55use futures::{FutureExt, future::BoxFuture};
56use moka::sync::SegmentedCache as MokaCache;
57use mysten_common::ZipDebugEqIteratorExt;
58use mysten_common::debug_fatal;
59use mysten_common::random_util::randomize_cache_capacity_in_tests;
60use mysten_common::sync::notify_read::NotifyRead;
61use parking_lot::Mutex;
62use rayon::prelude::*;
63use std::collections::{BTreeMap, HashSet};
64use std::hash::Hash;
65use std::sync::Arc;
66use std::sync::atomic::AtomicU64;
67use sui_config::ExecutionCacheConfig;
68use sui_macros::fail_point;
69use sui_protocol_config::ProtocolVersion;
70use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
71use sui_types::accumulator_event::AccumulatorEvent;
72use sui_types::accumulator_root::{AccumulatorObjId, AccumulatorValue};
73use sui_types::base_types::{
74 EpochId, FullObjectID, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData,
75};
76use sui_types::bridge::{Bridge, get_bridge};
77use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest};
78use sui_types::effects::{TransactionEffects, TransactionEvents};
79use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
80use sui_types::executable_transaction::VerifiedExecutableTransaction;
81use sui_types::global_state_hash::GlobalStateHash;
82use sui_types::message_envelope::Message;
83use sui_types::messages_checkpoint::CheckpointSequenceNumber;
84use sui_types::object::Object;
85use sui_types::storage::{
86 FullObjectKey, InputKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject,
87};
88use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
89use sui_types::transaction::{TransactionDataAPI, VerifiedTransaction};
90use tap::TapOptional;
91use tracing::{debug, info, instrument, trace, warn};
92
93use super::ExecutionCacheAPI;
94use super::cache_types::Ticket;
95use super::{
96 Batch, CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
97 ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
98 cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache},
99 implement_passthrough_traits,
100 object_locks::ObjectLocks,
101};
102
103#[cfg(test)]
104#[path = "unit_tests/writeback_cache_tests.rs"]
105pub mod writeback_cache_tests;
106
107#[cfg(test)]
108#[path = "unit_tests/notify_read_input_objects_tests.rs"]
109mod notify_read_input_objects_tests;
110
111#[derive(Clone, PartialEq, Eq)]
112enum ObjectEntry {
113 Object(Object),
114 Deleted,
115 Wrapped,
116}
117
118impl ObjectEntry {
119 #[cfg(test)]
120 fn unwrap_object(&self) -> &Object {
121 match self {
122 ObjectEntry::Object(o) => o,
123 _ => panic!("unwrap_object called on non-Object"),
124 }
125 }
126
127 fn is_tombstone(&self) -> bool {
128 match self {
129 ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
130 ObjectEntry::Object(_) => false,
131 }
132 }
133}
134
135impl std::fmt::Debug for ObjectEntry {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 match self {
138 ObjectEntry::Object(o) => {
139 write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
140 }
141 ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
142 ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
143 }
144 }
145}
146
147impl From<Object> for ObjectEntry {
148 fn from(object: Object) -> Self {
149 ObjectEntry::Object(object)
150 }
151}
152
153impl From<ObjectOrTombstone> for ObjectEntry {
154 fn from(object: ObjectOrTombstone) -> Self {
155 match object {
156 ObjectOrTombstone::Object(o) => o.into(),
157 ObjectOrTombstone::Tombstone(obj_ref) => {
158 if obj_ref.2.is_deleted() {
159 ObjectEntry::Deleted
160 } else if obj_ref.2.is_wrapped() {
161 ObjectEntry::Wrapped
162 } else {
163 panic!("tombstone digest must either be deleted or wrapped");
164 }
165 }
166 }
167 }
168}
169
170#[derive(Debug, Clone, PartialEq, Eq)]
171enum LatestObjectCacheEntry {
172 Object(SequenceNumber, ObjectEntry),
173 NonExistent,
174}
175
176impl LatestObjectCacheEntry {
177 #[cfg(test)]
178 fn version(&self) -> Option<SequenceNumber> {
179 match self {
180 LatestObjectCacheEntry::Object(version, _) => Some(*version),
181 LatestObjectCacheEntry::NonExistent => None,
182 }
183 }
184
185 fn is_alive(&self) -> bool {
186 match self {
187 LatestObjectCacheEntry::Object(_, entry) => !entry.is_tombstone(),
188 LatestObjectCacheEntry::NonExistent => false,
189 }
190 }
191}
192
193impl IsNewer for LatestObjectCacheEntry {
194 fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
195 match (self, other) {
196 (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
197 v1 > v2
198 }
199 (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
200 _ => false,
201 }
202 }
203}
204
205type MarkerKey = (EpochId, FullObjectID);
206
207struct UncommittedData {
210 objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
223
224 markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
229
230 transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
231
232 transaction_events: DashMap<TransactionDigest, TransactionEvents>,
233
234 unchanged_loaded_runtime_objects: DashMap<TransactionDigest, Vec<ObjectKey>>,
235
236 executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
237
238 pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
241
242 total_transaction_inserts: AtomicU64,
243 total_transaction_commits: AtomicU64,
244}
245
246impl UncommittedData {
247 fn new() -> Self {
248 Self {
249 objects: DashMap::with_shard_amount(2048),
250 markers: DashMap::with_shard_amount(2048),
251 transaction_effects: DashMap::with_shard_amount(2048),
252 executed_effects_digests: DashMap::with_shard_amount(2048),
253 pending_transaction_writes: DashMap::with_shard_amount(2048),
254 transaction_events: DashMap::with_shard_amount(2048),
255 unchanged_loaded_runtime_objects: DashMap::with_shard_amount(2048),
256 total_transaction_inserts: AtomicU64::new(0),
257 total_transaction_commits: AtomicU64::new(0),
258 }
259 }
260
261 fn clear(&self) {
262 self.objects.clear();
263 self.markers.clear();
264 self.transaction_effects.clear();
265 self.executed_effects_digests.clear();
266 self.pending_transaction_writes.clear();
267 self.transaction_events.clear();
268 self.unchanged_loaded_runtime_objects.clear();
269 self.total_transaction_inserts
270 .store(0, std::sync::atomic::Ordering::Relaxed);
271 self.total_transaction_commits
272 .store(0, std::sync::atomic::Ordering::Relaxed);
273 }
274
275 fn is_empty(&self) -> bool {
276 let empty = self.pending_transaction_writes.is_empty();
277 if empty && cfg!(debug_assertions) {
278 assert!(
279 self.objects.is_empty()
280 && self.markers.is_empty()
281 && self.transaction_effects.is_empty()
282 && self.executed_effects_digests.is_empty()
283 && self.transaction_events.is_empty()
284 && self.unchanged_loaded_runtime_objects.is_empty()
285 && self
286 .total_transaction_inserts
287 .load(std::sync::atomic::Ordering::Relaxed)
288 == self
289 .total_transaction_commits
290 .load(std::sync::atomic::Ordering::Relaxed),
291 );
292 }
293 empty
294 }
295}
296
297type PointCacheItem<T> = Option<T>;
299
300impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
303 fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
304 match (self, other) {
305 (Some(_), None) => true,
306
307 (Some(a), Some(b)) => {
308 debug_assert_eq!(a, b);
310 false
311 }
312
313 _ => false,
314 }
315 }
316}
317
318struct CachedCommittedData {
320 object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
322
323 marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
325
326 transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
327
328 transaction_effects:
329 MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
330
331 transaction_events: MonotonicCache<TransactionDigest, PointCacheItem<Arc<TransactionEvents>>>,
332
333 executed_effects_digests:
334 MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
335
336 transaction_executed_in_last_epoch:
337 MonotonicCache<(EpochId, TransactionDigest), PointCacheItem<()>>,
338
339 _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
342}
343
344impl CachedCommittedData {
345 fn new(config: &ExecutionCacheConfig) -> Self {
346 let object_cache = MokaCache::builder(8)
347 .max_capacity(randomize_cache_capacity_in_tests(
348 config.object_cache_size(),
349 ))
350 .build();
351 let marker_cache = MokaCache::builder(8)
352 .max_capacity(randomize_cache_capacity_in_tests(
353 config.marker_cache_size(),
354 ))
355 .build();
356
357 let transactions = MonotonicCache::new(randomize_cache_capacity_in_tests(
358 config.transaction_cache_size(),
359 ));
360 let transaction_effects = MonotonicCache::new(randomize_cache_capacity_in_tests(
361 config.effect_cache_size(),
362 ));
363 let transaction_events = MonotonicCache::new(randomize_cache_capacity_in_tests(
364 config.events_cache_size(),
365 ));
366 let executed_effects_digests = MonotonicCache::new(randomize_cache_capacity_in_tests(
367 config.executed_effect_cache_size(),
368 ));
369
370 let transaction_objects = MokaCache::builder(8)
371 .max_capacity(randomize_cache_capacity_in_tests(
372 config.transaction_objects_cache_size(),
373 ))
374 .build();
375
376 let transaction_executed_in_last_epoch = MonotonicCache::new(
377 randomize_cache_capacity_in_tests(config.executed_effect_cache_size()),
378 );
379
380 Self {
381 object_cache,
382 marker_cache,
383 transactions,
384 transaction_effects,
385 transaction_events,
386 executed_effects_digests,
387 transaction_executed_in_last_epoch,
388 _transaction_objects: transaction_objects,
389 }
390 }
391
392 fn clear_and_assert_empty(&self) {
393 self.object_cache.invalidate_all();
394 self.marker_cache.invalidate_all();
395 self.transactions.invalidate_all();
396 self.transaction_effects.invalidate_all();
397 self.transaction_events.invalidate_all();
398 self.executed_effects_digests.invalidate_all();
399 self.transaction_executed_in_last_epoch.invalidate_all();
400 self._transaction_objects.invalidate_all();
401
402 assert_empty(&self.object_cache);
403 assert_empty(&self.marker_cache);
404 assert!(self.transactions.is_empty());
405 assert!(self.transaction_effects.is_empty());
406 assert!(self.transaction_events.is_empty());
407 assert!(self.executed_effects_digests.is_empty());
408 assert!(self.transaction_executed_in_last_epoch.is_empty());
409 assert_empty(&self._transaction_objects);
410 }
411}
412
413fn assert_empty<K, V>(cache: &MokaCache<K, V>)
414where
415 K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
416 V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
417{
418 if cache.iter().next().is_some() {
419 panic!("cache should be empty");
420 }
421}
422
423pub struct WritebackCache {
424 dirty: UncommittedData,
425 cached: CachedCommittedData,
426
427 object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
434
435 packages: MokaCache<ObjectID, PackageObject>,
446
447 object_locks: ObjectLocks,
448
449 executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
450 object_notify_read: NotifyRead<InputKey, ()>,
451
452 store: Arc<AuthorityStore>,
453 backpressure_threshold: u64,
454 backpressure_manager: Arc<BackpressureManager>,
455 metrics: Arc<ExecutionCacheMetrics>,
456}
457
458macro_rules! check_cache_entry_by_version {
459 ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
460 $self.metrics.record_cache_request($table, $level);
461 if let Some(cache) = $cache {
462 if let Some(entry) = cache.get(&$version) {
463 $self.metrics.record_cache_hit($table, $level);
464 return CacheResult::Hit(entry.clone());
465 }
466
467 if let Some(least_version) = cache.get_least() {
468 if least_version.0 < $version {
469 $self.metrics.record_cache_negative_hit($table, $level);
472 return CacheResult::NegativeHit;
473 }
474 }
475 }
476 $self.metrics.record_cache_miss($table, $level);
477 };
478}
479
480macro_rules! check_cache_entry_by_latest {
481 ($self: ident, $table: expr, $level: expr, $cache: expr) => {
482 $self.metrics.record_cache_request($table, $level);
483 if let Some(cache) = $cache {
484 if let Some((version, entry)) = cache.get_highest() {
485 $self.metrics.record_cache_hit($table, $level);
486 return CacheResult::Hit((*version, entry.clone()));
487 } else {
488 panic!("empty CachedVersionMap should have been removed");
489 }
490 }
491 $self.metrics.record_cache_miss($table, $level);
492 };
493}
494
495impl WritebackCache {
496 pub fn new(
497 config: &ExecutionCacheConfig,
498 store: Arc<AuthorityStore>,
499 metrics: Arc<ExecutionCacheMetrics>,
500 backpressure_manager: Arc<BackpressureManager>,
501 ) -> Self {
502 let packages = MokaCache::builder(8)
503 .max_capacity(randomize_cache_capacity_in_tests(
504 config.package_cache_size(),
505 ))
506 .build();
507 Self {
508 dirty: UncommittedData::new(),
509 cached: CachedCommittedData::new(config),
510 object_by_id_cache: MonotonicCache::new(randomize_cache_capacity_in_tests(
511 config.object_by_id_cache_size(),
512 )),
513 packages,
514 object_locks: ObjectLocks::new(),
515 executed_effects_digests_notify_read: NotifyRead::new(),
516 object_notify_read: NotifyRead::new(),
517 store,
518 backpressure_manager,
519 backpressure_threshold: config.backpressure_threshold(),
520 metrics,
521 }
522 }
523
524 pub fn new_for_tests(store: Arc<AuthorityStore>) -> Self {
525 Self::new(
526 &Default::default(),
527 store,
528 ExecutionCacheMetrics::new(&prometheus::Registry::new()).into(),
529 BackpressureManager::new_for_tests(),
530 )
531 }
532
533 #[cfg(test)]
534 pub fn reset_for_test(&mut self) {
535 let mut new = Self::new(
536 &Default::default(),
537 self.store.clone(),
538 self.metrics.clone(),
539 self.backpressure_manager.clone(),
540 );
541 std::mem::swap(self, &mut new);
542 }
543
544 pub fn evict_executed_effects_from_cache_for_testing(&self, tx_digest: &TransactionDigest) {
545 self.cached.executed_effects_digests.invalidate(tx_digest);
546 self.cached.transaction_events.invalidate(tx_digest);
547 self.cached.transactions.invalidate(tx_digest);
548 }
549
550 fn write_object_entry(
551 &self,
552 object_id: &ObjectID,
553 version: SequenceNumber,
554 object: ObjectEntry,
555 ) {
556 trace!(?object_id, ?version, ?object, "inserting object entry");
557 self.metrics.record_cache_write("object");
558
559 let mut entry = self.dirty.objects.entry(*object_id).or_default();
579
580 self.object_by_id_cache
581 .insert(
582 object_id,
583 LatestObjectCacheEntry::Object(version, object.clone()),
584 Ticket::Write,
585 )
586 .ok();
589
590 entry.insert(version, object.clone());
591
592 if let ObjectEntry::Object(object) = &object {
593 if object.is_package() {
594 self.object_notify_read
595 .notify(&InputKey::Package { id: *object_id }, &());
596 } else if !object.is_child_object() {
597 self.object_notify_read.notify(
598 &InputKey::VersionedObject {
599 id: object.full_id(),
600 version: object.version(),
601 },
602 &(),
603 );
604 }
605 }
606 }
607
608 fn write_marker_value(
609 &self,
610 epoch_id: EpochId,
611 object_key: FullObjectKey,
612 marker_value: MarkerValue,
613 ) {
614 tracing::trace!("inserting marker value {object_key:?}: {marker_value:?}",);
615 self.metrics.record_cache_write("marker");
616 self.dirty
617 .markers
618 .entry((epoch_id, object_key.id()))
619 .or_default()
620 .value_mut()
621 .insert(object_key.version(), marker_value);
622 if matches!(marker_value, MarkerValue::ConsensusStreamEnded(_)) {
627 self.object_notify_read.notify(
628 &InputKey::VersionedObject {
629 id: object_key.id(),
630 version: object_key.version(),
631 },
632 &(),
633 );
634 }
635 }
636
637 fn with_locked_cache_entries<K, V, R>(
641 dirty_map: &DashMap<K, CachedVersionMap<V>>,
642 cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
643 key: &K,
644 cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
645 ) -> R
646 where
647 K: Copy + Eq + Hash + Send + Sync + 'static,
648 V: Send + Sync + 'static,
649 {
650 let dirty_entry = dirty_map.entry(*key);
651 let dirty_entry = match &dirty_entry {
652 DashMapEntry::Occupied(occupied) => Some(occupied.get()),
653 DashMapEntry::Vacant(_) => None,
654 };
655
656 let cached_entry = cached_map.get(key);
657 let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
658 let cached_entry = cached_lock.as_deref();
659
660 cb(dirty_entry, cached_entry)
661 }
662
663 fn get_object_entry_by_key_cache_only(
666 &self,
667 object_id: &ObjectID,
668 version: SequenceNumber,
669 ) -> CacheResult<ObjectEntry> {
670 Self::with_locked_cache_entries(
671 &self.dirty.objects,
672 &self.cached.object_cache,
673 object_id,
674 |dirty_entry, cached_entry| {
675 check_cache_entry_by_version!(
676 self,
677 "object_by_version",
678 "uncommitted",
679 dirty_entry,
680 version
681 );
682 check_cache_entry_by_version!(
683 self,
684 "object_by_version",
685 "committed",
686 cached_entry,
687 version
688 );
689 CacheResult::Miss
690 },
691 )
692 }
693
694 fn get_object_by_key_cache_only(
695 &self,
696 object_id: &ObjectID,
697 version: SequenceNumber,
698 ) -> CacheResult<Object> {
699 match self.get_object_entry_by_key_cache_only(object_id, version) {
700 CacheResult::Hit(entry) => match entry {
701 ObjectEntry::Object(object) => CacheResult::Hit(object),
702 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
703 },
704 CacheResult::Miss => CacheResult::Miss,
705 CacheResult::NegativeHit => CacheResult::NegativeHit,
706 }
707 }
708
709 fn get_object_entry_by_id_cache_only(
710 &self,
711 request_type: &'static str,
712 object_id: &ObjectID,
713 ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
714 self.metrics
715 .record_cache_request(request_type, "object_by_id");
716 let entry = self.object_by_id_cache.get(object_id);
717
718 if cfg!(debug_assertions)
719 && let Some(entry) = &entry
720 {
721 let highest: Option<ObjectEntry> = self
723 .dirty
724 .objects
725 .get(object_id)
726 .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
727 .or_else(|| {
728 let obj: Option<ObjectEntry> = self
729 .store
730 .get_latest_object_or_tombstone(*object_id)
731 .unwrap()
732 .map(|(_, o)| o.into());
733 obj
734 });
735
736 let cache_entry = match &*entry.lock() {
737 LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
738 LatestObjectCacheEntry::NonExistent => None,
739 };
740
741 let tombstone_possibly_pruned = highest.is_none()
743 && cache_entry
744 .as_ref()
745 .map(|e| e.is_tombstone())
746 .unwrap_or(false);
747
748 if highest != cache_entry && !tombstone_possibly_pruned {
749 tracing::error!(
750 ?highest,
751 ?cache_entry,
752 ?tombstone_possibly_pruned,
753 "object_by_id cache is incoherent for {:?}",
754 object_id
755 );
756 panic!("object_by_id cache is incoherent for {:?}", object_id);
757 }
758 }
759
760 if let Some(entry) = entry {
761 let entry = entry.lock();
762 match &*entry {
763 LatestObjectCacheEntry::Object(latest_version, latest_object) => {
764 self.metrics.record_cache_hit(request_type, "object_by_id");
765 return CacheResult::Hit((*latest_version, latest_object.clone()));
766 }
767 LatestObjectCacheEntry::NonExistent => {
768 self.metrics
769 .record_cache_negative_hit(request_type, "object_by_id");
770 return CacheResult::NegativeHit;
771 }
772 }
773 } else {
774 self.metrics.record_cache_miss(request_type, "object_by_id");
775 }
776
777 Self::with_locked_cache_entries(
778 &self.dirty.objects,
779 &self.cached.object_cache,
780 object_id,
781 |dirty_entry, cached_entry| {
782 check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
783 check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
784 CacheResult::Miss
785 },
786 )
787 }
788
789 fn get_object_by_id_cache_only(
790 &self,
791 request_type: &'static str,
792 object_id: &ObjectID,
793 ) -> CacheResult<(SequenceNumber, Object)> {
794 match self.get_object_entry_by_id_cache_only(request_type, object_id) {
795 CacheResult::Hit((version, entry)) => match entry {
796 ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
797 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
798 },
799 CacheResult::NegativeHit => CacheResult::NegativeHit,
800 CacheResult::Miss => CacheResult::Miss,
801 }
802 }
803
804 fn get_marker_value_cache_only(
805 &self,
806 object_key: FullObjectKey,
807 epoch_id: EpochId,
808 ) -> CacheResult<MarkerValue> {
809 Self::with_locked_cache_entries(
810 &self.dirty.markers,
811 &self.cached.marker_cache,
812 &(epoch_id, object_key.id()),
813 |dirty_entry, cached_entry| {
814 check_cache_entry_by_version!(
815 self,
816 "marker_by_version",
817 "uncommitted",
818 dirty_entry,
819 object_key.version()
820 );
821 check_cache_entry_by_version!(
822 self,
823 "marker_by_version",
824 "committed",
825 cached_entry,
826 object_key.version()
827 );
828 CacheResult::Miss
829 },
830 )
831 }
832
833 fn get_latest_marker_value_cache_only(
834 &self,
835 object_id: FullObjectID,
836 epoch_id: EpochId,
837 ) -> CacheResult<(SequenceNumber, MarkerValue)> {
838 Self::with_locked_cache_entries(
839 &self.dirty.markers,
840 &self.cached.marker_cache,
841 &(epoch_id, object_id),
842 |dirty_entry, cached_entry| {
843 check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
844 check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
845 CacheResult::Miss
846 },
847 )
848 }
849
850 fn get_object_impl(&self, request_type: &'static str, id: &ObjectID) -> Option<Object> {
851 let ticket = self.object_by_id_cache.get_ticket_for_read(id);
852 match self.get_object_entry_by_id_cache_only(request_type, id) {
853 CacheResult::Hit((_, entry)) => match entry {
854 ObjectEntry::Object(object) => Some(object),
855 ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
856 },
857 CacheResult::NegativeHit => None,
858 CacheResult::Miss => {
859 let obj = self
860 .store
861 .get_latest_object_or_tombstone(*id)
862 .expect("db error");
863 match obj {
864 Some((key, obj)) => {
865 self.cache_latest_object_by_id(
866 id,
867 LatestObjectCacheEntry::Object(key.1, obj.clone().into()),
868 ticket,
869 );
870 match obj {
871 ObjectOrTombstone::Object(object) => Some(object),
872 ObjectOrTombstone::Tombstone(_) => None,
873 }
874 }
875 None => {
876 self.cache_object_not_found(id, ticket);
877 None
878 }
879 }
880 }
881 }
882 }
883
884 fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
885 self.metrics.record_cache_request(request_type, "db");
886 &self.store
887 }
888
889 fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
890 self.metrics
891 .record_cache_multi_request(request_type, "db", count);
892 &self.store
893 }
894
895 #[instrument(level = "debug", skip_all)]
896 fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
897 let tx_digest = *tx_outputs.transaction.digest();
898 trace!(?tx_digest, "writing transaction outputs to cache");
899
900 assert!(
901 !self.transaction_executed_in_last_epoch(&tx_digest, epoch_id),
902 "Transaction {:?} was already executed in epoch {}",
903 tx_digest,
904 epoch_id.saturating_sub(1)
905 );
906
907 let TransactionOutputs {
908 transaction,
909 effects,
910 markers,
911 written,
912 deleted,
913 wrapped,
914 events,
915 unchanged_loaded_runtime_objects,
916 ..
917 } = &*tx_outputs;
918
919 for ObjectKey(id, version) in deleted.iter() {
924 self.write_object_entry(id, *version, ObjectEntry::Deleted);
925 }
926
927 for ObjectKey(id, version) in wrapped.iter() {
928 self.write_object_entry(id, *version, ObjectEntry::Wrapped);
929 }
930
931 for (object_key, marker_value) in markers.iter() {
933 self.write_marker_value(epoch_id, *object_key, *marker_value);
934 }
935
936 for (object_id, object) in written.iter() {
939 if object.is_child_object() {
940 self.write_object_entry(object_id, object.version(), object.clone().into());
941 }
942 }
943 for (object_id, object) in written.iter() {
944 if !object.is_child_object() {
945 self.write_object_entry(object_id, object.version(), object.clone().into());
946 if object.is_package() {
947 debug!("caching package: {:?}", object.compute_object_reference());
948 self.packages
949 .insert(*object_id, PackageObject::new(object.clone()));
950 }
951 }
952 }
953
954 let tx_digest = *transaction.digest();
955 debug!(
956 ?tx_digest,
957 "Writing transaction output objects to cache: {:?}",
958 written
959 .values()
960 .map(|o| (o.id(), o.version()))
961 .collect::<Vec<_>>(),
962 );
963 let effects_digest = effects.digest();
964
965 self.metrics.record_cache_write("transaction_block");
966 self.dirty
967 .pending_transaction_writes
968 .insert(tx_digest, tx_outputs.clone());
969
970 self.metrics.record_cache_write("transaction_effects");
973 self.dirty
974 .transaction_effects
975 .insert(effects_digest, effects.clone());
976
977 self.metrics.record_cache_write("transaction_events");
981 self.dirty
982 .transaction_events
983 .insert(tx_digest, events.clone());
984
985 self.metrics
986 .record_cache_write("unchanged_loaded_runtime_objects");
987 self.dirty
988 .unchanged_loaded_runtime_objects
989 .insert(tx_digest, unchanged_loaded_runtime_objects.clone());
990
991 self.metrics.record_cache_write("executed_effects_digests");
992 self.dirty
993 .executed_effects_digests
994 .insert(tx_digest, effects_digest);
995
996 self.executed_effects_digests_notify_read
997 .notify(&tx_digest, &effects_digest);
998
999 self.metrics
1000 .pending_notify_read
1001 .set(self.executed_effects_digests_notify_read.num_pending() as i64);
1002
1003 let prev = self
1004 .dirty
1005 .total_transaction_inserts
1006 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1007
1008 let pending_count = (prev + 1).saturating_sub(
1009 self.dirty
1010 .total_transaction_commits
1011 .load(std::sync::atomic::Ordering::Relaxed),
1012 );
1013
1014 self.set_backpressure(pending_count);
1015 }
1016
1017 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1018 let _metrics_guard = mysten_metrics::monitored_scope("WritebackCache::build_db_batch");
1019 let mut all_outputs = Vec::with_capacity(digests.len());
1020 for tx in digests {
1021 let Some(outputs) = self
1022 .dirty
1023 .pending_transaction_writes
1024 .get(tx)
1025 .map(|o| o.clone())
1026 else {
1027 warn!("Attempt to commit unknown transaction {:?}", tx);
1033 continue;
1034 };
1035 all_outputs.push(outputs);
1036 }
1037
1038 let batch = self
1039 .store
1040 .build_db_batch(epoch, &all_outputs)
1041 .expect("db error");
1042 (all_outputs, batch)
1043 }
1044
1045 #[instrument(level = "debug", skip_all)]
1047 fn commit_transaction_outputs(
1048 &self,
1049 epoch: EpochId,
1050 (all_outputs, db_batch): Batch,
1051 digests: &[TransactionDigest],
1052 ) {
1053 let _metrics_guard =
1054 mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs");
1055 fail_point!("writeback-cache-commit");
1056 trace!(?digests);
1057
1058 db_batch.write().expect("db error");
1062
1063 let _metrics_guard =
1064 mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs::flush");
1065 all_outputs.par_iter().with_min_len(16).for_each(|outputs| {
1068 let tx_digest = outputs.transaction.digest();
1069 assert!(
1070 self.dirty
1071 .pending_transaction_writes
1072 .remove(tx_digest)
1073 .is_some()
1074 );
1075 self.flush_tx_metadata_from_dirty_to_cached(*tx_digest, outputs);
1076 });
1077
1078 for outputs in all_outputs.iter() {
1083 self.flush_objects_from_dirty_to_cached(epoch, outputs);
1084 }
1085
1086 let num_outputs = all_outputs.len() as u64;
1087 let num_commits = self
1088 .dirty
1089 .total_transaction_commits
1090 .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
1091 + num_outputs;
1092
1093 let pending_count = self
1094 .dirty
1095 .total_transaction_inserts
1096 .load(std::sync::atomic::Ordering::Relaxed)
1097 .saturating_sub(num_commits);
1098
1099 self.set_backpressure(pending_count);
1100 }
1101
1102 fn approximate_pending_transaction_count(&self) -> u64 {
1103 let num_commits = self
1104 .dirty
1105 .total_transaction_commits
1106 .load(std::sync::atomic::Ordering::Relaxed);
1107
1108 self.dirty
1109 .total_transaction_inserts
1110 .load(std::sync::atomic::Ordering::Relaxed)
1111 .saturating_sub(num_commits)
1112 }
1113
1114 fn set_backpressure(&self, pending_count: u64) {
1115 let backpressure = pending_count > self.backpressure_threshold;
1116 let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1117 if backpressure_changed {
1118 self.metrics.backpressure_toggles.inc();
1119 }
1120 self.metrics
1121 .backpressure_status
1122 .set(if backpressure { 1 } else { 0 });
1123 }
1124
1125 fn flush_tx_metadata_from_dirty_to_cached(
1129 &self,
1130 tx_digest: TransactionDigest,
1131 outputs: &TransactionOutputs,
1132 ) {
1133 let TransactionOutputs {
1135 transaction,
1136 effects,
1137 events,
1138 ..
1139 } = outputs;
1140
1141 let effects_digest = effects.digest();
1142
1143 self.cached
1146 .transactions
1147 .insert(
1148 &tx_digest,
1149 PointCacheItem::Some(transaction.clone()),
1150 Ticket::Write,
1151 )
1152 .ok();
1153 self.cached
1154 .transaction_effects
1155 .insert(
1156 &effects_digest,
1157 PointCacheItem::Some(effects.clone().into()),
1158 Ticket::Write,
1159 )
1160 .ok();
1161 self.cached
1162 .executed_effects_digests
1163 .insert(
1164 &tx_digest,
1165 PointCacheItem::Some(effects_digest),
1166 Ticket::Write,
1167 )
1168 .ok();
1169 self.cached
1170 .transaction_events
1171 .insert(
1172 &tx_digest,
1173 PointCacheItem::Some(events.clone().into()),
1174 Ticket::Write,
1175 )
1176 .ok();
1177
1178 self.dirty
1179 .transaction_effects
1180 .remove(&effects_digest)
1181 .expect("effects must exist");
1182
1183 self.dirty
1184 .transaction_events
1185 .remove(&tx_digest)
1186 .expect("events must exist");
1187
1188 self.dirty
1189 .unchanged_loaded_runtime_objects
1190 .remove(&tx_digest)
1191 .expect("unchanged_loaded_runtime_objects must exist");
1192
1193 self.dirty
1194 .executed_effects_digests
1195 .remove(&tx_digest)
1196 .expect("executed effects must exist");
1197 }
1198
1199 fn flush_objects_from_dirty_to_cached(&self, epoch: EpochId, outputs: &TransactionOutputs) {
1203 let TransactionOutputs {
1204 markers,
1205 written,
1206 deleted,
1207 wrapped,
1208 ..
1209 } = outputs;
1210
1211 for (object_key, marker_value) in markers.iter() {
1212 Self::move_version_from_dirty_to_cache(
1213 &self.dirty.markers,
1214 &self.cached.marker_cache,
1215 (epoch, object_key.id()),
1216 object_key.version(),
1217 marker_value,
1218 );
1219 }
1220
1221 for (object_id, object) in written.iter() {
1222 Self::move_version_from_dirty_to_cache(
1223 &self.dirty.objects,
1224 &self.cached.object_cache,
1225 *object_id,
1226 object.version(),
1227 &ObjectEntry::Object(object.clone()),
1228 );
1229 }
1230
1231 for ObjectKey(object_id, version) in deleted.iter() {
1232 Self::move_version_from_dirty_to_cache(
1233 &self.dirty.objects,
1234 &self.cached.object_cache,
1235 *object_id,
1236 *version,
1237 &ObjectEntry::Deleted,
1238 );
1239 }
1240
1241 for ObjectKey(object_id, version) in wrapped.iter() {
1242 Self::move_version_from_dirty_to_cache(
1243 &self.dirty.objects,
1244 &self.cached.object_cache,
1245 *object_id,
1246 *version,
1247 &ObjectEntry::Wrapped,
1248 );
1249 }
1250 }
1251
1252 fn move_version_from_dirty_to_cache<K, V>(
1255 dirty: &DashMap<K, CachedVersionMap<V>>,
1256 cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1257 key: K,
1258 version: SequenceNumber,
1259 value: &V,
1260 ) where
1261 K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1262 V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1263 {
1264 static MAX_VERSIONS: usize = 3;
1265
1266 let dirty_entry = dirty.entry(key);
1269 let cache_entry = cache.entry(key).or_default();
1270 let mut cache_map = cache_entry.value().lock();
1271
1272 cache_map.insert(version, value.clone());
1274 cache_map.truncate_to(MAX_VERSIONS);
1276
1277 let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1278 panic!("dirty map must exist");
1279 };
1280
1281 let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1282
1283 assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1284
1285 if occupied_dirty_entry.get().is_empty() {
1287 occupied_dirty_entry.remove();
1288 }
1289 }
1290
1291 fn cache_latest_object_by_id(
1293 &self,
1294 object_id: &ObjectID,
1295 object: LatestObjectCacheEntry,
1296 ticket: Ticket,
1297 ) {
1298 trace!("caching object by id: {:?} {:?}", object_id, object);
1299 if self
1300 .object_by_id_cache
1301 .insert(object_id, object, ticket)
1302 .is_ok()
1303 {
1304 self.metrics.record_cache_write("object_by_id");
1305 } else {
1306 trace!("discarded cache write due to expired ticket");
1307 self.metrics.record_ticket_expiry();
1308 }
1309 }
1310
1311 fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1312 self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1313 }
1314
1315 fn clear_state_end_of_epoch_impl(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
1316 info!("clearing state at end of epoch");
1317
1318 for r in self.dirty.pending_transaction_writes.iter() {
1321 let outputs = r.value();
1322 if !outputs
1323 .transaction
1324 .transaction_data()
1325 .shared_input_objects()
1326 .is_empty()
1327 {
1328 debug_fatal!("transaction must be single writer");
1329 }
1330 info!(
1331 "clearing state for transaction {:?}",
1332 outputs.transaction.digest()
1333 );
1334 for (object_id, object) in outputs.written.iter() {
1335 if object.is_package() {
1336 info!("removing non-finalized package from cache: {:?}", object_id);
1337 self.packages.invalidate(object_id);
1338 }
1339 self.object_by_id_cache.invalidate(object_id);
1340 self.cached.object_cache.invalidate(object_id);
1341 }
1342
1343 for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1344 self.object_by_id_cache.invalidate(object_id);
1345 self.cached.object_cache.invalidate(object_id);
1346 }
1347 }
1348
1349 self.dirty.clear();
1350
1351 info!("clearing old transaction locks");
1352 self.object_locks.clear();
1353 info!("clearing object per epoch marker table");
1354 self.store
1355 .clear_object_per_epoch_marker_table(execution_guard)
1356 .expect("db error");
1357 }
1358
1359 fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) {
1360 self.store
1361 .bulk_insert_genesis_objects(objects)
1362 .expect("db error");
1363 for obj in objects {
1364 self.cached.object_cache.invalidate(&obj.id());
1365 self.object_by_id_cache.invalidate(&obj.id());
1366 }
1367 }
1368
1369 fn insert_genesis_object_impl(&self, object: Object) {
1370 self.object_by_id_cache.invalidate(&object.id());
1371 self.cached.object_cache.invalidate(&object.id());
1372 self.store.insert_genesis_object(object).expect("db error");
1373 }
1374
1375 pub fn clear_caches_and_assert_empty(&self) {
1376 info!("clearing caches");
1377 self.cached.clear_and_assert_empty();
1378 self.object_by_id_cache.invalidate_all();
1379 assert!(&self.object_by_id_cache.is_empty());
1380 self.packages.invalidate_all();
1381 assert_empty(&self.packages);
1382 }
1383}
1384
1385impl AccountFundsRead for WritebackCache {
1386 fn get_latest_account_amount(&self, account_id: &AccumulatorObjId) -> (u128, SequenceNumber) {
1387 let mut pre_root_version =
1411 ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1412 .unwrap()
1413 .version();
1414 let mut loop_iter = 0;
1415 loop {
1416 let value = self.get_account_amount_at_version(account_id, pre_root_version);
1419 let post_root_version =
1420 ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1421 .unwrap()
1422 .version();
1423 if pre_root_version == post_root_version {
1424 return (value, pre_root_version);
1425 }
1426 debug!(
1427 "Root version changed from {} to {} during MVCC read, retrying",
1428 pre_root_version, post_root_version
1429 );
1430 pre_root_version = post_root_version;
1431 loop_iter += 1;
1432 if loop_iter >= 3 {
1433 debug_fatal!("Unable to get a stable version after 3 iterations");
1434 }
1435 }
1436 }
1437
1438 fn get_account_amount_at_version(
1439 &self,
1440 account_id: &AccumulatorObjId,
1441 version: SequenceNumber,
1442 ) -> u128 {
1443 let account_obj = self.find_object_lt_or_eq_version(*account_id.inner(), version);
1444 if let Some(account_obj) = account_obj {
1445 let (_, AccumulatorValue::U128(value)) =
1446 account_obj.data.try_as_move().unwrap().try_into().unwrap();
1447 value.value
1448 } else {
1449 0
1450 }
1451 }
1452}
1453
1454impl ExecutionCacheAPI for WritebackCache {}
1455
1456impl ExecutionCacheCommit for WritebackCache {
1457 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1458 self.build_db_batch(epoch, digests)
1459 }
1460
1461 fn commit_transaction_outputs(
1462 &self,
1463 epoch: EpochId,
1464 batch: Batch,
1465 digests: &[TransactionDigest],
1466 ) {
1467 WritebackCache::commit_transaction_outputs(self, epoch, batch, digests)
1468 }
1469
1470 fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) {
1471 self.store.persist_transaction(tx).expect("db error");
1472 }
1473
1474 fn approximate_pending_transaction_count(&self) -> u64 {
1475 WritebackCache::approximate_pending_transaction_count(self)
1476 }
1477}
1478
1479impl ObjectCacheRead for WritebackCache {
1480 fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1481 self.metrics
1482 .record_cache_request("package", "package_cache");
1483 if let Some(p) = self.packages.get(package_id) {
1484 if cfg!(debug_assertions) {
1485 let canonical_package = self
1486 .dirty
1487 .objects
1488 .get(package_id)
1489 .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1490 Some(ObjectEntry::Object(object)) => Some(object),
1491 _ => None,
1492 })
1493 .or_else(|| self.store.get_object(package_id));
1494
1495 if let Some(canonical_package) = canonical_package {
1496 assert_eq!(
1497 canonical_package.digest(),
1498 p.object().digest(),
1499 "Package object cache is inconsistent for package {:?}",
1500 package_id
1501 );
1502 }
1503 }
1504 self.metrics.record_cache_hit("package", "package_cache");
1505 return Ok(Some(p));
1506 } else {
1507 self.metrics.record_cache_miss("package", "package_cache");
1508 }
1509
1510 if let Some(p) = self.get_object_impl("package", package_id) {
1514 if p.is_package() {
1515 let p = PackageObject::new(p);
1516 tracing::trace!(
1517 "caching package: {:?}",
1518 p.object().compute_object_reference()
1519 );
1520 self.metrics.record_cache_write("package");
1521 self.packages.insert(*package_id, p.clone());
1522 Ok(Some(p))
1523 } else {
1524 Err(SuiErrorKind::UserInputError {
1525 error: UserInputError::MoveObjectAsPackage {
1526 object_id: *package_id,
1527 },
1528 }
1529 .into())
1530 }
1531 } else {
1532 Ok(None)
1533 }
1534 }
1535
1536 fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1537 }
1540
1541 fn get_object(&self, id: &ObjectID) -> Option<Object> {
1544 self.get_object_impl("object_latest", id)
1545 }
1546
1547 fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object> {
1548 match self.get_object_by_key_cache_only(object_id, version) {
1549 CacheResult::Hit(object) => Some(object),
1550 CacheResult::NegativeHit => None,
1551 CacheResult::Miss => self
1552 .record_db_get("object_by_version")
1553 .get_object_by_key(object_id, version),
1554 }
1555 }
1556
1557 fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
1558 do_fallback_lookup(
1559 object_keys,
1560 |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1561 CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1562 CacheResult::NegativeHit => CacheResult::NegativeHit,
1563 CacheResult::Miss => CacheResult::Miss,
1564 },
1565 |remaining| {
1566 self.record_db_multi_get("object_by_version", remaining.len())
1567 .multi_get_objects_by_key(remaining)
1568 .expect("db error")
1569 },
1570 )
1571 }
1572
1573 fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool {
1574 match self.get_object_by_key_cache_only(object_id, version) {
1575 CacheResult::Hit(_) => true,
1576 CacheResult::NegativeHit => false,
1577 CacheResult::Miss => self
1578 .record_db_get("object_by_version")
1579 .object_exists_by_key(object_id, version)
1580 .expect("db error"),
1581 }
1582 }
1583
1584 fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool> {
1585 do_fallback_lookup(
1586 object_keys,
1587 |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1588 CacheResult::Hit(_) => CacheResult::Hit(true),
1589 CacheResult::NegativeHit => CacheResult::Hit(false),
1590 CacheResult::Miss => CacheResult::Miss,
1591 },
1592 |remaining| {
1593 self.record_db_multi_get("object_by_version", remaining.len())
1594 .multi_object_exists_by_key(remaining)
1595 .expect("db error")
1596 },
1597 )
1598 }
1599
1600 fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
1601 match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1602 CacheResult::Hit((version, entry)) => Some(match entry {
1603 ObjectEntry::Object(object) => object.compute_object_reference(),
1604 ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1605 ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1606 }),
1607 CacheResult::NegativeHit => None,
1608 CacheResult::Miss => self
1609 .record_db_get("latest_objref_or_tombstone")
1610 .get_latest_object_ref_or_tombstone(object_id)
1611 .expect("db error"),
1612 }
1613 }
1614
1615 fn get_latest_object_or_tombstone(
1616 &self,
1617 object_id: ObjectID,
1618 ) -> Option<(ObjectKey, ObjectOrTombstone)> {
1619 match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1620 CacheResult::Hit((version, entry)) => {
1621 let key = ObjectKey(object_id, version);
1622 Some(match entry {
1623 ObjectEntry::Object(object) => (key, object.into()),
1624 ObjectEntry::Deleted => (
1625 key,
1626 ObjectOrTombstone::Tombstone((
1627 object_id,
1628 version,
1629 ObjectDigest::OBJECT_DIGEST_DELETED,
1630 )),
1631 ),
1632 ObjectEntry::Wrapped => (
1633 key,
1634 ObjectOrTombstone::Tombstone((
1635 object_id,
1636 version,
1637 ObjectDigest::OBJECT_DIGEST_WRAPPED,
1638 )),
1639 ),
1640 })
1641 }
1642 CacheResult::NegativeHit => None,
1643 CacheResult::Miss => self
1644 .record_db_get("latest_object_or_tombstone")
1645 .get_latest_object_or_tombstone(object_id)
1646 .expect("db error"),
1647 }
1648 }
1649
1650 fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool> {
1651 keys.iter()
1652 .map(|key| {
1653 if key.is_cancelled() {
1654 true
1655 } else {
1656 match key {
1657 InputKey::VersionedObject { id, version } => {
1658 matches!(
1659 self.get_object_by_key_cache_only(&id.id(), *version),
1660 CacheResult::Hit(_)
1661 )
1662 }
1663 InputKey::Package { id } => self.packages.contains_key(id),
1664 }
1665 }
1666 })
1667 .collect()
1668 }
1669
1670 #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1671 fn find_object_lt_or_eq_version(
1672 &self,
1673 object_id: ObjectID,
1674 version_bound: SequenceNumber,
1675 ) -> Option<Object> {
1676 macro_rules! check_cache_entry {
1677 ($level: expr, $objects: expr) => {
1678 self.metrics
1679 .record_cache_request("object_lt_or_eq_version", $level);
1680 if let Some(objects) = $objects {
1681 if let Some((_, object)) = objects
1682 .all_versions_lt_or_eq_descending(&version_bound)
1683 .next()
1684 {
1685 if let ObjectEntry::Object(object) = object {
1686 self.metrics
1687 .record_cache_hit("object_lt_or_eq_version", $level);
1688 return Some(object.clone());
1689 } else {
1690 self.metrics
1692 .record_cache_negative_hit("object_lt_or_eq_version", $level);
1693 return None;
1694 }
1695 } else {
1696 self.metrics
1697 .record_cache_miss("object_lt_or_eq_version", $level);
1698 }
1699 }
1700 };
1701 }
1702
1703 self.metrics
1705 .record_cache_request("object_lt_or_eq_version", "object_by_id");
1706 let latest_cache_entry = self.object_by_id_cache.get(&object_id);
1707 if let Some(latest) = &latest_cache_entry {
1708 let latest = latest.lock();
1709 match &*latest {
1710 LatestObjectCacheEntry::Object(latest_version, object) => {
1711 if *latest_version <= version_bound {
1712 if let ObjectEntry::Object(object) = object {
1713 self.metrics
1714 .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1715 return Some(object.clone());
1716 } else {
1717 self.metrics.record_cache_negative_hit(
1719 "object_lt_or_eq_version",
1720 "object_by_id",
1721 );
1722 return None;
1723 }
1724 }
1725 }
1727 LatestObjectCacheEntry::NonExistent => {
1729 self.metrics
1730 .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1731 return None;
1732 }
1733 }
1734 }
1735 self.metrics
1736 .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1737
1738 Self::with_locked_cache_entries(
1739 &self.dirty.objects,
1740 &self.cached.object_cache,
1741 &object_id,
1742 |dirty_entry, cached_entry| {
1743 check_cache_entry!("committed", dirty_entry);
1744 check_cache_entry!("uncommitted", cached_entry);
1745
1746 let latest: Option<(SequenceNumber, ObjectEntry)> =
1767 if let Some(dirty_set) = dirty_entry {
1768 dirty_set
1769 .get_highest()
1770 .cloned()
1771 .tap_none(|| panic!("dirty set cannot be empty"))
1772 } else {
1773 self.record_db_get("object_lt_or_eq_version_latest")
1775 .get_latest_object_or_tombstone(object_id)
1776 .expect("db error")
1777 .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1778 (version, ObjectEntry::from(obj_or_tombstone))
1779 })
1780 };
1781
1782 if let Some((obj_version, obj_entry)) = latest {
1783 self.cache_latest_object_by_id(
1791 &object_id,
1792 LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1793 self.object_by_id_cache.get_ticket_for_read(&object_id),
1796 );
1797
1798 if obj_version <= version_bound {
1799 match obj_entry {
1800 ObjectEntry::Object(object) => Some(object),
1801 ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
1802 }
1803 } else {
1804 self.record_db_get("object_lt_or_eq_version_scan")
1808 .find_object_lt_or_eq_version(object_id, version_bound)
1809 .expect("db error")
1810 }
1811
1812 } else if let Some(latest_cache_entry) = latest_cache_entry {
1818 assert!(!latest_cache_entry.lock().is_alive());
1820 None
1821 } else {
1822 let highest = cached_entry.and_then(|c| c.get_highest());
1824 assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1825 self.cache_object_not_found(
1826 &object_id,
1827 self.object_by_id_cache.get_ticket_for_read(&object_id),
1829 );
1830 None
1831 }
1832 },
1833 )
1834 }
1835
1836 fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1837 get_sui_system_state(self)
1838 }
1839
1840 fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge> {
1841 get_bridge(self)
1842 }
1843
1844 fn get_marker_value(
1845 &self,
1846 object_key: FullObjectKey,
1847 epoch_id: EpochId,
1848 ) -> Option<MarkerValue> {
1849 match self.get_marker_value_cache_only(object_key, epoch_id) {
1850 CacheResult::Hit(marker) => Some(marker),
1851 CacheResult::NegativeHit => None,
1852 CacheResult::Miss => self
1853 .record_db_get("marker_by_version")
1854 .get_marker_value(object_key, epoch_id)
1855 .expect("db error"),
1856 }
1857 }
1858
1859 fn get_latest_marker(
1860 &self,
1861 object_id: FullObjectID,
1862 epoch_id: EpochId,
1863 ) -> Option<(SequenceNumber, MarkerValue)> {
1864 match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1865 CacheResult::Hit((v, marker)) => Some((v, marker)),
1866 CacheResult::NegativeHit => {
1867 panic!("cannot have negative hit when getting latest marker")
1868 }
1869 CacheResult::Miss => self
1870 .record_db_get("marker_latest")
1871 .get_latest_marker(object_id, epoch_id)
1872 .expect("db error"),
1873 }
1874 }
1875
1876 fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult {
1877 let cur_epoch = epoch_store.epoch();
1878 match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1879 CacheResult::Hit((_, obj)) => {
1880 let actual_objref = obj.compute_object_reference();
1881 if obj_ref != actual_objref {
1882 Ok(ObjectLockStatus::LockedAtDifferentVersion {
1883 locked_ref: actual_objref,
1884 })
1885 } else {
1886 Ok(
1888 match self
1889 .object_locks
1890 .get_transaction_lock(&obj_ref, epoch_store)?
1891 {
1892 Some(tx_digest) => ObjectLockStatus::LockedToTx {
1893 locked_by_tx: LockDetailsDeprecated {
1894 epoch: cur_epoch,
1895 tx_digest,
1896 },
1897 },
1898 None => ObjectLockStatus::Initialized,
1899 },
1900 )
1901 }
1902 }
1903 CacheResult::NegativeHit => {
1904 Err(SuiError::from(UserInputError::ObjectNotFound {
1905 object_id: obj_ref.0,
1906 version: None,
1909 }))
1910 }
1911 CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1912 }
1913 }
1914
1915 fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef> {
1916 let obj = self.get_object_impl("live_objref", &object_id).ok_or(
1917 UserInputError::ObjectNotFound {
1918 object_id,
1919 version: None,
1920 },
1921 )?;
1922 Ok(obj.compute_object_reference())
1923 }
1924
1925 fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1926 do_fallback_lookup_fallible(
1927 owned_object_refs,
1928 |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1929 CacheResult::Hit((version, obj)) => {
1930 if obj.compute_object_reference() != *obj_ref {
1931 Err(UserInputError::ObjectVersionUnavailableForConsumption {
1932 provided_obj_ref: *obj_ref,
1933 current_version: version,
1934 }
1935 .into())
1936 } else {
1937 Ok(CacheResult::Hit(()))
1938 }
1939 }
1940 CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1941 object_id: obj_ref.0,
1942 version: None,
1943 }
1944 .into()),
1945 CacheResult::Miss => Ok(CacheResult::Miss),
1946 },
1947 |remaining| {
1948 self.record_db_multi_get("object_is_live", remaining.len())
1949 .check_owned_objects_are_live(remaining)?;
1950 Ok(vec![(); remaining.len()])
1951 },
1952 )?;
1953 Ok(())
1954 }
1955
1956 fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber> {
1957 self.store
1958 .perpetual_tables
1959 .get_highest_pruned_checkpoint()
1960 .expect("db error")
1961 }
1962
1963 fn notify_read_input_objects<'a>(
1964 &'a self,
1965 input_and_receiving_keys: &'a [InputKey],
1966 receiving_keys: &'a HashSet<InputKey>,
1967 epoch: EpochId,
1968 ) -> BoxFuture<'a, ()> {
1969 self.object_notify_read
1970 .read(
1971 "notify_read_input_objects",
1972 input_and_receiving_keys,
1973 move |keys| {
1974 self.multi_input_objects_available(keys, receiving_keys, epoch)
1975 .into_iter()
1976 .map(|available| if available { Some(()) } else { None })
1977 .collect::<Vec<_>>()
1978 },
1979 )
1980 .map(|_| ())
1981 .boxed()
1982 }
1983}
1984
1985impl TransactionCacheRead for WritebackCache {
1986 fn multi_get_transaction_blocks(
1987 &self,
1988 digests: &[TransactionDigest],
1989 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
1990 let digests_and_tickets: Vec<_> = digests
1991 .iter()
1992 .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1993 .collect();
1994 do_fallback_lookup(
1995 &digests_and_tickets,
1996 |(digest, _)| {
1997 self.metrics
1998 .record_cache_request("transaction_block", "uncommitted");
1999 if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
2000 self.metrics
2001 .record_cache_hit("transaction_block", "uncommitted");
2002 return CacheResult::Hit(Some(tx.transaction.clone()));
2003 }
2004 self.metrics
2005 .record_cache_miss("transaction_block", "uncommitted");
2006
2007 self.metrics
2008 .record_cache_request("transaction_block", "committed");
2009
2010 match self
2011 .cached
2012 .transactions
2013 .get(digest)
2014 .map(|l| l.lock().clone())
2015 {
2016 Some(PointCacheItem::Some(tx)) => {
2017 self.metrics
2018 .record_cache_hit("transaction_block", "committed");
2019 CacheResult::Hit(Some(tx))
2020 }
2021 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2022 None => {
2023 self.metrics
2024 .record_cache_miss("transaction_block", "committed");
2025
2026 CacheResult::Miss
2027 }
2028 }
2029 },
2030 |remaining| {
2031 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2032 let results: Vec<_> = self
2033 .record_db_multi_get("transaction_block", remaining.len())
2034 .multi_get_transaction_blocks(&remaining_digests)
2035 .expect("db error")
2036 .into_iter()
2037 .map(|o| o.map(Arc::new))
2038 .collect();
2039 for ((digest, ticket), result) in remaining.iter().zip_debug_eq(results.iter()) {
2040 if result.is_none() {
2041 self.cached.transactions.insert(digest, None, *ticket).ok();
2042 }
2043 }
2044 results
2045 },
2046 )
2047 }
2048
2049 fn multi_get_executed_effects_digests(
2050 &self,
2051 digests: &[TransactionDigest],
2052 ) -> Vec<Option<TransactionEffectsDigest>> {
2053 let digests_and_tickets: Vec<_> = digests
2054 .iter()
2055 .map(|d| {
2056 (
2057 *d,
2058 self.cached.executed_effects_digests.get_ticket_for_read(d),
2059 )
2060 })
2061 .collect();
2062 do_fallback_lookup(
2063 &digests_and_tickets,
2064 |(digest, _)| {
2065 self.metrics
2066 .record_cache_request("executed_effects_digests", "uncommitted");
2067 if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
2068 self.metrics
2069 .record_cache_hit("executed_effects_digests", "uncommitted");
2070 return CacheResult::Hit(Some(*digest));
2071 }
2072 self.metrics
2073 .record_cache_miss("executed_effects_digests", "uncommitted");
2074
2075 self.metrics
2076 .record_cache_request("executed_effects_digests", "committed");
2077 match self
2078 .cached
2079 .executed_effects_digests
2080 .get(digest)
2081 .map(|l| *l.lock())
2082 {
2083 Some(PointCacheItem::Some(digest)) => {
2084 self.metrics
2085 .record_cache_hit("executed_effects_digests", "committed");
2086 CacheResult::Hit(Some(digest))
2087 }
2088 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2089 None => {
2090 self.metrics
2091 .record_cache_miss("executed_effects_digests", "committed");
2092 CacheResult::Miss
2093 }
2094 }
2095 },
2096 |remaining| {
2097 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2098 let results = self
2099 .record_db_multi_get("executed_effects_digests", remaining.len())
2100 .multi_get_executed_effects_digests(&remaining_digests)
2101 .expect("db error");
2102 for ((digest, ticket), result) in remaining.iter().zip_debug_eq(results.iter()) {
2103 if result.is_none() {
2104 self.cached
2105 .executed_effects_digests
2106 .insert(digest, None, *ticket)
2107 .ok();
2108 }
2109 }
2110 results
2111 },
2112 )
2113 }
2114
2115 fn multi_get_effects(
2116 &self,
2117 digests: &[TransactionEffectsDigest],
2118 ) -> Vec<Option<TransactionEffects>> {
2119 let digests_and_tickets: Vec<_> = digests
2120 .iter()
2121 .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
2122 .collect();
2123 do_fallback_lookup(
2124 &digests_and_tickets,
2125 |(digest, _)| {
2126 self.metrics
2127 .record_cache_request("transaction_effects", "uncommitted");
2128 if let Some(effects) = self.dirty.transaction_effects.get(digest) {
2129 self.metrics
2130 .record_cache_hit("transaction_effects", "uncommitted");
2131 return CacheResult::Hit(Some(effects.clone()));
2132 }
2133 self.metrics
2134 .record_cache_miss("transaction_effects", "uncommitted");
2135
2136 self.metrics
2137 .record_cache_request("transaction_effects", "committed");
2138 match self
2139 .cached
2140 .transaction_effects
2141 .get(digest)
2142 .map(|l| l.lock().clone())
2143 {
2144 Some(PointCacheItem::Some(effects)) => {
2145 self.metrics
2146 .record_cache_hit("transaction_effects", "committed");
2147 CacheResult::Hit(Some((*effects).clone()))
2148 }
2149 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2150 None => {
2151 self.metrics
2152 .record_cache_miss("transaction_effects", "committed");
2153 CacheResult::Miss
2154 }
2155 }
2156 },
2157 |remaining| {
2158 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2159 let results = self
2160 .record_db_multi_get("transaction_effects", remaining.len())
2161 .multi_get_effects(remaining_digests.iter())
2162 .expect("db error");
2163 for ((digest, ticket), result) in remaining.iter().zip_debug_eq(results.iter()) {
2164 if result.is_none() {
2165 self.cached
2166 .transaction_effects
2167 .insert(digest, None, *ticket)
2168 .ok();
2169 }
2170 }
2171 results
2172 },
2173 )
2174 }
2175
2176 fn transaction_executed_in_last_epoch(
2177 &self,
2178 digest: &TransactionDigest,
2179 current_epoch: EpochId,
2180 ) -> bool {
2181 if current_epoch == 0 {
2182 return false;
2183 }
2184 let last_epoch = current_epoch - 1;
2185 let cache_key = (last_epoch, *digest);
2186
2187 let ticket = self
2188 .cached
2189 .transaction_executed_in_last_epoch
2190 .get_ticket_for_read(&cache_key);
2191
2192 if let Some(cached) = self
2193 .cached
2194 .transaction_executed_in_last_epoch
2195 .get(&cache_key)
2196 {
2197 return cached.lock().is_some();
2198 }
2199
2200 let was_executed = self
2201 .store
2202 .perpetual_tables
2203 .was_transaction_executed_in_last_epoch(digest, current_epoch);
2204
2205 let value = if was_executed { Some(()) } else { None };
2206 self.cached
2207 .transaction_executed_in_last_epoch
2208 .insert(&cache_key, value, ticket)
2209 .ok();
2210
2211 was_executed
2212 }
2213
2214 fn notify_read_executed_effects_digests<'a>(
2215 &'a self,
2216 task_name: &'static str,
2217 digests: &'a [TransactionDigest],
2218 ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>> {
2219 self.executed_effects_digests_notify_read
2220 .read(task_name, digests, |digests| {
2221 self.multi_get_executed_effects_digests(digests)
2222 })
2223 .boxed()
2224 }
2225
2226 fn multi_get_events(
2227 &self,
2228 event_digests: &[TransactionDigest],
2229 ) -> Vec<Option<TransactionEvents>> {
2230 fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
2231 if events.data.is_empty() {
2232 None
2233 } else {
2234 Some(events)
2235 }
2236 }
2237
2238 let digests_and_tickets: Vec<_> = event_digests
2239 .iter()
2240 .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2241 .collect();
2242 do_fallback_lookup(
2243 &digests_and_tickets,
2244 |(digest, _)| {
2245 self.metrics
2246 .record_cache_request("transaction_events", "uncommitted");
2247 if let Some(events) = self.dirty.transaction_events.get(digest).map(|e| e.clone()) {
2248 self.metrics
2249 .record_cache_hit("transaction_events", "uncommitted");
2250
2251 return CacheResult::Hit(map_events(events));
2252 }
2253 self.metrics
2254 .record_cache_miss("transaction_events", "uncommitted");
2255
2256 self.metrics
2257 .record_cache_request("transaction_events", "committed");
2258 match self
2259 .cached
2260 .transaction_events
2261 .get(digest)
2262 .map(|l| l.lock().clone())
2263 {
2264 Some(PointCacheItem::Some(events)) => {
2265 self.metrics
2266 .record_cache_hit("transaction_events", "committed");
2267 CacheResult::Hit(map_events((*events).clone()))
2268 }
2269 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2270 None => {
2271 self.metrics
2272 .record_cache_miss("transaction_events", "committed");
2273
2274 CacheResult::Miss
2275 }
2276 }
2277 },
2278 |remaining| {
2279 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2280 let results = self
2281 .store
2282 .multi_get_events(&remaining_digests)
2283 .expect("db error");
2284 for ((digest, ticket), result) in remaining.iter().zip_debug_eq(results.iter()) {
2285 if result.is_none() {
2286 self.cached
2287 .transaction_events
2288 .insert(digest, None, *ticket)
2289 .ok();
2290 }
2291 }
2292 results
2293 },
2294 )
2295 }
2296
2297 fn get_unchanged_loaded_runtime_objects(
2298 &self,
2299 digest: &TransactionDigest,
2300 ) -> Option<Vec<ObjectKey>> {
2301 self.dirty
2302 .unchanged_loaded_runtime_objects
2303 .get(digest)
2304 .map(|b| b.clone())
2305 .or_else(|| {
2306 self.store
2307 .get_unchanged_loaded_runtime_objects(digest)
2308 .expect("db error")
2309 })
2310 }
2311
2312 fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
2313 self.dirty
2314 .pending_transaction_writes
2315 .get(digest)
2316 .map(|transaction_output| transaction_output.take_accumulator_events())
2317 }
2318}
2319
2320impl ExecutionCacheWrite for WritebackCache {
2321 fn validate_owned_object_versions(&self, owned_input_objects: &[ObjectRef]) -> SuiResult {
2322 ObjectLocks::validate_owned_object_versions(self, owned_input_objects)
2323 }
2324
2325 fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
2326 WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs);
2327 }
2328
2329 #[cfg(test)]
2330 fn write_object_entry_for_test(&self, object: Object) {
2331 self.write_object_entry(&object.id(), object.version(), object.into());
2332 }
2333}
2334
2335implement_passthrough_traits!(WritebackCache);
2336
2337impl GlobalStateHashStore for WritebackCache {
2338 fn get_object_ref_prior_to_key_deprecated(
2339 &self,
2340 object_id: &ObjectID,
2341 version: SequenceNumber,
2342 ) -> SuiResult<Option<ObjectRef>> {
2343 let mut candidates = Vec::new();
2347
2348 let check_versions =
2349 |versions: &CachedVersionMap<ObjectEntry>| match versions.get_prior_to(&version) {
2350 Some((version, object_entry)) => match object_entry {
2351 ObjectEntry::Object(object) => {
2352 assert_eq!(object.version(), version);
2353 Some(object.compute_object_reference())
2354 }
2355 ObjectEntry::Deleted => {
2356 Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED))
2357 }
2358 ObjectEntry::Wrapped => {
2359 Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED))
2360 }
2361 },
2362 None => None,
2363 };
2364
2365 if let Some(objects) = self.dirty.objects.get(object_id)
2367 && let Some(prior) = check_versions(&objects)
2368 {
2369 candidates.push(prior);
2370 }
2371
2372 if let Some(objects) = self.cached.object_cache.get(object_id)
2373 && let Some(prior) = check_versions(&objects.lock())
2374 {
2375 candidates.push(prior);
2376 }
2377
2378 if let Some(prior) = self
2379 .store
2380 .get_object_ref_prior_to_key_deprecated(object_id, version)?
2381 {
2382 candidates.push(prior);
2383 }
2384
2385 candidates.sort_by_key(|(_, version, _)| *version);
2387 Ok(candidates.pop())
2388 }
2389
2390 fn get_root_state_hash_for_epoch(
2391 &self,
2392 epoch: EpochId,
2393 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
2394 self.store.get_root_state_hash_for_epoch(epoch)
2395 }
2396
2397 fn get_root_state_hash_for_highest_epoch(
2398 &self,
2399 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
2400 self.store.get_root_state_hash_for_highest_epoch()
2401 }
2402
2403 fn insert_state_hash_for_epoch(
2404 &self,
2405 epoch: EpochId,
2406 checkpoint_seq_num: &CheckpointSequenceNumber,
2407 acc: &GlobalStateHash,
2408 ) -> SuiResult {
2409 self.store
2410 .insert_state_hash_for_epoch(epoch, checkpoint_seq_num, acc)
2411 }
2412
2413 fn iter_live_object_set(
2414 &self,
2415 include_wrapped_tombstone: bool,
2416 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2417 assert!(
2421 self.dirty.is_empty(),
2422 "cannot iterate live object set with dirty data"
2423 );
2424 self.store.iter_live_object_set(include_wrapped_tombstone)
2425 }
2426
2427 fn iter_cached_live_object_set_for_testing(
2431 &self,
2432 include_wrapped_tombstone: bool,
2433 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2434 let iter = self.dirty.objects.iter();
2436 let mut dirty_objects = BTreeMap::new();
2437
2438 for obj in self.store.iter_live_object_set(include_wrapped_tombstone) {
2440 dirty_objects.insert(obj.object_id(), obj);
2441 }
2442
2443 for entry in iter {
2445 let id = *entry.key();
2446 let value = entry.value();
2447 match value.get_highest().unwrap() {
2448 (_, ObjectEntry::Object(object)) => {
2449 dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2450 }
2451 (version, ObjectEntry::Wrapped) => {
2452 if include_wrapped_tombstone {
2453 dirty_objects.insert(id, LiveObject::Wrapped(ObjectKey(id, *version)));
2454 } else {
2455 dirty_objects.remove(&id);
2456 }
2457 }
2458 (_, ObjectEntry::Deleted) => {
2459 dirty_objects.remove(&id);
2460 }
2461 }
2462 }
2463
2464 Box::new(dirty_objects.into_values())
2465 }
2466}
2467
2468impl StateSyncAPI for WritebackCache {
2474 fn insert_transaction_and_effects(
2475 &self,
2476 transaction: &VerifiedTransaction,
2477 transaction_effects: &TransactionEffects,
2478 ) {
2479 self.store
2480 .insert_transaction_and_effects(transaction, transaction_effects)
2481 .expect("db error");
2482 self.cached
2483 .transactions
2484 .insert(
2485 transaction.digest(),
2486 PointCacheItem::Some(Arc::new(transaction.clone())),
2487 Ticket::Write,
2488 )
2489 .ok();
2490 self.cached
2491 .transaction_effects
2492 .insert(
2493 &transaction_effects.digest(),
2494 PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2495 Ticket::Write,
2496 )
2497 .ok();
2498 }
2499
2500 fn multi_insert_transaction_and_effects(
2501 &self,
2502 transactions_and_effects: &[VerifiedExecutionData],
2503 ) {
2504 self.store
2505 .multi_insert_transaction_and_effects(transactions_and_effects.iter())
2506 .expect("db error");
2507 for VerifiedExecutionData {
2508 transaction,
2509 effects,
2510 } in transactions_and_effects
2511 {
2512 self.cached
2513 .transactions
2514 .insert(
2515 transaction.digest(),
2516 PointCacheItem::Some(Arc::new(transaction.clone())),
2517 Ticket::Write,
2518 )
2519 .ok();
2520 self.cached
2521 .transaction_effects
2522 .insert(
2523 &effects.digest(),
2524 PointCacheItem::Some(Arc::new(effects.clone())),
2525 Ticket::Write,
2526 )
2527 .ok();
2528 }
2529 }
2530}