sui_core/execution_cache/
writeback_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! MemoryCache is a cache for the transaction execution which delays writes to the database until
5//! transaction results are certified (i.e. they appear in a certified checkpoint, or an effects cert
6//! is observed by a fullnode). The cache also stores committed data in memory in order to serve
7//! future reads without hitting the database.
8//!
9//! For storing uncommitted transaction outputs, we cannot evict the data at all until it is written
10//! to disk. Committed data not only can be evicted, but it is also unbounded (imagine a stream of
11//! transactions that keep splitting a coin into smaller coins).
12//!
13//! We also want to be able to support negative cache hits (i.e. the case where we can determine an
14//! object does not exist without hitting the database).
15//!
16//! To achieve both of these goals, we split the cache data into two pieces, a dirty set and a cached
17//! set. The dirty set has no automatic evictions, data is only removed after being committed. The
18//! cached set is in a bounded-sized cache with automatic evictions. In order to support negative
19//! cache hits, we treat the two halves of the cache as FIFO queue. Newly written (dirty) versions are
20//! inserted to one end of the dirty queue. As versions are committed to disk, they are
21//! removed from the other end of the dirty queue and inserted into the cache queue. The cache queue
22//! is truncated if it exceeds its maximum size, by removing all but the N newest versions.
23//!
24//! This gives us the property that the sequence of versions in the dirty and cached queues are the
25//! most recent versions of the object, i.e. there can be no "gaps". This allows for the following:
26//!
27//!   - Negative cache hits: If the queried version is not in memory, but is higher than the smallest
28//!     version in the cached queue, it does not exist in the db either.
29//!   - Bounded reads: When reading the most recent version that is <= some version bound, we can
30//!     correctly satisfy this query from the cache, or determine that we must go to the db.
31//!
32//! Note that at any time, either or both the dirty or the cached queue may be non-existent. There may be no
33//! dirty versions of the objects, in which case there will be no dirty queue. And, the cached queue
34//! may be evicted from the cache, in which case there will be no cached queue. Because only the cached
35//! queue can be evicted (the dirty queue can only become empty by moving versions from it to the cached
36//! queue), the "highest versions" property still holds in all cases.
37//!
38//! The above design is used for both objects and markers.
39
40use crate::accumulators::funds_read::AccountFundsRead;
41use crate::authority::AuthorityStore;
42use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
43use crate::authority::authority_store::{
44    ExecutionLockWriteGuard, LockDetailsDeprecated, ObjectLockStatus, SuiLockResult,
45};
46use crate::authority::authority_store_tables::LiveObject;
47use crate::authority::backpressure::BackpressureManager;
48use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
49use crate::fallback_fetch::{do_fallback_lookup, do_fallback_lookup_fallible};
50use crate::global_state_hasher::GlobalStateHashStore;
51use crate::transaction_outputs::TransactionOutputs;
52
53use dashmap::DashMap;
54use dashmap::mapref::entry::Entry as DashMapEntry;
55use futures::{FutureExt, future::BoxFuture};
56use moka::sync::SegmentedCache as MokaCache;
57use mysten_common::ZipDebugEqIteratorExt;
58use mysten_common::debug_fatal;
59use mysten_common::random_util::randomize_cache_capacity_in_tests;
60use mysten_common::sync::notify_read::NotifyRead;
61use parking_lot::Mutex;
62use rayon::prelude::*;
63use std::collections::{BTreeMap, HashSet};
64use std::hash::Hash;
65use std::sync::Arc;
66use std::sync::atomic::AtomicU64;
67use sui_config::ExecutionCacheConfig;
68use sui_macros::fail_point;
69use sui_protocol_config::ProtocolVersion;
70use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
71use sui_types::accumulator_event::AccumulatorEvent;
72use sui_types::accumulator_root::{AccumulatorObjId, AccumulatorValue};
73use sui_types::base_types::{
74    EpochId, FullObjectID, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData,
75};
76use sui_types::bridge::{Bridge, get_bridge};
77use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest};
78use sui_types::effects::{TransactionEffects, TransactionEvents};
79use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
80use sui_types::executable_transaction::VerifiedExecutableTransaction;
81use sui_types::global_state_hash::GlobalStateHash;
82use sui_types::message_envelope::Message;
83use sui_types::messages_checkpoint::CheckpointSequenceNumber;
84use sui_types::object::Object;
85use sui_types::storage::{
86    FullObjectKey, InputKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject,
87};
88use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
89use sui_types::transaction::{TransactionDataAPI, VerifiedTransaction};
90use tap::TapOptional;
91use tracing::{debug, info, instrument, trace, warn};
92
93use super::ExecutionCacheAPI;
94use super::cache_types::Ticket;
95use super::{
96    Batch, CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
97    ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
98    cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache},
99    implement_passthrough_traits,
100    object_locks::ObjectLocks,
101};
102
103#[cfg(test)]
104#[path = "unit_tests/writeback_cache_tests.rs"]
105pub mod writeback_cache_tests;
106
107#[cfg(test)]
108#[path = "unit_tests/notify_read_input_objects_tests.rs"]
109mod notify_read_input_objects_tests;
110
111#[derive(Clone, PartialEq, Eq)]
112enum ObjectEntry {
113    Object(Object),
114    Deleted,
115    Wrapped,
116}
117
118impl ObjectEntry {
119    #[cfg(test)]
120    fn unwrap_object(&self) -> &Object {
121        match self {
122            ObjectEntry::Object(o) => o,
123            _ => panic!("unwrap_object called on non-Object"),
124        }
125    }
126
127    fn is_tombstone(&self) -> bool {
128        match self {
129            ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
130            ObjectEntry::Object(_) => false,
131        }
132    }
133}
134
135impl std::fmt::Debug for ObjectEntry {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        match self {
138            ObjectEntry::Object(o) => {
139                write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
140            }
141            ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
142            ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
143        }
144    }
145}
146
147impl From<Object> for ObjectEntry {
148    fn from(object: Object) -> Self {
149        ObjectEntry::Object(object)
150    }
151}
152
153impl From<ObjectOrTombstone> for ObjectEntry {
154    fn from(object: ObjectOrTombstone) -> Self {
155        match object {
156            ObjectOrTombstone::Object(o) => o.into(),
157            ObjectOrTombstone::Tombstone(obj_ref) => {
158                if obj_ref.2.is_deleted() {
159                    ObjectEntry::Deleted
160                } else if obj_ref.2.is_wrapped() {
161                    ObjectEntry::Wrapped
162                } else {
163                    panic!("tombstone digest must either be deleted or wrapped");
164                }
165            }
166        }
167    }
168}
169
170#[derive(Debug, Clone, PartialEq, Eq)]
171enum LatestObjectCacheEntry {
172    Object(SequenceNumber, ObjectEntry),
173    NonExistent,
174}
175
176impl LatestObjectCacheEntry {
177    #[cfg(test)]
178    fn version(&self) -> Option<SequenceNumber> {
179        match self {
180            LatestObjectCacheEntry::Object(version, _) => Some(*version),
181            LatestObjectCacheEntry::NonExistent => None,
182        }
183    }
184
185    fn is_alive(&self) -> bool {
186        match self {
187            LatestObjectCacheEntry::Object(_, entry) => !entry.is_tombstone(),
188            LatestObjectCacheEntry::NonExistent => false,
189        }
190    }
191}
192
193impl IsNewer for LatestObjectCacheEntry {
194    fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
195        match (self, other) {
196            (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
197                v1 > v2
198            }
199            (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
200            _ => false,
201        }
202    }
203}
204
205type MarkerKey = (EpochId, FullObjectID);
206
207/// UncommittedData stores execution outputs that are not yet written to the db. Entries in this
208/// struct can only be purged after they are committed.
209struct UncommittedData {
210    /// The object dirty set. All writes go into this table first. After we flush the data to the
211    /// db, the data is removed from this table and inserted into the object_cache.
212    ///
213    /// This table may contain both live and dead objects, since we flush both live and dead
214    /// objects to the db in order to support past object queries on fullnodes.
215    ///
216    /// Further, we only remove objects in FIFO order, which ensures that the cached
217    /// sequence of objects has no gaps. In other words, if we have versions 4, 8, 13 of
218    /// an object, we can deduce that version 9 does not exist. This also makes child object
219    /// reads efficient. `object_cache` cannot contain a more recent version of an object than
220    /// `objects`, and neither can have any gaps. Therefore if there is any object <= the version
221    /// bound for a child read in objects, it is the correct object to return.
222    objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
223
224    // Markers for received objects and deleted shared objects. This contains all of the dirty
225    // marker state, which is committed to the db at the same time as other transaction data.
226    // After markers are committed to the db we remove them from this table and insert them into
227    // marker_cache.
228    markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
229
230    transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
231
232    transaction_events: DashMap<TransactionDigest, TransactionEvents>,
233
234    unchanged_loaded_runtime_objects: DashMap<TransactionDigest, Vec<ObjectKey>>,
235
236    executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
237
238    // Transaction outputs that have not yet been written to the DB. Items are removed from this
239    // table as they are flushed to the db.
240    pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
241
242    total_transaction_inserts: AtomicU64,
243    total_transaction_commits: AtomicU64,
244}
245
246impl UncommittedData {
247    fn new() -> Self {
248        Self {
249            objects: DashMap::with_shard_amount(2048),
250            markers: DashMap::with_shard_amount(2048),
251            transaction_effects: DashMap::with_shard_amount(2048),
252            executed_effects_digests: DashMap::with_shard_amount(2048),
253            pending_transaction_writes: DashMap::with_shard_amount(2048),
254            transaction_events: DashMap::with_shard_amount(2048),
255            unchanged_loaded_runtime_objects: DashMap::with_shard_amount(2048),
256            total_transaction_inserts: AtomicU64::new(0),
257            total_transaction_commits: AtomicU64::new(0),
258        }
259    }
260
261    fn clear(&self) {
262        self.objects.clear();
263        self.markers.clear();
264        self.transaction_effects.clear();
265        self.executed_effects_digests.clear();
266        self.pending_transaction_writes.clear();
267        self.transaction_events.clear();
268        self.unchanged_loaded_runtime_objects.clear();
269        self.total_transaction_inserts
270            .store(0, std::sync::atomic::Ordering::Relaxed);
271        self.total_transaction_commits
272            .store(0, std::sync::atomic::Ordering::Relaxed);
273    }
274
275    fn is_empty(&self) -> bool {
276        let empty = self.pending_transaction_writes.is_empty();
277        if empty && cfg!(debug_assertions) {
278            assert!(
279                self.objects.is_empty()
280                    && self.markers.is_empty()
281                    && self.transaction_effects.is_empty()
282                    && self.executed_effects_digests.is_empty()
283                    && self.transaction_events.is_empty()
284                    && self.unchanged_loaded_runtime_objects.is_empty()
285                    && self
286                        .total_transaction_inserts
287                        .load(std::sync::atomic::Ordering::Relaxed)
288                        == self
289                            .total_transaction_commits
290                            .load(std::sync::atomic::Ordering::Relaxed),
291            );
292        }
293        empty
294    }
295}
296
297// Point items (anything without a version number) can be negatively cached as None
298type PointCacheItem<T> = Option<T>;
299
300// PointCacheItem can only be used for insert-only collections, so a Some entry
301// is always newer than a None entry.
302impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
303    fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
304        match (self, other) {
305            (Some(_), None) => true,
306
307            (Some(a), Some(b)) => {
308                // conflicting inserts should never happen
309                debug_assert_eq!(a, b);
310                false
311            }
312
313            _ => false,
314        }
315    }
316}
317
318/// CachedData stores data that has been committed to the db, but is likely to be read soon.
319struct CachedCommittedData {
320    // See module level comment for an explanation of caching strategy.
321    object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
322
323    // See module level comment for an explanation of caching strategy.
324    marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
325
326    transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
327
328    transaction_effects:
329        MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
330
331    transaction_events: MonotonicCache<TransactionDigest, PointCacheItem<Arc<TransactionEvents>>>,
332
333    executed_effects_digests:
334        MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
335
336    transaction_executed_in_last_epoch:
337        MonotonicCache<(EpochId, TransactionDigest), PointCacheItem<()>>,
338
339    // Objects that were read at transaction signing time - allows us to access them again at
340    // execution time with a single lock / hash lookup
341    _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
342}
343
344impl CachedCommittedData {
345    fn new(config: &ExecutionCacheConfig) -> Self {
346        let object_cache = MokaCache::builder(8)
347            .max_capacity(randomize_cache_capacity_in_tests(
348                config.object_cache_size(),
349            ))
350            .build();
351        let marker_cache = MokaCache::builder(8)
352            .max_capacity(randomize_cache_capacity_in_tests(
353                config.marker_cache_size(),
354            ))
355            .build();
356
357        let transactions = MonotonicCache::new(randomize_cache_capacity_in_tests(
358            config.transaction_cache_size(),
359        ));
360        let transaction_effects = MonotonicCache::new(randomize_cache_capacity_in_tests(
361            config.effect_cache_size(),
362        ));
363        let transaction_events = MonotonicCache::new(randomize_cache_capacity_in_tests(
364            config.events_cache_size(),
365        ));
366        let executed_effects_digests = MonotonicCache::new(randomize_cache_capacity_in_tests(
367            config.executed_effect_cache_size(),
368        ));
369
370        let transaction_objects = MokaCache::builder(8)
371            .max_capacity(randomize_cache_capacity_in_tests(
372                config.transaction_objects_cache_size(),
373            ))
374            .build();
375
376        let transaction_executed_in_last_epoch = MonotonicCache::new(
377            randomize_cache_capacity_in_tests(config.executed_effect_cache_size()),
378        );
379
380        Self {
381            object_cache,
382            marker_cache,
383            transactions,
384            transaction_effects,
385            transaction_events,
386            executed_effects_digests,
387            transaction_executed_in_last_epoch,
388            _transaction_objects: transaction_objects,
389        }
390    }
391
392    fn clear_and_assert_empty(&self) {
393        self.object_cache.invalidate_all();
394        self.marker_cache.invalidate_all();
395        self.transactions.invalidate_all();
396        self.transaction_effects.invalidate_all();
397        self.transaction_events.invalidate_all();
398        self.executed_effects_digests.invalidate_all();
399        self.transaction_executed_in_last_epoch.invalidate_all();
400        self._transaction_objects.invalidate_all();
401
402        assert_empty(&self.object_cache);
403        assert_empty(&self.marker_cache);
404        assert!(self.transactions.is_empty());
405        assert!(self.transaction_effects.is_empty());
406        assert!(self.transaction_events.is_empty());
407        assert!(self.executed_effects_digests.is_empty());
408        assert!(self.transaction_executed_in_last_epoch.is_empty());
409        assert_empty(&self._transaction_objects);
410    }
411}
412
413fn assert_empty<K, V>(cache: &MokaCache<K, V>)
414where
415    K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
416    V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
417{
418    if cache.iter().next().is_some() {
419        panic!("cache should be empty");
420    }
421}
422
423pub struct WritebackCache {
424    dirty: UncommittedData,
425    cached: CachedCommittedData,
426
427    // We separately cache the latest version of each object. Although this seems
428    // redundant, it is the only way to support populating the cache after a read.
429    // We cannot simply insert objects that we read off the disk into `object_cache`,
430    // since that may violate the no-missing-versions property.
431    // `object_by_id_cache` is also written to on writes so that it is always coherent.
432    // Hence it contains both committed and dirty object data.
433    object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
434
435    // The packages cache is treated separately from objects, because they are immutable and can be
436    // used by any number of transactions. Additionally, many operations require loading large
437    // numbers of packages (due to dependencies), so we want to try to keep all packages in memory.
438    //
439    // Also, this cache can contain packages that are dirty or committed, so it does not live in
440    // UncachedData or CachedCommittedData. The cache is populated in two ways:
441    // - when packages are written (in which case they will also be present in the dirty set)
442    // - after a cache miss. Because package IDs are unique (only one version exists for each ID)
443    //   we do not need to worry about the contiguous version property.
444    // - note that we removed any unfinalized packages from the cache during revert_state_update().
445    packages: MokaCache<ObjectID, PackageObject>,
446
447    object_locks: ObjectLocks,
448
449    executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
450    object_notify_read: NotifyRead<InputKey, ()>,
451
452    store: Arc<AuthorityStore>,
453    backpressure_threshold: u64,
454    backpressure_manager: Arc<BackpressureManager>,
455    metrics: Arc<ExecutionCacheMetrics>,
456}
457
458macro_rules! check_cache_entry_by_version {
459    ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
460        $self.metrics.record_cache_request($table, $level);
461        if let Some(cache) = $cache {
462            if let Some(entry) = cache.get(&$version) {
463                $self.metrics.record_cache_hit($table, $level);
464                return CacheResult::Hit(entry.clone());
465            }
466
467            if let Some(least_version) = cache.get_least() {
468                if least_version.0 < $version {
469                    // If the version is greater than the least version in the cache, then we know
470                    // that the object does not exist anywhere
471                    $self.metrics.record_cache_negative_hit($table, $level);
472                    return CacheResult::NegativeHit;
473                }
474            }
475        }
476        $self.metrics.record_cache_miss($table, $level);
477    };
478}
479
480macro_rules! check_cache_entry_by_latest {
481    ($self: ident, $table: expr, $level: expr, $cache: expr) => {
482        $self.metrics.record_cache_request($table, $level);
483        if let Some(cache) = $cache {
484            if let Some((version, entry)) = cache.get_highest() {
485                $self.metrics.record_cache_hit($table, $level);
486                return CacheResult::Hit((*version, entry.clone()));
487            } else {
488                panic!("empty CachedVersionMap should have been removed");
489            }
490        }
491        $self.metrics.record_cache_miss($table, $level);
492    };
493}
494
495impl WritebackCache {
496    pub fn new(
497        config: &ExecutionCacheConfig,
498        store: Arc<AuthorityStore>,
499        metrics: Arc<ExecutionCacheMetrics>,
500        backpressure_manager: Arc<BackpressureManager>,
501    ) -> Self {
502        let packages = MokaCache::builder(8)
503            .max_capacity(randomize_cache_capacity_in_tests(
504                config.package_cache_size(),
505            ))
506            .build();
507        Self {
508            dirty: UncommittedData::new(),
509            cached: CachedCommittedData::new(config),
510            object_by_id_cache: MonotonicCache::new(randomize_cache_capacity_in_tests(
511                config.object_by_id_cache_size(),
512            )),
513            packages,
514            object_locks: ObjectLocks::new(),
515            executed_effects_digests_notify_read: NotifyRead::new(),
516            object_notify_read: NotifyRead::new(),
517            store,
518            backpressure_manager,
519            backpressure_threshold: config.backpressure_threshold(),
520            metrics,
521        }
522    }
523
524    pub fn new_for_tests(store: Arc<AuthorityStore>) -> Self {
525        Self::new(
526            &Default::default(),
527            store,
528            ExecutionCacheMetrics::new(&prometheus::Registry::new()).into(),
529            BackpressureManager::new_for_tests(),
530        )
531    }
532
533    #[cfg(test)]
534    pub fn reset_for_test(&mut self) {
535        let mut new = Self::new(
536            &Default::default(),
537            self.store.clone(),
538            self.metrics.clone(),
539            self.backpressure_manager.clone(),
540        );
541        std::mem::swap(self, &mut new);
542    }
543
544    pub fn evict_executed_effects_from_cache_for_testing(&self, tx_digest: &TransactionDigest) {
545        self.cached.executed_effects_digests.invalidate(tx_digest);
546        self.cached.transaction_events.invalidate(tx_digest);
547        self.cached.transactions.invalidate(tx_digest);
548    }
549
550    fn write_object_entry(
551        &self,
552        object_id: &ObjectID,
553        version: SequenceNumber,
554        object: ObjectEntry,
555    ) {
556        trace!(?object_id, ?version, ?object, "inserting object entry");
557        self.metrics.record_cache_write("object");
558
559        // We must hold the lock for the object entry while inserting to the
560        // object_by_id_cache. Otherwise, a surprising bug can occur:
561        //
562        // 1. A thread executing TX1 can write object (O,1) to the dirty set and then pause.
563        // 2. TX2, which reads (O,1) can begin executing, because ExecutionScheduler immediately
564        //    schedules transactions if their inputs are available. It does not matter that TX1
565        //    hasn't finished executing yet.
566        // 3. TX2 can write (O,2) to both the dirty set and the object_by_id_cache.
567        // 4. The thread executing TX1 can resume and write (O,1) to the object_by_id_cache.
568        //
569        // Now, any subsequent attempt to get the latest version of O will return (O,1) instead of
570        // (O,2).
571        //
572        // This seems very unlikely, but it may be possible under the following circumstances:
573        // - While a thread is unlikely to pause for so long, moka cache uses optimistic
574        //   lock-free algorithms that have retry loops. Possibly, under high contention, this
575        //   code might spin for a surprisingly long time.
576        // - Additionally, many concurrent re-executions of the same tx could happen due to
577        //   the tx finalizer, plus checkpoint executor, consensus, and RPCs from fullnodes.
578        let mut entry = self.dirty.objects.entry(*object_id).or_default();
579
580        self.object_by_id_cache
581            .insert(
582                object_id,
583                LatestObjectCacheEntry::Object(version, object.clone()),
584                Ticket::Write,
585            )
586            // While Ticket::Write cannot expire, this insert may still fail.
587            // See the comment in `MonotonicCache::insert`.
588            .ok();
589
590        entry.insert(version, object.clone());
591
592        if let ObjectEntry::Object(object) = &object {
593            if object.is_package() {
594                self.object_notify_read
595                    .notify(&InputKey::Package { id: *object_id }, &());
596            } else if !object.is_child_object() {
597                self.object_notify_read.notify(
598                    &InputKey::VersionedObject {
599                        id: object.full_id(),
600                        version: object.version(),
601                    },
602                    &(),
603                );
604            }
605        }
606    }
607
608    fn write_marker_value(
609        &self,
610        epoch_id: EpochId,
611        object_key: FullObjectKey,
612        marker_value: MarkerValue,
613    ) {
614        tracing::trace!("inserting marker value {object_key:?}: {marker_value:?}",);
615        self.metrics.record_cache_write("marker");
616        self.dirty
617            .markers
618            .entry((epoch_id, object_key.id()))
619            .or_default()
620            .value_mut()
621            .insert(object_key.version(), marker_value);
622        // It is possible for a transaction to use a consensus stream ended
623        // object in the input, hence we must notify that it is now available
624        // at the assigned version, so that any transaction waiting for this
625        // object version can start execution.
626        if matches!(marker_value, MarkerValue::ConsensusStreamEnded(_)) {
627            self.object_notify_read.notify(
628                &InputKey::VersionedObject {
629                    id: object_key.id(),
630                    version: object_key.version(),
631                },
632                &(),
633            );
634        }
635    }
636
637    // lock both the dirty and committed sides of the cache, and then pass the entries to
638    // the callback. Written with the `with` pattern because any other way of doing this
639    // creates lifetime hell.
640    fn with_locked_cache_entries<K, V, R>(
641        dirty_map: &DashMap<K, CachedVersionMap<V>>,
642        cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
643        key: &K,
644        cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
645    ) -> R
646    where
647        K: Copy + Eq + Hash + Send + Sync + 'static,
648        V: Send + Sync + 'static,
649    {
650        let dirty_entry = dirty_map.entry(*key);
651        let dirty_entry = match &dirty_entry {
652            DashMapEntry::Occupied(occupied) => Some(occupied.get()),
653            DashMapEntry::Vacant(_) => None,
654        };
655
656        let cached_entry = cached_map.get(key);
657        let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
658        let cached_entry = cached_lock.as_deref();
659
660        cb(dirty_entry, cached_entry)
661    }
662
663    // Attempt to get an object from the cache. The DB is not consulted.
664    // Can return Hit, Miss, or NegativeHit (if the object is known to not exist).
665    fn get_object_entry_by_key_cache_only(
666        &self,
667        object_id: &ObjectID,
668        version: SequenceNumber,
669    ) -> CacheResult<ObjectEntry> {
670        Self::with_locked_cache_entries(
671            &self.dirty.objects,
672            &self.cached.object_cache,
673            object_id,
674            |dirty_entry, cached_entry| {
675                check_cache_entry_by_version!(
676                    self,
677                    "object_by_version",
678                    "uncommitted",
679                    dirty_entry,
680                    version
681                );
682                check_cache_entry_by_version!(
683                    self,
684                    "object_by_version",
685                    "committed",
686                    cached_entry,
687                    version
688                );
689                CacheResult::Miss
690            },
691        )
692    }
693
694    fn get_object_by_key_cache_only(
695        &self,
696        object_id: &ObjectID,
697        version: SequenceNumber,
698    ) -> CacheResult<Object> {
699        match self.get_object_entry_by_key_cache_only(object_id, version) {
700            CacheResult::Hit(entry) => match entry {
701                ObjectEntry::Object(object) => CacheResult::Hit(object),
702                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
703            },
704            CacheResult::Miss => CacheResult::Miss,
705            CacheResult::NegativeHit => CacheResult::NegativeHit,
706        }
707    }
708
709    fn get_object_entry_by_id_cache_only(
710        &self,
711        request_type: &'static str,
712        object_id: &ObjectID,
713    ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
714        self.metrics
715            .record_cache_request(request_type, "object_by_id");
716        let entry = self.object_by_id_cache.get(object_id);
717
718        if cfg!(debug_assertions)
719            && let Some(entry) = &entry
720        {
721            // check that cache is coherent
722            let highest: Option<ObjectEntry> = self
723                .dirty
724                .objects
725                .get(object_id)
726                .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
727                .or_else(|| {
728                    let obj: Option<ObjectEntry> = self
729                        .store
730                        .get_latest_object_or_tombstone(*object_id)
731                        .unwrap()
732                        .map(|(_, o)| o.into());
733                    obj
734                });
735
736            let cache_entry = match &*entry.lock() {
737                LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
738                LatestObjectCacheEntry::NonExistent => None,
739            };
740
741            // If the cache entry is a tombstone, the db entry may be missing if it was pruned.
742            let tombstone_possibly_pruned = highest.is_none()
743                && cache_entry
744                    .as_ref()
745                    .map(|e| e.is_tombstone())
746                    .unwrap_or(false);
747
748            if highest != cache_entry && !tombstone_possibly_pruned {
749                tracing::error!(
750                    ?highest,
751                    ?cache_entry,
752                    ?tombstone_possibly_pruned,
753                    "object_by_id cache is incoherent for {:?}",
754                    object_id
755                );
756                panic!("object_by_id cache is incoherent for {:?}", object_id);
757            }
758        }
759
760        if let Some(entry) = entry {
761            let entry = entry.lock();
762            match &*entry {
763                LatestObjectCacheEntry::Object(latest_version, latest_object) => {
764                    self.metrics.record_cache_hit(request_type, "object_by_id");
765                    return CacheResult::Hit((*latest_version, latest_object.clone()));
766                }
767                LatestObjectCacheEntry::NonExistent => {
768                    self.metrics
769                        .record_cache_negative_hit(request_type, "object_by_id");
770                    return CacheResult::NegativeHit;
771                }
772            }
773        } else {
774            self.metrics.record_cache_miss(request_type, "object_by_id");
775        }
776
777        Self::with_locked_cache_entries(
778            &self.dirty.objects,
779            &self.cached.object_cache,
780            object_id,
781            |dirty_entry, cached_entry| {
782                check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
783                check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
784                CacheResult::Miss
785            },
786        )
787    }
788
789    fn get_object_by_id_cache_only(
790        &self,
791        request_type: &'static str,
792        object_id: &ObjectID,
793    ) -> CacheResult<(SequenceNumber, Object)> {
794        match self.get_object_entry_by_id_cache_only(request_type, object_id) {
795            CacheResult::Hit((version, entry)) => match entry {
796                ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
797                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
798            },
799            CacheResult::NegativeHit => CacheResult::NegativeHit,
800            CacheResult::Miss => CacheResult::Miss,
801        }
802    }
803
804    fn get_marker_value_cache_only(
805        &self,
806        object_key: FullObjectKey,
807        epoch_id: EpochId,
808    ) -> CacheResult<MarkerValue> {
809        Self::with_locked_cache_entries(
810            &self.dirty.markers,
811            &self.cached.marker_cache,
812            &(epoch_id, object_key.id()),
813            |dirty_entry, cached_entry| {
814                check_cache_entry_by_version!(
815                    self,
816                    "marker_by_version",
817                    "uncommitted",
818                    dirty_entry,
819                    object_key.version()
820                );
821                check_cache_entry_by_version!(
822                    self,
823                    "marker_by_version",
824                    "committed",
825                    cached_entry,
826                    object_key.version()
827                );
828                CacheResult::Miss
829            },
830        )
831    }
832
833    fn get_latest_marker_value_cache_only(
834        &self,
835        object_id: FullObjectID,
836        epoch_id: EpochId,
837    ) -> CacheResult<(SequenceNumber, MarkerValue)> {
838        Self::with_locked_cache_entries(
839            &self.dirty.markers,
840            &self.cached.marker_cache,
841            &(epoch_id, object_id),
842            |dirty_entry, cached_entry| {
843                check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
844                check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
845                CacheResult::Miss
846            },
847        )
848    }
849
850    fn get_object_impl(&self, request_type: &'static str, id: &ObjectID) -> Option<Object> {
851        let ticket = self.object_by_id_cache.get_ticket_for_read(id);
852        match self.get_object_entry_by_id_cache_only(request_type, id) {
853            CacheResult::Hit((_, entry)) => match entry {
854                ObjectEntry::Object(object) => Some(object),
855                ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
856            },
857            CacheResult::NegativeHit => None,
858            CacheResult::Miss => {
859                let obj = self
860                    .store
861                    .get_latest_object_or_tombstone(*id)
862                    .expect("db error");
863                match obj {
864                    Some((key, obj)) => {
865                        self.cache_latest_object_by_id(
866                            id,
867                            LatestObjectCacheEntry::Object(key.1, obj.clone().into()),
868                            ticket,
869                        );
870                        match obj {
871                            ObjectOrTombstone::Object(object) => Some(object),
872                            ObjectOrTombstone::Tombstone(_) => None,
873                        }
874                    }
875                    None => {
876                        self.cache_object_not_found(id, ticket);
877                        None
878                    }
879                }
880            }
881        }
882    }
883
884    fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
885        self.metrics.record_cache_request(request_type, "db");
886        &self.store
887    }
888
889    fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
890        self.metrics
891            .record_cache_multi_request(request_type, "db", count);
892        &self.store
893    }
894
895    #[instrument(level = "debug", skip_all)]
896    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
897        let tx_digest = *tx_outputs.transaction.digest();
898        trace!(?tx_digest, "writing transaction outputs to cache");
899
900        assert!(
901            !self.transaction_executed_in_last_epoch(&tx_digest, epoch_id),
902            "Transaction {:?} was already executed in epoch {}",
903            tx_digest,
904            epoch_id.saturating_sub(1)
905        );
906
907        let TransactionOutputs {
908            transaction,
909            effects,
910            markers,
911            written,
912            deleted,
913            wrapped,
914            events,
915            unchanged_loaded_runtime_objects,
916            ..
917        } = &*tx_outputs;
918
919        // Deletions and wraps must be written first. The reason is that one of the deletes
920        // may be a child object, and if we write the parent object first, a reader may or may
921        // not see the previous version of the child object, instead of the deleted/wrapped
922        // tombstone, which would cause an execution fork
923        for ObjectKey(id, version) in deleted.iter() {
924            self.write_object_entry(id, *version, ObjectEntry::Deleted);
925        }
926
927        for ObjectKey(id, version) in wrapped.iter() {
928            self.write_object_entry(id, *version, ObjectEntry::Wrapped);
929        }
930
931        // Update all markers
932        for (object_key, marker_value) in markers.iter() {
933            self.write_marker_value(epoch_id, *object_key, *marker_value);
934        }
935
936        // Write children before parents to ensure that readers do not observe a parent object
937        // before its most recent children are visible.
938        for (object_id, object) in written.iter() {
939            if object.is_child_object() {
940                self.write_object_entry(object_id, object.version(), object.clone().into());
941            }
942        }
943        for (object_id, object) in written.iter() {
944            if !object.is_child_object() {
945                self.write_object_entry(object_id, object.version(), object.clone().into());
946                if object.is_package() {
947                    debug!("caching package: {:?}", object.compute_object_reference());
948                    self.packages
949                        .insert(*object_id, PackageObject::new(object.clone()));
950                }
951            }
952        }
953
954        let tx_digest = *transaction.digest();
955        debug!(
956            ?tx_digest,
957            "Writing transaction output objects to cache: {:?}",
958            written
959                .values()
960                .map(|o| (o.id(), o.version()))
961                .collect::<Vec<_>>(),
962        );
963        let effects_digest = effects.digest();
964
965        self.metrics.record_cache_write("transaction_block");
966        self.dirty
967            .pending_transaction_writes
968            .insert(tx_digest, tx_outputs.clone());
969
970        // insert transaction effects before executed_effects_digests so that there
971        // are never dangling entries in executed_effects_digests
972        self.metrics.record_cache_write("transaction_effects");
973        self.dirty
974            .transaction_effects
975            .insert(effects_digest, effects.clone());
976
977        // note: if events.data.is_empty(), then there are no events for this transaction. We
978        // store it anyway to avoid special cases in commint_transaction_outputs, and translate
979        // an empty events structure to None when reading.
980        self.metrics.record_cache_write("transaction_events");
981        self.dirty
982            .transaction_events
983            .insert(tx_digest, events.clone());
984
985        self.metrics
986            .record_cache_write("unchanged_loaded_runtime_objects");
987        self.dirty
988            .unchanged_loaded_runtime_objects
989            .insert(tx_digest, unchanged_loaded_runtime_objects.clone());
990
991        self.metrics.record_cache_write("executed_effects_digests");
992        self.dirty
993            .executed_effects_digests
994            .insert(tx_digest, effects_digest);
995
996        self.executed_effects_digests_notify_read
997            .notify(&tx_digest, &effects_digest);
998
999        self.metrics
1000            .pending_notify_read
1001            .set(self.executed_effects_digests_notify_read.num_pending() as i64);
1002
1003        let prev = self
1004            .dirty
1005            .total_transaction_inserts
1006            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1007
1008        let pending_count = (prev + 1).saturating_sub(
1009            self.dirty
1010                .total_transaction_commits
1011                .load(std::sync::atomic::Ordering::Relaxed),
1012        );
1013
1014        self.set_backpressure(pending_count);
1015    }
1016
1017    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1018        let _metrics_guard = mysten_metrics::monitored_scope("WritebackCache::build_db_batch");
1019        let mut all_outputs = Vec::with_capacity(digests.len());
1020        for tx in digests {
1021            let Some(outputs) = self
1022                .dirty
1023                .pending_transaction_writes
1024                .get(tx)
1025                .map(|o| o.clone())
1026            else {
1027                // This can happen in the following rare case:
1028                // All transactions in the checkpoint are committed to the db (by commit_transaction_outputs,
1029                // called in CheckpointExecutor::process_executed_transactions), but the process crashes before
1030                // the checkpoint water mark is bumped. We will then re-commit the checkpoint at startup,
1031                // despite that all transactions are already executed.
1032                warn!("Attempt to commit unknown transaction {:?}", tx);
1033                continue;
1034            };
1035            all_outputs.push(outputs);
1036        }
1037
1038        let batch = self
1039            .store
1040            .build_db_batch(epoch, &all_outputs)
1041            .expect("db error");
1042        (all_outputs, batch)
1043    }
1044
1045    // Commits dirty data for the given TransactionDigest to the db.
1046    #[instrument(level = "debug", skip_all)]
1047    fn commit_transaction_outputs(
1048        &self,
1049        epoch: EpochId,
1050        (all_outputs, db_batch): Batch,
1051        digests: &[TransactionDigest],
1052    ) {
1053        let _metrics_guard =
1054            mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs");
1055        fail_point!("writeback-cache-commit");
1056        trace!(?digests);
1057
1058        // Flush writes to disk before removing anything from dirty set. otherwise,
1059        // a cache eviction could cause a value to disappear briefly, even if we insert to the
1060        // cache before removing from the dirty set.
1061        db_batch.write().expect("db error");
1062
1063        let _metrics_guard =
1064            mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs::flush");
1065        // Parallel phase: tx-level metadata is keyed by unique tx_digest/effects_digest,
1066        // so there are no cross-transaction ordering constraints.
1067        all_outputs.par_iter().with_min_len(16).for_each(|outputs| {
1068            let tx_digest = outputs.transaction.digest();
1069            assert!(
1070                self.dirty
1071                    .pending_transaction_writes
1072                    .remove(tx_digest)
1073                    .is_some()
1074            );
1075            self.flush_tx_metadata_from_dirty_to_cached(*tx_digest, outputs);
1076        });
1077
1078        // Sequential phase: object/marker versions must be popped in causal order
1079        // (oldest first) per object_id. Multiple transactions in the batch can touch
1080        // the same shared object at consecutive versions, so this loop must preserve
1081        // the order of all_outputs.
1082        for outputs in all_outputs.iter() {
1083            self.flush_objects_from_dirty_to_cached(epoch, outputs);
1084        }
1085
1086        let num_outputs = all_outputs.len() as u64;
1087        let num_commits = self
1088            .dirty
1089            .total_transaction_commits
1090            .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
1091            + num_outputs;
1092
1093        let pending_count = self
1094            .dirty
1095            .total_transaction_inserts
1096            .load(std::sync::atomic::Ordering::Relaxed)
1097            .saturating_sub(num_commits);
1098
1099        self.set_backpressure(pending_count);
1100    }
1101
1102    fn approximate_pending_transaction_count(&self) -> u64 {
1103        let num_commits = self
1104            .dirty
1105            .total_transaction_commits
1106            .load(std::sync::atomic::Ordering::Relaxed);
1107
1108        self.dirty
1109            .total_transaction_inserts
1110            .load(std::sync::atomic::Ordering::Relaxed)
1111            .saturating_sub(num_commits)
1112    }
1113
1114    fn set_backpressure(&self, pending_count: u64) {
1115        let backpressure = pending_count > self.backpressure_threshold;
1116        let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1117        if backpressure_changed {
1118            self.metrics.backpressure_toggles.inc();
1119        }
1120        self.metrics
1121            .backpressure_status
1122            .set(if backpressure { 1 } else { 0 });
1123    }
1124
1125    // Flushes tx-level metadata for a single transaction from dirty to cache.
1126    // All keys are unique per transaction (tx_digest, effects_digest), so this
1127    // is safe to call in parallel across transactions.
1128    fn flush_tx_metadata_from_dirty_to_cached(
1129        &self,
1130        tx_digest: TransactionDigest,
1131        outputs: &TransactionOutputs,
1132    ) {
1133        // TODO: outputs should have a strong count of 1 so we should be able to move out of it
1134        let TransactionOutputs {
1135            transaction,
1136            effects,
1137            events,
1138            ..
1139        } = outputs;
1140
1141        let effects_digest = effects.digest();
1142
1143        // Update cache before removing from self.dirty to avoid
1144        // unnecessary cache misses
1145        self.cached
1146            .transactions
1147            .insert(
1148                &tx_digest,
1149                PointCacheItem::Some(transaction.clone()),
1150                Ticket::Write,
1151            )
1152            .ok();
1153        self.cached
1154            .transaction_effects
1155            .insert(
1156                &effects_digest,
1157                PointCacheItem::Some(effects.clone().into()),
1158                Ticket::Write,
1159            )
1160            .ok();
1161        self.cached
1162            .executed_effects_digests
1163            .insert(
1164                &tx_digest,
1165                PointCacheItem::Some(effects_digest),
1166                Ticket::Write,
1167            )
1168            .ok();
1169        self.cached
1170            .transaction_events
1171            .insert(
1172                &tx_digest,
1173                PointCacheItem::Some(events.clone().into()),
1174                Ticket::Write,
1175            )
1176            .ok();
1177
1178        self.dirty
1179            .transaction_effects
1180            .remove(&effects_digest)
1181            .expect("effects must exist");
1182
1183        self.dirty
1184            .transaction_events
1185            .remove(&tx_digest)
1186            .expect("events must exist");
1187
1188        self.dirty
1189            .unchanged_loaded_runtime_objects
1190            .remove(&tx_digest)
1191            .expect("unchanged_loaded_runtime_objects must exist");
1192
1193        self.dirty
1194            .executed_effects_digests
1195            .remove(&tx_digest)
1196            .expect("executed effects must exist");
1197    }
1198
1199    // Flushes object and marker versions for a single transaction from dirty to cache.
1200    // Multiple transactions in the same batch can modify the same shared object at
1201    // consecutive versions, so callers must invoke this in causal (checkpoint) order.
1202    fn flush_objects_from_dirty_to_cached(&self, epoch: EpochId, outputs: &TransactionOutputs) {
1203        let TransactionOutputs {
1204            markers,
1205            written,
1206            deleted,
1207            wrapped,
1208            ..
1209        } = outputs;
1210
1211        for (object_key, marker_value) in markers.iter() {
1212            Self::move_version_from_dirty_to_cache(
1213                &self.dirty.markers,
1214                &self.cached.marker_cache,
1215                (epoch, object_key.id()),
1216                object_key.version(),
1217                marker_value,
1218            );
1219        }
1220
1221        for (object_id, object) in written.iter() {
1222            Self::move_version_from_dirty_to_cache(
1223                &self.dirty.objects,
1224                &self.cached.object_cache,
1225                *object_id,
1226                object.version(),
1227                &ObjectEntry::Object(object.clone()),
1228            );
1229        }
1230
1231        for ObjectKey(object_id, version) in deleted.iter() {
1232            Self::move_version_from_dirty_to_cache(
1233                &self.dirty.objects,
1234                &self.cached.object_cache,
1235                *object_id,
1236                *version,
1237                &ObjectEntry::Deleted,
1238            );
1239        }
1240
1241        for ObjectKey(object_id, version) in wrapped.iter() {
1242            Self::move_version_from_dirty_to_cache(
1243                &self.dirty.objects,
1244                &self.cached.object_cache,
1245                *object_id,
1246                *version,
1247                &ObjectEntry::Wrapped,
1248            );
1249        }
1250    }
1251
1252    // Move the oldest/least entry from the dirty queue to the cache queue.
1253    // This is called after the entry is committed to the db.
1254    fn move_version_from_dirty_to_cache<K, V>(
1255        dirty: &DashMap<K, CachedVersionMap<V>>,
1256        cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1257        key: K,
1258        version: SequenceNumber,
1259        value: &V,
1260    ) where
1261        K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1262        V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1263    {
1264        static MAX_VERSIONS: usize = 3;
1265
1266        // IMPORTANT: lock both the dirty set entry and the cache entry before modifying either.
1267        // this ensures that readers cannot see a value temporarily disappear.
1268        let dirty_entry = dirty.entry(key);
1269        let cache_entry = cache.entry(key).or_default();
1270        let mut cache_map = cache_entry.value().lock();
1271
1272        // insert into cache and drop old versions.
1273        cache_map.insert(version, value.clone());
1274        // TODO: make this automatic by giving CachedVersionMap an optional max capacity
1275        cache_map.truncate_to(MAX_VERSIONS);
1276
1277        let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1278            panic!("dirty map must exist");
1279        };
1280
1281        let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1282
1283        assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1284
1285        // if there are no versions remaining, remove the map entry
1286        if occupied_dirty_entry.get().is_empty() {
1287            occupied_dirty_entry.remove();
1288        }
1289    }
1290
1291    // Updates the latest object id cache with an entry that was read from the db.
1292    fn cache_latest_object_by_id(
1293        &self,
1294        object_id: &ObjectID,
1295        object: LatestObjectCacheEntry,
1296        ticket: Ticket,
1297    ) {
1298        trace!("caching object by id: {:?} {:?}", object_id, object);
1299        if self
1300            .object_by_id_cache
1301            .insert(object_id, object, ticket)
1302            .is_ok()
1303        {
1304            self.metrics.record_cache_write("object_by_id");
1305        } else {
1306            trace!("discarded cache write due to expired ticket");
1307            self.metrics.record_ticket_expiry();
1308        }
1309    }
1310
1311    fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1312        self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1313    }
1314
1315    fn clear_state_end_of_epoch_impl(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
1316        info!("clearing state at end of epoch");
1317
1318        // Note: there cannot be any concurrent writes to self.dirty while we are in this function,
1319        // as all transaction execution is paused.
1320        for r in self.dirty.pending_transaction_writes.iter() {
1321            let outputs = r.value();
1322            if !outputs
1323                .transaction
1324                .transaction_data()
1325                .shared_input_objects()
1326                .is_empty()
1327            {
1328                debug_fatal!("transaction must be single writer");
1329            }
1330            info!(
1331                "clearing state for transaction {:?}",
1332                outputs.transaction.digest()
1333            );
1334            for (object_id, object) in outputs.written.iter() {
1335                if object.is_package() {
1336                    info!("removing non-finalized package from cache: {:?}", object_id);
1337                    self.packages.invalidate(object_id);
1338                }
1339                self.object_by_id_cache.invalidate(object_id);
1340                self.cached.object_cache.invalidate(object_id);
1341            }
1342
1343            for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1344                self.object_by_id_cache.invalidate(object_id);
1345                self.cached.object_cache.invalidate(object_id);
1346            }
1347        }
1348
1349        self.dirty.clear();
1350
1351        info!("clearing old transaction locks");
1352        self.object_locks.clear();
1353        info!("clearing object per epoch marker table");
1354        self.store
1355            .clear_object_per_epoch_marker_table(execution_guard)
1356            .expect("db error");
1357    }
1358
1359    fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) {
1360        self.store
1361            .bulk_insert_genesis_objects(objects)
1362            .expect("db error");
1363        for obj in objects {
1364            self.cached.object_cache.invalidate(&obj.id());
1365            self.object_by_id_cache.invalidate(&obj.id());
1366        }
1367    }
1368
1369    fn insert_genesis_object_impl(&self, object: Object) {
1370        self.object_by_id_cache.invalidate(&object.id());
1371        self.cached.object_cache.invalidate(&object.id());
1372        self.store.insert_genesis_object(object).expect("db error");
1373    }
1374
1375    pub fn clear_caches_and_assert_empty(&self) {
1376        info!("clearing caches");
1377        self.cached.clear_and_assert_empty();
1378        self.object_by_id_cache.invalidate_all();
1379        assert!(&self.object_by_id_cache.is_empty());
1380        self.packages.invalidate_all();
1381        assert_empty(&self.packages);
1382    }
1383}
1384
1385impl AccountFundsRead for WritebackCache {
1386    fn get_latest_account_amount(&self, account_id: &AccumulatorObjId) -> (u128, SequenceNumber) {
1387        // Settlement is not atomic. A settlement transaction writes the accumulator
1388        // objects at version V+1 first, and then a later barrier transaction bumps the
1389        // root from V to V+1. A reader that observes the state in between sees
1390        // post-settlement account objects alongside the pre-settlement root version,
1391        // and reading the "latest" account can therefore disagree with the root we
1392        // just captured.
1393        //
1394        // We handle this with two pieces:
1395        //
1396        // 1. MVCC read capped at the captured root version (see the call site below).
1397        //    By construction of the settlement/barrier ordering, every account object's
1398        //    version is <= the root version after the corresponding barrier runs, so
1399        //    capping at the captured root strips away any newer-settlement writes that
1400        //    have raced ahead of the barrier — we get the balance consistent with the
1401        //    root version we captured.
1402        //
1403        // 2. Root-version stability check (pre == post). `get_account_amount_at_version`
1404        //    is only safe to call when the target version has not been pruned. Pruning
1405        //    is tied to root advancement, so by reading the root before and after the
1406        //    MVCC read and retrying on mismatch, we ensure that no root advance (and
1407        //    therefore no pruning of the version we read at) could have happened while
1408        //    we were reading — the data we read is still live in the system (memory or
1409        //    db) throughout the call.
1410        let mut pre_root_version =
1411            ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1412                .unwrap()
1413                .version();
1414        let mut loop_iter = 0;
1415        loop {
1416            // Safe because of (1) and (2) above: the stability check below bounds the
1417            // lifetime of `pre_root_version` to a window in which no pruning happens.
1418            let value = self.get_account_amount_at_version(account_id, pre_root_version);
1419            let post_root_version =
1420                ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1421                    .unwrap()
1422                    .version();
1423            if pre_root_version == post_root_version {
1424                return (value, pre_root_version);
1425            }
1426            debug!(
1427                "Root version changed from {} to {} during MVCC read, retrying",
1428                pre_root_version, post_root_version
1429            );
1430            pre_root_version = post_root_version;
1431            loop_iter += 1;
1432            if loop_iter >= 3 {
1433                debug_fatal!("Unable to get a stable version after 3 iterations");
1434            }
1435        }
1436    }
1437
1438    fn get_account_amount_at_version(
1439        &self,
1440        account_id: &AccumulatorObjId,
1441        version: SequenceNumber,
1442    ) -> u128 {
1443        let account_obj = self.find_object_lt_or_eq_version(*account_id.inner(), version);
1444        if let Some(account_obj) = account_obj {
1445            let (_, AccumulatorValue::U128(value)) =
1446                account_obj.data.try_as_move().unwrap().try_into().unwrap();
1447            value.value
1448        } else {
1449            0
1450        }
1451    }
1452}
1453
1454impl ExecutionCacheAPI for WritebackCache {}
1455
1456impl ExecutionCacheCommit for WritebackCache {
1457    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1458        self.build_db_batch(epoch, digests)
1459    }
1460
1461    fn set_highest_committed_checkpoint_in_batch(
1462        &self,
1463        batch: &mut Batch,
1464        checkpoint: CheckpointSequenceNumber,
1465    ) {
1466        self.store
1467            .perpetual_tables
1468            .set_highest_committed_checkpoint(&mut batch.1, checkpoint)
1469            .expect("db error");
1470    }
1471
1472    fn commit_transaction_outputs(
1473        &self,
1474        epoch: EpochId,
1475        batch: Batch,
1476        digests: &[TransactionDigest],
1477    ) {
1478        WritebackCache::commit_transaction_outputs(self, epoch, batch, digests)
1479    }
1480
1481    fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) {
1482        self.store.persist_transaction(tx).expect("db error");
1483    }
1484
1485    fn approximate_pending_transaction_count(&self) -> u64 {
1486        WritebackCache::approximate_pending_transaction_count(self)
1487    }
1488}
1489
1490impl ObjectCacheRead for WritebackCache {
1491    fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1492        self.metrics
1493            .record_cache_request("package", "package_cache");
1494        if let Some(p) = self.packages.get(package_id) {
1495            if cfg!(debug_assertions) {
1496                let canonical_package = self
1497                    .dirty
1498                    .objects
1499                    .get(package_id)
1500                    .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1501                        Some(ObjectEntry::Object(object)) => Some(object),
1502                        _ => None,
1503                    })
1504                    .or_else(|| self.store.get_object(package_id));
1505
1506                if let Some(canonical_package) = canonical_package {
1507                    assert_eq!(
1508                        canonical_package.digest(),
1509                        p.object().digest(),
1510                        "Package object cache is inconsistent for package {:?}",
1511                        package_id
1512                    );
1513                }
1514            }
1515            self.metrics.record_cache_hit("package", "package_cache");
1516            return Ok(Some(p));
1517        } else {
1518            self.metrics.record_cache_miss("package", "package_cache");
1519        }
1520
1521        // We try the dirty objects cache as well before going to the database. This is necessary
1522        // because the package could be evicted from the package cache before it is committed
1523        // to the database.
1524        if let Some(p) = self.get_object_impl("package", package_id) {
1525            if p.is_package() {
1526                let p = PackageObject::new(p);
1527                tracing::trace!(
1528                    "caching package: {:?}",
1529                    p.object().compute_object_reference()
1530                );
1531                self.metrics.record_cache_write("package");
1532                self.packages.insert(*package_id, p.clone());
1533                Ok(Some(p))
1534            } else {
1535                Err(SuiErrorKind::UserInputError {
1536                    error: UserInputError::MoveObjectAsPackage {
1537                        object_id: *package_id,
1538                    },
1539                }
1540                .into())
1541            }
1542        } else {
1543            Ok(None)
1544        }
1545    }
1546
1547    fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1548        // This is a no-op because all writes go through the cache, therefore it can never
1549        // be incoherent
1550    }
1551
1552    // get_object and variants.
1553
1554    fn get_object(&self, id: &ObjectID) -> Option<Object> {
1555        self.get_object_impl("object_latest", id)
1556    }
1557
1558    fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object> {
1559        match self.get_object_by_key_cache_only(object_id, version) {
1560            CacheResult::Hit(object) => Some(object),
1561            CacheResult::NegativeHit => None,
1562            CacheResult::Miss => self
1563                .record_db_get("object_by_version")
1564                .get_object_by_key(object_id, version),
1565        }
1566    }
1567
1568    fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
1569        do_fallback_lookup(
1570            object_keys,
1571            |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1572                CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1573                CacheResult::NegativeHit => CacheResult::NegativeHit,
1574                CacheResult::Miss => CacheResult::Miss,
1575            },
1576            |remaining| {
1577                self.record_db_multi_get("object_by_version", remaining.len())
1578                    .multi_get_objects_by_key(remaining)
1579                    .expect("db error")
1580            },
1581        )
1582    }
1583
1584    fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool {
1585        match self.get_object_by_key_cache_only(object_id, version) {
1586            CacheResult::Hit(_) => true,
1587            CacheResult::NegativeHit => false,
1588            CacheResult::Miss => self
1589                .record_db_get("object_by_version")
1590                .object_exists_by_key(object_id, version)
1591                .expect("db error"),
1592        }
1593    }
1594
1595    fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool> {
1596        do_fallback_lookup(
1597            object_keys,
1598            |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1599                CacheResult::Hit(_) => CacheResult::Hit(true),
1600                CacheResult::NegativeHit => CacheResult::Hit(false),
1601                CacheResult::Miss => CacheResult::Miss,
1602            },
1603            |remaining| {
1604                self.record_db_multi_get("object_by_version", remaining.len())
1605                    .multi_object_exists_by_key(remaining)
1606                    .expect("db error")
1607            },
1608        )
1609    }
1610
1611    fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
1612        match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1613            CacheResult::Hit((version, entry)) => Some(match entry {
1614                ObjectEntry::Object(object) => object.compute_object_reference(),
1615                ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1616                ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1617            }),
1618            CacheResult::NegativeHit => None,
1619            CacheResult::Miss => self
1620                .record_db_get("latest_objref_or_tombstone")
1621                .get_latest_object_ref_or_tombstone(object_id)
1622                .expect("db error"),
1623        }
1624    }
1625
1626    fn get_latest_object_or_tombstone(
1627        &self,
1628        object_id: ObjectID,
1629    ) -> Option<(ObjectKey, ObjectOrTombstone)> {
1630        match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1631            CacheResult::Hit((version, entry)) => {
1632                let key = ObjectKey(object_id, version);
1633                Some(match entry {
1634                    ObjectEntry::Object(object) => (key, object.into()),
1635                    ObjectEntry::Deleted => (
1636                        key,
1637                        ObjectOrTombstone::Tombstone((
1638                            object_id,
1639                            version,
1640                            ObjectDigest::OBJECT_DIGEST_DELETED,
1641                        )),
1642                    ),
1643                    ObjectEntry::Wrapped => (
1644                        key,
1645                        ObjectOrTombstone::Tombstone((
1646                            object_id,
1647                            version,
1648                            ObjectDigest::OBJECT_DIGEST_WRAPPED,
1649                        )),
1650                    ),
1651                })
1652            }
1653            CacheResult::NegativeHit => None,
1654            CacheResult::Miss => self
1655                .record_db_get("latest_object_or_tombstone")
1656                .get_latest_object_or_tombstone(object_id)
1657                .expect("db error"),
1658        }
1659    }
1660
1661    fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool> {
1662        keys.iter()
1663            .map(|key| {
1664                if key.is_cancelled() {
1665                    true
1666                } else {
1667                    match key {
1668                        InputKey::VersionedObject { id, version } => {
1669                            matches!(
1670                                self.get_object_by_key_cache_only(&id.id(), *version),
1671                                CacheResult::Hit(_)
1672                            )
1673                        }
1674                        InputKey::Package { id } => self.packages.contains_key(id),
1675                    }
1676                }
1677            })
1678            .collect()
1679    }
1680
1681    #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1682    fn find_object_lt_or_eq_version(
1683        &self,
1684        object_id: ObjectID,
1685        version_bound: SequenceNumber,
1686    ) -> Option<Object> {
1687        macro_rules! check_cache_entry {
1688            ($level: expr, $objects: expr) => {
1689                self.metrics
1690                    .record_cache_request("object_lt_or_eq_version", $level);
1691                if let Some(objects) = $objects {
1692                    if let Some((_, object)) = objects
1693                        .all_versions_lt_or_eq_descending(&version_bound)
1694                        .next()
1695                    {
1696                        if let ObjectEntry::Object(object) = object {
1697                            self.metrics
1698                                .record_cache_hit("object_lt_or_eq_version", $level);
1699                            return Some(object.clone());
1700                        } else {
1701                            // if we find a tombstone, the object does not exist
1702                            self.metrics
1703                                .record_cache_negative_hit("object_lt_or_eq_version", $level);
1704                            return None;
1705                        }
1706                    } else {
1707                        self.metrics
1708                            .record_cache_miss("object_lt_or_eq_version", $level);
1709                    }
1710                }
1711            };
1712        }
1713
1714        // if we have the latest version cached, and it is within the bound, we are done
1715        self.metrics
1716            .record_cache_request("object_lt_or_eq_version", "object_by_id");
1717        let latest_cache_entry = self.object_by_id_cache.get(&object_id);
1718        if let Some(latest) = &latest_cache_entry {
1719            let latest = latest.lock();
1720            match &*latest {
1721                LatestObjectCacheEntry::Object(latest_version, object) => {
1722                    if *latest_version <= version_bound {
1723                        if let ObjectEntry::Object(object) = object {
1724                            self.metrics
1725                                .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1726                            return Some(object.clone());
1727                        } else {
1728                            // object is a tombstone, but is still within the version bound
1729                            self.metrics.record_cache_negative_hit(
1730                                "object_lt_or_eq_version",
1731                                "object_by_id",
1732                            );
1733                            return None;
1734                        }
1735                    }
1736                    // latest object is not within the version bound. fall through.
1737                }
1738                // No object by this ID exists at all
1739                LatestObjectCacheEntry::NonExistent => {
1740                    self.metrics
1741                        .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1742                    return None;
1743                }
1744            }
1745        }
1746        self.metrics
1747            .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1748
1749        Self::with_locked_cache_entries(
1750            &self.dirty.objects,
1751            &self.cached.object_cache,
1752            &object_id,
1753            |dirty_entry, cached_entry| {
1754                check_cache_entry!("committed", dirty_entry);
1755                check_cache_entry!("uncommitted", cached_entry);
1756
1757                // Much of the time, the query will be for the very latest object version, so
1758                // try that first. But we have to be careful:
1759                // 1. We must load the tombstone if it is present, because its version may exceed
1760                //    the version_bound, in which case we must do a scan.
1761                // 2. You might think we could just call `self.store.get_latest_object_or_tombstone` here.
1762                //    But we cannot, because there may be a more recent version in the dirty set, which
1763                //    we skipped over in check_cache_entry! because of the version bound. However, if we
1764                //    skipped it above, we will skip it here as well, again due to the version bound.
1765                // 3. Despite that, we really want to warm the cache here. Why? Because if the object is
1766                //    cold (not being written to), then we will very soon be able to start serving reads
1767                //    of it from the object_by_id cache, IF we can warm the cache. If we don't warm the
1768                //    the cache here, and no writes to the object occur, then we will always have to go
1769                //    to the db for the object.
1770                //
1771                // Lastly, it is important to understand the rationale for all this: If the object is
1772                // write-hot, we will serve almost all reads to it from the dirty set (or possibly the
1773                // cached set if it is only written to once every few checkpoints). If the object is
1774                // write-cold (or non-existent) and read-hot, then we will serve almost all reads to it
1775                // from the object_by_id cache check above.  Most of the apparently wasteful code here
1776                // exists only to ensure correctness in all the edge cases.
1777                let latest: Option<(SequenceNumber, ObjectEntry)> =
1778                    if let Some(dirty_set) = dirty_entry {
1779                        dirty_set
1780                            .get_highest()
1781                            .cloned()
1782                            .tap_none(|| panic!("dirty set cannot be empty"))
1783                    } else {
1784                        // TODO: we should try not to read from the db while holding the locks.
1785                        self.record_db_get("object_lt_or_eq_version_latest")
1786                            .get_latest_object_or_tombstone(object_id)
1787                            .expect("db error")
1788                            .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1789                                (version, ObjectEntry::from(obj_or_tombstone))
1790                            })
1791                    };
1792
1793                if let Some((obj_version, obj_entry)) = latest {
1794                    // we can always cache the latest object (or tombstone), even if it is not within the
1795                    // version_bound. This is done in order to warm the cache in the case where a sequence
1796                    // of transactions all read the same child object without writing to it.
1797
1798                    // Note: no need to call with_object_by_id_cache_update here, because we are holding
1799                    // the lock on the dirty cache entry, and `latest` cannot become out-of-date
1800                    // while we hold that lock.
1801                    self.cache_latest_object_by_id(
1802                        &object_id,
1803                        LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1804                        // We can get a ticket at the last second, because we are holding the lock
1805                        // on dirty, so there cannot be any concurrent writes.
1806                        self.object_by_id_cache.get_ticket_for_read(&object_id),
1807                    );
1808
1809                    if obj_version <= version_bound {
1810                        match obj_entry {
1811                            ObjectEntry::Object(object) => Some(object),
1812                            ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
1813                        }
1814                    } else {
1815                        // The latest object exceeded the bound, so now we have to do a scan
1816                        // But we already know there is no dirty entry within the bound,
1817                        // so we go to the db.
1818                        self.record_db_get("object_lt_or_eq_version_scan")
1819                            .find_object_lt_or_eq_version(object_id, version_bound)
1820                            .expect("db error")
1821                    }
1822
1823                // no object found in dirty set or db, object does not exist
1824                // When this is called from a read api (i.e. not the execution path) it is
1825                // possible that the object has been deleted and pruned. In this case,
1826                // there would be no entry at all on disk, but we may have a tombstone in the
1827                // cache
1828                } else if let Some(latest_cache_entry) = latest_cache_entry {
1829                    // If there is a latest cache entry, it had better not be a live object!
1830                    assert!(!latest_cache_entry.lock().is_alive());
1831                    None
1832                } else {
1833                    // If there is no latest cache entry, we can insert one.
1834                    let highest = cached_entry.and_then(|c| c.get_highest());
1835                    assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1836                    self.cache_object_not_found(
1837                        &object_id,
1838                        // okay to get ticket at last second - see above
1839                        self.object_by_id_cache.get_ticket_for_read(&object_id),
1840                    );
1841                    None
1842                }
1843            },
1844        )
1845    }
1846
1847    fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1848        get_sui_system_state(self)
1849    }
1850
1851    fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge> {
1852        get_bridge(self)
1853    }
1854
1855    fn get_marker_value(
1856        &self,
1857        object_key: FullObjectKey,
1858        epoch_id: EpochId,
1859    ) -> Option<MarkerValue> {
1860        match self.get_marker_value_cache_only(object_key, epoch_id) {
1861            CacheResult::Hit(marker) => Some(marker),
1862            CacheResult::NegativeHit => None,
1863            CacheResult::Miss => self
1864                .record_db_get("marker_by_version")
1865                .get_marker_value(object_key, epoch_id)
1866                .expect("db error"),
1867        }
1868    }
1869
1870    fn get_latest_marker(
1871        &self,
1872        object_id: FullObjectID,
1873        epoch_id: EpochId,
1874    ) -> Option<(SequenceNumber, MarkerValue)> {
1875        match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1876            CacheResult::Hit((v, marker)) => Some((v, marker)),
1877            CacheResult::NegativeHit => {
1878                panic!("cannot have negative hit when getting latest marker")
1879            }
1880            CacheResult::Miss => self
1881                .record_db_get("marker_latest")
1882                .get_latest_marker(object_id, epoch_id)
1883                .expect("db error"),
1884        }
1885    }
1886
1887    fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult {
1888        let cur_epoch = epoch_store.epoch();
1889        match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1890            CacheResult::Hit((_, obj)) => {
1891                let actual_objref = obj.compute_object_reference();
1892                if obj_ref != actual_objref {
1893                    Ok(ObjectLockStatus::LockedAtDifferentVersion {
1894                        locked_ref: actual_objref,
1895                    })
1896                } else {
1897                    // requested object ref is live, check if there is a lock
1898                    Ok(
1899                        match self
1900                            .object_locks
1901                            .get_transaction_lock(&obj_ref, epoch_store)?
1902                        {
1903                            Some(tx_digest) => ObjectLockStatus::LockedToTx {
1904                                locked_by_tx: LockDetailsDeprecated {
1905                                    epoch: cur_epoch,
1906                                    tx_digest,
1907                                },
1908                            },
1909                            None => ObjectLockStatus::Initialized,
1910                        },
1911                    )
1912                }
1913            }
1914            CacheResult::NegativeHit => {
1915                Err(SuiError::from(UserInputError::ObjectNotFound {
1916                    object_id: obj_ref.0,
1917                    // even though we know the requested version, we leave it as None to indicate
1918                    // that the object does not exist at any version
1919                    version: None,
1920                }))
1921            }
1922            CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1923        }
1924    }
1925
1926    fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef> {
1927        let obj = self.get_object_impl("live_objref", &object_id).ok_or(
1928            UserInputError::ObjectNotFound {
1929                object_id,
1930                version: None,
1931            },
1932        )?;
1933        Ok(obj.compute_object_reference())
1934    }
1935
1936    fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1937        do_fallback_lookup_fallible(
1938            owned_object_refs,
1939            |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1940                CacheResult::Hit((version, obj)) => {
1941                    if obj.compute_object_reference() != *obj_ref {
1942                        Err(UserInputError::ObjectVersionUnavailableForConsumption {
1943                            provided_obj_ref: *obj_ref,
1944                            current_version: version,
1945                        }
1946                        .into())
1947                    } else {
1948                        Ok(CacheResult::Hit(()))
1949                    }
1950                }
1951                CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1952                    object_id: obj_ref.0,
1953                    version: None,
1954                }
1955                .into()),
1956                CacheResult::Miss => Ok(CacheResult::Miss),
1957            },
1958            |remaining| {
1959                self.record_db_multi_get("object_is_live", remaining.len())
1960                    .check_owned_objects_are_live(remaining)?;
1961                Ok(vec![(); remaining.len()])
1962            },
1963        )?;
1964        Ok(())
1965    }
1966
1967    fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber> {
1968        self.store
1969            .perpetual_tables
1970            .get_highest_pruned_checkpoint()
1971            .expect("db error")
1972    }
1973
1974    fn notify_read_input_objects<'a>(
1975        &'a self,
1976        input_and_receiving_keys: &'a [InputKey],
1977        receiving_keys: &'a HashSet<InputKey>,
1978        epoch: EpochId,
1979    ) -> BoxFuture<'a, ()> {
1980        self.object_notify_read
1981            .read(
1982                "notify_read_input_objects",
1983                input_and_receiving_keys,
1984                move |keys| {
1985                    self.multi_input_objects_available(keys, receiving_keys, epoch)
1986                        .into_iter()
1987                        .map(|available| if available { Some(()) } else { None })
1988                        .collect::<Vec<_>>()
1989                },
1990            )
1991            .map(|_| ())
1992            .boxed()
1993    }
1994}
1995
1996impl TransactionCacheRead for WritebackCache {
1997    fn multi_get_transaction_blocks(
1998        &self,
1999        digests: &[TransactionDigest],
2000    ) -> Vec<Option<Arc<VerifiedTransaction>>> {
2001        let digests_and_tickets: Vec<_> = digests
2002            .iter()
2003            .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
2004            .collect();
2005        do_fallback_lookup(
2006            &digests_and_tickets,
2007            |(digest, _)| {
2008                self.metrics
2009                    .record_cache_request("transaction_block", "uncommitted");
2010                if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
2011                    self.metrics
2012                        .record_cache_hit("transaction_block", "uncommitted");
2013                    return CacheResult::Hit(Some(tx.transaction.clone()));
2014                }
2015                self.metrics
2016                    .record_cache_miss("transaction_block", "uncommitted");
2017
2018                self.metrics
2019                    .record_cache_request("transaction_block", "committed");
2020
2021                match self
2022                    .cached
2023                    .transactions
2024                    .get(digest)
2025                    .map(|l| l.lock().clone())
2026                {
2027                    Some(PointCacheItem::Some(tx)) => {
2028                        self.metrics
2029                            .record_cache_hit("transaction_block", "committed");
2030                        CacheResult::Hit(Some(tx))
2031                    }
2032                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2033                    None => {
2034                        self.metrics
2035                            .record_cache_miss("transaction_block", "committed");
2036
2037                        CacheResult::Miss
2038                    }
2039                }
2040            },
2041            |remaining| {
2042                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2043                let results: Vec<_> = self
2044                    .record_db_multi_get("transaction_block", remaining.len())
2045                    .multi_get_transaction_blocks(&remaining_digests)
2046                    .expect("db error")
2047                    .into_iter()
2048                    .map(|o| o.map(Arc::new))
2049                    .collect();
2050                for ((digest, ticket), result) in remaining.iter().zip_debug_eq(results.iter()) {
2051                    if result.is_none() {
2052                        self.cached.transactions.insert(digest, None, *ticket).ok();
2053                    }
2054                }
2055                results
2056            },
2057        )
2058    }
2059
2060    fn multi_get_executed_effects_digests(
2061        &self,
2062        digests: &[TransactionDigest],
2063    ) -> Vec<Option<TransactionEffectsDigest>> {
2064        let digests_and_tickets: Vec<_> = digests
2065            .iter()
2066            .map(|d| {
2067                (
2068                    *d,
2069                    self.cached.executed_effects_digests.get_ticket_for_read(d),
2070                )
2071            })
2072            .collect();
2073        do_fallback_lookup(
2074            &digests_and_tickets,
2075            |(digest, _)| {
2076                self.metrics
2077                    .record_cache_request("executed_effects_digests", "uncommitted");
2078                if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
2079                    self.metrics
2080                        .record_cache_hit("executed_effects_digests", "uncommitted");
2081                    return CacheResult::Hit(Some(*digest));
2082                }
2083                self.metrics
2084                    .record_cache_miss("executed_effects_digests", "uncommitted");
2085
2086                self.metrics
2087                    .record_cache_request("executed_effects_digests", "committed");
2088                match self
2089                    .cached
2090                    .executed_effects_digests
2091                    .get(digest)
2092                    .map(|l| *l.lock())
2093                {
2094                    Some(PointCacheItem::Some(digest)) => {
2095                        self.metrics
2096                            .record_cache_hit("executed_effects_digests", "committed");
2097                        CacheResult::Hit(Some(digest))
2098                    }
2099                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2100                    None => {
2101                        self.metrics
2102                            .record_cache_miss("executed_effects_digests", "committed");
2103                        CacheResult::Miss
2104                    }
2105                }
2106            },
2107            |remaining| {
2108                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2109                let results = self
2110                    .record_db_multi_get("executed_effects_digests", remaining.len())
2111                    .multi_get_executed_effects_digests(&remaining_digests)
2112                    .expect("db error");
2113                for ((digest, ticket), result) in remaining.iter().zip_debug_eq(results.iter()) {
2114                    if result.is_none() {
2115                        self.cached
2116                            .executed_effects_digests
2117                            .insert(digest, None, *ticket)
2118                            .ok();
2119                    }
2120                }
2121                results
2122            },
2123        )
2124    }
2125
2126    fn multi_get_effects(
2127        &self,
2128        digests: &[TransactionEffectsDigest],
2129    ) -> Vec<Option<TransactionEffects>> {
2130        let digests_and_tickets: Vec<_> = digests
2131            .iter()
2132            .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
2133            .collect();
2134        do_fallback_lookup(
2135            &digests_and_tickets,
2136            |(digest, _)| {
2137                self.metrics
2138                    .record_cache_request("transaction_effects", "uncommitted");
2139                if let Some(effects) = self.dirty.transaction_effects.get(digest) {
2140                    self.metrics
2141                        .record_cache_hit("transaction_effects", "uncommitted");
2142                    return CacheResult::Hit(Some(effects.clone()));
2143                }
2144                self.metrics
2145                    .record_cache_miss("transaction_effects", "uncommitted");
2146
2147                self.metrics
2148                    .record_cache_request("transaction_effects", "committed");
2149                match self
2150                    .cached
2151                    .transaction_effects
2152                    .get(digest)
2153                    .map(|l| l.lock().clone())
2154                {
2155                    Some(PointCacheItem::Some(effects)) => {
2156                        self.metrics
2157                            .record_cache_hit("transaction_effects", "committed");
2158                        CacheResult::Hit(Some((*effects).clone()))
2159                    }
2160                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2161                    None => {
2162                        self.metrics
2163                            .record_cache_miss("transaction_effects", "committed");
2164                        CacheResult::Miss
2165                    }
2166                }
2167            },
2168            |remaining| {
2169                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2170                let results = self
2171                    .record_db_multi_get("transaction_effects", remaining.len())
2172                    .multi_get_effects(remaining_digests.iter())
2173                    .expect("db error");
2174                for ((digest, ticket), result) in remaining.iter().zip_debug_eq(results.iter()) {
2175                    if result.is_none() {
2176                        self.cached
2177                            .transaction_effects
2178                            .insert(digest, None, *ticket)
2179                            .ok();
2180                    }
2181                }
2182                results
2183            },
2184        )
2185    }
2186
2187    fn transaction_executed_in_last_epoch(
2188        &self,
2189        digest: &TransactionDigest,
2190        current_epoch: EpochId,
2191    ) -> bool {
2192        if current_epoch == 0 {
2193            return false;
2194        }
2195        let last_epoch = current_epoch - 1;
2196        let cache_key = (last_epoch, *digest);
2197
2198        let ticket = self
2199            .cached
2200            .transaction_executed_in_last_epoch
2201            .get_ticket_for_read(&cache_key);
2202
2203        if let Some(cached) = self
2204            .cached
2205            .transaction_executed_in_last_epoch
2206            .get(&cache_key)
2207        {
2208            return cached.lock().is_some();
2209        }
2210
2211        let was_executed = self
2212            .store
2213            .perpetual_tables
2214            .was_transaction_executed_in_last_epoch(digest, current_epoch);
2215
2216        let value = if was_executed { Some(()) } else { None };
2217        self.cached
2218            .transaction_executed_in_last_epoch
2219            .insert(&cache_key, value, ticket)
2220            .ok();
2221
2222        was_executed
2223    }
2224
2225    fn notify_read_executed_effects_digests<'a>(
2226        &'a self,
2227        task_name: &'static str,
2228        digests: &'a [TransactionDigest],
2229    ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>> {
2230        self.executed_effects_digests_notify_read
2231            .read(task_name, digests, |digests| {
2232                self.multi_get_executed_effects_digests(digests)
2233            })
2234            .boxed()
2235    }
2236
2237    fn multi_get_events(
2238        &self,
2239        event_digests: &[TransactionDigest],
2240    ) -> Vec<Option<TransactionEvents>> {
2241        fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
2242            if events.data.is_empty() {
2243                None
2244            } else {
2245                Some(events)
2246            }
2247        }
2248
2249        let digests_and_tickets: Vec<_> = event_digests
2250            .iter()
2251            .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2252            .collect();
2253        do_fallback_lookup(
2254            &digests_and_tickets,
2255            |(digest, _)| {
2256                self.metrics
2257                    .record_cache_request("transaction_events", "uncommitted");
2258                if let Some(events) = self.dirty.transaction_events.get(digest).map(|e| e.clone()) {
2259                    self.metrics
2260                        .record_cache_hit("transaction_events", "uncommitted");
2261
2262                    return CacheResult::Hit(map_events(events));
2263                }
2264                self.metrics
2265                    .record_cache_miss("transaction_events", "uncommitted");
2266
2267                self.metrics
2268                    .record_cache_request("transaction_events", "committed");
2269                match self
2270                    .cached
2271                    .transaction_events
2272                    .get(digest)
2273                    .map(|l| l.lock().clone())
2274                {
2275                    Some(PointCacheItem::Some(events)) => {
2276                        self.metrics
2277                            .record_cache_hit("transaction_events", "committed");
2278                        CacheResult::Hit(map_events((*events).clone()))
2279                    }
2280                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2281                    None => {
2282                        self.metrics
2283                            .record_cache_miss("transaction_events", "committed");
2284
2285                        CacheResult::Miss
2286                    }
2287                }
2288            },
2289            |remaining| {
2290                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2291                let results = self
2292                    .store
2293                    .multi_get_events(&remaining_digests)
2294                    .expect("db error");
2295                for ((digest, ticket), result) in remaining.iter().zip_debug_eq(results.iter()) {
2296                    if result.is_none() {
2297                        self.cached
2298                            .transaction_events
2299                            .insert(digest, None, *ticket)
2300                            .ok();
2301                    }
2302                }
2303                results
2304            },
2305        )
2306    }
2307
2308    fn get_unchanged_loaded_runtime_objects(
2309        &self,
2310        digest: &TransactionDigest,
2311    ) -> Option<Vec<ObjectKey>> {
2312        self.dirty
2313            .unchanged_loaded_runtime_objects
2314            .get(digest)
2315            .map(|b| b.clone())
2316            .or_else(|| {
2317                self.store
2318                    .get_unchanged_loaded_runtime_objects(digest)
2319                    .expect("db error")
2320            })
2321    }
2322
2323    fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
2324        self.dirty
2325            .pending_transaction_writes
2326            .get(digest)
2327            .map(|transaction_output| transaction_output.take_accumulator_events())
2328    }
2329}
2330
2331impl ExecutionCacheWrite for WritebackCache {
2332    fn validate_owned_object_versions(&self, owned_input_objects: &[ObjectRef]) -> SuiResult {
2333        ObjectLocks::validate_owned_object_versions(self, owned_input_objects)
2334    }
2335
2336    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
2337        WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs);
2338    }
2339
2340    #[cfg(test)]
2341    fn write_object_entry_for_test(&self, object: Object) {
2342        self.write_object_entry(&object.id(), object.version(), object.into());
2343    }
2344}
2345
2346implement_passthrough_traits!(WritebackCache);
2347
2348impl GlobalStateHashStore for WritebackCache {
2349    fn get_object_ref_prior_to_key_deprecated(
2350        &self,
2351        object_id: &ObjectID,
2352        version: SequenceNumber,
2353    ) -> SuiResult<Option<ObjectRef>> {
2354        // There is probably a more efficient way to implement this, but since this is only used by
2355        // old protocol versions, it is better to do the simple thing that is obviously correct.
2356        // In this case we previous version from all sources and choose the highest
2357        let mut candidates = Vec::new();
2358
2359        let check_versions =
2360            |versions: &CachedVersionMap<ObjectEntry>| match versions.get_prior_to(&version) {
2361                Some((version, object_entry)) => match object_entry {
2362                    ObjectEntry::Object(object) => {
2363                        assert_eq!(object.version(), version);
2364                        Some(object.compute_object_reference())
2365                    }
2366                    ObjectEntry::Deleted => {
2367                        Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED))
2368                    }
2369                    ObjectEntry::Wrapped => {
2370                        Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED))
2371                    }
2372                },
2373                None => None,
2374            };
2375
2376        // first check dirty data
2377        if let Some(objects) = self.dirty.objects.get(object_id)
2378            && let Some(prior) = check_versions(&objects)
2379        {
2380            candidates.push(prior);
2381        }
2382
2383        if let Some(objects) = self.cached.object_cache.get(object_id)
2384            && let Some(prior) = check_versions(&objects.lock())
2385        {
2386            candidates.push(prior);
2387        }
2388
2389        if let Some(prior) = self
2390            .store
2391            .get_object_ref_prior_to_key_deprecated(object_id, version)?
2392        {
2393            candidates.push(prior);
2394        }
2395
2396        // sort candidates by version, and return the highest
2397        candidates.sort_by_key(|(_, version, _)| *version);
2398        Ok(candidates.pop())
2399    }
2400
2401    fn get_root_state_hash_for_epoch(
2402        &self,
2403        epoch: EpochId,
2404    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
2405        self.store.get_root_state_hash_for_epoch(epoch)
2406    }
2407
2408    fn get_root_state_hash_for_highest_epoch(
2409        &self,
2410    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
2411        self.store.get_root_state_hash_for_highest_epoch()
2412    }
2413
2414    fn insert_state_hash_for_epoch(
2415        &self,
2416        epoch: EpochId,
2417        checkpoint_seq_num: &CheckpointSequenceNumber,
2418        acc: &GlobalStateHash,
2419    ) -> SuiResult {
2420        self.store
2421            .insert_state_hash_for_epoch(epoch, checkpoint_seq_num, acc)
2422    }
2423
2424    fn iter_live_object_set(
2425        &self,
2426        include_wrapped_tombstone: bool,
2427    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2428        // The only time it is safe to iterate the live object set is at an epoch boundary,
2429        // at which point the db is consistent and the dirty cache is empty. So this does
2430        // read the cache
2431        assert!(
2432            self.dirty.is_empty(),
2433            "cannot iterate live object set with dirty data"
2434        );
2435        self.store.iter_live_object_set(include_wrapped_tombstone)
2436    }
2437
2438    // A version of iter_live_object_set that reads the cache. Only use for testing. If used
2439    // on a live validator, can cause the server to block for as long as it takes to iterate
2440    // the entire live object set.
2441    fn iter_cached_live_object_set_for_testing(
2442        &self,
2443        include_wrapped_tombstone: bool,
2444    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2445        // hold iter until we are finished to prevent any concurrent inserts/deletes
2446        let iter = self.dirty.objects.iter();
2447        let mut dirty_objects = BTreeMap::new();
2448
2449        // add everything from the store
2450        for obj in self.store.iter_live_object_set(include_wrapped_tombstone) {
2451            dirty_objects.insert(obj.object_id(), obj);
2452        }
2453
2454        // add everything from the cache, but also remove deletions
2455        for entry in iter {
2456            let id = *entry.key();
2457            let value = entry.value();
2458            match value.get_highest().unwrap() {
2459                (_, ObjectEntry::Object(object)) => {
2460                    dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2461                }
2462                (version, ObjectEntry::Wrapped) => {
2463                    if include_wrapped_tombstone {
2464                        dirty_objects.insert(id, LiveObject::Wrapped(ObjectKey(id, *version)));
2465                    } else {
2466                        dirty_objects.remove(&id);
2467                    }
2468                }
2469                (_, ObjectEntry::Deleted) => {
2470                    dirty_objects.remove(&id);
2471                }
2472            }
2473        }
2474
2475        Box::new(dirty_objects.into_values())
2476    }
2477}
2478
2479// TODO: For correctness, we must at least invalidate the cache when items are written through this
2480// trait (since they could be negatively cached as absent). But it may or may not be optimal to
2481// actually insert them into the cache. For instance if state sync is running ahead of execution,
2482// they might evict other items that are about to be read. This could be an area for tuning in the
2483// future.
2484impl StateSyncAPI for WritebackCache {
2485    fn insert_transaction_and_effects(
2486        &self,
2487        transaction: &VerifiedTransaction,
2488        transaction_effects: &TransactionEffects,
2489    ) {
2490        self.store
2491            .insert_transaction_and_effects(transaction, transaction_effects)
2492            .expect("db error");
2493        self.cached
2494            .transactions
2495            .insert(
2496                transaction.digest(),
2497                PointCacheItem::Some(Arc::new(transaction.clone())),
2498                Ticket::Write,
2499            )
2500            .ok();
2501        self.cached
2502            .transaction_effects
2503            .insert(
2504                &transaction_effects.digest(),
2505                PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2506                Ticket::Write,
2507            )
2508            .ok();
2509    }
2510
2511    fn multi_insert_transaction_and_effects(
2512        &self,
2513        transactions_and_effects: &[VerifiedExecutionData],
2514    ) {
2515        self.store
2516            .multi_insert_transaction_and_effects(transactions_and_effects.iter())
2517            .expect("db error");
2518        for VerifiedExecutionData {
2519            transaction,
2520            effects,
2521        } in transactions_and_effects
2522        {
2523            self.cached
2524                .transactions
2525                .insert(
2526                    transaction.digest(),
2527                    PointCacheItem::Some(Arc::new(transaction.clone())),
2528                    Ticket::Write,
2529                )
2530                .ok();
2531            self.cached
2532                .transaction_effects
2533                .insert(
2534                    &effects.digest(),
2535                    PointCacheItem::Some(Arc::new(effects.clone())),
2536                    Ticket::Write,
2537                )
2538                .ok();
2539        }
2540    }
2541}