sui_core/execution_cache/
writeback_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! MemoryCache is a cache for the transaction execution which delays writes to the database until
5//! transaction results are certified (i.e. they appear in a certified checkpoint, or an effects cert
6//! is observed by a fullnode). The cache also stores committed data in memory in order to serve
7//! future reads without hitting the database.
8//!
9//! For storing uncommitted transaction outputs, we cannot evict the data at all until it is written
10//! to disk. Committed data not only can be evicted, but it is also unbounded (imagine a stream of
11//! transactions that keep splitting a coin into smaller coins).
12//!
13//! We also want to be able to support negative cache hits (i.e. the case where we can determine an
14//! object does not exist without hitting the database).
15//!
16//! To achieve both of these goals, we split the cache data into two pieces, a dirty set and a cached
17//! set. The dirty set has no automatic evictions, data is only removed after being committed. The
18//! cached set is in a bounded-sized cache with automatic evictions. In order to support negative
19//! cache hits, we treat the two halves of the cache as FIFO queue. Newly written (dirty) versions are
20//! inserted to one end of the dirty queue. As versions are committed to disk, they are
21//! removed from the other end of the dirty queue and inserted into the cache queue. The cache queue
22//! is truncated if it exceeds its maximum size, by removing all but the N newest versions.
23//!
24//! This gives us the property that the sequence of versions in the dirty and cached queues are the
25//! most recent versions of the object, i.e. there can be no "gaps". This allows for the following:
26//!
27//!   - Negative cache hits: If the queried version is not in memory, but is higher than the smallest
28//!     version in the cached queue, it does not exist in the db either.
29//!   - Bounded reads: When reading the most recent version that is <= some version bound, we can
30//!     correctly satisfy this query from the cache, or determine that we must go to the db.
31//!
32//! Note that at any time, either or both the dirty or the cached queue may be non-existent. There may be no
33//! dirty versions of the objects, in which case there will be no dirty queue. And, the cached queue
34//! may be evicted from the cache, in which case there will be no cached queue. Because only the cached
35//! queue can be evicted (the dirty queue can only become empty by moving versions from it to the cached
36//! queue), the "highest versions" property still holds in all cases.
37//!
38//! The above design is used for both objects and markers.
39
40use crate::accumulators::funds_read::AccountFundsRead;
41use crate::authority::AuthorityStore;
42use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
43use crate::authority::authority_store::{
44    ExecutionLockWriteGuard, LockDetailsDeprecated, ObjectLockStatus, SuiLockResult,
45};
46use crate::authority::authority_store_tables::LiveObject;
47use crate::authority::backpressure::BackpressureManager;
48use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
49use crate::fallback_fetch::{do_fallback_lookup, do_fallback_lookup_fallible};
50use crate::global_state_hasher::GlobalStateHashStore;
51use crate::transaction_outputs::TransactionOutputs;
52
53use dashmap::DashMap;
54use dashmap::mapref::entry::Entry as DashMapEntry;
55use futures::{FutureExt, future::BoxFuture};
56use moka::sync::SegmentedCache as MokaCache;
57use mysten_common::debug_fatal;
58use mysten_common::random_util::randomize_cache_capacity_in_tests;
59use mysten_common::sync::notify_read::NotifyRead;
60use parking_lot::Mutex;
61use rayon::prelude::*;
62use std::collections::{BTreeMap, HashSet};
63use std::hash::Hash;
64use std::sync::Arc;
65use std::sync::atomic::AtomicU64;
66use sui_config::ExecutionCacheConfig;
67use sui_macros::fail_point;
68use sui_protocol_config::ProtocolVersion;
69use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
70use sui_types::accumulator_event::AccumulatorEvent;
71use sui_types::accumulator_root::{AccumulatorObjId, AccumulatorValue};
72use sui_types::base_types::{
73    EpochId, FullObjectID, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData,
74};
75use sui_types::bridge::{Bridge, get_bridge};
76use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest};
77use sui_types::effects::{TransactionEffects, TransactionEvents};
78use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
79use sui_types::executable_transaction::VerifiedExecutableTransaction;
80use sui_types::global_state_hash::GlobalStateHash;
81use sui_types::message_envelope::Message;
82use sui_types::messages_checkpoint::CheckpointSequenceNumber;
83use sui_types::object::Object;
84use sui_types::storage::{
85    FullObjectKey, InputKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject,
86};
87use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
88use sui_types::transaction::{TransactionDataAPI, VerifiedTransaction};
89use tap::TapOptional;
90use tracing::{debug, info, instrument, trace, warn};
91
92use super::ExecutionCacheAPI;
93use super::cache_types::Ticket;
94use super::{
95    Batch, CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
96    ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
97    cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache},
98    implement_passthrough_traits,
99    object_locks::ObjectLocks,
100};
101
102#[cfg(test)]
103#[path = "unit_tests/writeback_cache_tests.rs"]
104pub mod writeback_cache_tests;
105
106#[cfg(test)]
107#[path = "unit_tests/notify_read_input_objects_tests.rs"]
108mod notify_read_input_objects_tests;
109
110#[derive(Clone, PartialEq, Eq)]
111enum ObjectEntry {
112    Object(Object),
113    Deleted,
114    Wrapped,
115}
116
117impl ObjectEntry {
118    #[cfg(test)]
119    fn unwrap_object(&self) -> &Object {
120        match self {
121            ObjectEntry::Object(o) => o,
122            _ => panic!("unwrap_object called on non-Object"),
123        }
124    }
125
126    fn is_tombstone(&self) -> bool {
127        match self {
128            ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
129            ObjectEntry::Object(_) => false,
130        }
131    }
132}
133
134impl std::fmt::Debug for ObjectEntry {
135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        match self {
137            ObjectEntry::Object(o) => {
138                write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
139            }
140            ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
141            ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
142        }
143    }
144}
145
146impl From<Object> for ObjectEntry {
147    fn from(object: Object) -> Self {
148        ObjectEntry::Object(object)
149    }
150}
151
152impl From<ObjectOrTombstone> for ObjectEntry {
153    fn from(object: ObjectOrTombstone) -> Self {
154        match object {
155            ObjectOrTombstone::Object(o) => o.into(),
156            ObjectOrTombstone::Tombstone(obj_ref) => {
157                if obj_ref.2.is_deleted() {
158                    ObjectEntry::Deleted
159                } else if obj_ref.2.is_wrapped() {
160                    ObjectEntry::Wrapped
161                } else {
162                    panic!("tombstone digest must either be deleted or wrapped");
163                }
164            }
165        }
166    }
167}
168
169#[derive(Debug, Clone, PartialEq, Eq)]
170enum LatestObjectCacheEntry {
171    Object(SequenceNumber, ObjectEntry),
172    NonExistent,
173}
174
175impl LatestObjectCacheEntry {
176    #[cfg(test)]
177    fn version(&self) -> Option<SequenceNumber> {
178        match self {
179            LatestObjectCacheEntry::Object(version, _) => Some(*version),
180            LatestObjectCacheEntry::NonExistent => None,
181        }
182    }
183
184    fn is_alive(&self) -> bool {
185        match self {
186            LatestObjectCacheEntry::Object(_, entry) => !entry.is_tombstone(),
187            LatestObjectCacheEntry::NonExistent => false,
188        }
189    }
190}
191
192impl IsNewer for LatestObjectCacheEntry {
193    fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
194        match (self, other) {
195            (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
196                v1 > v2
197            }
198            (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
199            _ => false,
200        }
201    }
202}
203
204type MarkerKey = (EpochId, FullObjectID);
205
206/// UncommittedData stores execution outputs that are not yet written to the db. Entries in this
207/// struct can only be purged after they are committed.
208struct UncommittedData {
209    /// The object dirty set. All writes go into this table first. After we flush the data to the
210    /// db, the data is removed from this table and inserted into the object_cache.
211    ///
212    /// This table may contain both live and dead objects, since we flush both live and dead
213    /// objects to the db in order to support past object queries on fullnodes.
214    ///
215    /// Further, we only remove objects in FIFO order, which ensures that the cached
216    /// sequence of objects has no gaps. In other words, if we have versions 4, 8, 13 of
217    /// an object, we can deduce that version 9 does not exist. This also makes child object
218    /// reads efficient. `object_cache` cannot contain a more recent version of an object than
219    /// `objects`, and neither can have any gaps. Therefore if there is any object <= the version
220    /// bound for a child read in objects, it is the correct object to return.
221    objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
222
223    // Markers for received objects and deleted shared objects. This contains all of the dirty
224    // marker state, which is committed to the db at the same time as other transaction data.
225    // After markers are committed to the db we remove them from this table and insert them into
226    // marker_cache.
227    markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
228
229    transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
230
231    transaction_events: DashMap<TransactionDigest, TransactionEvents>,
232
233    unchanged_loaded_runtime_objects: DashMap<TransactionDigest, Vec<ObjectKey>>,
234
235    executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
236
237    // Transaction outputs that have not yet been written to the DB. Items are removed from this
238    // table as they are flushed to the db.
239    pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
240
241    total_transaction_inserts: AtomicU64,
242    total_transaction_commits: AtomicU64,
243}
244
245impl UncommittedData {
246    fn new() -> Self {
247        Self {
248            objects: DashMap::with_shard_amount(2048),
249            markers: DashMap::with_shard_amount(2048),
250            transaction_effects: DashMap::with_shard_amount(2048),
251            executed_effects_digests: DashMap::with_shard_amount(2048),
252            pending_transaction_writes: DashMap::with_shard_amount(2048),
253            transaction_events: DashMap::with_shard_amount(2048),
254            unchanged_loaded_runtime_objects: DashMap::with_shard_amount(2048),
255            total_transaction_inserts: AtomicU64::new(0),
256            total_transaction_commits: AtomicU64::new(0),
257        }
258    }
259
260    fn clear(&self) {
261        self.objects.clear();
262        self.markers.clear();
263        self.transaction_effects.clear();
264        self.executed_effects_digests.clear();
265        self.pending_transaction_writes.clear();
266        self.transaction_events.clear();
267        self.unchanged_loaded_runtime_objects.clear();
268        self.total_transaction_inserts
269            .store(0, std::sync::atomic::Ordering::Relaxed);
270        self.total_transaction_commits
271            .store(0, std::sync::atomic::Ordering::Relaxed);
272    }
273
274    fn is_empty(&self) -> bool {
275        let empty = self.pending_transaction_writes.is_empty();
276        if empty && cfg!(debug_assertions) {
277            assert!(
278                self.objects.is_empty()
279                    && self.markers.is_empty()
280                    && self.transaction_effects.is_empty()
281                    && self.executed_effects_digests.is_empty()
282                    && self.transaction_events.is_empty()
283                    && self.unchanged_loaded_runtime_objects.is_empty()
284                    && self
285                        .total_transaction_inserts
286                        .load(std::sync::atomic::Ordering::Relaxed)
287                        == self
288                            .total_transaction_commits
289                            .load(std::sync::atomic::Ordering::Relaxed),
290            );
291        }
292        empty
293    }
294}
295
296// Point items (anything without a version number) can be negatively cached as None
297type PointCacheItem<T> = Option<T>;
298
299// PointCacheItem can only be used for insert-only collections, so a Some entry
300// is always newer than a None entry.
301impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
302    fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
303        match (self, other) {
304            (Some(_), None) => true,
305
306            (Some(a), Some(b)) => {
307                // conflicting inserts should never happen
308                debug_assert_eq!(a, b);
309                false
310            }
311
312            _ => false,
313        }
314    }
315}
316
317/// CachedData stores data that has been committed to the db, but is likely to be read soon.
318struct CachedCommittedData {
319    // See module level comment for an explanation of caching strategy.
320    object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
321
322    // See module level comment for an explanation of caching strategy.
323    marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
324
325    transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
326
327    transaction_effects:
328        MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
329
330    transaction_events: MonotonicCache<TransactionDigest, PointCacheItem<Arc<TransactionEvents>>>,
331
332    executed_effects_digests:
333        MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
334
335    transaction_executed_in_last_epoch:
336        MonotonicCache<(EpochId, TransactionDigest), PointCacheItem<()>>,
337
338    // Objects that were read at transaction signing time - allows us to access them again at
339    // execution time with a single lock / hash lookup
340    _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
341}
342
343impl CachedCommittedData {
344    fn new(config: &ExecutionCacheConfig) -> Self {
345        let object_cache = MokaCache::builder(8)
346            .max_capacity(randomize_cache_capacity_in_tests(
347                config.object_cache_size(),
348            ))
349            .build();
350        let marker_cache = MokaCache::builder(8)
351            .max_capacity(randomize_cache_capacity_in_tests(
352                config.marker_cache_size(),
353            ))
354            .build();
355
356        let transactions = MonotonicCache::new(randomize_cache_capacity_in_tests(
357            config.transaction_cache_size(),
358        ));
359        let transaction_effects = MonotonicCache::new(randomize_cache_capacity_in_tests(
360            config.effect_cache_size(),
361        ));
362        let transaction_events = MonotonicCache::new(randomize_cache_capacity_in_tests(
363            config.events_cache_size(),
364        ));
365        let executed_effects_digests = MonotonicCache::new(randomize_cache_capacity_in_tests(
366            config.executed_effect_cache_size(),
367        ));
368
369        let transaction_objects = MokaCache::builder(8)
370            .max_capacity(randomize_cache_capacity_in_tests(
371                config.transaction_objects_cache_size(),
372            ))
373            .build();
374
375        let transaction_executed_in_last_epoch = MonotonicCache::new(
376            randomize_cache_capacity_in_tests(config.executed_effect_cache_size()),
377        );
378
379        Self {
380            object_cache,
381            marker_cache,
382            transactions,
383            transaction_effects,
384            transaction_events,
385            executed_effects_digests,
386            transaction_executed_in_last_epoch,
387            _transaction_objects: transaction_objects,
388        }
389    }
390
391    fn clear_and_assert_empty(&self) {
392        self.object_cache.invalidate_all();
393        self.marker_cache.invalidate_all();
394        self.transactions.invalidate_all();
395        self.transaction_effects.invalidate_all();
396        self.transaction_events.invalidate_all();
397        self.executed_effects_digests.invalidate_all();
398        self.transaction_executed_in_last_epoch.invalidate_all();
399        self._transaction_objects.invalidate_all();
400
401        assert_empty(&self.object_cache);
402        assert_empty(&self.marker_cache);
403        assert!(self.transactions.is_empty());
404        assert!(self.transaction_effects.is_empty());
405        assert!(self.transaction_events.is_empty());
406        assert!(self.executed_effects_digests.is_empty());
407        assert!(self.transaction_executed_in_last_epoch.is_empty());
408        assert_empty(&self._transaction_objects);
409    }
410}
411
412fn assert_empty<K, V>(cache: &MokaCache<K, V>)
413where
414    K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
415    V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
416{
417    if cache.iter().next().is_some() {
418        panic!("cache should be empty");
419    }
420}
421
422pub struct WritebackCache {
423    dirty: UncommittedData,
424    cached: CachedCommittedData,
425
426    // We separately cache the latest version of each object. Although this seems
427    // redundant, it is the only way to support populating the cache after a read.
428    // We cannot simply insert objects that we read off the disk into `object_cache`,
429    // since that may violate the no-missing-versions property.
430    // `object_by_id_cache` is also written to on writes so that it is always coherent.
431    // Hence it contains both committed and dirty object data.
432    object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
433
434    // The packages cache is treated separately from objects, because they are immutable and can be
435    // used by any number of transactions. Additionally, many operations require loading large
436    // numbers of packages (due to dependencies), so we want to try to keep all packages in memory.
437    //
438    // Also, this cache can contain packages that are dirty or committed, so it does not live in
439    // UncachedData or CachedCommittedData. The cache is populated in two ways:
440    // - when packages are written (in which case they will also be present in the dirty set)
441    // - after a cache miss. Because package IDs are unique (only one version exists for each ID)
442    //   we do not need to worry about the contiguous version property.
443    // - note that we removed any unfinalized packages from the cache during revert_state_update().
444    packages: MokaCache<ObjectID, PackageObject>,
445
446    object_locks: ObjectLocks,
447
448    executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
449    object_notify_read: NotifyRead<InputKey, ()>,
450
451    store: Arc<AuthorityStore>,
452    backpressure_threshold: u64,
453    backpressure_manager: Arc<BackpressureManager>,
454    metrics: Arc<ExecutionCacheMetrics>,
455}
456
457macro_rules! check_cache_entry_by_version {
458    ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
459        $self.metrics.record_cache_request($table, $level);
460        if let Some(cache) = $cache {
461            if let Some(entry) = cache.get(&$version) {
462                $self.metrics.record_cache_hit($table, $level);
463                return CacheResult::Hit(entry.clone());
464            }
465
466            if let Some(least_version) = cache.get_least() {
467                if least_version.0 < $version {
468                    // If the version is greater than the least version in the cache, then we know
469                    // that the object does not exist anywhere
470                    $self.metrics.record_cache_negative_hit($table, $level);
471                    return CacheResult::NegativeHit;
472                }
473            }
474        }
475        $self.metrics.record_cache_miss($table, $level);
476    };
477}
478
479macro_rules! check_cache_entry_by_latest {
480    ($self: ident, $table: expr, $level: expr, $cache: expr) => {
481        $self.metrics.record_cache_request($table, $level);
482        if let Some(cache) = $cache {
483            if let Some((version, entry)) = cache.get_highest() {
484                $self.metrics.record_cache_hit($table, $level);
485                return CacheResult::Hit((*version, entry.clone()));
486            } else {
487                panic!("empty CachedVersionMap should have been removed");
488            }
489        }
490        $self.metrics.record_cache_miss($table, $level);
491    };
492}
493
494impl WritebackCache {
495    pub fn new(
496        config: &ExecutionCacheConfig,
497        store: Arc<AuthorityStore>,
498        metrics: Arc<ExecutionCacheMetrics>,
499        backpressure_manager: Arc<BackpressureManager>,
500    ) -> Self {
501        let packages = MokaCache::builder(8)
502            .max_capacity(randomize_cache_capacity_in_tests(
503                config.package_cache_size(),
504            ))
505            .build();
506        Self {
507            dirty: UncommittedData::new(),
508            cached: CachedCommittedData::new(config),
509            object_by_id_cache: MonotonicCache::new(randomize_cache_capacity_in_tests(
510                config.object_by_id_cache_size(),
511            )),
512            packages,
513            object_locks: ObjectLocks::new(),
514            executed_effects_digests_notify_read: NotifyRead::new(),
515            object_notify_read: NotifyRead::new(),
516            store,
517            backpressure_manager,
518            backpressure_threshold: config.backpressure_threshold(),
519            metrics,
520        }
521    }
522
523    pub fn new_for_tests(store: Arc<AuthorityStore>) -> Self {
524        Self::new(
525            &Default::default(),
526            store,
527            ExecutionCacheMetrics::new(&prometheus::Registry::new()).into(),
528            BackpressureManager::new_for_tests(),
529        )
530    }
531
532    #[cfg(test)]
533    pub fn reset_for_test(&mut self) {
534        let mut new = Self::new(
535            &Default::default(),
536            self.store.clone(),
537            self.metrics.clone(),
538            self.backpressure_manager.clone(),
539        );
540        std::mem::swap(self, &mut new);
541    }
542
543    pub fn evict_executed_effects_from_cache_for_testing(&self, tx_digest: &TransactionDigest) {
544        self.cached.executed_effects_digests.invalidate(tx_digest);
545        self.cached.transaction_events.invalidate(tx_digest);
546        self.cached.transactions.invalidate(tx_digest);
547    }
548
549    fn write_object_entry(
550        &self,
551        object_id: &ObjectID,
552        version: SequenceNumber,
553        object: ObjectEntry,
554    ) {
555        trace!(?object_id, ?version, ?object, "inserting object entry");
556        self.metrics.record_cache_write("object");
557
558        // We must hold the lock for the object entry while inserting to the
559        // object_by_id_cache. Otherwise, a surprising bug can occur:
560        //
561        // 1. A thread executing TX1 can write object (O,1) to the dirty set and then pause.
562        // 2. TX2, which reads (O,1) can begin executing, because ExecutionScheduler immediately
563        //    schedules transactions if their inputs are available. It does not matter that TX1
564        //    hasn't finished executing yet.
565        // 3. TX2 can write (O,2) to both the dirty set and the object_by_id_cache.
566        // 4. The thread executing TX1 can resume and write (O,1) to the object_by_id_cache.
567        //
568        // Now, any subsequent attempt to get the latest version of O will return (O,1) instead of
569        // (O,2).
570        //
571        // This seems very unlikely, but it may be possible under the following circumstances:
572        // - While a thread is unlikely to pause for so long, moka cache uses optimistic
573        //   lock-free algorithms that have retry loops. Possibly, under high contention, this
574        //   code might spin for a surprisingly long time.
575        // - Additionally, many concurrent re-executions of the same tx could happen due to
576        //   the tx finalizer, plus checkpoint executor, consensus, and RPCs from fullnodes.
577        let mut entry = self.dirty.objects.entry(*object_id).or_default();
578
579        self.object_by_id_cache
580            .insert(
581                object_id,
582                LatestObjectCacheEntry::Object(version, object.clone()),
583                Ticket::Write,
584            )
585            // While Ticket::Write cannot expire, this insert may still fail.
586            // See the comment in `MonotonicCache::insert`.
587            .ok();
588
589        entry.insert(version, object.clone());
590
591        if let ObjectEntry::Object(object) = &object {
592            if object.is_package() {
593                self.object_notify_read
594                    .notify(&InputKey::Package { id: *object_id }, &());
595            } else if !object.is_child_object() {
596                self.object_notify_read.notify(
597                    &InputKey::VersionedObject {
598                        id: object.full_id(),
599                        version: object.version(),
600                    },
601                    &(),
602                );
603            }
604        }
605    }
606
607    fn write_marker_value(
608        &self,
609        epoch_id: EpochId,
610        object_key: FullObjectKey,
611        marker_value: MarkerValue,
612    ) {
613        tracing::trace!("inserting marker value {object_key:?}: {marker_value:?}",);
614        self.metrics.record_cache_write("marker");
615        self.dirty
616            .markers
617            .entry((epoch_id, object_key.id()))
618            .or_default()
619            .value_mut()
620            .insert(object_key.version(), marker_value);
621        // It is possible for a transaction to use a consensus stream ended
622        // object in the input, hence we must notify that it is now available
623        // at the assigned version, so that any transaction waiting for this
624        // object version can start execution.
625        if matches!(marker_value, MarkerValue::ConsensusStreamEnded(_)) {
626            self.object_notify_read.notify(
627                &InputKey::VersionedObject {
628                    id: object_key.id(),
629                    version: object_key.version(),
630                },
631                &(),
632            );
633        }
634    }
635
636    // lock both the dirty and committed sides of the cache, and then pass the entries to
637    // the callback. Written with the `with` pattern because any other way of doing this
638    // creates lifetime hell.
639    fn with_locked_cache_entries<K, V, R>(
640        dirty_map: &DashMap<K, CachedVersionMap<V>>,
641        cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
642        key: &K,
643        cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
644    ) -> R
645    where
646        K: Copy + Eq + Hash + Send + Sync + 'static,
647        V: Send + Sync + 'static,
648    {
649        let dirty_entry = dirty_map.entry(*key);
650        let dirty_entry = match &dirty_entry {
651            DashMapEntry::Occupied(occupied) => Some(occupied.get()),
652            DashMapEntry::Vacant(_) => None,
653        };
654
655        let cached_entry = cached_map.get(key);
656        let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
657        let cached_entry = cached_lock.as_deref();
658
659        cb(dirty_entry, cached_entry)
660    }
661
662    // Attempt to get an object from the cache. The DB is not consulted.
663    // Can return Hit, Miss, or NegativeHit (if the object is known to not exist).
664    fn get_object_entry_by_key_cache_only(
665        &self,
666        object_id: &ObjectID,
667        version: SequenceNumber,
668    ) -> CacheResult<ObjectEntry> {
669        Self::with_locked_cache_entries(
670            &self.dirty.objects,
671            &self.cached.object_cache,
672            object_id,
673            |dirty_entry, cached_entry| {
674                check_cache_entry_by_version!(
675                    self,
676                    "object_by_version",
677                    "uncommitted",
678                    dirty_entry,
679                    version
680                );
681                check_cache_entry_by_version!(
682                    self,
683                    "object_by_version",
684                    "committed",
685                    cached_entry,
686                    version
687                );
688                CacheResult::Miss
689            },
690        )
691    }
692
693    fn get_object_by_key_cache_only(
694        &self,
695        object_id: &ObjectID,
696        version: SequenceNumber,
697    ) -> CacheResult<Object> {
698        match self.get_object_entry_by_key_cache_only(object_id, version) {
699            CacheResult::Hit(entry) => match entry {
700                ObjectEntry::Object(object) => CacheResult::Hit(object),
701                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
702            },
703            CacheResult::Miss => CacheResult::Miss,
704            CacheResult::NegativeHit => CacheResult::NegativeHit,
705        }
706    }
707
708    fn get_object_entry_by_id_cache_only(
709        &self,
710        request_type: &'static str,
711        object_id: &ObjectID,
712    ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
713        self.metrics
714            .record_cache_request(request_type, "object_by_id");
715        let entry = self.object_by_id_cache.get(object_id);
716
717        if cfg!(debug_assertions)
718            && let Some(entry) = &entry
719        {
720            // check that cache is coherent
721            let highest: Option<ObjectEntry> = self
722                .dirty
723                .objects
724                .get(object_id)
725                .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
726                .or_else(|| {
727                    let obj: Option<ObjectEntry> = self
728                        .store
729                        .get_latest_object_or_tombstone(*object_id)
730                        .unwrap()
731                        .map(|(_, o)| o.into());
732                    obj
733                });
734
735            let cache_entry = match &*entry.lock() {
736                LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
737                LatestObjectCacheEntry::NonExistent => None,
738            };
739
740            // If the cache entry is a tombstone, the db entry may be missing if it was pruned.
741            let tombstone_possibly_pruned = highest.is_none()
742                && cache_entry
743                    .as_ref()
744                    .map(|e| e.is_tombstone())
745                    .unwrap_or(false);
746
747            if highest != cache_entry && !tombstone_possibly_pruned {
748                tracing::error!(
749                    ?highest,
750                    ?cache_entry,
751                    ?tombstone_possibly_pruned,
752                    "object_by_id cache is incoherent for {:?}",
753                    object_id
754                );
755                panic!("object_by_id cache is incoherent for {:?}", object_id);
756            }
757        }
758
759        if let Some(entry) = entry {
760            let entry = entry.lock();
761            match &*entry {
762                LatestObjectCacheEntry::Object(latest_version, latest_object) => {
763                    self.metrics.record_cache_hit(request_type, "object_by_id");
764                    return CacheResult::Hit((*latest_version, latest_object.clone()));
765                }
766                LatestObjectCacheEntry::NonExistent => {
767                    self.metrics
768                        .record_cache_negative_hit(request_type, "object_by_id");
769                    return CacheResult::NegativeHit;
770                }
771            }
772        } else {
773            self.metrics.record_cache_miss(request_type, "object_by_id");
774        }
775
776        Self::with_locked_cache_entries(
777            &self.dirty.objects,
778            &self.cached.object_cache,
779            object_id,
780            |dirty_entry, cached_entry| {
781                check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
782                check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
783                CacheResult::Miss
784            },
785        )
786    }
787
788    fn get_object_by_id_cache_only(
789        &self,
790        request_type: &'static str,
791        object_id: &ObjectID,
792    ) -> CacheResult<(SequenceNumber, Object)> {
793        match self.get_object_entry_by_id_cache_only(request_type, object_id) {
794            CacheResult::Hit((version, entry)) => match entry {
795                ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
796                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
797            },
798            CacheResult::NegativeHit => CacheResult::NegativeHit,
799            CacheResult::Miss => CacheResult::Miss,
800        }
801    }
802
803    fn get_marker_value_cache_only(
804        &self,
805        object_key: FullObjectKey,
806        epoch_id: EpochId,
807    ) -> CacheResult<MarkerValue> {
808        Self::with_locked_cache_entries(
809            &self.dirty.markers,
810            &self.cached.marker_cache,
811            &(epoch_id, object_key.id()),
812            |dirty_entry, cached_entry| {
813                check_cache_entry_by_version!(
814                    self,
815                    "marker_by_version",
816                    "uncommitted",
817                    dirty_entry,
818                    object_key.version()
819                );
820                check_cache_entry_by_version!(
821                    self,
822                    "marker_by_version",
823                    "committed",
824                    cached_entry,
825                    object_key.version()
826                );
827                CacheResult::Miss
828            },
829        )
830    }
831
832    fn get_latest_marker_value_cache_only(
833        &self,
834        object_id: FullObjectID,
835        epoch_id: EpochId,
836    ) -> CacheResult<(SequenceNumber, MarkerValue)> {
837        Self::with_locked_cache_entries(
838            &self.dirty.markers,
839            &self.cached.marker_cache,
840            &(epoch_id, object_id),
841            |dirty_entry, cached_entry| {
842                check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
843                check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
844                CacheResult::Miss
845            },
846        )
847    }
848
849    fn get_object_impl(&self, request_type: &'static str, id: &ObjectID) -> Option<Object> {
850        let ticket = self.object_by_id_cache.get_ticket_for_read(id);
851        match self.get_object_entry_by_id_cache_only(request_type, id) {
852            CacheResult::Hit((_, entry)) => match entry {
853                ObjectEntry::Object(object) => Some(object),
854                ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
855            },
856            CacheResult::NegativeHit => None,
857            CacheResult::Miss => {
858                let obj = self
859                    .store
860                    .get_latest_object_or_tombstone(*id)
861                    .expect("db error");
862                match obj {
863                    Some((key, obj)) => {
864                        self.cache_latest_object_by_id(
865                            id,
866                            LatestObjectCacheEntry::Object(key.1, obj.clone().into()),
867                            ticket,
868                        );
869                        match obj {
870                            ObjectOrTombstone::Object(object) => Some(object),
871                            ObjectOrTombstone::Tombstone(_) => None,
872                        }
873                    }
874                    None => {
875                        self.cache_object_not_found(id, ticket);
876                        None
877                    }
878                }
879            }
880        }
881    }
882
883    fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
884        self.metrics.record_cache_request(request_type, "db");
885        &self.store
886    }
887
888    fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
889        self.metrics
890            .record_cache_multi_request(request_type, "db", count);
891        &self.store
892    }
893
894    #[instrument(level = "debug", skip_all)]
895    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
896        let tx_digest = *tx_outputs.transaction.digest();
897        trace!(?tx_digest, "writing transaction outputs to cache");
898
899        assert!(
900            !self.transaction_executed_in_last_epoch(&tx_digest, epoch_id),
901            "Transaction {:?} was already executed in epoch {}",
902            tx_digest,
903            epoch_id.saturating_sub(1)
904        );
905
906        let TransactionOutputs {
907            transaction,
908            effects,
909            markers,
910            written,
911            deleted,
912            wrapped,
913            events,
914            unchanged_loaded_runtime_objects,
915            ..
916        } = &*tx_outputs;
917
918        // Deletions and wraps must be written first. The reason is that one of the deletes
919        // may be a child object, and if we write the parent object first, a reader may or may
920        // not see the previous version of the child object, instead of the deleted/wrapped
921        // tombstone, which would cause an execution fork
922        for ObjectKey(id, version) in deleted.iter() {
923            self.write_object_entry(id, *version, ObjectEntry::Deleted);
924        }
925
926        for ObjectKey(id, version) in wrapped.iter() {
927            self.write_object_entry(id, *version, ObjectEntry::Wrapped);
928        }
929
930        // Update all markers
931        for (object_key, marker_value) in markers.iter() {
932            self.write_marker_value(epoch_id, *object_key, *marker_value);
933        }
934
935        // Write children before parents to ensure that readers do not observe a parent object
936        // before its most recent children are visible.
937        for (object_id, object) in written.iter() {
938            if object.is_child_object() {
939                self.write_object_entry(object_id, object.version(), object.clone().into());
940            }
941        }
942        for (object_id, object) in written.iter() {
943            if !object.is_child_object() {
944                self.write_object_entry(object_id, object.version(), object.clone().into());
945                if object.is_package() {
946                    debug!("caching package: {:?}", object.compute_object_reference());
947                    self.packages
948                        .insert(*object_id, PackageObject::new(object.clone()));
949                }
950            }
951        }
952
953        let tx_digest = *transaction.digest();
954        debug!(
955            ?tx_digest,
956            "Writing transaction output objects to cache: {:?}",
957            written
958                .values()
959                .map(|o| (o.id(), o.version()))
960                .collect::<Vec<_>>(),
961        );
962        let effects_digest = effects.digest();
963
964        self.metrics.record_cache_write("transaction_block");
965        self.dirty
966            .pending_transaction_writes
967            .insert(tx_digest, tx_outputs.clone());
968
969        // insert transaction effects before executed_effects_digests so that there
970        // are never dangling entries in executed_effects_digests
971        self.metrics.record_cache_write("transaction_effects");
972        self.dirty
973            .transaction_effects
974            .insert(effects_digest, effects.clone());
975
976        // note: if events.data.is_empty(), then there are no events for this transaction. We
977        // store it anyway to avoid special cases in commint_transaction_outputs, and translate
978        // an empty events structure to None when reading.
979        self.metrics.record_cache_write("transaction_events");
980        self.dirty
981            .transaction_events
982            .insert(tx_digest, events.clone());
983
984        self.metrics
985            .record_cache_write("unchanged_loaded_runtime_objects");
986        self.dirty
987            .unchanged_loaded_runtime_objects
988            .insert(tx_digest, unchanged_loaded_runtime_objects.clone());
989
990        self.metrics.record_cache_write("executed_effects_digests");
991        self.dirty
992            .executed_effects_digests
993            .insert(tx_digest, effects_digest);
994
995        self.executed_effects_digests_notify_read
996            .notify(&tx_digest, &effects_digest);
997
998        self.metrics
999            .pending_notify_read
1000            .set(self.executed_effects_digests_notify_read.num_pending() as i64);
1001
1002        let prev = self
1003            .dirty
1004            .total_transaction_inserts
1005            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1006
1007        let pending_count = (prev + 1).saturating_sub(
1008            self.dirty
1009                .total_transaction_commits
1010                .load(std::sync::atomic::Ordering::Relaxed),
1011        );
1012
1013        self.set_backpressure(pending_count);
1014    }
1015
1016    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1017        let _metrics_guard = mysten_metrics::monitored_scope("WritebackCache::build_db_batch");
1018        let mut all_outputs = Vec::with_capacity(digests.len());
1019        for tx in digests {
1020            let Some(outputs) = self
1021                .dirty
1022                .pending_transaction_writes
1023                .get(tx)
1024                .map(|o| o.clone())
1025            else {
1026                // This can happen in the following rare case:
1027                // All transactions in the checkpoint are committed to the db (by commit_transaction_outputs,
1028                // called in CheckpointExecutor::process_executed_transactions), but the process crashes before
1029                // the checkpoint water mark is bumped. We will then re-commit the checkpoint at startup,
1030                // despite that all transactions are already executed.
1031                warn!("Attempt to commit unknown transaction {:?}", tx);
1032                continue;
1033            };
1034            all_outputs.push(outputs);
1035        }
1036
1037        let batch = self
1038            .store
1039            .build_db_batch(epoch, &all_outputs)
1040            .expect("db error");
1041        (all_outputs, batch)
1042    }
1043
1044    // Commits dirty data for the given TransactionDigest to the db.
1045    #[instrument(level = "debug", skip_all)]
1046    fn commit_transaction_outputs(
1047        &self,
1048        epoch: EpochId,
1049        (all_outputs, db_batch): Batch,
1050        digests: &[TransactionDigest],
1051    ) {
1052        let _metrics_guard =
1053            mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs");
1054        fail_point!("writeback-cache-commit");
1055        trace!(?digests);
1056
1057        // Flush writes to disk before removing anything from dirty set. otherwise,
1058        // a cache eviction could cause a value to disappear briefly, even if we insert to the
1059        // cache before removing from the dirty set.
1060        db_batch.write().expect("db error");
1061
1062        let _metrics_guard =
1063            mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs::flush");
1064        // Parallel phase: tx-level metadata is keyed by unique tx_digest/effects_digest,
1065        // so there are no cross-transaction ordering constraints.
1066        all_outputs.par_iter().with_min_len(16).for_each(|outputs| {
1067            let tx_digest = outputs.transaction.digest();
1068            assert!(
1069                self.dirty
1070                    .pending_transaction_writes
1071                    .remove(tx_digest)
1072                    .is_some()
1073            );
1074            self.flush_tx_metadata_from_dirty_to_cached(*tx_digest, outputs);
1075        });
1076
1077        // Sequential phase: object/marker versions must be popped in causal order
1078        // (oldest first) per object_id. Multiple transactions in the batch can touch
1079        // the same shared object at consecutive versions, so this loop must preserve
1080        // the order of all_outputs.
1081        for outputs in all_outputs.iter() {
1082            self.flush_objects_from_dirty_to_cached(epoch, outputs);
1083        }
1084
1085        let num_outputs = all_outputs.len() as u64;
1086        let num_commits = self
1087            .dirty
1088            .total_transaction_commits
1089            .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
1090            + num_outputs;
1091
1092        let pending_count = self
1093            .dirty
1094            .total_transaction_inserts
1095            .load(std::sync::atomic::Ordering::Relaxed)
1096            .saturating_sub(num_commits);
1097
1098        self.set_backpressure(pending_count);
1099    }
1100
1101    fn approximate_pending_transaction_count(&self) -> u64 {
1102        let num_commits = self
1103            .dirty
1104            .total_transaction_commits
1105            .load(std::sync::atomic::Ordering::Relaxed);
1106
1107        self.dirty
1108            .total_transaction_inserts
1109            .load(std::sync::atomic::Ordering::Relaxed)
1110            .saturating_sub(num_commits)
1111    }
1112
1113    fn set_backpressure(&self, pending_count: u64) {
1114        let backpressure = pending_count > self.backpressure_threshold;
1115        let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1116        if backpressure_changed {
1117            self.metrics.backpressure_toggles.inc();
1118        }
1119        self.metrics
1120            .backpressure_status
1121            .set(if backpressure { 1 } else { 0 });
1122    }
1123
1124    // Flushes tx-level metadata for a single transaction from dirty to cache.
1125    // All keys are unique per transaction (tx_digest, effects_digest), so this
1126    // is safe to call in parallel across transactions.
1127    fn flush_tx_metadata_from_dirty_to_cached(
1128        &self,
1129        tx_digest: TransactionDigest,
1130        outputs: &TransactionOutputs,
1131    ) {
1132        // TODO: outputs should have a strong count of 1 so we should be able to move out of it
1133        let TransactionOutputs {
1134            transaction,
1135            effects,
1136            events,
1137            ..
1138        } = outputs;
1139
1140        let effects_digest = effects.digest();
1141
1142        // Update cache before removing from self.dirty to avoid
1143        // unnecessary cache misses
1144        self.cached
1145            .transactions
1146            .insert(
1147                &tx_digest,
1148                PointCacheItem::Some(transaction.clone()),
1149                Ticket::Write,
1150            )
1151            .ok();
1152        self.cached
1153            .transaction_effects
1154            .insert(
1155                &effects_digest,
1156                PointCacheItem::Some(effects.clone().into()),
1157                Ticket::Write,
1158            )
1159            .ok();
1160        self.cached
1161            .executed_effects_digests
1162            .insert(
1163                &tx_digest,
1164                PointCacheItem::Some(effects_digest),
1165                Ticket::Write,
1166            )
1167            .ok();
1168        self.cached
1169            .transaction_events
1170            .insert(
1171                &tx_digest,
1172                PointCacheItem::Some(events.clone().into()),
1173                Ticket::Write,
1174            )
1175            .ok();
1176
1177        self.dirty
1178            .transaction_effects
1179            .remove(&effects_digest)
1180            .expect("effects must exist");
1181
1182        self.dirty
1183            .transaction_events
1184            .remove(&tx_digest)
1185            .expect("events must exist");
1186
1187        self.dirty
1188            .unchanged_loaded_runtime_objects
1189            .remove(&tx_digest)
1190            .expect("unchanged_loaded_runtime_objects must exist");
1191
1192        self.dirty
1193            .executed_effects_digests
1194            .remove(&tx_digest)
1195            .expect("executed effects must exist");
1196    }
1197
1198    // Flushes object and marker versions for a single transaction from dirty to cache.
1199    // Multiple transactions in the same batch can modify the same shared object at
1200    // consecutive versions, so callers must invoke this in causal (checkpoint) order.
1201    fn flush_objects_from_dirty_to_cached(&self, epoch: EpochId, outputs: &TransactionOutputs) {
1202        let TransactionOutputs {
1203            markers,
1204            written,
1205            deleted,
1206            wrapped,
1207            ..
1208        } = outputs;
1209
1210        for (object_key, marker_value) in markers.iter() {
1211            Self::move_version_from_dirty_to_cache(
1212                &self.dirty.markers,
1213                &self.cached.marker_cache,
1214                (epoch, object_key.id()),
1215                object_key.version(),
1216                marker_value,
1217            );
1218        }
1219
1220        for (object_id, object) in written.iter() {
1221            Self::move_version_from_dirty_to_cache(
1222                &self.dirty.objects,
1223                &self.cached.object_cache,
1224                *object_id,
1225                object.version(),
1226                &ObjectEntry::Object(object.clone()),
1227            );
1228        }
1229
1230        for ObjectKey(object_id, version) in deleted.iter() {
1231            Self::move_version_from_dirty_to_cache(
1232                &self.dirty.objects,
1233                &self.cached.object_cache,
1234                *object_id,
1235                *version,
1236                &ObjectEntry::Deleted,
1237            );
1238        }
1239
1240        for ObjectKey(object_id, version) in wrapped.iter() {
1241            Self::move_version_from_dirty_to_cache(
1242                &self.dirty.objects,
1243                &self.cached.object_cache,
1244                *object_id,
1245                *version,
1246                &ObjectEntry::Wrapped,
1247            );
1248        }
1249    }
1250
1251    // Move the oldest/least entry from the dirty queue to the cache queue.
1252    // This is called after the entry is committed to the db.
1253    fn move_version_from_dirty_to_cache<K, V>(
1254        dirty: &DashMap<K, CachedVersionMap<V>>,
1255        cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1256        key: K,
1257        version: SequenceNumber,
1258        value: &V,
1259    ) where
1260        K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1261        V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1262    {
1263        static MAX_VERSIONS: usize = 3;
1264
1265        // IMPORTANT: lock both the dirty set entry and the cache entry before modifying either.
1266        // this ensures that readers cannot see a value temporarily disappear.
1267        let dirty_entry = dirty.entry(key);
1268        let cache_entry = cache.entry(key).or_default();
1269        let mut cache_map = cache_entry.value().lock();
1270
1271        // insert into cache and drop old versions.
1272        cache_map.insert(version, value.clone());
1273        // TODO: make this automatic by giving CachedVersionMap an optional max capacity
1274        cache_map.truncate_to(MAX_VERSIONS);
1275
1276        let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1277            panic!("dirty map must exist");
1278        };
1279
1280        let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1281
1282        assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1283
1284        // if there are no versions remaining, remove the map entry
1285        if occupied_dirty_entry.get().is_empty() {
1286            occupied_dirty_entry.remove();
1287        }
1288    }
1289
1290    // Updates the latest object id cache with an entry that was read from the db.
1291    fn cache_latest_object_by_id(
1292        &self,
1293        object_id: &ObjectID,
1294        object: LatestObjectCacheEntry,
1295        ticket: Ticket,
1296    ) {
1297        trace!("caching object by id: {:?} {:?}", object_id, object);
1298        if self
1299            .object_by_id_cache
1300            .insert(object_id, object, ticket)
1301            .is_ok()
1302        {
1303            self.metrics.record_cache_write("object_by_id");
1304        } else {
1305            trace!("discarded cache write due to expired ticket");
1306            self.metrics.record_ticket_expiry();
1307        }
1308    }
1309
1310    fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1311        self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1312    }
1313
1314    fn clear_state_end_of_epoch_impl(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
1315        info!("clearing state at end of epoch");
1316
1317        // Note: there cannot be any concurrent writes to self.dirty while we are in this function,
1318        // as all transaction execution is paused.
1319        for r in self.dirty.pending_transaction_writes.iter() {
1320            let outputs = r.value();
1321            if !outputs
1322                .transaction
1323                .transaction_data()
1324                .shared_input_objects()
1325                .is_empty()
1326            {
1327                debug_fatal!("transaction must be single writer");
1328            }
1329            info!(
1330                "clearing state for transaction {:?}",
1331                outputs.transaction.digest()
1332            );
1333            for (object_id, object) in outputs.written.iter() {
1334                if object.is_package() {
1335                    info!("removing non-finalized package from cache: {:?}", object_id);
1336                    self.packages.invalidate(object_id);
1337                }
1338                self.object_by_id_cache.invalidate(object_id);
1339                self.cached.object_cache.invalidate(object_id);
1340            }
1341
1342            for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1343                self.object_by_id_cache.invalidate(object_id);
1344                self.cached.object_cache.invalidate(object_id);
1345            }
1346        }
1347
1348        self.dirty.clear();
1349
1350        info!("clearing old transaction locks");
1351        self.object_locks.clear();
1352        info!("clearing object per epoch marker table");
1353        self.store
1354            .clear_object_per_epoch_marker_table(execution_guard)
1355            .expect("db error");
1356    }
1357
1358    fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) {
1359        self.store
1360            .bulk_insert_genesis_objects(objects)
1361            .expect("db error");
1362        for obj in objects {
1363            self.cached.object_cache.invalidate(&obj.id());
1364            self.object_by_id_cache.invalidate(&obj.id());
1365        }
1366    }
1367
1368    fn insert_genesis_object_impl(&self, object: Object) {
1369        self.object_by_id_cache.invalidate(&object.id());
1370        self.cached.object_cache.invalidate(&object.id());
1371        self.store.insert_genesis_object(object).expect("db error");
1372    }
1373
1374    pub fn clear_caches_and_assert_empty(&self) {
1375        info!("clearing caches");
1376        self.cached.clear_and_assert_empty();
1377        self.object_by_id_cache.invalidate_all();
1378        assert!(&self.object_by_id_cache.is_empty());
1379        self.packages.invalidate_all();
1380        assert_empty(&self.packages);
1381    }
1382}
1383
1384impl AccountFundsRead for WritebackCache {
1385    fn get_latest_account_amount(&self, account_id: &AccumulatorObjId) -> (u128, SequenceNumber) {
1386        let mut pre_root_version =
1387            ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1388                .unwrap()
1389                .version();
1390        let mut loop_iter = 0;
1391        loop {
1392            let account_obj = ObjectCacheRead::get_object(self, account_id.inner());
1393            if let Some(account_obj) = account_obj {
1394                let (_, AccumulatorValue::U128(value)) =
1395                    account_obj.data.try_as_move().unwrap().try_into().unwrap();
1396                return (value.value, account_obj.version());
1397            }
1398            let post_root_version =
1399                ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1400                    .unwrap()
1401                    .version();
1402            if pre_root_version == post_root_version {
1403                return (0, pre_root_version);
1404            }
1405            debug!(
1406                "Root version changed from {} to {} while reading account amount, retrying",
1407                pre_root_version, post_root_version
1408            );
1409            pre_root_version = post_root_version;
1410            loop_iter += 1;
1411            if loop_iter >= 3 {
1412                debug_fatal!("Unable to get a stable version after 3 iterations");
1413            }
1414        }
1415    }
1416
1417    fn get_account_amount_at_version(
1418        &self,
1419        account_id: &AccumulatorObjId,
1420        version: SequenceNumber,
1421    ) -> u128 {
1422        let account_obj = self.find_object_lt_or_eq_version(*account_id.inner(), version);
1423        if let Some(account_obj) = account_obj {
1424            let (_, AccumulatorValue::U128(value)) =
1425                account_obj.data.try_as_move().unwrap().try_into().unwrap();
1426            value.value
1427        } else {
1428            0
1429        }
1430    }
1431}
1432
1433impl ExecutionCacheAPI for WritebackCache {}
1434
1435impl ExecutionCacheCommit for WritebackCache {
1436    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1437        self.build_db_batch(epoch, digests)
1438    }
1439
1440    fn commit_transaction_outputs(
1441        &self,
1442        epoch: EpochId,
1443        batch: Batch,
1444        digests: &[TransactionDigest],
1445    ) {
1446        WritebackCache::commit_transaction_outputs(self, epoch, batch, digests)
1447    }
1448
1449    fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) {
1450        self.store.persist_transaction(tx).expect("db error");
1451    }
1452
1453    fn approximate_pending_transaction_count(&self) -> u64 {
1454        WritebackCache::approximate_pending_transaction_count(self)
1455    }
1456}
1457
1458impl ObjectCacheRead for WritebackCache {
1459    fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1460        self.metrics
1461            .record_cache_request("package", "package_cache");
1462        if let Some(p) = self.packages.get(package_id) {
1463            if cfg!(debug_assertions) {
1464                let canonical_package = self
1465                    .dirty
1466                    .objects
1467                    .get(package_id)
1468                    .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1469                        Some(ObjectEntry::Object(object)) => Some(object),
1470                        _ => None,
1471                    })
1472                    .or_else(|| self.store.get_object(package_id));
1473
1474                if let Some(canonical_package) = canonical_package {
1475                    assert_eq!(
1476                        canonical_package.digest(),
1477                        p.object().digest(),
1478                        "Package object cache is inconsistent for package {:?}",
1479                        package_id
1480                    );
1481                }
1482            }
1483            self.metrics.record_cache_hit("package", "package_cache");
1484            return Ok(Some(p));
1485        } else {
1486            self.metrics.record_cache_miss("package", "package_cache");
1487        }
1488
1489        // We try the dirty objects cache as well before going to the database. This is necessary
1490        // because the package could be evicted from the package cache before it is committed
1491        // to the database.
1492        if let Some(p) = self.get_object_impl("package", package_id) {
1493            if p.is_package() {
1494                let p = PackageObject::new(p);
1495                tracing::trace!(
1496                    "caching package: {:?}",
1497                    p.object().compute_object_reference()
1498                );
1499                self.metrics.record_cache_write("package");
1500                self.packages.insert(*package_id, p.clone());
1501                Ok(Some(p))
1502            } else {
1503                Err(SuiErrorKind::UserInputError {
1504                    error: UserInputError::MoveObjectAsPackage {
1505                        object_id: *package_id,
1506                    },
1507                }
1508                .into())
1509            }
1510        } else {
1511            Ok(None)
1512        }
1513    }
1514
1515    fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1516        // This is a no-op because all writes go through the cache, therefore it can never
1517        // be incoherent
1518    }
1519
1520    // get_object and variants.
1521
1522    fn get_object(&self, id: &ObjectID) -> Option<Object> {
1523        self.get_object_impl("object_latest", id)
1524    }
1525
1526    fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object> {
1527        match self.get_object_by_key_cache_only(object_id, version) {
1528            CacheResult::Hit(object) => Some(object),
1529            CacheResult::NegativeHit => None,
1530            CacheResult::Miss => self
1531                .record_db_get("object_by_version")
1532                .get_object_by_key(object_id, version),
1533        }
1534    }
1535
1536    fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
1537        do_fallback_lookup(
1538            object_keys,
1539            |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1540                CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1541                CacheResult::NegativeHit => CacheResult::NegativeHit,
1542                CacheResult::Miss => CacheResult::Miss,
1543            },
1544            |remaining| {
1545                self.record_db_multi_get("object_by_version", remaining.len())
1546                    .multi_get_objects_by_key(remaining)
1547                    .expect("db error")
1548            },
1549        )
1550    }
1551
1552    fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool {
1553        match self.get_object_by_key_cache_only(object_id, version) {
1554            CacheResult::Hit(_) => true,
1555            CacheResult::NegativeHit => false,
1556            CacheResult::Miss => self
1557                .record_db_get("object_by_version")
1558                .object_exists_by_key(object_id, version)
1559                .expect("db error"),
1560        }
1561    }
1562
1563    fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool> {
1564        do_fallback_lookup(
1565            object_keys,
1566            |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1567                CacheResult::Hit(_) => CacheResult::Hit(true),
1568                CacheResult::NegativeHit => CacheResult::Hit(false),
1569                CacheResult::Miss => CacheResult::Miss,
1570            },
1571            |remaining| {
1572                self.record_db_multi_get("object_by_version", remaining.len())
1573                    .multi_object_exists_by_key(remaining)
1574                    .expect("db error")
1575            },
1576        )
1577    }
1578
1579    fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
1580        match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1581            CacheResult::Hit((version, entry)) => Some(match entry {
1582                ObjectEntry::Object(object) => object.compute_object_reference(),
1583                ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1584                ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1585            }),
1586            CacheResult::NegativeHit => None,
1587            CacheResult::Miss => self
1588                .record_db_get("latest_objref_or_tombstone")
1589                .get_latest_object_ref_or_tombstone(object_id)
1590                .expect("db error"),
1591        }
1592    }
1593
1594    fn get_latest_object_or_tombstone(
1595        &self,
1596        object_id: ObjectID,
1597    ) -> Option<(ObjectKey, ObjectOrTombstone)> {
1598        match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1599            CacheResult::Hit((version, entry)) => {
1600                let key = ObjectKey(object_id, version);
1601                Some(match entry {
1602                    ObjectEntry::Object(object) => (key, object.into()),
1603                    ObjectEntry::Deleted => (
1604                        key,
1605                        ObjectOrTombstone::Tombstone((
1606                            object_id,
1607                            version,
1608                            ObjectDigest::OBJECT_DIGEST_DELETED,
1609                        )),
1610                    ),
1611                    ObjectEntry::Wrapped => (
1612                        key,
1613                        ObjectOrTombstone::Tombstone((
1614                            object_id,
1615                            version,
1616                            ObjectDigest::OBJECT_DIGEST_WRAPPED,
1617                        )),
1618                    ),
1619                })
1620            }
1621            CacheResult::NegativeHit => None,
1622            CacheResult::Miss => self
1623                .record_db_get("latest_object_or_tombstone")
1624                .get_latest_object_or_tombstone(object_id)
1625                .expect("db error"),
1626        }
1627    }
1628
1629    fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool> {
1630        keys.iter()
1631            .map(|key| {
1632                if key.is_cancelled() {
1633                    true
1634                } else {
1635                    match key {
1636                        InputKey::VersionedObject { id, version } => {
1637                            matches!(
1638                                self.get_object_by_key_cache_only(&id.id(), *version),
1639                                CacheResult::Hit(_)
1640                            )
1641                        }
1642                        InputKey::Package { id } => self.packages.contains_key(id),
1643                    }
1644                }
1645            })
1646            .collect()
1647    }
1648
1649    #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1650    fn find_object_lt_or_eq_version(
1651        &self,
1652        object_id: ObjectID,
1653        version_bound: SequenceNumber,
1654    ) -> Option<Object> {
1655        macro_rules! check_cache_entry {
1656            ($level: expr, $objects: expr) => {
1657                self.metrics
1658                    .record_cache_request("object_lt_or_eq_version", $level);
1659                if let Some(objects) = $objects {
1660                    if let Some((_, object)) = objects
1661                        .all_versions_lt_or_eq_descending(&version_bound)
1662                        .next()
1663                    {
1664                        if let ObjectEntry::Object(object) = object {
1665                            self.metrics
1666                                .record_cache_hit("object_lt_or_eq_version", $level);
1667                            return Some(object.clone());
1668                        } else {
1669                            // if we find a tombstone, the object does not exist
1670                            self.metrics
1671                                .record_cache_negative_hit("object_lt_or_eq_version", $level);
1672                            return None;
1673                        }
1674                    } else {
1675                        self.metrics
1676                            .record_cache_miss("object_lt_or_eq_version", $level);
1677                    }
1678                }
1679            };
1680        }
1681
1682        // if we have the latest version cached, and it is within the bound, we are done
1683        self.metrics
1684            .record_cache_request("object_lt_or_eq_version", "object_by_id");
1685        let latest_cache_entry = self.object_by_id_cache.get(&object_id);
1686        if let Some(latest) = &latest_cache_entry {
1687            let latest = latest.lock();
1688            match &*latest {
1689                LatestObjectCacheEntry::Object(latest_version, object) => {
1690                    if *latest_version <= version_bound {
1691                        if let ObjectEntry::Object(object) = object {
1692                            self.metrics
1693                                .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1694                            return Some(object.clone());
1695                        } else {
1696                            // object is a tombstone, but is still within the version bound
1697                            self.metrics.record_cache_negative_hit(
1698                                "object_lt_or_eq_version",
1699                                "object_by_id",
1700                            );
1701                            return None;
1702                        }
1703                    }
1704                    // latest object is not within the version bound. fall through.
1705                }
1706                // No object by this ID exists at all
1707                LatestObjectCacheEntry::NonExistent => {
1708                    self.metrics
1709                        .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1710                    return None;
1711                }
1712            }
1713        }
1714        self.metrics
1715            .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1716
1717        Self::with_locked_cache_entries(
1718            &self.dirty.objects,
1719            &self.cached.object_cache,
1720            &object_id,
1721            |dirty_entry, cached_entry| {
1722                check_cache_entry!("committed", dirty_entry);
1723                check_cache_entry!("uncommitted", cached_entry);
1724
1725                // Much of the time, the query will be for the very latest object version, so
1726                // try that first. But we have to be careful:
1727                // 1. We must load the tombstone if it is present, because its version may exceed
1728                //    the version_bound, in which case we must do a scan.
1729                // 2. You might think we could just call `self.store.get_latest_object_or_tombstone` here.
1730                //    But we cannot, because there may be a more recent version in the dirty set, which
1731                //    we skipped over in check_cache_entry! because of the version bound. However, if we
1732                //    skipped it above, we will skip it here as well, again due to the version bound.
1733                // 3. Despite that, we really want to warm the cache here. Why? Because if the object is
1734                //    cold (not being written to), then we will very soon be able to start serving reads
1735                //    of it from the object_by_id cache, IF we can warm the cache. If we don't warm the
1736                //    the cache here, and no writes to the object occur, then we will always have to go
1737                //    to the db for the object.
1738                //
1739                // Lastly, it is important to understand the rationale for all this: If the object is
1740                // write-hot, we will serve almost all reads to it from the dirty set (or possibly the
1741                // cached set if it is only written to once every few checkpoints). If the object is
1742                // write-cold (or non-existent) and read-hot, then we will serve almost all reads to it
1743                // from the object_by_id cache check above.  Most of the apparently wasteful code here
1744                // exists only to ensure correctness in all the edge cases.
1745                let latest: Option<(SequenceNumber, ObjectEntry)> =
1746                    if let Some(dirty_set) = dirty_entry {
1747                        dirty_set
1748                            .get_highest()
1749                            .cloned()
1750                            .tap_none(|| panic!("dirty set cannot be empty"))
1751                    } else {
1752                        // TODO: we should try not to read from the db while holding the locks.
1753                        self.record_db_get("object_lt_or_eq_version_latest")
1754                            .get_latest_object_or_tombstone(object_id)
1755                            .expect("db error")
1756                            .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1757                                (version, ObjectEntry::from(obj_or_tombstone))
1758                            })
1759                    };
1760
1761                if let Some((obj_version, obj_entry)) = latest {
1762                    // we can always cache the latest object (or tombstone), even if it is not within the
1763                    // version_bound. This is done in order to warm the cache in the case where a sequence
1764                    // of transactions all read the same child object without writing to it.
1765
1766                    // Note: no need to call with_object_by_id_cache_update here, because we are holding
1767                    // the lock on the dirty cache entry, and `latest` cannot become out-of-date
1768                    // while we hold that lock.
1769                    self.cache_latest_object_by_id(
1770                        &object_id,
1771                        LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1772                        // We can get a ticket at the last second, because we are holding the lock
1773                        // on dirty, so there cannot be any concurrent writes.
1774                        self.object_by_id_cache.get_ticket_for_read(&object_id),
1775                    );
1776
1777                    if obj_version <= version_bound {
1778                        match obj_entry {
1779                            ObjectEntry::Object(object) => Some(object),
1780                            ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
1781                        }
1782                    } else {
1783                        // The latest object exceeded the bound, so now we have to do a scan
1784                        // But we already know there is no dirty entry within the bound,
1785                        // so we go to the db.
1786                        self.record_db_get("object_lt_or_eq_version_scan")
1787                            .find_object_lt_or_eq_version(object_id, version_bound)
1788                            .expect("db error")
1789                    }
1790
1791                // no object found in dirty set or db, object does not exist
1792                // When this is called from a read api (i.e. not the execution path) it is
1793                // possible that the object has been deleted and pruned. In this case,
1794                // there would be no entry at all on disk, but we may have a tombstone in the
1795                // cache
1796                } else if let Some(latest_cache_entry) = latest_cache_entry {
1797                    // If there is a latest cache entry, it had better not be a live object!
1798                    assert!(!latest_cache_entry.lock().is_alive());
1799                    None
1800                } else {
1801                    // If there is no latest cache entry, we can insert one.
1802                    let highest = cached_entry.and_then(|c| c.get_highest());
1803                    assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1804                    self.cache_object_not_found(
1805                        &object_id,
1806                        // okay to get ticket at last second - see above
1807                        self.object_by_id_cache.get_ticket_for_read(&object_id),
1808                    );
1809                    None
1810                }
1811            },
1812        )
1813    }
1814
1815    fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1816        get_sui_system_state(self)
1817    }
1818
1819    fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge> {
1820        get_bridge(self)
1821    }
1822
1823    fn get_marker_value(
1824        &self,
1825        object_key: FullObjectKey,
1826        epoch_id: EpochId,
1827    ) -> Option<MarkerValue> {
1828        match self.get_marker_value_cache_only(object_key, epoch_id) {
1829            CacheResult::Hit(marker) => Some(marker),
1830            CacheResult::NegativeHit => None,
1831            CacheResult::Miss => self
1832                .record_db_get("marker_by_version")
1833                .get_marker_value(object_key, epoch_id)
1834                .expect("db error"),
1835        }
1836    }
1837
1838    fn get_latest_marker(
1839        &self,
1840        object_id: FullObjectID,
1841        epoch_id: EpochId,
1842    ) -> Option<(SequenceNumber, MarkerValue)> {
1843        match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1844            CacheResult::Hit((v, marker)) => Some((v, marker)),
1845            CacheResult::NegativeHit => {
1846                panic!("cannot have negative hit when getting latest marker")
1847            }
1848            CacheResult::Miss => self
1849                .record_db_get("marker_latest")
1850                .get_latest_marker(object_id, epoch_id)
1851                .expect("db error"),
1852        }
1853    }
1854
1855    fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult {
1856        let cur_epoch = epoch_store.epoch();
1857        match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1858            CacheResult::Hit((_, obj)) => {
1859                let actual_objref = obj.compute_object_reference();
1860                if obj_ref != actual_objref {
1861                    Ok(ObjectLockStatus::LockedAtDifferentVersion {
1862                        locked_ref: actual_objref,
1863                    })
1864                } else {
1865                    // requested object ref is live, check if there is a lock
1866                    Ok(
1867                        match self
1868                            .object_locks
1869                            .get_transaction_lock(&obj_ref, epoch_store)?
1870                        {
1871                            Some(tx_digest) => ObjectLockStatus::LockedToTx {
1872                                locked_by_tx: LockDetailsDeprecated {
1873                                    epoch: cur_epoch,
1874                                    tx_digest,
1875                                },
1876                            },
1877                            None => ObjectLockStatus::Initialized,
1878                        },
1879                    )
1880                }
1881            }
1882            CacheResult::NegativeHit => {
1883                Err(SuiError::from(UserInputError::ObjectNotFound {
1884                    object_id: obj_ref.0,
1885                    // even though we know the requested version, we leave it as None to indicate
1886                    // that the object does not exist at any version
1887                    version: None,
1888                }))
1889            }
1890            CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1891        }
1892    }
1893
1894    fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef> {
1895        let obj = self.get_object_impl("live_objref", &object_id).ok_or(
1896            UserInputError::ObjectNotFound {
1897                object_id,
1898                version: None,
1899            },
1900        )?;
1901        Ok(obj.compute_object_reference())
1902    }
1903
1904    fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1905        do_fallback_lookup_fallible(
1906            owned_object_refs,
1907            |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1908                CacheResult::Hit((version, obj)) => {
1909                    if obj.compute_object_reference() != *obj_ref {
1910                        Err(UserInputError::ObjectVersionUnavailableForConsumption {
1911                            provided_obj_ref: *obj_ref,
1912                            current_version: version,
1913                        }
1914                        .into())
1915                    } else {
1916                        Ok(CacheResult::Hit(()))
1917                    }
1918                }
1919                CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1920                    object_id: obj_ref.0,
1921                    version: None,
1922                }
1923                .into()),
1924                CacheResult::Miss => Ok(CacheResult::Miss),
1925            },
1926            |remaining| {
1927                self.record_db_multi_get("object_is_live", remaining.len())
1928                    .check_owned_objects_are_live(remaining)?;
1929                Ok(vec![(); remaining.len()])
1930            },
1931        )?;
1932        Ok(())
1933    }
1934
1935    fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber> {
1936        self.store
1937            .perpetual_tables
1938            .get_highest_pruned_checkpoint()
1939            .expect("db error")
1940    }
1941
1942    fn notify_read_input_objects<'a>(
1943        &'a self,
1944        input_and_receiving_keys: &'a [InputKey],
1945        receiving_keys: &'a HashSet<InputKey>,
1946        epoch: EpochId,
1947    ) -> BoxFuture<'a, ()> {
1948        self.object_notify_read
1949            .read(
1950                "notify_read_input_objects",
1951                input_and_receiving_keys,
1952                move |keys| {
1953                    self.multi_input_objects_available(keys, receiving_keys, epoch)
1954                        .into_iter()
1955                        .map(|available| if available { Some(()) } else { None })
1956                        .collect::<Vec<_>>()
1957                },
1958            )
1959            .map(|_| ())
1960            .boxed()
1961    }
1962}
1963
1964impl TransactionCacheRead for WritebackCache {
1965    fn multi_get_transaction_blocks(
1966        &self,
1967        digests: &[TransactionDigest],
1968    ) -> Vec<Option<Arc<VerifiedTransaction>>> {
1969        let digests_and_tickets: Vec<_> = digests
1970            .iter()
1971            .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1972            .collect();
1973        do_fallback_lookup(
1974            &digests_and_tickets,
1975            |(digest, _)| {
1976                self.metrics
1977                    .record_cache_request("transaction_block", "uncommitted");
1978                if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1979                    self.metrics
1980                        .record_cache_hit("transaction_block", "uncommitted");
1981                    return CacheResult::Hit(Some(tx.transaction.clone()));
1982                }
1983                self.metrics
1984                    .record_cache_miss("transaction_block", "uncommitted");
1985
1986                self.metrics
1987                    .record_cache_request("transaction_block", "committed");
1988
1989                match self
1990                    .cached
1991                    .transactions
1992                    .get(digest)
1993                    .map(|l| l.lock().clone())
1994                {
1995                    Some(PointCacheItem::Some(tx)) => {
1996                        self.metrics
1997                            .record_cache_hit("transaction_block", "committed");
1998                        CacheResult::Hit(Some(tx))
1999                    }
2000                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2001                    None => {
2002                        self.metrics
2003                            .record_cache_miss("transaction_block", "committed");
2004
2005                        CacheResult::Miss
2006                    }
2007                }
2008            },
2009            |remaining| {
2010                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2011                let results: Vec<_> = self
2012                    .record_db_multi_get("transaction_block", remaining.len())
2013                    .multi_get_transaction_blocks(&remaining_digests)
2014                    .expect("db error")
2015                    .into_iter()
2016                    .map(|o| o.map(Arc::new))
2017                    .collect();
2018                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2019                    if result.is_none() {
2020                        self.cached.transactions.insert(digest, None, *ticket).ok();
2021                    }
2022                }
2023                results
2024            },
2025        )
2026    }
2027
2028    fn multi_get_executed_effects_digests(
2029        &self,
2030        digests: &[TransactionDigest],
2031    ) -> Vec<Option<TransactionEffectsDigest>> {
2032        let digests_and_tickets: Vec<_> = digests
2033            .iter()
2034            .map(|d| {
2035                (
2036                    *d,
2037                    self.cached.executed_effects_digests.get_ticket_for_read(d),
2038                )
2039            })
2040            .collect();
2041        do_fallback_lookup(
2042            &digests_and_tickets,
2043            |(digest, _)| {
2044                self.metrics
2045                    .record_cache_request("executed_effects_digests", "uncommitted");
2046                if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
2047                    self.metrics
2048                        .record_cache_hit("executed_effects_digests", "uncommitted");
2049                    return CacheResult::Hit(Some(*digest));
2050                }
2051                self.metrics
2052                    .record_cache_miss("executed_effects_digests", "uncommitted");
2053
2054                self.metrics
2055                    .record_cache_request("executed_effects_digests", "committed");
2056                match self
2057                    .cached
2058                    .executed_effects_digests
2059                    .get(digest)
2060                    .map(|l| *l.lock())
2061                {
2062                    Some(PointCacheItem::Some(digest)) => {
2063                        self.metrics
2064                            .record_cache_hit("executed_effects_digests", "committed");
2065                        CacheResult::Hit(Some(digest))
2066                    }
2067                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2068                    None => {
2069                        self.metrics
2070                            .record_cache_miss("executed_effects_digests", "committed");
2071                        CacheResult::Miss
2072                    }
2073                }
2074            },
2075            |remaining| {
2076                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2077                let results = self
2078                    .record_db_multi_get("executed_effects_digests", remaining.len())
2079                    .multi_get_executed_effects_digests(&remaining_digests)
2080                    .expect("db error");
2081                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2082                    if result.is_none() {
2083                        self.cached
2084                            .executed_effects_digests
2085                            .insert(digest, None, *ticket)
2086                            .ok();
2087                    }
2088                }
2089                results
2090            },
2091        )
2092    }
2093
2094    fn multi_get_effects(
2095        &self,
2096        digests: &[TransactionEffectsDigest],
2097    ) -> Vec<Option<TransactionEffects>> {
2098        let digests_and_tickets: Vec<_> = digests
2099            .iter()
2100            .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
2101            .collect();
2102        do_fallback_lookup(
2103            &digests_and_tickets,
2104            |(digest, _)| {
2105                self.metrics
2106                    .record_cache_request("transaction_effects", "uncommitted");
2107                if let Some(effects) = self.dirty.transaction_effects.get(digest) {
2108                    self.metrics
2109                        .record_cache_hit("transaction_effects", "uncommitted");
2110                    return CacheResult::Hit(Some(effects.clone()));
2111                }
2112                self.metrics
2113                    .record_cache_miss("transaction_effects", "uncommitted");
2114
2115                self.metrics
2116                    .record_cache_request("transaction_effects", "committed");
2117                match self
2118                    .cached
2119                    .transaction_effects
2120                    .get(digest)
2121                    .map(|l| l.lock().clone())
2122                {
2123                    Some(PointCacheItem::Some(effects)) => {
2124                        self.metrics
2125                            .record_cache_hit("transaction_effects", "committed");
2126                        CacheResult::Hit(Some((*effects).clone()))
2127                    }
2128                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2129                    None => {
2130                        self.metrics
2131                            .record_cache_miss("transaction_effects", "committed");
2132                        CacheResult::Miss
2133                    }
2134                }
2135            },
2136            |remaining| {
2137                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2138                let results = self
2139                    .record_db_multi_get("transaction_effects", remaining.len())
2140                    .multi_get_effects(remaining_digests.iter())
2141                    .expect("db error");
2142                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2143                    if result.is_none() {
2144                        self.cached
2145                            .transaction_effects
2146                            .insert(digest, None, *ticket)
2147                            .ok();
2148                    }
2149                }
2150                results
2151            },
2152        )
2153    }
2154
2155    fn transaction_executed_in_last_epoch(
2156        &self,
2157        digest: &TransactionDigest,
2158        current_epoch: EpochId,
2159    ) -> bool {
2160        if current_epoch == 0 {
2161            return false;
2162        }
2163        let last_epoch = current_epoch - 1;
2164        let cache_key = (last_epoch, *digest);
2165
2166        let ticket = self
2167            .cached
2168            .transaction_executed_in_last_epoch
2169            .get_ticket_for_read(&cache_key);
2170
2171        if let Some(cached) = self
2172            .cached
2173            .transaction_executed_in_last_epoch
2174            .get(&cache_key)
2175        {
2176            return cached.lock().is_some();
2177        }
2178
2179        let was_executed = self
2180            .store
2181            .perpetual_tables
2182            .was_transaction_executed_in_last_epoch(digest, current_epoch);
2183
2184        let value = if was_executed { Some(()) } else { None };
2185        self.cached
2186            .transaction_executed_in_last_epoch
2187            .insert(&cache_key, value, ticket)
2188            .ok();
2189
2190        was_executed
2191    }
2192
2193    fn notify_read_executed_effects_digests<'a>(
2194        &'a self,
2195        task_name: &'static str,
2196        digests: &'a [TransactionDigest],
2197    ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>> {
2198        self.executed_effects_digests_notify_read
2199            .read(task_name, digests, |digests| {
2200                self.multi_get_executed_effects_digests(digests)
2201            })
2202            .boxed()
2203    }
2204
2205    fn multi_get_events(
2206        &self,
2207        event_digests: &[TransactionDigest],
2208    ) -> Vec<Option<TransactionEvents>> {
2209        fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
2210            if events.data.is_empty() {
2211                None
2212            } else {
2213                Some(events)
2214            }
2215        }
2216
2217        let digests_and_tickets: Vec<_> = event_digests
2218            .iter()
2219            .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2220            .collect();
2221        do_fallback_lookup(
2222            &digests_and_tickets,
2223            |(digest, _)| {
2224                self.metrics
2225                    .record_cache_request("transaction_events", "uncommitted");
2226                if let Some(events) = self.dirty.transaction_events.get(digest).map(|e| e.clone()) {
2227                    self.metrics
2228                        .record_cache_hit("transaction_events", "uncommitted");
2229
2230                    return CacheResult::Hit(map_events(events));
2231                }
2232                self.metrics
2233                    .record_cache_miss("transaction_events", "uncommitted");
2234
2235                self.metrics
2236                    .record_cache_request("transaction_events", "committed");
2237                match self
2238                    .cached
2239                    .transaction_events
2240                    .get(digest)
2241                    .map(|l| l.lock().clone())
2242                {
2243                    Some(PointCacheItem::Some(events)) => {
2244                        self.metrics
2245                            .record_cache_hit("transaction_events", "committed");
2246                        CacheResult::Hit(map_events((*events).clone()))
2247                    }
2248                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2249                    None => {
2250                        self.metrics
2251                            .record_cache_miss("transaction_events", "committed");
2252
2253                        CacheResult::Miss
2254                    }
2255                }
2256            },
2257            |remaining| {
2258                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2259                let results = self
2260                    .store
2261                    .multi_get_events(&remaining_digests)
2262                    .expect("db error");
2263                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2264                    if result.is_none() {
2265                        self.cached
2266                            .transaction_events
2267                            .insert(digest, None, *ticket)
2268                            .ok();
2269                    }
2270                }
2271                results
2272            },
2273        )
2274    }
2275
2276    fn get_unchanged_loaded_runtime_objects(
2277        &self,
2278        digest: &TransactionDigest,
2279    ) -> Option<Vec<ObjectKey>> {
2280        self.dirty
2281            .unchanged_loaded_runtime_objects
2282            .get(digest)
2283            .map(|b| b.clone())
2284            .or_else(|| {
2285                self.store
2286                    .get_unchanged_loaded_runtime_objects(digest)
2287                    .expect("db error")
2288            })
2289    }
2290
2291    fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
2292        self.dirty
2293            .pending_transaction_writes
2294            .get(digest)
2295            .map(|transaction_output| transaction_output.take_accumulator_events())
2296    }
2297}
2298
2299impl ExecutionCacheWrite for WritebackCache {
2300    fn validate_owned_object_versions(&self, owned_input_objects: &[ObjectRef]) -> SuiResult {
2301        ObjectLocks::validate_owned_object_versions(self, owned_input_objects)
2302    }
2303
2304    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
2305        WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs);
2306    }
2307
2308    #[cfg(test)]
2309    fn write_object_entry_for_test(&self, object: Object) {
2310        self.write_object_entry(&object.id(), object.version(), object.into());
2311    }
2312}
2313
2314implement_passthrough_traits!(WritebackCache);
2315
2316impl GlobalStateHashStore for WritebackCache {
2317    fn get_object_ref_prior_to_key_deprecated(
2318        &self,
2319        object_id: &ObjectID,
2320        version: SequenceNumber,
2321    ) -> SuiResult<Option<ObjectRef>> {
2322        // There is probably a more efficient way to implement this, but since this is only used by
2323        // old protocol versions, it is better to do the simple thing that is obviously correct.
2324        // In this case we previous version from all sources and choose the highest
2325        let mut candidates = Vec::new();
2326
2327        let check_versions =
2328            |versions: &CachedVersionMap<ObjectEntry>| match versions.get_prior_to(&version) {
2329                Some((version, object_entry)) => match object_entry {
2330                    ObjectEntry::Object(object) => {
2331                        assert_eq!(object.version(), version);
2332                        Some(object.compute_object_reference())
2333                    }
2334                    ObjectEntry::Deleted => {
2335                        Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED))
2336                    }
2337                    ObjectEntry::Wrapped => {
2338                        Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED))
2339                    }
2340                },
2341                None => None,
2342            };
2343
2344        // first check dirty data
2345        if let Some(objects) = self.dirty.objects.get(object_id)
2346            && let Some(prior) = check_versions(&objects)
2347        {
2348            candidates.push(prior);
2349        }
2350
2351        if let Some(objects) = self.cached.object_cache.get(object_id)
2352            && let Some(prior) = check_versions(&objects.lock())
2353        {
2354            candidates.push(prior);
2355        }
2356
2357        if let Some(prior) = self
2358            .store
2359            .get_object_ref_prior_to_key_deprecated(object_id, version)?
2360        {
2361            candidates.push(prior);
2362        }
2363
2364        // sort candidates by version, and return the highest
2365        candidates.sort_by_key(|(_, version, _)| *version);
2366        Ok(candidates.pop())
2367    }
2368
2369    fn get_root_state_hash_for_epoch(
2370        &self,
2371        epoch: EpochId,
2372    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
2373        self.store.get_root_state_hash_for_epoch(epoch)
2374    }
2375
2376    fn get_root_state_hash_for_highest_epoch(
2377        &self,
2378    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
2379        self.store.get_root_state_hash_for_highest_epoch()
2380    }
2381
2382    fn insert_state_hash_for_epoch(
2383        &self,
2384        epoch: EpochId,
2385        checkpoint_seq_num: &CheckpointSequenceNumber,
2386        acc: &GlobalStateHash,
2387    ) -> SuiResult {
2388        self.store
2389            .insert_state_hash_for_epoch(epoch, checkpoint_seq_num, acc)
2390    }
2391
2392    fn iter_live_object_set(
2393        &self,
2394        include_wrapped_tombstone: bool,
2395    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2396        // The only time it is safe to iterate the live object set is at an epoch boundary,
2397        // at which point the db is consistent and the dirty cache is empty. So this does
2398        // read the cache
2399        assert!(
2400            self.dirty.is_empty(),
2401            "cannot iterate live object set with dirty data"
2402        );
2403        self.store.iter_live_object_set(include_wrapped_tombstone)
2404    }
2405
2406    // A version of iter_live_object_set that reads the cache. Only use for testing. If used
2407    // on a live validator, can cause the server to block for as long as it takes to iterate
2408    // the entire live object set.
2409    fn iter_cached_live_object_set_for_testing(
2410        &self,
2411        include_wrapped_tombstone: bool,
2412    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2413        // hold iter until we are finished to prevent any concurrent inserts/deletes
2414        let iter = self.dirty.objects.iter();
2415        let mut dirty_objects = BTreeMap::new();
2416
2417        // add everything from the store
2418        for obj in self.store.iter_live_object_set(include_wrapped_tombstone) {
2419            dirty_objects.insert(obj.object_id(), obj);
2420        }
2421
2422        // add everything from the cache, but also remove deletions
2423        for entry in iter {
2424            let id = *entry.key();
2425            let value = entry.value();
2426            match value.get_highest().unwrap() {
2427                (_, ObjectEntry::Object(object)) => {
2428                    dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2429                }
2430                (version, ObjectEntry::Wrapped) => {
2431                    if include_wrapped_tombstone {
2432                        dirty_objects.insert(id, LiveObject::Wrapped(ObjectKey(id, *version)));
2433                    } else {
2434                        dirty_objects.remove(&id);
2435                    }
2436                }
2437                (_, ObjectEntry::Deleted) => {
2438                    dirty_objects.remove(&id);
2439                }
2440            }
2441        }
2442
2443        Box::new(dirty_objects.into_values())
2444    }
2445}
2446
2447// TODO: For correctness, we must at least invalidate the cache when items are written through this
2448// trait (since they could be negatively cached as absent). But it may or may not be optimal to
2449// actually insert them into the cache. For instance if state sync is running ahead of execution,
2450// they might evict other items that are about to be read. This could be an area for tuning in the
2451// future.
2452impl StateSyncAPI for WritebackCache {
2453    fn insert_transaction_and_effects(
2454        &self,
2455        transaction: &VerifiedTransaction,
2456        transaction_effects: &TransactionEffects,
2457    ) {
2458        self.store
2459            .insert_transaction_and_effects(transaction, transaction_effects)
2460            .expect("db error");
2461        self.cached
2462            .transactions
2463            .insert(
2464                transaction.digest(),
2465                PointCacheItem::Some(Arc::new(transaction.clone())),
2466                Ticket::Write,
2467            )
2468            .ok();
2469        self.cached
2470            .transaction_effects
2471            .insert(
2472                &transaction_effects.digest(),
2473                PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2474                Ticket::Write,
2475            )
2476            .ok();
2477    }
2478
2479    fn multi_insert_transaction_and_effects(
2480        &self,
2481        transactions_and_effects: &[VerifiedExecutionData],
2482    ) {
2483        self.store
2484            .multi_insert_transaction_and_effects(transactions_and_effects.iter())
2485            .expect("db error");
2486        for VerifiedExecutionData {
2487            transaction,
2488            effects,
2489        } in transactions_and_effects
2490        {
2491            self.cached
2492                .transactions
2493                .insert(
2494                    transaction.digest(),
2495                    PointCacheItem::Some(Arc::new(transaction.clone())),
2496                    Ticket::Write,
2497                )
2498                .ok();
2499            self.cached
2500                .transaction_effects
2501                .insert(
2502                    &effects.digest(),
2503                    PointCacheItem::Some(Arc::new(effects.clone())),
2504                    Ticket::Write,
2505                )
2506                .ok();
2507        }
2508    }
2509}