1use crate::accumulators::funds_read::AccountFundsRead;
41use crate::authority::AuthorityStore;
42use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
43use crate::authority::authority_store::{
44 ExecutionLockWriteGuard, LockDetailsDeprecated, ObjectLockStatus, SuiLockResult,
45};
46use crate::authority::authority_store_tables::LiveObject;
47use crate::authority::backpressure::BackpressureManager;
48use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
49use crate::fallback_fetch::{do_fallback_lookup, do_fallback_lookup_fallible};
50use crate::global_state_hasher::GlobalStateHashStore;
51use crate::transaction_outputs::TransactionOutputs;
52
53use dashmap::DashMap;
54use dashmap::mapref::entry::Entry as DashMapEntry;
55use futures::{FutureExt, future::BoxFuture};
56use moka::sync::SegmentedCache as MokaCache;
57use mysten_common::ZipDebugEqIteratorExt;
58use mysten_common::debug_fatal;
59use mysten_common::random_util::randomize_cache_capacity_in_tests;
60use mysten_common::sync::notify_read::NotifyRead;
61use parking_lot::Mutex;
62use rayon::prelude::*;
63use std::collections::{BTreeMap, HashSet};
64use std::hash::Hash;
65use std::sync::Arc;
66use std::sync::atomic::AtomicU64;
67use sui_config::ExecutionCacheConfig;
68use sui_macros::fail_point;
69use sui_protocol_config::ProtocolVersion;
70use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
71use sui_types::accumulator_event::AccumulatorEvent;
72use sui_types::accumulator_root::{AccumulatorObjId, AccumulatorValue};
73use sui_types::base_types::{
74 EpochId, FullObjectID, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData,
75};
76use sui_types::bridge::{Bridge, get_bridge};
77use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest};
78use sui_types::effects::{TransactionEffects, TransactionEvents};
79use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
80use sui_types::executable_transaction::VerifiedExecutableTransaction;
81use sui_types::global_state_hash::GlobalStateHash;
82use sui_types::message_envelope::Message;
83use sui_types::messages_checkpoint::CheckpointSequenceNumber;
84use sui_types::object::Object;
85use sui_types::storage::{
86 FullObjectKey, InputKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject,
87};
88use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
89use sui_types::transaction::{TransactionDataAPI, VerifiedTransaction};
90use tap::TapOptional;
91use tracing::{debug, info, instrument, trace, warn};
92
93use super::ExecutionCacheAPI;
94use super::cache_types::Ticket;
95use super::{
96 Batch, CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
97 ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
98 cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache},
99 implement_passthrough_traits,
100 object_locks::ObjectLocks,
101};
102
103#[cfg(test)]
104#[path = "unit_tests/writeback_cache_tests.rs"]
105pub mod writeback_cache_tests;
106
107#[cfg(test)]
108#[path = "unit_tests/notify_read_input_objects_tests.rs"]
109mod notify_read_input_objects_tests;
110
111#[derive(Clone, PartialEq, Eq)]
112enum ObjectEntry {
113 Object(Object),
114 Deleted,
115 Wrapped,
116}
117
118impl ObjectEntry {
119 #[cfg(test)]
120 fn unwrap_object(&self) -> &Object {
121 match self {
122 ObjectEntry::Object(o) => o,
123 _ => panic!("unwrap_object called on non-Object"),
124 }
125 }
126
127 fn is_tombstone(&self) -> bool {
128 match self {
129 ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
130 ObjectEntry::Object(_) => false,
131 }
132 }
133}
134
135impl std::fmt::Debug for ObjectEntry {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 match self {
138 ObjectEntry::Object(o) => {
139 write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
140 }
141 ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
142 ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
143 }
144 }
145}
146
147impl From<Object> for ObjectEntry {
148 fn from(object: Object) -> Self {
149 ObjectEntry::Object(object)
150 }
151}
152
153impl From<ObjectOrTombstone> for ObjectEntry {
154 fn from(object: ObjectOrTombstone) -> Self {
155 match object {
156 ObjectOrTombstone::Object(o) => o.into(),
157 ObjectOrTombstone::Tombstone(obj_ref) => {
158 if obj_ref.2.is_deleted() {
159 ObjectEntry::Deleted
160 } else if obj_ref.2.is_wrapped() {
161 ObjectEntry::Wrapped
162 } else {
163 panic!("tombstone digest must either be deleted or wrapped");
164 }
165 }
166 }
167 }
168}
169
170#[derive(Debug, Clone, PartialEq, Eq)]
171enum LatestObjectCacheEntry {
172 Object(SequenceNumber, ObjectEntry),
173 NonExistent,
174}
175
176impl LatestObjectCacheEntry {
177 #[cfg(test)]
178 fn version(&self) -> Option<SequenceNumber> {
179 match self {
180 LatestObjectCacheEntry::Object(version, _) => Some(*version),
181 LatestObjectCacheEntry::NonExistent => None,
182 }
183 }
184
185 fn is_alive(&self) -> bool {
186 match self {
187 LatestObjectCacheEntry::Object(_, entry) => !entry.is_tombstone(),
188 LatestObjectCacheEntry::NonExistent => false,
189 }
190 }
191}
192
193impl IsNewer for LatestObjectCacheEntry {
194 fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
195 match (self, other) {
196 (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
197 v1 > v2
198 }
199 (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
200 _ => false,
201 }
202 }
203}
204
205type MarkerKey = (EpochId, FullObjectID);
206
207struct UncommittedData {
210 objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
223
224 markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
229
230 transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
231
232 transaction_events: DashMap<TransactionDigest, TransactionEvents>,
233
234 unchanged_loaded_runtime_objects: DashMap<TransactionDigest, Vec<ObjectKey>>,
235
236 executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
237
238 pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
241
242 total_transaction_inserts: AtomicU64,
243 total_transaction_commits: AtomicU64,
244}
245
246impl UncommittedData {
247 fn new() -> Self {
248 Self {
249 objects: DashMap::with_shard_amount(2048),
250 markers: DashMap::with_shard_amount(2048),
251 transaction_effects: DashMap::with_shard_amount(2048),
252 executed_effects_digests: DashMap::with_shard_amount(2048),
253 pending_transaction_writes: DashMap::with_shard_amount(2048),
254 transaction_events: DashMap::with_shard_amount(2048),
255 unchanged_loaded_runtime_objects: DashMap::with_shard_amount(2048),
256 total_transaction_inserts: AtomicU64::new(0),
257 total_transaction_commits: AtomicU64::new(0),
258 }
259 }
260
261 fn clear(&self) {
262 self.objects.clear();
263 self.markers.clear();
264 self.transaction_effects.clear();
265 self.executed_effects_digests.clear();
266 self.pending_transaction_writes.clear();
267 self.transaction_events.clear();
268 self.unchanged_loaded_runtime_objects.clear();
269 self.total_transaction_inserts
270 .store(0, std::sync::atomic::Ordering::Relaxed);
271 self.total_transaction_commits
272 .store(0, std::sync::atomic::Ordering::Relaxed);
273 }
274
275 fn is_empty(&self) -> bool {
276 let empty = self.pending_transaction_writes.is_empty();
277 if empty && cfg!(debug_assertions) {
278 assert!(
279 self.objects.is_empty()
280 && self.markers.is_empty()
281 && self.transaction_effects.is_empty()
282 && self.executed_effects_digests.is_empty()
283 && self.transaction_events.is_empty()
284 && self.unchanged_loaded_runtime_objects.is_empty()
285 && self
286 .total_transaction_inserts
287 .load(std::sync::atomic::Ordering::Relaxed)
288 == self
289 .total_transaction_commits
290 .load(std::sync::atomic::Ordering::Relaxed),
291 );
292 }
293 empty
294 }
295}
296
297type PointCacheItem<T> = Option<T>;
299
300impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
303 fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
304 match (self, other) {
305 (Some(_), None) => true,
306
307 (Some(a), Some(b)) => {
308 debug_assert_eq!(a, b);
310 false
311 }
312
313 _ => false,
314 }
315 }
316}
317
318struct CachedCommittedData {
320 object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
322
323 marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
325
326 transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
327
328 transaction_effects:
329 MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
330
331 transaction_events: MonotonicCache<TransactionDigest, PointCacheItem<Arc<TransactionEvents>>>,
332
333 executed_effects_digests:
334 MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
335
336 transaction_executed_in_last_epoch:
337 MonotonicCache<(EpochId, TransactionDigest), PointCacheItem<()>>,
338
339 _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
342}
343
344impl CachedCommittedData {
345 fn new(config: &ExecutionCacheConfig) -> Self {
346 let object_cache = MokaCache::builder(8)
347 .max_capacity(randomize_cache_capacity_in_tests(
348 config.object_cache_size(),
349 ))
350 .build();
351 let marker_cache = MokaCache::builder(8)
352 .max_capacity(randomize_cache_capacity_in_tests(
353 config.marker_cache_size(),
354 ))
355 .build();
356
357 let transactions = MonotonicCache::new(randomize_cache_capacity_in_tests(
358 config.transaction_cache_size(),
359 ));
360 let transaction_effects = MonotonicCache::new(randomize_cache_capacity_in_tests(
361 config.effect_cache_size(),
362 ));
363 let transaction_events = MonotonicCache::new(randomize_cache_capacity_in_tests(
364 config.events_cache_size(),
365 ));
366 let executed_effects_digests = MonotonicCache::new(randomize_cache_capacity_in_tests(
367 config.executed_effect_cache_size(),
368 ));
369
370 let transaction_objects = MokaCache::builder(8)
371 .max_capacity(randomize_cache_capacity_in_tests(
372 config.transaction_objects_cache_size(),
373 ))
374 .build();
375
376 let transaction_executed_in_last_epoch = MonotonicCache::new(
377 randomize_cache_capacity_in_tests(config.executed_effect_cache_size()),
378 );
379
380 Self {
381 object_cache,
382 marker_cache,
383 transactions,
384 transaction_effects,
385 transaction_events,
386 executed_effects_digests,
387 transaction_executed_in_last_epoch,
388 _transaction_objects: transaction_objects,
389 }
390 }
391
392 fn clear_and_assert_empty(&self) {
393 self.object_cache.invalidate_all();
394 self.marker_cache.invalidate_all();
395 self.transactions.invalidate_all();
396 self.transaction_effects.invalidate_all();
397 self.transaction_events.invalidate_all();
398 self.executed_effects_digests.invalidate_all();
399 self.transaction_executed_in_last_epoch.invalidate_all();
400 self._transaction_objects.invalidate_all();
401
402 assert_empty(&self.object_cache);
403 assert_empty(&self.marker_cache);
404 assert!(self.transactions.is_empty());
405 assert!(self.transaction_effects.is_empty());
406 assert!(self.transaction_events.is_empty());
407 assert!(self.executed_effects_digests.is_empty());
408 assert!(self.transaction_executed_in_last_epoch.is_empty());
409 assert_empty(&self._transaction_objects);
410 }
411}
412
413fn assert_empty<K, V>(cache: &MokaCache<K, V>)
414where
415 K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
416 V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
417{
418 if cache.iter().next().is_some() {
419 panic!("cache should be empty");
420 }
421}
422
423pub struct WritebackCache {
424 dirty: UncommittedData,
425 cached: CachedCommittedData,
426
427 object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
434
435 packages: MokaCache<ObjectID, PackageObject>,
446
447 object_locks: ObjectLocks,
448
449 executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
450 object_notify_read: NotifyRead<InputKey, ()>,
451
452 store: Arc<AuthorityStore>,
453 backpressure_threshold: u64,
454 backpressure_manager: Arc<BackpressureManager>,
455 metrics: Arc<ExecutionCacheMetrics>,
456}
457
458macro_rules! check_cache_entry_by_version {
459 ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
460 $self.metrics.record_cache_request($table, $level);
461 if let Some(cache) = $cache {
462 if let Some(entry) = cache.get(&$version) {
463 $self.metrics.record_cache_hit($table, $level);
464 return CacheResult::Hit(entry.clone());
465 }
466
467 if let Some(least_version) = cache.get_least() {
468 if least_version.0 < $version {
469 $self.metrics.record_cache_negative_hit($table, $level);
472 return CacheResult::NegativeHit;
473 }
474 }
475 }
476 $self.metrics.record_cache_miss($table, $level);
477 };
478}
479
480macro_rules! check_cache_entry_by_latest {
481 ($self: ident, $table: expr, $level: expr, $cache: expr) => {
482 $self.metrics.record_cache_request($table, $level);
483 if let Some(cache) = $cache {
484 if let Some((version, entry)) = cache.get_highest() {
485 $self.metrics.record_cache_hit($table, $level);
486 return CacheResult::Hit((*version, entry.clone()));
487 } else {
488 panic!("empty CachedVersionMap should have been removed");
489 }
490 }
491 $self.metrics.record_cache_miss($table, $level);
492 };
493}
494
495impl WritebackCache {
496 pub fn new(
497 config: &ExecutionCacheConfig,
498 store: Arc<AuthorityStore>,
499 metrics: Arc<ExecutionCacheMetrics>,
500 backpressure_manager: Arc<BackpressureManager>,
501 ) -> Self {
502 let packages = MokaCache::builder(8)
503 .max_capacity(randomize_cache_capacity_in_tests(
504 config.package_cache_size(),
505 ))
506 .build();
507 Self {
508 dirty: UncommittedData::new(),
509 cached: CachedCommittedData::new(config),
510 object_by_id_cache: MonotonicCache::new(randomize_cache_capacity_in_tests(
511 config.object_by_id_cache_size(),
512 )),
513 packages,
514 object_locks: ObjectLocks::new(),
515 executed_effects_digests_notify_read: NotifyRead::new(),
516 object_notify_read: NotifyRead::new(),
517 store,
518 backpressure_manager,
519 backpressure_threshold: config.backpressure_threshold(),
520 metrics,
521 }
522 }
523
524 pub fn new_for_tests(store: Arc<AuthorityStore>) -> Self {
525 Self::new(
526 &Default::default(),
527 store,
528 ExecutionCacheMetrics::new(&prometheus::Registry::new()).into(),
529 BackpressureManager::new_for_tests(),
530 )
531 }
532
533 #[cfg(test)]
534 pub fn reset_for_test(&mut self) {
535 let mut new = Self::new(
536 &Default::default(),
537 self.store.clone(),
538 self.metrics.clone(),
539 self.backpressure_manager.clone(),
540 );
541 std::mem::swap(self, &mut new);
542 }
543
544 pub fn evict_executed_effects_from_cache_for_testing(&self, tx_digest: &TransactionDigest) {
545 self.cached.executed_effects_digests.invalidate(tx_digest);
546 self.cached.transaction_events.invalidate(tx_digest);
547 self.cached.transactions.invalidate(tx_digest);
548 }
549
550 fn write_object_entry(
551 &self,
552 object_id: &ObjectID,
553 version: SequenceNumber,
554 object: ObjectEntry,
555 ) {
556 trace!(?object_id, ?version, ?object, "inserting object entry");
557 self.metrics.record_cache_write("object");
558
559 let mut entry = self.dirty.objects.entry(*object_id).or_default();
579
580 self.object_by_id_cache
581 .insert(
582 object_id,
583 LatestObjectCacheEntry::Object(version, object.clone()),
584 Ticket::Write,
585 )
586 .ok();
589
590 entry.insert(version, object.clone());
591
592 if let ObjectEntry::Object(object) = &object {
593 if object.is_package() {
594 self.object_notify_read
595 .notify(&InputKey::Package { id: *object_id }, &());
596 } else if !object.is_child_object() {
597 self.object_notify_read.notify(
598 &InputKey::VersionedObject {
599 id: object.full_id(),
600 version: object.version(),
601 },
602 &(),
603 );
604 }
605 }
606 }
607
608 fn write_marker_value(
609 &self,
610 epoch_id: EpochId,
611 object_key: FullObjectKey,
612 marker_value: MarkerValue,
613 ) {
614 tracing::trace!("inserting marker value {object_key:?}: {marker_value:?}",);
615 self.metrics.record_cache_write("marker");
616 self.dirty
617 .markers
618 .entry((epoch_id, object_key.id()))
619 .or_default()
620 .value_mut()
621 .insert(object_key.version(), marker_value);
622 if matches!(marker_value, MarkerValue::ConsensusStreamEnded(_)) {
627 self.object_notify_read.notify(
628 &InputKey::VersionedObject {
629 id: object_key.id(),
630 version: object_key.version(),
631 },
632 &(),
633 );
634 }
635 }
636
637 fn with_locked_cache_entries<K, V, R>(
641 dirty_map: &DashMap<K, CachedVersionMap<V>>,
642 cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
643 key: &K,
644 cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
645 ) -> R
646 where
647 K: Copy + Eq + Hash + Send + Sync + 'static,
648 V: Send + Sync + 'static,
649 {
650 let dirty_entry = dirty_map.entry(*key);
651 let dirty_entry = match &dirty_entry {
652 DashMapEntry::Occupied(occupied) => Some(occupied.get()),
653 DashMapEntry::Vacant(_) => None,
654 };
655
656 let cached_entry = cached_map.get(key);
657 let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
658 let cached_entry = cached_lock.as_deref();
659
660 cb(dirty_entry, cached_entry)
661 }
662
663 fn get_object_entry_by_key_cache_only(
666 &self,
667 object_id: &ObjectID,
668 version: SequenceNumber,
669 ) -> CacheResult<ObjectEntry> {
670 Self::with_locked_cache_entries(
671 &self.dirty.objects,
672 &self.cached.object_cache,
673 object_id,
674 |dirty_entry, cached_entry| {
675 check_cache_entry_by_version!(
676 self,
677 "object_by_version",
678 "uncommitted",
679 dirty_entry,
680 version
681 );
682 check_cache_entry_by_version!(
683 self,
684 "object_by_version",
685 "committed",
686 cached_entry,
687 version
688 );
689 CacheResult::Miss
690 },
691 )
692 }
693
694 fn get_object_by_key_cache_only(
695 &self,
696 object_id: &ObjectID,
697 version: SequenceNumber,
698 ) -> CacheResult<Object> {
699 match self.get_object_entry_by_key_cache_only(object_id, version) {
700 CacheResult::Hit(entry) => match entry {
701 ObjectEntry::Object(object) => CacheResult::Hit(object),
702 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
703 },
704 CacheResult::Miss => CacheResult::Miss,
705 CacheResult::NegativeHit => CacheResult::NegativeHit,
706 }
707 }
708
709 fn get_object_entry_by_id_cache_only(
710 &self,
711 request_type: &'static str,
712 object_id: &ObjectID,
713 ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
714 self.metrics
715 .record_cache_request(request_type, "object_by_id");
716 let entry = self.object_by_id_cache.get(object_id);
717
718 if cfg!(debug_assertions)
719 && let Some(entry) = &entry
720 {
721 let highest: Option<ObjectEntry> = self
723 .dirty
724 .objects
725 .get(object_id)
726 .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
727 .or_else(|| {
728 let obj: Option<ObjectEntry> = self
729 .store
730 .get_latest_object_or_tombstone(*object_id)
731 .unwrap()
732 .map(|(_, o)| o.into());
733 obj
734 });
735
736 let cache_entry = match &*entry.lock() {
737 LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
738 LatestObjectCacheEntry::NonExistent => None,
739 };
740
741 let tombstone_possibly_pruned = highest.is_none()
743 && cache_entry
744 .as_ref()
745 .map(|e| e.is_tombstone())
746 .unwrap_or(false);
747
748 if highest != cache_entry && !tombstone_possibly_pruned {
749 tracing::error!(
750 ?highest,
751 ?cache_entry,
752 ?tombstone_possibly_pruned,
753 "object_by_id cache is incoherent for {:?}",
754 object_id
755 );
756 panic!("object_by_id cache is incoherent for {:?}", object_id);
757 }
758 }
759
760 if let Some(entry) = entry {
761 let entry = entry.lock();
762 match &*entry {
763 LatestObjectCacheEntry::Object(latest_version, latest_object) => {
764 self.metrics.record_cache_hit(request_type, "object_by_id");
765 return CacheResult::Hit((*latest_version, latest_object.clone()));
766 }
767 LatestObjectCacheEntry::NonExistent => {
768 self.metrics
769 .record_cache_negative_hit(request_type, "object_by_id");
770 return CacheResult::NegativeHit;
771 }
772 }
773 } else {
774 self.metrics.record_cache_miss(request_type, "object_by_id");
775 }
776
777 Self::with_locked_cache_entries(
778 &self.dirty.objects,
779 &self.cached.object_cache,
780 object_id,
781 |dirty_entry, cached_entry| {
782 check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
783 check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
784 CacheResult::Miss
785 },
786 )
787 }
788
789 fn get_object_by_id_cache_only(
790 &self,
791 request_type: &'static str,
792 object_id: &ObjectID,
793 ) -> CacheResult<(SequenceNumber, Object)> {
794 match self.get_object_entry_by_id_cache_only(request_type, object_id) {
795 CacheResult::Hit((version, entry)) => match entry {
796 ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
797 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
798 },
799 CacheResult::NegativeHit => CacheResult::NegativeHit,
800 CacheResult::Miss => CacheResult::Miss,
801 }
802 }
803
804 fn get_marker_value_cache_only(
805 &self,
806 object_key: FullObjectKey,
807 epoch_id: EpochId,
808 ) -> CacheResult<MarkerValue> {
809 Self::with_locked_cache_entries(
810 &self.dirty.markers,
811 &self.cached.marker_cache,
812 &(epoch_id, object_key.id()),
813 |dirty_entry, cached_entry| {
814 check_cache_entry_by_version!(
815 self,
816 "marker_by_version",
817 "uncommitted",
818 dirty_entry,
819 object_key.version()
820 );
821 check_cache_entry_by_version!(
822 self,
823 "marker_by_version",
824 "committed",
825 cached_entry,
826 object_key.version()
827 );
828 CacheResult::Miss
829 },
830 )
831 }
832
833 fn get_latest_marker_value_cache_only(
834 &self,
835 object_id: FullObjectID,
836 epoch_id: EpochId,
837 ) -> CacheResult<(SequenceNumber, MarkerValue)> {
838 Self::with_locked_cache_entries(
839 &self.dirty.markers,
840 &self.cached.marker_cache,
841 &(epoch_id, object_id),
842 |dirty_entry, cached_entry| {
843 check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
844 check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
845 CacheResult::Miss
846 },
847 )
848 }
849
850 fn get_object_impl(&self, request_type: &'static str, id: &ObjectID) -> Option<Object> {
851 let ticket = self.object_by_id_cache.get_ticket_for_read(id);
852 match self.get_object_entry_by_id_cache_only(request_type, id) {
853 CacheResult::Hit((_, entry)) => match entry {
854 ObjectEntry::Object(object) => Some(object),
855 ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
856 },
857 CacheResult::NegativeHit => None,
858 CacheResult::Miss => {
859 let obj = self
860 .store
861 .get_latest_object_or_tombstone(*id)
862 .expect("db error");
863 match obj {
864 Some((key, obj)) => {
865 self.cache_latest_object_by_id(
866 id,
867 LatestObjectCacheEntry::Object(key.1, obj.clone().into()),
868 ticket,
869 );
870 match obj {
871 ObjectOrTombstone::Object(object) => Some(object),
872 ObjectOrTombstone::Tombstone(_) => None,
873 }
874 }
875 None => {
876 self.cache_object_not_found(id, ticket);
877 None
878 }
879 }
880 }
881 }
882 }
883
884 fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
885 self.metrics.record_cache_request(request_type, "db");
886 &self.store
887 }
888
889 fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
890 self.metrics
891 .record_cache_multi_request(request_type, "db", count);
892 &self.store
893 }
894
895 #[instrument(level = "debug", skip_all)]
896 fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
897 let tx_digest = *tx_outputs.transaction.digest();
898 trace!(?tx_digest, "writing transaction outputs to cache");
899
900 assert!(
901 !self.transaction_executed_in_last_epoch(&tx_digest, epoch_id),
902 "Transaction {:?} was already executed in epoch {}",
903 tx_digest,
904 epoch_id.saturating_sub(1)
905 );
906
907 let TransactionOutputs {
908 transaction,
909 effects,
910 markers,
911 written,
912 deleted,
913 wrapped,
914 events,
915 unchanged_loaded_runtime_objects,
916 ..
917 } = &*tx_outputs;
918
919 for ObjectKey(id, version) in deleted.iter() {
924 self.write_object_entry(id, *version, ObjectEntry::Deleted);
925 }
926
927 for ObjectKey(id, version) in wrapped.iter() {
928 self.write_object_entry(id, *version, ObjectEntry::Wrapped);
929 }
930
931 for (object_key, marker_value) in markers.iter() {
933 self.write_marker_value(epoch_id, *object_key, *marker_value);
934 }
935
936 for (object_id, object) in written.iter() {
939 if object.is_child_object() {
940 self.write_object_entry(object_id, object.version(), object.clone().into());
941 }
942 }
943 for (object_id, object) in written.iter() {
944 if !object.is_child_object() {
945 self.write_object_entry(object_id, object.version(), object.clone().into());
946 if object.is_package() {
947 debug!("caching package: {:?}", object.compute_object_reference());
948 self.packages
949 .insert(*object_id, PackageObject::new(object.clone()));
950 }
951 }
952 }
953
954 let tx_digest = *transaction.digest();
955 debug!(
956 ?tx_digest,
957 "Writing transaction output objects to cache: {:?}",
958 written
959 .values()
960 .map(|o| (o.id(), o.version()))
961 .collect::<Vec<_>>(),
962 );
963 let effects_digest = effects.digest();
964
965 self.metrics.record_cache_write("transaction_block");
966 self.dirty
967 .pending_transaction_writes
968 .insert(tx_digest, tx_outputs.clone());
969
970 self.metrics.record_cache_write("transaction_effects");
973 self.dirty
974 .transaction_effects
975 .insert(effects_digest, effects.clone());
976
977 self.metrics.record_cache_write("transaction_events");
981 self.dirty
982 .transaction_events
983 .insert(tx_digest, events.clone());
984
985 self.metrics
986 .record_cache_write("unchanged_loaded_runtime_objects");
987 self.dirty
988 .unchanged_loaded_runtime_objects
989 .insert(tx_digest, unchanged_loaded_runtime_objects.clone());
990
991 self.metrics.record_cache_write("executed_effects_digests");
992 self.dirty
993 .executed_effects_digests
994 .insert(tx_digest, effects_digest);
995
996 self.executed_effects_digests_notify_read
997 .notify(&tx_digest, &effects_digest);
998
999 self.metrics
1000 .pending_notify_read
1001 .set(self.executed_effects_digests_notify_read.num_pending() as i64);
1002
1003 let prev = self
1004 .dirty
1005 .total_transaction_inserts
1006 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1007
1008 let pending_count = (prev + 1).saturating_sub(
1009 self.dirty
1010 .total_transaction_commits
1011 .load(std::sync::atomic::Ordering::Relaxed),
1012 );
1013
1014 self.set_backpressure(pending_count);
1015 }
1016
1017 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1018 let _metrics_guard = mysten_metrics::monitored_scope("WritebackCache::build_db_batch");
1019 let mut all_outputs = Vec::with_capacity(digests.len());
1020 for tx in digests {
1021 let Some(outputs) = self
1022 .dirty
1023 .pending_transaction_writes
1024 .get(tx)
1025 .map(|o| o.clone())
1026 else {
1027 warn!("Attempt to commit unknown transaction {:?}", tx);
1033 continue;
1034 };
1035 all_outputs.push(outputs);
1036 }
1037
1038 let batch = self
1039 .store
1040 .build_db_batch(epoch, &all_outputs)
1041 .expect("db error");
1042 (all_outputs, batch)
1043 }
1044
1045 #[instrument(level = "debug", skip_all)]
1047 fn commit_transaction_outputs(
1048 &self,
1049 epoch: EpochId,
1050 (all_outputs, db_batch): Batch,
1051 digests: &[TransactionDigest],
1052 ) {
1053 let _metrics_guard =
1054 mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs");
1055 fail_point!("writeback-cache-commit");
1056 trace!(?digests);
1057
1058 db_batch.write().expect("db error");
1062
1063 let _metrics_guard =
1064 mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs::flush");
1065 all_outputs.par_iter().with_min_len(16).for_each(|outputs| {
1068 let tx_digest = outputs.transaction.digest();
1069 assert!(
1070 self.dirty
1071 .pending_transaction_writes
1072 .remove(tx_digest)
1073 .is_some()
1074 );
1075 self.flush_tx_metadata_from_dirty_to_cached(*tx_digest, outputs);
1076 });
1077
1078 for outputs in all_outputs.iter() {
1083 self.flush_objects_from_dirty_to_cached(epoch, outputs);
1084 }
1085
1086 let num_outputs = all_outputs.len() as u64;
1087 let num_commits = self
1088 .dirty
1089 .total_transaction_commits
1090 .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
1091 + num_outputs;
1092
1093 let pending_count = self
1094 .dirty
1095 .total_transaction_inserts
1096 .load(std::sync::atomic::Ordering::Relaxed)
1097 .saturating_sub(num_commits);
1098
1099 self.set_backpressure(pending_count);
1100 }
1101
1102 fn approximate_pending_transaction_count(&self) -> u64 {
1103 let num_commits = self
1104 .dirty
1105 .total_transaction_commits
1106 .load(std::sync::atomic::Ordering::Relaxed);
1107
1108 self.dirty
1109 .total_transaction_inserts
1110 .load(std::sync::atomic::Ordering::Relaxed)
1111 .saturating_sub(num_commits)
1112 }
1113
1114 fn set_backpressure(&self, pending_count: u64) {
1115 let backpressure = pending_count > self.backpressure_threshold;
1116 let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1117 if backpressure_changed {
1118 self.metrics.backpressure_toggles.inc();
1119 }
1120 self.metrics
1121 .backpressure_status
1122 .set(if backpressure { 1 } else { 0 });
1123 }
1124
1125 fn flush_tx_metadata_from_dirty_to_cached(
1129 &self,
1130 tx_digest: TransactionDigest,
1131 outputs: &TransactionOutputs,
1132 ) {
1133 let TransactionOutputs {
1135 transaction,
1136 effects,
1137 events,
1138 ..
1139 } = outputs;
1140
1141 let effects_digest = effects.digest();
1142
1143 self.cached
1146 .transactions
1147 .insert(
1148 &tx_digest,
1149 PointCacheItem::Some(transaction.clone()),
1150 Ticket::Write,
1151 )
1152 .ok();
1153 self.cached
1154 .transaction_effects
1155 .insert(
1156 &effects_digest,
1157 PointCacheItem::Some(effects.clone().into()),
1158 Ticket::Write,
1159 )
1160 .ok();
1161 self.cached
1162 .executed_effects_digests
1163 .insert(
1164 &tx_digest,
1165 PointCacheItem::Some(effects_digest),
1166 Ticket::Write,
1167 )
1168 .ok();
1169 self.cached
1170 .transaction_events
1171 .insert(
1172 &tx_digest,
1173 PointCacheItem::Some(events.clone().into()),
1174 Ticket::Write,
1175 )
1176 .ok();
1177
1178 self.dirty
1179 .transaction_effects
1180 .remove(&effects_digest)
1181 .expect("effects must exist");
1182
1183 self.dirty
1184 .transaction_events
1185 .remove(&tx_digest)
1186 .expect("events must exist");
1187
1188 self.dirty
1189 .unchanged_loaded_runtime_objects
1190 .remove(&tx_digest)
1191 .expect("unchanged_loaded_runtime_objects must exist");
1192
1193 self.dirty
1194 .executed_effects_digests
1195 .remove(&tx_digest)
1196 .expect("executed effects must exist");
1197 }
1198
1199 fn flush_objects_from_dirty_to_cached(&self, epoch: EpochId, outputs: &TransactionOutputs) {
1203 let TransactionOutputs {
1204 markers,
1205 written,
1206 deleted,
1207 wrapped,
1208 ..
1209 } = outputs;
1210
1211 for (object_key, marker_value) in markers.iter() {
1212 Self::move_version_from_dirty_to_cache(
1213 &self.dirty.markers,
1214 &self.cached.marker_cache,
1215 (epoch, object_key.id()),
1216 object_key.version(),
1217 marker_value,
1218 );
1219 }
1220
1221 for (object_id, object) in written.iter() {
1222 Self::move_version_from_dirty_to_cache(
1223 &self.dirty.objects,
1224 &self.cached.object_cache,
1225 *object_id,
1226 object.version(),
1227 &ObjectEntry::Object(object.clone()),
1228 );
1229 }
1230
1231 for ObjectKey(object_id, version) in deleted.iter() {
1232 Self::move_version_from_dirty_to_cache(
1233 &self.dirty.objects,
1234 &self.cached.object_cache,
1235 *object_id,
1236 *version,
1237 &ObjectEntry::Deleted,
1238 );
1239 }
1240
1241 for ObjectKey(object_id, version) in wrapped.iter() {
1242 Self::move_version_from_dirty_to_cache(
1243 &self.dirty.objects,
1244 &self.cached.object_cache,
1245 *object_id,
1246 *version,
1247 &ObjectEntry::Wrapped,
1248 );
1249 }
1250 }
1251
1252 fn move_version_from_dirty_to_cache<K, V>(
1255 dirty: &DashMap<K, CachedVersionMap<V>>,
1256 cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1257 key: K,
1258 version: SequenceNumber,
1259 value: &V,
1260 ) where
1261 K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1262 V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1263 {
1264 static MAX_VERSIONS: usize = 3;
1265
1266 let dirty_entry = dirty.entry(key);
1269 let cache_entry = cache.entry(key).or_default();
1270 let mut cache_map = cache_entry.value().lock();
1271
1272 cache_map.insert(version, value.clone());
1274 cache_map.truncate_to(MAX_VERSIONS);
1276
1277 let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1278 panic!("dirty map must exist");
1279 };
1280
1281 let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1282
1283 assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1284
1285 if occupied_dirty_entry.get().is_empty() {
1287 occupied_dirty_entry.remove();
1288 }
1289 }
1290
1291 fn cache_latest_object_by_id(
1293 &self,
1294 object_id: &ObjectID,
1295 object: LatestObjectCacheEntry,
1296 ticket: Ticket,
1297 ) {
1298 trace!("caching object by id: {:?} {:?}", object_id, object);
1299 if self
1300 .object_by_id_cache
1301 .insert(object_id, object, ticket)
1302 .is_ok()
1303 {
1304 self.metrics.record_cache_write("object_by_id");
1305 } else {
1306 trace!("discarded cache write due to expired ticket");
1307 self.metrics.record_ticket_expiry();
1308 }
1309 }
1310
1311 fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1312 self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1313 }
1314
1315 fn clear_state_end_of_epoch_impl(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
1316 info!("clearing state at end of epoch");
1317
1318 for r in self.dirty.pending_transaction_writes.iter() {
1321 let outputs = r.value();
1322 if !outputs
1323 .transaction
1324 .transaction_data()
1325 .shared_input_objects()
1326 .is_empty()
1327 {
1328 debug_fatal!("transaction must be single writer");
1329 }
1330 info!(
1331 "clearing state for transaction {:?}",
1332 outputs.transaction.digest()
1333 );
1334 for (object_id, object) in outputs.written.iter() {
1335 if object.is_package() {
1336 info!("removing non-finalized package from cache: {:?}", object_id);
1337 self.packages.invalidate(object_id);
1338 }
1339 self.object_by_id_cache.invalidate(object_id);
1340 self.cached.object_cache.invalidate(object_id);
1341 }
1342
1343 for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1344 self.object_by_id_cache.invalidate(object_id);
1345 self.cached.object_cache.invalidate(object_id);
1346 }
1347 }
1348
1349 self.dirty.clear();
1350
1351 info!("clearing old transaction locks");
1352 self.object_locks.clear();
1353 info!("clearing object per epoch marker table");
1354 self.store
1355 .clear_object_per_epoch_marker_table(execution_guard)
1356 .expect("db error");
1357 }
1358
1359 fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) {
1360 self.store
1361 .bulk_insert_genesis_objects(objects)
1362 .expect("db error");
1363 for obj in objects {
1364 self.cached.object_cache.invalidate(&obj.id());
1365 self.object_by_id_cache.invalidate(&obj.id());
1366 }
1367 }
1368
1369 fn insert_genesis_object_impl(&self, object: Object) {
1370 self.object_by_id_cache.invalidate(&object.id());
1371 self.cached.object_cache.invalidate(&object.id());
1372 self.store.insert_genesis_object(object).expect("db error");
1373 }
1374
1375 pub fn clear_caches_and_assert_empty(&self) {
1376 info!("clearing caches");
1377 self.cached.clear_and_assert_empty();
1378 self.object_by_id_cache.invalidate_all();
1379 assert!(&self.object_by_id_cache.is_empty());
1380 self.packages.invalidate_all();
1381 assert_empty(&self.packages);
1382 }
1383}
1384
1385impl AccountFundsRead for WritebackCache {
1386 fn get_latest_account_amount(&self, account_id: &AccumulatorObjId) -> (u128, SequenceNumber) {
1387 let mut pre_root_version =
1388 ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1389 .unwrap()
1390 .version();
1391 let mut loop_iter = 0;
1392 loop {
1393 let account_obj = ObjectCacheRead::get_object(self, account_id.inner());
1394 if let Some(account_obj) = account_obj {
1395 let (_, AccumulatorValue::U128(value)) =
1396 account_obj.data.try_as_move().unwrap().try_into().unwrap();
1397 return (value.value, account_obj.version());
1398 }
1399 let post_root_version =
1400 ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1401 .unwrap()
1402 .version();
1403 if pre_root_version == post_root_version {
1404 return (0, pre_root_version);
1405 }
1406 debug!(
1407 "Root version changed from {} to {} while reading account amount, retrying",
1408 pre_root_version, post_root_version
1409 );
1410 pre_root_version = post_root_version;
1411 loop_iter += 1;
1412 if loop_iter >= 3 {
1413 debug_fatal!("Unable to get a stable version after 3 iterations");
1414 }
1415 }
1416 }
1417
1418 fn get_account_amount_at_version(
1419 &self,
1420 account_id: &AccumulatorObjId,
1421 version: SequenceNumber,
1422 ) -> u128 {
1423 let account_obj = self.find_object_lt_or_eq_version(*account_id.inner(), version);
1424 if let Some(account_obj) = account_obj {
1425 let (_, AccumulatorValue::U128(value)) =
1426 account_obj.data.try_as_move().unwrap().try_into().unwrap();
1427 value.value
1428 } else {
1429 0
1430 }
1431 }
1432}
1433
1434impl ExecutionCacheAPI for WritebackCache {}
1435
1436impl ExecutionCacheCommit for WritebackCache {
1437 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1438 self.build_db_batch(epoch, digests)
1439 }
1440
1441 fn commit_transaction_outputs(
1442 &self,
1443 epoch: EpochId,
1444 batch: Batch,
1445 digests: &[TransactionDigest],
1446 ) {
1447 WritebackCache::commit_transaction_outputs(self, epoch, batch, digests)
1448 }
1449
1450 fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) {
1451 self.store.persist_transaction(tx).expect("db error");
1452 }
1453
1454 fn approximate_pending_transaction_count(&self) -> u64 {
1455 WritebackCache::approximate_pending_transaction_count(self)
1456 }
1457}
1458
1459impl ObjectCacheRead for WritebackCache {
1460 fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1461 self.metrics
1462 .record_cache_request("package", "package_cache");
1463 if let Some(p) = self.packages.get(package_id) {
1464 if cfg!(debug_assertions) {
1465 let canonical_package = self
1466 .dirty
1467 .objects
1468 .get(package_id)
1469 .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1470 Some(ObjectEntry::Object(object)) => Some(object),
1471 _ => None,
1472 })
1473 .or_else(|| self.store.get_object(package_id));
1474
1475 if let Some(canonical_package) = canonical_package {
1476 assert_eq!(
1477 canonical_package.digest(),
1478 p.object().digest(),
1479 "Package object cache is inconsistent for package {:?}",
1480 package_id
1481 );
1482 }
1483 }
1484 self.metrics.record_cache_hit("package", "package_cache");
1485 return Ok(Some(p));
1486 } else {
1487 self.metrics.record_cache_miss("package", "package_cache");
1488 }
1489
1490 if let Some(p) = self.get_object_impl("package", package_id) {
1494 if p.is_package() {
1495 let p = PackageObject::new(p);
1496 tracing::trace!(
1497 "caching package: {:?}",
1498 p.object().compute_object_reference()
1499 );
1500 self.metrics.record_cache_write("package");
1501 self.packages.insert(*package_id, p.clone());
1502 Ok(Some(p))
1503 } else {
1504 Err(SuiErrorKind::UserInputError {
1505 error: UserInputError::MoveObjectAsPackage {
1506 object_id: *package_id,
1507 },
1508 }
1509 .into())
1510 }
1511 } else {
1512 Ok(None)
1513 }
1514 }
1515
1516 fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1517 }
1520
1521 fn get_object(&self, id: &ObjectID) -> Option<Object> {
1524 self.get_object_impl("object_latest", id)
1525 }
1526
1527 fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object> {
1528 match self.get_object_by_key_cache_only(object_id, version) {
1529 CacheResult::Hit(object) => Some(object),
1530 CacheResult::NegativeHit => None,
1531 CacheResult::Miss => self
1532 .record_db_get("object_by_version")
1533 .get_object_by_key(object_id, version),
1534 }
1535 }
1536
1537 fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
1538 do_fallback_lookup(
1539 object_keys,
1540 |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1541 CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1542 CacheResult::NegativeHit => CacheResult::NegativeHit,
1543 CacheResult::Miss => CacheResult::Miss,
1544 },
1545 |remaining| {
1546 self.record_db_multi_get("object_by_version", remaining.len())
1547 .multi_get_objects_by_key(remaining)
1548 .expect("db error")
1549 },
1550 )
1551 }
1552
1553 fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool {
1554 match self.get_object_by_key_cache_only(object_id, version) {
1555 CacheResult::Hit(_) => true,
1556 CacheResult::NegativeHit => false,
1557 CacheResult::Miss => self
1558 .record_db_get("object_by_version")
1559 .object_exists_by_key(object_id, version)
1560 .expect("db error"),
1561 }
1562 }
1563
1564 fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool> {
1565 do_fallback_lookup(
1566 object_keys,
1567 |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1568 CacheResult::Hit(_) => CacheResult::Hit(true),
1569 CacheResult::NegativeHit => CacheResult::Hit(false),
1570 CacheResult::Miss => CacheResult::Miss,
1571 },
1572 |remaining| {
1573 self.record_db_multi_get("object_by_version", remaining.len())
1574 .multi_object_exists_by_key(remaining)
1575 .expect("db error")
1576 },
1577 )
1578 }
1579
1580 fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
1581 match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1582 CacheResult::Hit((version, entry)) => Some(match entry {
1583 ObjectEntry::Object(object) => object.compute_object_reference(),
1584 ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1585 ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1586 }),
1587 CacheResult::NegativeHit => None,
1588 CacheResult::Miss => self
1589 .record_db_get("latest_objref_or_tombstone")
1590 .get_latest_object_ref_or_tombstone(object_id)
1591 .expect("db error"),
1592 }
1593 }
1594
1595 fn get_latest_object_or_tombstone(
1596 &self,
1597 object_id: ObjectID,
1598 ) -> Option<(ObjectKey, ObjectOrTombstone)> {
1599 match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1600 CacheResult::Hit((version, entry)) => {
1601 let key = ObjectKey(object_id, version);
1602 Some(match entry {
1603 ObjectEntry::Object(object) => (key, object.into()),
1604 ObjectEntry::Deleted => (
1605 key,
1606 ObjectOrTombstone::Tombstone((
1607 object_id,
1608 version,
1609 ObjectDigest::OBJECT_DIGEST_DELETED,
1610 )),
1611 ),
1612 ObjectEntry::Wrapped => (
1613 key,
1614 ObjectOrTombstone::Tombstone((
1615 object_id,
1616 version,
1617 ObjectDigest::OBJECT_DIGEST_WRAPPED,
1618 )),
1619 ),
1620 })
1621 }
1622 CacheResult::NegativeHit => None,
1623 CacheResult::Miss => self
1624 .record_db_get("latest_object_or_tombstone")
1625 .get_latest_object_or_tombstone(object_id)
1626 .expect("db error"),
1627 }
1628 }
1629
1630 fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool> {
1631 keys.iter()
1632 .map(|key| {
1633 if key.is_cancelled() {
1634 true
1635 } else {
1636 match key {
1637 InputKey::VersionedObject { id, version } => {
1638 matches!(
1639 self.get_object_by_key_cache_only(&id.id(), *version),
1640 CacheResult::Hit(_)
1641 )
1642 }
1643 InputKey::Package { id } => self.packages.contains_key(id),
1644 }
1645 }
1646 })
1647 .collect()
1648 }
1649
1650 #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1651 fn find_object_lt_or_eq_version(
1652 &self,
1653 object_id: ObjectID,
1654 version_bound: SequenceNumber,
1655 ) -> Option<Object> {
1656 macro_rules! check_cache_entry {
1657 ($level: expr, $objects: expr) => {
1658 self.metrics
1659 .record_cache_request("object_lt_or_eq_version", $level);
1660 if let Some(objects) = $objects {
1661 if let Some((_, object)) = objects
1662 .all_versions_lt_or_eq_descending(&version_bound)
1663 .next()
1664 {
1665 if let ObjectEntry::Object(object) = object {
1666 self.metrics
1667 .record_cache_hit("object_lt_or_eq_version", $level);
1668 return Some(object.clone());
1669 } else {
1670 self.metrics
1672 .record_cache_negative_hit("object_lt_or_eq_version", $level);
1673 return None;
1674 }
1675 } else {
1676 self.metrics
1677 .record_cache_miss("object_lt_or_eq_version", $level);
1678 }
1679 }
1680 };
1681 }
1682
1683 self.metrics
1685 .record_cache_request("object_lt_or_eq_version", "object_by_id");
1686 let latest_cache_entry = self.object_by_id_cache.get(&object_id);
1687 if let Some(latest) = &latest_cache_entry {
1688 let latest = latest.lock();
1689 match &*latest {
1690 LatestObjectCacheEntry::Object(latest_version, object) => {
1691 if *latest_version <= version_bound {
1692 if let ObjectEntry::Object(object) = object {
1693 self.metrics
1694 .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1695 return Some(object.clone());
1696 } else {
1697 self.metrics.record_cache_negative_hit(
1699 "object_lt_or_eq_version",
1700 "object_by_id",
1701 );
1702 return None;
1703 }
1704 }
1705 }
1707 LatestObjectCacheEntry::NonExistent => {
1709 self.metrics
1710 .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1711 return None;
1712 }
1713 }
1714 }
1715 self.metrics
1716 .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1717
1718 Self::with_locked_cache_entries(
1719 &self.dirty.objects,
1720 &self.cached.object_cache,
1721 &object_id,
1722 |dirty_entry, cached_entry| {
1723 check_cache_entry!("committed", dirty_entry);
1724 check_cache_entry!("uncommitted", cached_entry);
1725
1726 let latest: Option<(SequenceNumber, ObjectEntry)> =
1747 if let Some(dirty_set) = dirty_entry {
1748 dirty_set
1749 .get_highest()
1750 .cloned()
1751 .tap_none(|| panic!("dirty set cannot be empty"))
1752 } else {
1753 self.record_db_get("object_lt_or_eq_version_latest")
1755 .get_latest_object_or_tombstone(object_id)
1756 .expect("db error")
1757 .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1758 (version, ObjectEntry::from(obj_or_tombstone))
1759 })
1760 };
1761
1762 if let Some((obj_version, obj_entry)) = latest {
1763 self.cache_latest_object_by_id(
1771 &object_id,
1772 LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1773 self.object_by_id_cache.get_ticket_for_read(&object_id),
1776 );
1777
1778 if obj_version <= version_bound {
1779 match obj_entry {
1780 ObjectEntry::Object(object) => Some(object),
1781 ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
1782 }
1783 } else {
1784 self.record_db_get("object_lt_or_eq_version_scan")
1788 .find_object_lt_or_eq_version(object_id, version_bound)
1789 .expect("db error")
1790 }
1791
1792 } else if let Some(latest_cache_entry) = latest_cache_entry {
1798 assert!(!latest_cache_entry.lock().is_alive());
1800 None
1801 } else {
1802 let highest = cached_entry.and_then(|c| c.get_highest());
1804 assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1805 self.cache_object_not_found(
1806 &object_id,
1807 self.object_by_id_cache.get_ticket_for_read(&object_id),
1809 );
1810 None
1811 }
1812 },
1813 )
1814 }
1815
1816 fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1817 get_sui_system_state(self)
1818 }
1819
1820 fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge> {
1821 get_bridge(self)
1822 }
1823
1824 fn get_marker_value(
1825 &self,
1826 object_key: FullObjectKey,
1827 epoch_id: EpochId,
1828 ) -> Option<MarkerValue> {
1829 match self.get_marker_value_cache_only(object_key, epoch_id) {
1830 CacheResult::Hit(marker) => Some(marker),
1831 CacheResult::NegativeHit => None,
1832 CacheResult::Miss => self
1833 .record_db_get("marker_by_version")
1834 .get_marker_value(object_key, epoch_id)
1835 .expect("db error"),
1836 }
1837 }
1838
1839 fn get_latest_marker(
1840 &self,
1841 object_id: FullObjectID,
1842 epoch_id: EpochId,
1843 ) -> Option<(SequenceNumber, MarkerValue)> {
1844 match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1845 CacheResult::Hit((v, marker)) => Some((v, marker)),
1846 CacheResult::NegativeHit => {
1847 panic!("cannot have negative hit when getting latest marker")
1848 }
1849 CacheResult::Miss => self
1850 .record_db_get("marker_latest")
1851 .get_latest_marker(object_id, epoch_id)
1852 .expect("db error"),
1853 }
1854 }
1855
1856 fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult {
1857 let cur_epoch = epoch_store.epoch();
1858 match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1859 CacheResult::Hit((_, obj)) => {
1860 let actual_objref = obj.compute_object_reference();
1861 if obj_ref != actual_objref {
1862 Ok(ObjectLockStatus::LockedAtDifferentVersion {
1863 locked_ref: actual_objref,
1864 })
1865 } else {
1866 Ok(
1868 match self
1869 .object_locks
1870 .get_transaction_lock(&obj_ref, epoch_store)?
1871 {
1872 Some(tx_digest) => ObjectLockStatus::LockedToTx {
1873 locked_by_tx: LockDetailsDeprecated {
1874 epoch: cur_epoch,
1875 tx_digest,
1876 },
1877 },
1878 None => ObjectLockStatus::Initialized,
1879 },
1880 )
1881 }
1882 }
1883 CacheResult::NegativeHit => {
1884 Err(SuiError::from(UserInputError::ObjectNotFound {
1885 object_id: obj_ref.0,
1886 version: None,
1889 }))
1890 }
1891 CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1892 }
1893 }
1894
1895 fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef> {
1896 let obj = self.get_object_impl("live_objref", &object_id).ok_or(
1897 UserInputError::ObjectNotFound {
1898 object_id,
1899 version: None,
1900 },
1901 )?;
1902 Ok(obj.compute_object_reference())
1903 }
1904
1905 fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1906 do_fallback_lookup_fallible(
1907 owned_object_refs,
1908 |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1909 CacheResult::Hit((version, obj)) => {
1910 if obj.compute_object_reference() != *obj_ref {
1911 Err(UserInputError::ObjectVersionUnavailableForConsumption {
1912 provided_obj_ref: *obj_ref,
1913 current_version: version,
1914 }
1915 .into())
1916 } else {
1917 Ok(CacheResult::Hit(()))
1918 }
1919 }
1920 CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1921 object_id: obj_ref.0,
1922 version: None,
1923 }
1924 .into()),
1925 CacheResult::Miss => Ok(CacheResult::Miss),
1926 },
1927 |remaining| {
1928 self.record_db_multi_get("object_is_live", remaining.len())
1929 .check_owned_objects_are_live(remaining)?;
1930 Ok(vec![(); remaining.len()])
1931 },
1932 )?;
1933 Ok(())
1934 }
1935
1936 fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber> {
1937 self.store
1938 .perpetual_tables
1939 .get_highest_pruned_checkpoint()
1940 .expect("db error")
1941 }
1942
1943 fn notify_read_input_objects<'a>(
1944 &'a self,
1945 input_and_receiving_keys: &'a [InputKey],
1946 receiving_keys: &'a HashSet<InputKey>,
1947 epoch: EpochId,
1948 ) -> BoxFuture<'a, ()> {
1949 self.object_notify_read
1950 .read(
1951 "notify_read_input_objects",
1952 input_and_receiving_keys,
1953 move |keys| {
1954 self.multi_input_objects_available(keys, receiving_keys, epoch)
1955 .into_iter()
1956 .map(|available| if available { Some(()) } else { None })
1957 .collect::<Vec<_>>()
1958 },
1959 )
1960 .map(|_| ())
1961 .boxed()
1962 }
1963}
1964
1965impl TransactionCacheRead for WritebackCache {
1966 fn multi_get_transaction_blocks(
1967 &self,
1968 digests: &[TransactionDigest],
1969 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
1970 let digests_and_tickets: Vec<_> = digests
1971 .iter()
1972 .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1973 .collect();
1974 do_fallback_lookup(
1975 &digests_and_tickets,
1976 |(digest, _)| {
1977 self.metrics
1978 .record_cache_request("transaction_block", "uncommitted");
1979 if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1980 self.metrics
1981 .record_cache_hit("transaction_block", "uncommitted");
1982 return CacheResult::Hit(Some(tx.transaction.clone()));
1983 }
1984 self.metrics
1985 .record_cache_miss("transaction_block", "uncommitted");
1986
1987 self.metrics
1988 .record_cache_request("transaction_block", "committed");
1989
1990 match self
1991 .cached
1992 .transactions
1993 .get(digest)
1994 .map(|l| l.lock().clone())
1995 {
1996 Some(PointCacheItem::Some(tx)) => {
1997 self.metrics
1998 .record_cache_hit("transaction_block", "committed");
1999 CacheResult::Hit(Some(tx))
2000 }
2001 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2002 None => {
2003 self.metrics
2004 .record_cache_miss("transaction_block", "committed");
2005
2006 CacheResult::Miss
2007 }
2008 }
2009 },
2010 |remaining| {
2011 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2012 let results: Vec<_> = self
2013 .record_db_multi_get("transaction_block", remaining.len())
2014 .multi_get_transaction_blocks(&remaining_digests)
2015 .expect("db error")
2016 .into_iter()
2017 .map(|o| o.map(Arc::new))
2018 .collect();
2019 for ((digest, ticket), result) in remaining.iter().zip_debug_eq(results.iter()) {
2020 if result.is_none() {
2021 self.cached.transactions.insert(digest, None, *ticket).ok();
2022 }
2023 }
2024 results
2025 },
2026 )
2027 }
2028
2029 fn multi_get_executed_effects_digests(
2030 &self,
2031 digests: &[TransactionDigest],
2032 ) -> Vec<Option<TransactionEffectsDigest>> {
2033 let digests_and_tickets: Vec<_> = digests
2034 .iter()
2035 .map(|d| {
2036 (
2037 *d,
2038 self.cached.executed_effects_digests.get_ticket_for_read(d),
2039 )
2040 })
2041 .collect();
2042 do_fallback_lookup(
2043 &digests_and_tickets,
2044 |(digest, _)| {
2045 self.metrics
2046 .record_cache_request("executed_effects_digests", "uncommitted");
2047 if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
2048 self.metrics
2049 .record_cache_hit("executed_effects_digests", "uncommitted");
2050 return CacheResult::Hit(Some(*digest));
2051 }
2052 self.metrics
2053 .record_cache_miss("executed_effects_digests", "uncommitted");
2054
2055 self.metrics
2056 .record_cache_request("executed_effects_digests", "committed");
2057 match self
2058 .cached
2059 .executed_effects_digests
2060 .get(digest)
2061 .map(|l| *l.lock())
2062 {
2063 Some(PointCacheItem::Some(digest)) => {
2064 self.metrics
2065 .record_cache_hit("executed_effects_digests", "committed");
2066 CacheResult::Hit(Some(digest))
2067 }
2068 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2069 None => {
2070 self.metrics
2071 .record_cache_miss("executed_effects_digests", "committed");
2072 CacheResult::Miss
2073 }
2074 }
2075 },
2076 |remaining| {
2077 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2078 let results = self
2079 .record_db_multi_get("executed_effects_digests", remaining.len())
2080 .multi_get_executed_effects_digests(&remaining_digests)
2081 .expect("db error");
2082 for ((digest, ticket), result) in remaining.iter().zip_debug_eq(results.iter()) {
2083 if result.is_none() {
2084 self.cached
2085 .executed_effects_digests
2086 .insert(digest, None, *ticket)
2087 .ok();
2088 }
2089 }
2090 results
2091 },
2092 )
2093 }
2094
2095 fn multi_get_effects(
2096 &self,
2097 digests: &[TransactionEffectsDigest],
2098 ) -> Vec<Option<TransactionEffects>> {
2099 let digests_and_tickets: Vec<_> = digests
2100 .iter()
2101 .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
2102 .collect();
2103 do_fallback_lookup(
2104 &digests_and_tickets,
2105 |(digest, _)| {
2106 self.metrics
2107 .record_cache_request("transaction_effects", "uncommitted");
2108 if let Some(effects) = self.dirty.transaction_effects.get(digest) {
2109 self.metrics
2110 .record_cache_hit("transaction_effects", "uncommitted");
2111 return CacheResult::Hit(Some(effects.clone()));
2112 }
2113 self.metrics
2114 .record_cache_miss("transaction_effects", "uncommitted");
2115
2116 self.metrics
2117 .record_cache_request("transaction_effects", "committed");
2118 match self
2119 .cached
2120 .transaction_effects
2121 .get(digest)
2122 .map(|l| l.lock().clone())
2123 {
2124 Some(PointCacheItem::Some(effects)) => {
2125 self.metrics
2126 .record_cache_hit("transaction_effects", "committed");
2127 CacheResult::Hit(Some((*effects).clone()))
2128 }
2129 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2130 None => {
2131 self.metrics
2132 .record_cache_miss("transaction_effects", "committed");
2133 CacheResult::Miss
2134 }
2135 }
2136 },
2137 |remaining| {
2138 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2139 let results = self
2140 .record_db_multi_get("transaction_effects", remaining.len())
2141 .multi_get_effects(remaining_digests.iter())
2142 .expect("db error");
2143 for ((digest, ticket), result) in remaining.iter().zip_debug_eq(results.iter()) {
2144 if result.is_none() {
2145 self.cached
2146 .transaction_effects
2147 .insert(digest, None, *ticket)
2148 .ok();
2149 }
2150 }
2151 results
2152 },
2153 )
2154 }
2155
2156 fn transaction_executed_in_last_epoch(
2157 &self,
2158 digest: &TransactionDigest,
2159 current_epoch: EpochId,
2160 ) -> bool {
2161 if current_epoch == 0 {
2162 return false;
2163 }
2164 let last_epoch = current_epoch - 1;
2165 let cache_key = (last_epoch, *digest);
2166
2167 let ticket = self
2168 .cached
2169 .transaction_executed_in_last_epoch
2170 .get_ticket_for_read(&cache_key);
2171
2172 if let Some(cached) = self
2173 .cached
2174 .transaction_executed_in_last_epoch
2175 .get(&cache_key)
2176 {
2177 return cached.lock().is_some();
2178 }
2179
2180 let was_executed = self
2181 .store
2182 .perpetual_tables
2183 .was_transaction_executed_in_last_epoch(digest, current_epoch);
2184
2185 let value = if was_executed { Some(()) } else { None };
2186 self.cached
2187 .transaction_executed_in_last_epoch
2188 .insert(&cache_key, value, ticket)
2189 .ok();
2190
2191 was_executed
2192 }
2193
2194 fn notify_read_executed_effects_digests<'a>(
2195 &'a self,
2196 task_name: &'static str,
2197 digests: &'a [TransactionDigest],
2198 ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>> {
2199 self.executed_effects_digests_notify_read
2200 .read(task_name, digests, |digests| {
2201 self.multi_get_executed_effects_digests(digests)
2202 })
2203 .boxed()
2204 }
2205
2206 fn multi_get_events(
2207 &self,
2208 event_digests: &[TransactionDigest],
2209 ) -> Vec<Option<TransactionEvents>> {
2210 fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
2211 if events.data.is_empty() {
2212 None
2213 } else {
2214 Some(events)
2215 }
2216 }
2217
2218 let digests_and_tickets: Vec<_> = event_digests
2219 .iter()
2220 .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2221 .collect();
2222 do_fallback_lookup(
2223 &digests_and_tickets,
2224 |(digest, _)| {
2225 self.metrics
2226 .record_cache_request("transaction_events", "uncommitted");
2227 if let Some(events) = self.dirty.transaction_events.get(digest).map(|e| e.clone()) {
2228 self.metrics
2229 .record_cache_hit("transaction_events", "uncommitted");
2230
2231 return CacheResult::Hit(map_events(events));
2232 }
2233 self.metrics
2234 .record_cache_miss("transaction_events", "uncommitted");
2235
2236 self.metrics
2237 .record_cache_request("transaction_events", "committed");
2238 match self
2239 .cached
2240 .transaction_events
2241 .get(digest)
2242 .map(|l| l.lock().clone())
2243 {
2244 Some(PointCacheItem::Some(events)) => {
2245 self.metrics
2246 .record_cache_hit("transaction_events", "committed");
2247 CacheResult::Hit(map_events((*events).clone()))
2248 }
2249 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2250 None => {
2251 self.metrics
2252 .record_cache_miss("transaction_events", "committed");
2253
2254 CacheResult::Miss
2255 }
2256 }
2257 },
2258 |remaining| {
2259 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2260 let results = self
2261 .store
2262 .multi_get_events(&remaining_digests)
2263 .expect("db error");
2264 for ((digest, ticket), result) in remaining.iter().zip_debug_eq(results.iter()) {
2265 if result.is_none() {
2266 self.cached
2267 .transaction_events
2268 .insert(digest, None, *ticket)
2269 .ok();
2270 }
2271 }
2272 results
2273 },
2274 )
2275 }
2276
2277 fn get_unchanged_loaded_runtime_objects(
2278 &self,
2279 digest: &TransactionDigest,
2280 ) -> Option<Vec<ObjectKey>> {
2281 self.dirty
2282 .unchanged_loaded_runtime_objects
2283 .get(digest)
2284 .map(|b| b.clone())
2285 .or_else(|| {
2286 self.store
2287 .get_unchanged_loaded_runtime_objects(digest)
2288 .expect("db error")
2289 })
2290 }
2291
2292 fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
2293 self.dirty
2294 .pending_transaction_writes
2295 .get(digest)
2296 .map(|transaction_output| transaction_output.take_accumulator_events())
2297 }
2298}
2299
2300impl ExecutionCacheWrite for WritebackCache {
2301 fn validate_owned_object_versions(&self, owned_input_objects: &[ObjectRef]) -> SuiResult {
2302 ObjectLocks::validate_owned_object_versions(self, owned_input_objects)
2303 }
2304
2305 fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
2306 WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs);
2307 }
2308
2309 #[cfg(test)]
2310 fn write_object_entry_for_test(&self, object: Object) {
2311 self.write_object_entry(&object.id(), object.version(), object.into());
2312 }
2313}
2314
2315implement_passthrough_traits!(WritebackCache);
2316
2317impl GlobalStateHashStore for WritebackCache {
2318 fn get_object_ref_prior_to_key_deprecated(
2319 &self,
2320 object_id: &ObjectID,
2321 version: SequenceNumber,
2322 ) -> SuiResult<Option<ObjectRef>> {
2323 let mut candidates = Vec::new();
2327
2328 let check_versions =
2329 |versions: &CachedVersionMap<ObjectEntry>| match versions.get_prior_to(&version) {
2330 Some((version, object_entry)) => match object_entry {
2331 ObjectEntry::Object(object) => {
2332 assert_eq!(object.version(), version);
2333 Some(object.compute_object_reference())
2334 }
2335 ObjectEntry::Deleted => {
2336 Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED))
2337 }
2338 ObjectEntry::Wrapped => {
2339 Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED))
2340 }
2341 },
2342 None => None,
2343 };
2344
2345 if let Some(objects) = self.dirty.objects.get(object_id)
2347 && let Some(prior) = check_versions(&objects)
2348 {
2349 candidates.push(prior);
2350 }
2351
2352 if let Some(objects) = self.cached.object_cache.get(object_id)
2353 && let Some(prior) = check_versions(&objects.lock())
2354 {
2355 candidates.push(prior);
2356 }
2357
2358 if let Some(prior) = self
2359 .store
2360 .get_object_ref_prior_to_key_deprecated(object_id, version)?
2361 {
2362 candidates.push(prior);
2363 }
2364
2365 candidates.sort_by_key(|(_, version, _)| *version);
2367 Ok(candidates.pop())
2368 }
2369
2370 fn get_root_state_hash_for_epoch(
2371 &self,
2372 epoch: EpochId,
2373 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
2374 self.store.get_root_state_hash_for_epoch(epoch)
2375 }
2376
2377 fn get_root_state_hash_for_highest_epoch(
2378 &self,
2379 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
2380 self.store.get_root_state_hash_for_highest_epoch()
2381 }
2382
2383 fn insert_state_hash_for_epoch(
2384 &self,
2385 epoch: EpochId,
2386 checkpoint_seq_num: &CheckpointSequenceNumber,
2387 acc: &GlobalStateHash,
2388 ) -> SuiResult {
2389 self.store
2390 .insert_state_hash_for_epoch(epoch, checkpoint_seq_num, acc)
2391 }
2392
2393 fn iter_live_object_set(
2394 &self,
2395 include_wrapped_tombstone: bool,
2396 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2397 assert!(
2401 self.dirty.is_empty(),
2402 "cannot iterate live object set with dirty data"
2403 );
2404 self.store.iter_live_object_set(include_wrapped_tombstone)
2405 }
2406
2407 fn iter_cached_live_object_set_for_testing(
2411 &self,
2412 include_wrapped_tombstone: bool,
2413 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2414 let iter = self.dirty.objects.iter();
2416 let mut dirty_objects = BTreeMap::new();
2417
2418 for obj in self.store.iter_live_object_set(include_wrapped_tombstone) {
2420 dirty_objects.insert(obj.object_id(), obj);
2421 }
2422
2423 for entry in iter {
2425 let id = *entry.key();
2426 let value = entry.value();
2427 match value.get_highest().unwrap() {
2428 (_, ObjectEntry::Object(object)) => {
2429 dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2430 }
2431 (version, ObjectEntry::Wrapped) => {
2432 if include_wrapped_tombstone {
2433 dirty_objects.insert(id, LiveObject::Wrapped(ObjectKey(id, *version)));
2434 } else {
2435 dirty_objects.remove(&id);
2436 }
2437 }
2438 (_, ObjectEntry::Deleted) => {
2439 dirty_objects.remove(&id);
2440 }
2441 }
2442 }
2443
2444 Box::new(dirty_objects.into_values())
2445 }
2446}
2447
2448impl StateSyncAPI for WritebackCache {
2454 fn insert_transaction_and_effects(
2455 &self,
2456 transaction: &VerifiedTransaction,
2457 transaction_effects: &TransactionEffects,
2458 ) {
2459 self.store
2460 .insert_transaction_and_effects(transaction, transaction_effects)
2461 .expect("db error");
2462 self.cached
2463 .transactions
2464 .insert(
2465 transaction.digest(),
2466 PointCacheItem::Some(Arc::new(transaction.clone())),
2467 Ticket::Write,
2468 )
2469 .ok();
2470 self.cached
2471 .transaction_effects
2472 .insert(
2473 &transaction_effects.digest(),
2474 PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2475 Ticket::Write,
2476 )
2477 .ok();
2478 }
2479
2480 fn multi_insert_transaction_and_effects(
2481 &self,
2482 transactions_and_effects: &[VerifiedExecutionData],
2483 ) {
2484 self.store
2485 .multi_insert_transaction_and_effects(transactions_and_effects.iter())
2486 .expect("db error");
2487 for VerifiedExecutionData {
2488 transaction,
2489 effects,
2490 } in transactions_and_effects
2491 {
2492 self.cached
2493 .transactions
2494 .insert(
2495 transaction.digest(),
2496 PointCacheItem::Some(Arc::new(transaction.clone())),
2497 Ticket::Write,
2498 )
2499 .ok();
2500 self.cached
2501 .transaction_effects
2502 .insert(
2503 &effects.digest(),
2504 PointCacheItem::Some(Arc::new(effects.clone())),
2505 Ticket::Write,
2506 )
2507 .ok();
2508 }
2509 }
2510}