sui_core/execution_cache/
writeback_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! MemoryCache is a cache for the transaction execution which delays writes to the database until
5//! transaction results are certified (i.e. they appear in a certified checkpoint, or an effects cert
6//! is observed by a fullnode). The cache also stores committed data in memory in order to serve
7//! future reads without hitting the database.
8//!
9//! For storing uncommitted transaction outputs, we cannot evict the data at all until it is written
10//! to disk. Committed data not only can be evicted, but it is also unbounded (imagine a stream of
11//! transactions that keep splitting a coin into smaller coins).
12//!
13//! We also want to be able to support negative cache hits (i.e. the case where we can determine an
14//! object does not exist without hitting the database).
15//!
16//! To achieve both of these goals, we split the cache data into two pieces, a dirty set and a cached
17//! set. The dirty set has no automatic evictions, data is only removed after being committed. The
18//! cached set is in a bounded-sized cache with automatic evictions. In order to support negative
19//! cache hits, we treat the two halves of the cache as FIFO queue. Newly written (dirty) versions are
20//! inserted to one end of the dirty queue. As versions are committed to disk, they are
21//! removed from the other end of the dirty queue and inserted into the cache queue. The cache queue
22//! is truncated if it exceeds its maximum size, by removing all but the N newest versions.
23//!
24//! This gives us the property that the sequence of versions in the dirty and cached queues are the
25//! most recent versions of the object, i.e. there can be no "gaps". This allows for the following:
26//!
27//!   - Negative cache hits: If the queried version is not in memory, but is higher than the smallest
28//!     version in the cached queue, it does not exist in the db either.
29//!   - Bounded reads: When reading the most recent version that is <= some version bound, we can
30//!     correctly satisfy this query from the cache, or determine that we must go to the db.
31//!
32//! Note that at any time, either or both the dirty or the cached queue may be non-existent. There may be no
33//! dirty versions of the objects, in which case there will be no dirty queue. And, the cached queue
34//! may be evicted from the cache, in which case there will be no cached queue. Because only the cached
35//! queue can be evicted (the dirty queue can only become empty by moving versions from it to the cached
36//! queue), the "highest versions" property still holds in all cases.
37//!
38//! The above design is used for both objects and markers.
39
40use crate::accumulators::funds_read::AccountFundsRead;
41use crate::authority::AuthorityStore;
42use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
43use crate::authority::authority_store::{
44    ExecutionLockWriteGuard, LockDetailsDeprecated, ObjectLockStatus, SuiLockResult,
45};
46use crate::authority::authority_store_tables::LiveObject;
47use crate::authority::backpressure::BackpressureManager;
48use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
49use crate::fallback_fetch::{do_fallback_lookup, do_fallback_lookup_fallible};
50use crate::global_state_hasher::GlobalStateHashStore;
51use crate::transaction_outputs::TransactionOutputs;
52
53use dashmap::DashMap;
54use dashmap::mapref::entry::Entry as DashMapEntry;
55use futures::{FutureExt, future::BoxFuture};
56use moka::sync::SegmentedCache as MokaCache;
57use mysten_common::debug_fatal;
58use mysten_common::random_util::randomize_cache_capacity_in_tests;
59use mysten_common::sync::notify_read::NotifyRead;
60use parking_lot::Mutex;
61use std::collections::{BTreeMap, HashSet};
62use std::hash::Hash;
63use std::sync::Arc;
64use std::sync::atomic::AtomicU64;
65use sui_config::ExecutionCacheConfig;
66use sui_macros::fail_point;
67use sui_protocol_config::ProtocolVersion;
68use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
69use sui_types::accumulator_event::AccumulatorEvent;
70use sui_types::accumulator_root::{AccumulatorObjId, AccumulatorValue};
71use sui_types::base_types::{
72    EpochId, FullObjectID, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData,
73};
74use sui_types::bridge::{Bridge, get_bridge};
75use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest};
76use sui_types::effects::{TransactionEffects, TransactionEvents};
77use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
78use sui_types::executable_transaction::VerifiedExecutableTransaction;
79use sui_types::global_state_hash::GlobalStateHash;
80use sui_types::message_envelope::Message;
81use sui_types::messages_checkpoint::CheckpointSequenceNumber;
82use sui_types::object::Object;
83use sui_types::storage::{
84    FullObjectKey, InputKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject,
85};
86use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
87use sui_types::transaction::{TransactionDataAPI, VerifiedTransaction};
88use tap::TapOptional;
89use tracing::{debug, info, instrument, trace, warn};
90
91use super::ExecutionCacheAPI;
92use super::cache_types::Ticket;
93use super::{
94    Batch, CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
95    ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
96    cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache},
97    implement_passthrough_traits,
98    object_locks::ObjectLocks,
99};
100
101#[cfg(test)]
102#[path = "unit_tests/writeback_cache_tests.rs"]
103pub mod writeback_cache_tests;
104
105#[cfg(test)]
106#[path = "unit_tests/notify_read_input_objects_tests.rs"]
107mod notify_read_input_objects_tests;
108
109#[derive(Clone, PartialEq, Eq)]
110enum ObjectEntry {
111    Object(Object),
112    Deleted,
113    Wrapped,
114}
115
116impl ObjectEntry {
117    #[cfg(test)]
118    fn unwrap_object(&self) -> &Object {
119        match self {
120            ObjectEntry::Object(o) => o,
121            _ => panic!("unwrap_object called on non-Object"),
122        }
123    }
124
125    fn is_tombstone(&self) -> bool {
126        match self {
127            ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
128            ObjectEntry::Object(_) => false,
129        }
130    }
131}
132
133impl std::fmt::Debug for ObjectEntry {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        match self {
136            ObjectEntry::Object(o) => {
137                write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
138            }
139            ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
140            ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
141        }
142    }
143}
144
145impl From<Object> for ObjectEntry {
146    fn from(object: Object) -> Self {
147        ObjectEntry::Object(object)
148    }
149}
150
151impl From<ObjectOrTombstone> for ObjectEntry {
152    fn from(object: ObjectOrTombstone) -> Self {
153        match object {
154            ObjectOrTombstone::Object(o) => o.into(),
155            ObjectOrTombstone::Tombstone(obj_ref) => {
156                if obj_ref.2.is_deleted() {
157                    ObjectEntry::Deleted
158                } else if obj_ref.2.is_wrapped() {
159                    ObjectEntry::Wrapped
160                } else {
161                    panic!("tombstone digest must either be deleted or wrapped");
162                }
163            }
164        }
165    }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169enum LatestObjectCacheEntry {
170    Object(SequenceNumber, ObjectEntry),
171    NonExistent,
172}
173
174impl LatestObjectCacheEntry {
175    #[cfg(test)]
176    fn version(&self) -> Option<SequenceNumber> {
177        match self {
178            LatestObjectCacheEntry::Object(version, _) => Some(*version),
179            LatestObjectCacheEntry::NonExistent => None,
180        }
181    }
182
183    fn is_alive(&self) -> bool {
184        match self {
185            LatestObjectCacheEntry::Object(_, entry) => !entry.is_tombstone(),
186            LatestObjectCacheEntry::NonExistent => false,
187        }
188    }
189}
190
191impl IsNewer for LatestObjectCacheEntry {
192    fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
193        match (self, other) {
194            (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
195                v1 > v2
196            }
197            (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
198            _ => false,
199        }
200    }
201}
202
203type MarkerKey = (EpochId, FullObjectID);
204
205/// UncommittedData stores execution outputs that are not yet written to the db. Entries in this
206/// struct can only be purged after they are committed.
207struct UncommittedData {
208    /// The object dirty set. All writes go into this table first. After we flush the data to the
209    /// db, the data is removed from this table and inserted into the object_cache.
210    ///
211    /// This table may contain both live and dead objects, since we flush both live and dead
212    /// objects to the db in order to support past object queries on fullnodes.
213    ///
214    /// Further, we only remove objects in FIFO order, which ensures that the cached
215    /// sequence of objects has no gaps. In other words, if we have versions 4, 8, 13 of
216    /// an object, we can deduce that version 9 does not exist. This also makes child object
217    /// reads efficient. `object_cache` cannot contain a more recent version of an object than
218    /// `objects`, and neither can have any gaps. Therefore if there is any object <= the version
219    /// bound for a child read in objects, it is the correct object to return.
220    objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
221
222    // Markers for received objects and deleted shared objects. This contains all of the dirty
223    // marker state, which is committed to the db at the same time as other transaction data.
224    // After markers are committed to the db we remove them from this table and insert them into
225    // marker_cache.
226    markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
227
228    transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
229
230    transaction_events: DashMap<TransactionDigest, TransactionEvents>,
231
232    unchanged_loaded_runtime_objects: DashMap<TransactionDigest, Vec<ObjectKey>>,
233
234    executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
235
236    // Transaction outputs that have not yet been written to the DB. Items are removed from this
237    // table as they are flushed to the db.
238    pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
239
240    total_transaction_inserts: AtomicU64,
241    total_transaction_commits: AtomicU64,
242}
243
244impl UncommittedData {
245    fn new() -> Self {
246        Self {
247            objects: DashMap::with_shard_amount(2048),
248            markers: DashMap::with_shard_amount(2048),
249            transaction_effects: DashMap::with_shard_amount(2048),
250            executed_effects_digests: DashMap::with_shard_amount(2048),
251            pending_transaction_writes: DashMap::with_shard_amount(2048),
252            transaction_events: DashMap::with_shard_amount(2048),
253            unchanged_loaded_runtime_objects: DashMap::with_shard_amount(2048),
254            total_transaction_inserts: AtomicU64::new(0),
255            total_transaction_commits: AtomicU64::new(0),
256        }
257    }
258
259    fn clear(&self) {
260        self.objects.clear();
261        self.markers.clear();
262        self.transaction_effects.clear();
263        self.executed_effects_digests.clear();
264        self.pending_transaction_writes.clear();
265        self.transaction_events.clear();
266        self.unchanged_loaded_runtime_objects.clear();
267        self.total_transaction_inserts
268            .store(0, std::sync::atomic::Ordering::Relaxed);
269        self.total_transaction_commits
270            .store(0, std::sync::atomic::Ordering::Relaxed);
271    }
272
273    fn is_empty(&self) -> bool {
274        let empty = self.pending_transaction_writes.is_empty();
275        if empty && cfg!(debug_assertions) {
276            assert!(
277                self.objects.is_empty()
278                    && self.markers.is_empty()
279                    && self.transaction_effects.is_empty()
280                    && self.executed_effects_digests.is_empty()
281                    && self.transaction_events.is_empty()
282                    && self.unchanged_loaded_runtime_objects.is_empty()
283                    && self
284                        .total_transaction_inserts
285                        .load(std::sync::atomic::Ordering::Relaxed)
286                        == self
287                            .total_transaction_commits
288                            .load(std::sync::atomic::Ordering::Relaxed),
289            );
290        }
291        empty
292    }
293}
294
295// Point items (anything without a version number) can be negatively cached as None
296type PointCacheItem<T> = Option<T>;
297
298// PointCacheItem can only be used for insert-only collections, so a Some entry
299// is always newer than a None entry.
300impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
301    fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
302        match (self, other) {
303            (Some(_), None) => true,
304
305            (Some(a), Some(b)) => {
306                // conflicting inserts should never happen
307                debug_assert_eq!(a, b);
308                false
309            }
310
311            _ => false,
312        }
313    }
314}
315
316/// CachedData stores data that has been committed to the db, but is likely to be read soon.
317struct CachedCommittedData {
318    // See module level comment for an explanation of caching strategy.
319    object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
320
321    // See module level comment for an explanation of caching strategy.
322    marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
323
324    transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
325
326    transaction_effects:
327        MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
328
329    transaction_events: MonotonicCache<TransactionDigest, PointCacheItem<Arc<TransactionEvents>>>,
330
331    executed_effects_digests:
332        MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
333
334    transaction_executed_in_last_epoch:
335        MonotonicCache<(EpochId, TransactionDigest), PointCacheItem<()>>,
336
337    // Objects that were read at transaction signing time - allows us to access them again at
338    // execution time with a single lock / hash lookup
339    _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
340}
341
342impl CachedCommittedData {
343    fn new(config: &ExecutionCacheConfig) -> Self {
344        let object_cache = MokaCache::builder(8)
345            .max_capacity(randomize_cache_capacity_in_tests(
346                config.object_cache_size(),
347            ))
348            .build();
349        let marker_cache = MokaCache::builder(8)
350            .max_capacity(randomize_cache_capacity_in_tests(
351                config.marker_cache_size(),
352            ))
353            .build();
354
355        let transactions = MonotonicCache::new(randomize_cache_capacity_in_tests(
356            config.transaction_cache_size(),
357        ));
358        let transaction_effects = MonotonicCache::new(randomize_cache_capacity_in_tests(
359            config.effect_cache_size(),
360        ));
361        let transaction_events = MonotonicCache::new(randomize_cache_capacity_in_tests(
362            config.events_cache_size(),
363        ));
364        let executed_effects_digests = MonotonicCache::new(randomize_cache_capacity_in_tests(
365            config.executed_effect_cache_size(),
366        ));
367
368        let transaction_objects = MokaCache::builder(8)
369            .max_capacity(randomize_cache_capacity_in_tests(
370                config.transaction_objects_cache_size(),
371            ))
372            .build();
373
374        let transaction_executed_in_last_epoch = MonotonicCache::new(
375            randomize_cache_capacity_in_tests(config.executed_effect_cache_size()),
376        );
377
378        Self {
379            object_cache,
380            marker_cache,
381            transactions,
382            transaction_effects,
383            transaction_events,
384            executed_effects_digests,
385            transaction_executed_in_last_epoch,
386            _transaction_objects: transaction_objects,
387        }
388    }
389
390    fn clear_and_assert_empty(&self) {
391        self.object_cache.invalidate_all();
392        self.marker_cache.invalidate_all();
393        self.transactions.invalidate_all();
394        self.transaction_effects.invalidate_all();
395        self.transaction_events.invalidate_all();
396        self.executed_effects_digests.invalidate_all();
397        self.transaction_executed_in_last_epoch.invalidate_all();
398        self._transaction_objects.invalidate_all();
399
400        assert_empty(&self.object_cache);
401        assert_empty(&self.marker_cache);
402        assert!(self.transactions.is_empty());
403        assert!(self.transaction_effects.is_empty());
404        assert!(self.transaction_events.is_empty());
405        assert!(self.executed_effects_digests.is_empty());
406        assert!(self.transaction_executed_in_last_epoch.is_empty());
407        assert_empty(&self._transaction_objects);
408    }
409}
410
411fn assert_empty<K, V>(cache: &MokaCache<K, V>)
412where
413    K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
414    V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
415{
416    if cache.iter().next().is_some() {
417        panic!("cache should be empty");
418    }
419}
420
421pub struct WritebackCache {
422    dirty: UncommittedData,
423    cached: CachedCommittedData,
424
425    // We separately cache the latest version of each object. Although this seems
426    // redundant, it is the only way to support populating the cache after a read.
427    // We cannot simply insert objects that we read off the disk into `object_cache`,
428    // since that may violate the no-missing-versions property.
429    // `object_by_id_cache` is also written to on writes so that it is always coherent.
430    // Hence it contains both committed and dirty object data.
431    object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
432
433    // The packages cache is treated separately from objects, because they are immutable and can be
434    // used by any number of transactions. Additionally, many operations require loading large
435    // numbers of packages (due to dependencies), so we want to try to keep all packages in memory.
436    //
437    // Also, this cache can contain packages that are dirty or committed, so it does not live in
438    // UncachedData or CachedCommittedData. The cache is populated in two ways:
439    // - when packages are written (in which case they will also be present in the dirty set)
440    // - after a cache miss. Because package IDs are unique (only one version exists for each ID)
441    //   we do not need to worry about the contiguous version property.
442    // - note that we removed any unfinalized packages from the cache during revert_state_update().
443    packages: MokaCache<ObjectID, PackageObject>,
444
445    object_locks: ObjectLocks,
446
447    executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
448    object_notify_read: NotifyRead<InputKey, ()>,
449
450    store: Arc<AuthorityStore>,
451    backpressure_threshold: u64,
452    backpressure_manager: Arc<BackpressureManager>,
453    metrics: Arc<ExecutionCacheMetrics>,
454}
455
456macro_rules! check_cache_entry_by_version {
457    ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
458        $self.metrics.record_cache_request($table, $level);
459        if let Some(cache) = $cache {
460            if let Some(entry) = cache.get(&$version) {
461                $self.metrics.record_cache_hit($table, $level);
462                return CacheResult::Hit(entry.clone());
463            }
464
465            if let Some(least_version) = cache.get_least() {
466                if least_version.0 < $version {
467                    // If the version is greater than the least version in the cache, then we know
468                    // that the object does not exist anywhere
469                    $self.metrics.record_cache_negative_hit($table, $level);
470                    return CacheResult::NegativeHit;
471                }
472            }
473        }
474        $self.metrics.record_cache_miss($table, $level);
475    };
476}
477
478macro_rules! check_cache_entry_by_latest {
479    ($self: ident, $table: expr, $level: expr, $cache: expr) => {
480        $self.metrics.record_cache_request($table, $level);
481        if let Some(cache) = $cache {
482            if let Some((version, entry)) = cache.get_highest() {
483                $self.metrics.record_cache_hit($table, $level);
484                return CacheResult::Hit((*version, entry.clone()));
485            } else {
486                panic!("empty CachedVersionMap should have been removed");
487            }
488        }
489        $self.metrics.record_cache_miss($table, $level);
490    };
491}
492
493impl WritebackCache {
494    pub fn new(
495        config: &ExecutionCacheConfig,
496        store: Arc<AuthorityStore>,
497        metrics: Arc<ExecutionCacheMetrics>,
498        backpressure_manager: Arc<BackpressureManager>,
499    ) -> Self {
500        let packages = MokaCache::builder(8)
501            .max_capacity(randomize_cache_capacity_in_tests(
502                config.package_cache_size(),
503            ))
504            .build();
505        Self {
506            dirty: UncommittedData::new(),
507            cached: CachedCommittedData::new(config),
508            object_by_id_cache: MonotonicCache::new(randomize_cache_capacity_in_tests(
509                config.object_by_id_cache_size(),
510            )),
511            packages,
512            object_locks: ObjectLocks::new(),
513            executed_effects_digests_notify_read: NotifyRead::new(),
514            object_notify_read: NotifyRead::new(),
515            store,
516            backpressure_manager,
517            backpressure_threshold: config.backpressure_threshold(),
518            metrics,
519        }
520    }
521
522    pub fn new_for_tests(store: Arc<AuthorityStore>) -> Self {
523        Self::new(
524            &Default::default(),
525            store,
526            ExecutionCacheMetrics::new(&prometheus::Registry::new()).into(),
527            BackpressureManager::new_for_tests(),
528        )
529    }
530
531    #[cfg(test)]
532    pub fn reset_for_test(&mut self) {
533        let mut new = Self::new(
534            &Default::default(),
535            self.store.clone(),
536            self.metrics.clone(),
537            self.backpressure_manager.clone(),
538        );
539        std::mem::swap(self, &mut new);
540    }
541
542    pub fn evict_executed_effects_from_cache_for_testing(&self, tx_digest: &TransactionDigest) {
543        self.cached.executed_effects_digests.invalidate(tx_digest);
544        self.cached.transaction_events.invalidate(tx_digest);
545        self.cached.transactions.invalidate(tx_digest);
546    }
547
548    fn write_object_entry(
549        &self,
550        object_id: &ObjectID,
551        version: SequenceNumber,
552        object: ObjectEntry,
553    ) {
554        trace!(?object_id, ?version, ?object, "inserting object entry");
555        self.metrics.record_cache_write("object");
556
557        // We must hold the lock for the object entry while inserting to the
558        // object_by_id_cache. Otherwise, a surprising bug can occur:
559        //
560        // 1. A thread executing TX1 can write object (O,1) to the dirty set and then pause.
561        // 2. TX2, which reads (O,1) can begin executing, because ExecutionScheduler immediately
562        //    schedules transactions if their inputs are available. It does not matter that TX1
563        //    hasn't finished executing yet.
564        // 3. TX2 can write (O,2) to both the dirty set and the object_by_id_cache.
565        // 4. The thread executing TX1 can resume and write (O,1) to the object_by_id_cache.
566        //
567        // Now, any subsequent attempt to get the latest version of O will return (O,1) instead of
568        // (O,2).
569        //
570        // This seems very unlikely, but it may be possible under the following circumstances:
571        // - While a thread is unlikely to pause for so long, moka cache uses optimistic
572        //   lock-free algorithms that have retry loops. Possibly, under high contention, this
573        //   code might spin for a surprisingly long time.
574        // - Additionally, many concurrent re-executions of the same tx could happen due to
575        //   the tx finalizer, plus checkpoint executor, consensus, and RPCs from fullnodes.
576        let mut entry = self.dirty.objects.entry(*object_id).or_default();
577
578        self.object_by_id_cache
579            .insert(
580                object_id,
581                LatestObjectCacheEntry::Object(version, object.clone()),
582                Ticket::Write,
583            )
584            // While Ticket::Write cannot expire, this insert may still fail.
585            // See the comment in `MonotonicCache::insert`.
586            .ok();
587
588        entry.insert(version, object.clone());
589
590        if let ObjectEntry::Object(object) = &object {
591            if object.is_package() {
592                self.object_notify_read
593                    .notify(&InputKey::Package { id: *object_id }, &());
594            } else if !object.is_child_object() {
595                self.object_notify_read.notify(
596                    &InputKey::VersionedObject {
597                        id: object.full_id(),
598                        version: object.version(),
599                    },
600                    &(),
601                );
602            }
603        }
604    }
605
606    fn write_marker_value(
607        &self,
608        epoch_id: EpochId,
609        object_key: FullObjectKey,
610        marker_value: MarkerValue,
611    ) {
612        tracing::trace!("inserting marker value {object_key:?}: {marker_value:?}",);
613        self.metrics.record_cache_write("marker");
614        self.dirty
615            .markers
616            .entry((epoch_id, object_key.id()))
617            .or_default()
618            .value_mut()
619            .insert(object_key.version(), marker_value);
620        // It is possible for a transaction to use a consensus stream ended
621        // object in the input, hence we must notify that it is now available
622        // at the assigned version, so that any transaction waiting for this
623        // object version can start execution.
624        if matches!(marker_value, MarkerValue::ConsensusStreamEnded(_)) {
625            self.object_notify_read.notify(
626                &InputKey::VersionedObject {
627                    id: object_key.id(),
628                    version: object_key.version(),
629                },
630                &(),
631            );
632        }
633    }
634
635    // lock both the dirty and committed sides of the cache, and then pass the entries to
636    // the callback. Written with the `with` pattern because any other way of doing this
637    // creates lifetime hell.
638    fn with_locked_cache_entries<K, V, R>(
639        dirty_map: &DashMap<K, CachedVersionMap<V>>,
640        cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
641        key: &K,
642        cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
643    ) -> R
644    where
645        K: Copy + Eq + Hash + Send + Sync + 'static,
646        V: Send + Sync + 'static,
647    {
648        let dirty_entry = dirty_map.entry(*key);
649        let dirty_entry = match &dirty_entry {
650            DashMapEntry::Occupied(occupied) => Some(occupied.get()),
651            DashMapEntry::Vacant(_) => None,
652        };
653
654        let cached_entry = cached_map.get(key);
655        let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
656        let cached_entry = cached_lock.as_deref();
657
658        cb(dirty_entry, cached_entry)
659    }
660
661    // Attempt to get an object from the cache. The DB is not consulted.
662    // Can return Hit, Miss, or NegativeHit (if the object is known to not exist).
663    fn get_object_entry_by_key_cache_only(
664        &self,
665        object_id: &ObjectID,
666        version: SequenceNumber,
667    ) -> CacheResult<ObjectEntry> {
668        Self::with_locked_cache_entries(
669            &self.dirty.objects,
670            &self.cached.object_cache,
671            object_id,
672            |dirty_entry, cached_entry| {
673                check_cache_entry_by_version!(
674                    self,
675                    "object_by_version",
676                    "uncommitted",
677                    dirty_entry,
678                    version
679                );
680                check_cache_entry_by_version!(
681                    self,
682                    "object_by_version",
683                    "committed",
684                    cached_entry,
685                    version
686                );
687                CacheResult::Miss
688            },
689        )
690    }
691
692    fn get_object_by_key_cache_only(
693        &self,
694        object_id: &ObjectID,
695        version: SequenceNumber,
696    ) -> CacheResult<Object> {
697        match self.get_object_entry_by_key_cache_only(object_id, version) {
698            CacheResult::Hit(entry) => match entry {
699                ObjectEntry::Object(object) => CacheResult::Hit(object),
700                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
701            },
702            CacheResult::Miss => CacheResult::Miss,
703            CacheResult::NegativeHit => CacheResult::NegativeHit,
704        }
705    }
706
707    fn get_object_entry_by_id_cache_only(
708        &self,
709        request_type: &'static str,
710        object_id: &ObjectID,
711    ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
712        self.metrics
713            .record_cache_request(request_type, "object_by_id");
714        let entry = self.object_by_id_cache.get(object_id);
715
716        if cfg!(debug_assertions)
717            && let Some(entry) = &entry
718        {
719            // check that cache is coherent
720            let highest: Option<ObjectEntry> = self
721                .dirty
722                .objects
723                .get(object_id)
724                .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
725                .or_else(|| {
726                    let obj: Option<ObjectEntry> = self
727                        .store
728                        .get_latest_object_or_tombstone(*object_id)
729                        .unwrap()
730                        .map(|(_, o)| o.into());
731                    obj
732                });
733
734            let cache_entry = match &*entry.lock() {
735                LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
736                LatestObjectCacheEntry::NonExistent => None,
737            };
738
739            // If the cache entry is a tombstone, the db entry may be missing if it was pruned.
740            let tombstone_possibly_pruned = highest.is_none()
741                && cache_entry
742                    .as_ref()
743                    .map(|e| e.is_tombstone())
744                    .unwrap_or(false);
745
746            if highest != cache_entry && !tombstone_possibly_pruned {
747                tracing::error!(
748                    ?highest,
749                    ?cache_entry,
750                    ?tombstone_possibly_pruned,
751                    "object_by_id cache is incoherent for {:?}",
752                    object_id
753                );
754                panic!("object_by_id cache is incoherent for {:?}", object_id);
755            }
756        }
757
758        if let Some(entry) = entry {
759            let entry = entry.lock();
760            match &*entry {
761                LatestObjectCacheEntry::Object(latest_version, latest_object) => {
762                    self.metrics.record_cache_hit(request_type, "object_by_id");
763                    return CacheResult::Hit((*latest_version, latest_object.clone()));
764                }
765                LatestObjectCacheEntry::NonExistent => {
766                    self.metrics
767                        .record_cache_negative_hit(request_type, "object_by_id");
768                    return CacheResult::NegativeHit;
769                }
770            }
771        } else {
772            self.metrics.record_cache_miss(request_type, "object_by_id");
773        }
774
775        Self::with_locked_cache_entries(
776            &self.dirty.objects,
777            &self.cached.object_cache,
778            object_id,
779            |dirty_entry, cached_entry| {
780                check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
781                check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
782                CacheResult::Miss
783            },
784        )
785    }
786
787    fn get_object_by_id_cache_only(
788        &self,
789        request_type: &'static str,
790        object_id: &ObjectID,
791    ) -> CacheResult<(SequenceNumber, Object)> {
792        match self.get_object_entry_by_id_cache_only(request_type, object_id) {
793            CacheResult::Hit((version, entry)) => match entry {
794                ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
795                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
796            },
797            CacheResult::NegativeHit => CacheResult::NegativeHit,
798            CacheResult::Miss => CacheResult::Miss,
799        }
800    }
801
802    fn get_marker_value_cache_only(
803        &self,
804        object_key: FullObjectKey,
805        epoch_id: EpochId,
806    ) -> CacheResult<MarkerValue> {
807        Self::with_locked_cache_entries(
808            &self.dirty.markers,
809            &self.cached.marker_cache,
810            &(epoch_id, object_key.id()),
811            |dirty_entry, cached_entry| {
812                check_cache_entry_by_version!(
813                    self,
814                    "marker_by_version",
815                    "uncommitted",
816                    dirty_entry,
817                    object_key.version()
818                );
819                check_cache_entry_by_version!(
820                    self,
821                    "marker_by_version",
822                    "committed",
823                    cached_entry,
824                    object_key.version()
825                );
826                CacheResult::Miss
827            },
828        )
829    }
830
831    fn get_latest_marker_value_cache_only(
832        &self,
833        object_id: FullObjectID,
834        epoch_id: EpochId,
835    ) -> CacheResult<(SequenceNumber, MarkerValue)> {
836        Self::with_locked_cache_entries(
837            &self.dirty.markers,
838            &self.cached.marker_cache,
839            &(epoch_id, object_id),
840            |dirty_entry, cached_entry| {
841                check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
842                check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
843                CacheResult::Miss
844            },
845        )
846    }
847
848    fn get_object_impl(&self, request_type: &'static str, id: &ObjectID) -> Option<Object> {
849        let ticket = self.object_by_id_cache.get_ticket_for_read(id);
850        match self.get_object_entry_by_id_cache_only(request_type, id) {
851            CacheResult::Hit((_, entry)) => match entry {
852                ObjectEntry::Object(object) => Some(object),
853                ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
854            },
855            CacheResult::NegativeHit => None,
856            CacheResult::Miss => {
857                let obj = self
858                    .store
859                    .get_latest_object_or_tombstone(*id)
860                    .expect("db error");
861                match obj {
862                    Some((key, obj)) => {
863                        self.cache_latest_object_by_id(
864                            id,
865                            LatestObjectCacheEntry::Object(key.1, obj.clone().into()),
866                            ticket,
867                        );
868                        match obj {
869                            ObjectOrTombstone::Object(object) => Some(object),
870                            ObjectOrTombstone::Tombstone(_) => None,
871                        }
872                    }
873                    None => {
874                        self.cache_object_not_found(id, ticket);
875                        None
876                    }
877                }
878            }
879        }
880    }
881
882    fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
883        self.metrics.record_cache_request(request_type, "db");
884        &self.store
885    }
886
887    fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
888        self.metrics
889            .record_cache_multi_request(request_type, "db", count);
890        &self.store
891    }
892
893    #[instrument(level = "debug", skip_all)]
894    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
895        let tx_digest = *tx_outputs.transaction.digest();
896        trace!(?tx_digest, "writing transaction outputs to cache");
897
898        assert!(
899            !self.transaction_executed_in_last_epoch(&tx_digest, epoch_id),
900            "Transaction {:?} was already executed in epoch {}",
901            tx_digest,
902            epoch_id.saturating_sub(1)
903        );
904
905        let TransactionOutputs {
906            transaction,
907            effects,
908            markers,
909            written,
910            deleted,
911            wrapped,
912            events,
913            unchanged_loaded_runtime_objects,
914            ..
915        } = &*tx_outputs;
916
917        // Deletions and wraps must be written first. The reason is that one of the deletes
918        // may be a child object, and if we write the parent object first, a reader may or may
919        // not see the previous version of the child object, instead of the deleted/wrapped
920        // tombstone, which would cause an execution fork
921        for ObjectKey(id, version) in deleted.iter() {
922            self.write_object_entry(id, *version, ObjectEntry::Deleted);
923        }
924
925        for ObjectKey(id, version) in wrapped.iter() {
926            self.write_object_entry(id, *version, ObjectEntry::Wrapped);
927        }
928
929        // Update all markers
930        for (object_key, marker_value) in markers.iter() {
931            self.write_marker_value(epoch_id, *object_key, *marker_value);
932        }
933
934        // Write children before parents to ensure that readers do not observe a parent object
935        // before its most recent children are visible.
936        for (object_id, object) in written.iter() {
937            if object.is_child_object() {
938                self.write_object_entry(object_id, object.version(), object.clone().into());
939            }
940        }
941        for (object_id, object) in written.iter() {
942            if !object.is_child_object() {
943                self.write_object_entry(object_id, object.version(), object.clone().into());
944                if object.is_package() {
945                    debug!("caching package: {:?}", object.compute_object_reference());
946                    self.packages
947                        .insert(*object_id, PackageObject::new(object.clone()));
948                }
949            }
950        }
951
952        let tx_digest = *transaction.digest();
953        debug!(
954            ?tx_digest,
955            "Writing transaction output objects to cache: {:?}",
956            written
957                .values()
958                .map(|o| (o.id(), o.version()))
959                .collect::<Vec<_>>(),
960        );
961        let effects_digest = effects.digest();
962
963        self.metrics.record_cache_write("transaction_block");
964        self.dirty
965            .pending_transaction_writes
966            .insert(tx_digest, tx_outputs.clone());
967
968        // insert transaction effects before executed_effects_digests so that there
969        // are never dangling entries in executed_effects_digests
970        self.metrics.record_cache_write("transaction_effects");
971        self.dirty
972            .transaction_effects
973            .insert(effects_digest, effects.clone());
974
975        // note: if events.data.is_empty(), then there are no events for this transaction. We
976        // store it anyway to avoid special cases in commint_transaction_outputs, and translate
977        // an empty events structure to None when reading.
978        self.metrics.record_cache_write("transaction_events");
979        self.dirty
980            .transaction_events
981            .insert(tx_digest, events.clone());
982
983        self.metrics
984            .record_cache_write("unchanged_loaded_runtime_objects");
985        self.dirty
986            .unchanged_loaded_runtime_objects
987            .insert(tx_digest, unchanged_loaded_runtime_objects.clone());
988
989        self.metrics.record_cache_write("executed_effects_digests");
990        self.dirty
991            .executed_effects_digests
992            .insert(tx_digest, effects_digest);
993
994        self.executed_effects_digests_notify_read
995            .notify(&tx_digest, &effects_digest);
996
997        self.metrics
998            .pending_notify_read
999            .set(self.executed_effects_digests_notify_read.num_pending() as i64);
1000
1001        let prev = self
1002            .dirty
1003            .total_transaction_inserts
1004            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1005
1006        let pending_count = (prev + 1).saturating_sub(
1007            self.dirty
1008                .total_transaction_commits
1009                .load(std::sync::atomic::Ordering::Relaxed),
1010        );
1011
1012        self.set_backpressure(pending_count);
1013    }
1014
1015    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1016        let _metrics_guard = mysten_metrics::monitored_scope("WritebackCache::build_db_batch");
1017        let mut all_outputs = Vec::with_capacity(digests.len());
1018        for tx in digests {
1019            let Some(outputs) = self
1020                .dirty
1021                .pending_transaction_writes
1022                .get(tx)
1023                .map(|o| o.clone())
1024            else {
1025                // This can happen in the following rare case:
1026                // All transactions in the checkpoint are committed to the db (by commit_transaction_outputs,
1027                // called in CheckpointExecutor::process_executed_transactions), but the process crashes before
1028                // the checkpoint water mark is bumped. We will then re-commit the checkpoint at startup,
1029                // despite that all transactions are already executed.
1030                warn!("Attempt to commit unknown transaction {:?}", tx);
1031                continue;
1032            };
1033            all_outputs.push(outputs);
1034        }
1035
1036        let batch = self
1037            .store
1038            .build_db_batch(epoch, &all_outputs)
1039            .expect("db error");
1040        (all_outputs, batch)
1041    }
1042
1043    // Commits dirty data for the given TransactionDigest to the db.
1044    #[instrument(level = "debug", skip_all)]
1045    fn commit_transaction_outputs(
1046        &self,
1047        epoch: EpochId,
1048        (all_outputs, db_batch): Batch,
1049        digests: &[TransactionDigest],
1050    ) {
1051        let _metrics_guard =
1052            mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs");
1053        fail_point!("writeback-cache-commit");
1054        trace!(?digests);
1055
1056        // Flush writes to disk before removing anything from dirty set. otherwise,
1057        // a cache eviction could cause a value to disappear briefly, even if we insert to the
1058        // cache before removing from the dirty set.
1059        db_batch.write().expect("db error");
1060
1061        let _metrics_guard =
1062            mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs::flush");
1063        for outputs in all_outputs.iter() {
1064            let tx_digest = outputs.transaction.digest();
1065            assert!(
1066                self.dirty
1067                    .pending_transaction_writes
1068                    .remove(tx_digest)
1069                    .is_some()
1070            );
1071            self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
1072        }
1073
1074        let num_outputs = all_outputs.len() as u64;
1075        let num_commits = self
1076            .dirty
1077            .total_transaction_commits
1078            .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
1079            + num_outputs;
1080
1081        let pending_count = self
1082            .dirty
1083            .total_transaction_inserts
1084            .load(std::sync::atomic::Ordering::Relaxed)
1085            .saturating_sub(num_commits);
1086
1087        self.set_backpressure(pending_count);
1088    }
1089
1090    fn approximate_pending_transaction_count(&self) -> u64 {
1091        let num_commits = self
1092            .dirty
1093            .total_transaction_commits
1094            .load(std::sync::atomic::Ordering::Relaxed);
1095
1096        self.dirty
1097            .total_transaction_inserts
1098            .load(std::sync::atomic::Ordering::Relaxed)
1099            .saturating_sub(num_commits)
1100    }
1101
1102    fn set_backpressure(&self, pending_count: u64) {
1103        let backpressure = pending_count > self.backpressure_threshold;
1104        let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1105        if backpressure_changed {
1106            self.metrics.backpressure_toggles.inc();
1107        }
1108        self.metrics
1109            .backpressure_status
1110            .set(if backpressure { 1 } else { 0 });
1111    }
1112
1113    fn flush_transactions_from_dirty_to_cached(
1114        &self,
1115        epoch: EpochId,
1116        tx_digest: TransactionDigest,
1117        outputs: &TransactionOutputs,
1118    ) {
1119        // Now, remove each piece of committed data from the dirty state and insert it into the cache.
1120        // TODO: outputs should have a strong count of 1 so we should be able to move out of it
1121        let TransactionOutputs {
1122            transaction,
1123            effects,
1124            markers,
1125            written,
1126            deleted,
1127            wrapped,
1128            events,
1129            ..
1130        } = outputs;
1131
1132        let effects_digest = effects.digest();
1133
1134        // Update cache before removing from self.dirty to avoid
1135        // unnecessary cache misses
1136        self.cached
1137            .transactions
1138            .insert(
1139                &tx_digest,
1140                PointCacheItem::Some(transaction.clone()),
1141                Ticket::Write,
1142            )
1143            .ok();
1144        self.cached
1145            .transaction_effects
1146            .insert(
1147                &effects_digest,
1148                PointCacheItem::Some(effects.clone().into()),
1149                Ticket::Write,
1150            )
1151            .ok();
1152        self.cached
1153            .executed_effects_digests
1154            .insert(
1155                &tx_digest,
1156                PointCacheItem::Some(effects_digest),
1157                Ticket::Write,
1158            )
1159            .ok();
1160        self.cached
1161            .transaction_events
1162            .insert(
1163                &tx_digest,
1164                PointCacheItem::Some(events.clone().into()),
1165                Ticket::Write,
1166            )
1167            .ok();
1168
1169        self.dirty
1170            .transaction_effects
1171            .remove(&effects_digest)
1172            .expect("effects must exist");
1173
1174        self.dirty
1175            .transaction_events
1176            .remove(&tx_digest)
1177            .expect("events must exist");
1178
1179        self.dirty
1180            .unchanged_loaded_runtime_objects
1181            .remove(&tx_digest)
1182            .expect("unchanged_loaded_runtime_objects must exist");
1183
1184        self.dirty
1185            .executed_effects_digests
1186            .remove(&tx_digest)
1187            .expect("executed effects must exist");
1188
1189        // Move dirty markers to cache
1190        for (object_key, marker_value) in markers.iter() {
1191            Self::move_version_from_dirty_to_cache(
1192                &self.dirty.markers,
1193                &self.cached.marker_cache,
1194                (epoch, object_key.id()),
1195                object_key.version(),
1196                marker_value,
1197            );
1198        }
1199
1200        for (object_id, object) in written.iter() {
1201            Self::move_version_from_dirty_to_cache(
1202                &self.dirty.objects,
1203                &self.cached.object_cache,
1204                *object_id,
1205                object.version(),
1206                &ObjectEntry::Object(object.clone()),
1207            );
1208        }
1209
1210        for ObjectKey(object_id, version) in deleted.iter() {
1211            Self::move_version_from_dirty_to_cache(
1212                &self.dirty.objects,
1213                &self.cached.object_cache,
1214                *object_id,
1215                *version,
1216                &ObjectEntry::Deleted,
1217            );
1218        }
1219
1220        for ObjectKey(object_id, version) in wrapped.iter() {
1221            Self::move_version_from_dirty_to_cache(
1222                &self.dirty.objects,
1223                &self.cached.object_cache,
1224                *object_id,
1225                *version,
1226                &ObjectEntry::Wrapped,
1227            );
1228        }
1229    }
1230
1231    // Move the oldest/least entry from the dirty queue to the cache queue.
1232    // This is called after the entry is committed to the db.
1233    fn move_version_from_dirty_to_cache<K, V>(
1234        dirty: &DashMap<K, CachedVersionMap<V>>,
1235        cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1236        key: K,
1237        version: SequenceNumber,
1238        value: &V,
1239    ) where
1240        K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1241        V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1242    {
1243        static MAX_VERSIONS: usize = 3;
1244
1245        // IMPORTANT: lock both the dirty set entry and the cache entry before modifying either.
1246        // this ensures that readers cannot see a value temporarily disappear.
1247        let dirty_entry = dirty.entry(key);
1248        let cache_entry = cache.entry(key).or_default();
1249        let mut cache_map = cache_entry.value().lock();
1250
1251        // insert into cache and drop old versions.
1252        cache_map.insert(version, value.clone());
1253        // TODO: make this automatic by giving CachedVersionMap an optional max capacity
1254        cache_map.truncate_to(MAX_VERSIONS);
1255
1256        let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1257            panic!("dirty map must exist");
1258        };
1259
1260        let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1261
1262        assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1263
1264        // if there are no versions remaining, remove the map entry
1265        if occupied_dirty_entry.get().is_empty() {
1266            occupied_dirty_entry.remove();
1267        }
1268    }
1269
1270    // Updates the latest object id cache with an entry that was read from the db.
1271    fn cache_latest_object_by_id(
1272        &self,
1273        object_id: &ObjectID,
1274        object: LatestObjectCacheEntry,
1275        ticket: Ticket,
1276    ) {
1277        trace!("caching object by id: {:?} {:?}", object_id, object);
1278        if self
1279            .object_by_id_cache
1280            .insert(object_id, object, ticket)
1281            .is_ok()
1282        {
1283            self.metrics.record_cache_write("object_by_id");
1284        } else {
1285            trace!("discarded cache write due to expired ticket");
1286            self.metrics.record_ticket_expiry();
1287        }
1288    }
1289
1290    fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1291        self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1292    }
1293
1294    fn clear_state_end_of_epoch_impl(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
1295        info!("clearing state at end of epoch");
1296
1297        // Note: there cannot be any concurrent writes to self.dirty while we are in this function,
1298        // as all transaction execution is paused.
1299        for r in self.dirty.pending_transaction_writes.iter() {
1300            let outputs = r.value();
1301            if !outputs
1302                .transaction
1303                .transaction_data()
1304                .shared_input_objects()
1305                .is_empty()
1306            {
1307                debug_fatal!("transaction must be single writer");
1308            }
1309            info!(
1310                "clearing state for transaction {:?}",
1311                outputs.transaction.digest()
1312            );
1313            for (object_id, object) in outputs.written.iter() {
1314                if object.is_package() {
1315                    info!("removing non-finalized package from cache: {:?}", object_id);
1316                    self.packages.invalidate(object_id);
1317                }
1318                self.object_by_id_cache.invalidate(object_id);
1319                self.cached.object_cache.invalidate(object_id);
1320            }
1321
1322            for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1323                self.object_by_id_cache.invalidate(object_id);
1324                self.cached.object_cache.invalidate(object_id);
1325            }
1326        }
1327
1328        self.dirty.clear();
1329
1330        info!("clearing old transaction locks");
1331        self.object_locks.clear();
1332        info!("clearing object per epoch marker table");
1333        self.store
1334            .clear_object_per_epoch_marker_table(execution_guard)
1335            .expect("db error");
1336    }
1337
1338    fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) {
1339        self.store
1340            .bulk_insert_genesis_objects(objects)
1341            .expect("db error");
1342        for obj in objects {
1343            self.cached.object_cache.invalidate(&obj.id());
1344            self.object_by_id_cache.invalidate(&obj.id());
1345        }
1346    }
1347
1348    fn insert_genesis_object_impl(&self, object: Object) {
1349        self.object_by_id_cache.invalidate(&object.id());
1350        self.cached.object_cache.invalidate(&object.id());
1351        self.store.insert_genesis_object(object).expect("db error");
1352    }
1353
1354    pub fn clear_caches_and_assert_empty(&self) {
1355        info!("clearing caches");
1356        self.cached.clear_and_assert_empty();
1357        self.object_by_id_cache.invalidate_all();
1358        assert!(&self.object_by_id_cache.is_empty());
1359        self.packages.invalidate_all();
1360        assert_empty(&self.packages);
1361    }
1362}
1363
1364impl AccountFundsRead for WritebackCache {
1365    fn get_latest_account_amount(&self, account_id: &AccumulatorObjId) -> (u128, SequenceNumber) {
1366        let mut pre_root_version =
1367            ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1368                .unwrap()
1369                .version();
1370        let mut loop_iter = 0;
1371        loop {
1372            let account_obj = ObjectCacheRead::get_object(self, account_id.inner());
1373            if let Some(account_obj) = account_obj {
1374                let (_, AccumulatorValue::U128(value)) =
1375                    account_obj.data.try_as_move().unwrap().try_into().unwrap();
1376                return (value.value, account_obj.version());
1377            }
1378            let post_root_version =
1379                ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1380                    .unwrap()
1381                    .version();
1382            if pre_root_version == post_root_version {
1383                return (0, pre_root_version);
1384            }
1385            debug!(
1386                "Root version changed from {} to {} while reading account amount, retrying",
1387                pre_root_version, post_root_version
1388            );
1389            pre_root_version = post_root_version;
1390            loop_iter += 1;
1391            if loop_iter >= 3 {
1392                debug_fatal!("Unable to get a stable version after 3 iterations");
1393            }
1394        }
1395    }
1396
1397    fn get_account_amount_at_version(
1398        &self,
1399        account_id: &AccumulatorObjId,
1400        version: SequenceNumber,
1401    ) -> u128 {
1402        let account_obj = self.find_object_lt_or_eq_version(*account_id.inner(), version);
1403        if let Some(account_obj) = account_obj {
1404            let (_, AccumulatorValue::U128(value)) =
1405                account_obj.data.try_as_move().unwrap().try_into().unwrap();
1406            value.value
1407        } else {
1408            0
1409        }
1410    }
1411}
1412
1413impl ExecutionCacheAPI for WritebackCache {}
1414
1415impl ExecutionCacheCommit for WritebackCache {
1416    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1417        self.build_db_batch(epoch, digests)
1418    }
1419
1420    fn commit_transaction_outputs(
1421        &self,
1422        epoch: EpochId,
1423        batch: Batch,
1424        digests: &[TransactionDigest],
1425    ) {
1426        WritebackCache::commit_transaction_outputs(self, epoch, batch, digests)
1427    }
1428
1429    fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) {
1430        self.store.persist_transaction(tx).expect("db error");
1431    }
1432
1433    fn approximate_pending_transaction_count(&self) -> u64 {
1434        WritebackCache::approximate_pending_transaction_count(self)
1435    }
1436}
1437
1438impl ObjectCacheRead for WritebackCache {
1439    fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1440        self.metrics
1441            .record_cache_request("package", "package_cache");
1442        if let Some(p) = self.packages.get(package_id) {
1443            if cfg!(debug_assertions) {
1444                let canonical_package = self
1445                    .dirty
1446                    .objects
1447                    .get(package_id)
1448                    .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1449                        Some(ObjectEntry::Object(object)) => Some(object),
1450                        _ => None,
1451                    })
1452                    .or_else(|| self.store.get_object(package_id));
1453
1454                if let Some(canonical_package) = canonical_package {
1455                    assert_eq!(
1456                        canonical_package.digest(),
1457                        p.object().digest(),
1458                        "Package object cache is inconsistent for package {:?}",
1459                        package_id
1460                    );
1461                }
1462            }
1463            self.metrics.record_cache_hit("package", "package_cache");
1464            return Ok(Some(p));
1465        } else {
1466            self.metrics.record_cache_miss("package", "package_cache");
1467        }
1468
1469        // We try the dirty objects cache as well before going to the database. This is necessary
1470        // because the package could be evicted from the package cache before it is committed
1471        // to the database.
1472        if let Some(p) = self.get_object_impl("package", package_id) {
1473            if p.is_package() {
1474                let p = PackageObject::new(p);
1475                tracing::trace!(
1476                    "caching package: {:?}",
1477                    p.object().compute_object_reference()
1478                );
1479                self.metrics.record_cache_write("package");
1480                self.packages.insert(*package_id, p.clone());
1481                Ok(Some(p))
1482            } else {
1483                Err(SuiErrorKind::UserInputError {
1484                    error: UserInputError::MoveObjectAsPackage {
1485                        object_id: *package_id,
1486                    },
1487                }
1488                .into())
1489            }
1490        } else {
1491            Ok(None)
1492        }
1493    }
1494
1495    fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1496        // This is a no-op because all writes go through the cache, therefore it can never
1497        // be incoherent
1498    }
1499
1500    // get_object and variants.
1501
1502    fn get_object(&self, id: &ObjectID) -> Option<Object> {
1503        self.get_object_impl("object_latest", id)
1504    }
1505
1506    fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object> {
1507        match self.get_object_by_key_cache_only(object_id, version) {
1508            CacheResult::Hit(object) => Some(object),
1509            CacheResult::NegativeHit => None,
1510            CacheResult::Miss => self
1511                .record_db_get("object_by_version")
1512                .get_object_by_key(object_id, version),
1513        }
1514    }
1515
1516    fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
1517        do_fallback_lookup(
1518            object_keys,
1519            |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1520                CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1521                CacheResult::NegativeHit => CacheResult::NegativeHit,
1522                CacheResult::Miss => CacheResult::Miss,
1523            },
1524            |remaining| {
1525                self.record_db_multi_get("object_by_version", remaining.len())
1526                    .multi_get_objects_by_key(remaining)
1527                    .expect("db error")
1528            },
1529        )
1530    }
1531
1532    fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool {
1533        match self.get_object_by_key_cache_only(object_id, version) {
1534            CacheResult::Hit(_) => true,
1535            CacheResult::NegativeHit => false,
1536            CacheResult::Miss => self
1537                .record_db_get("object_by_version")
1538                .object_exists_by_key(object_id, version)
1539                .expect("db error"),
1540        }
1541    }
1542
1543    fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool> {
1544        do_fallback_lookup(
1545            object_keys,
1546            |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1547                CacheResult::Hit(_) => CacheResult::Hit(true),
1548                CacheResult::NegativeHit => CacheResult::Hit(false),
1549                CacheResult::Miss => CacheResult::Miss,
1550            },
1551            |remaining| {
1552                self.record_db_multi_get("object_by_version", remaining.len())
1553                    .multi_object_exists_by_key(remaining)
1554                    .expect("db error")
1555            },
1556        )
1557    }
1558
1559    fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
1560        match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1561            CacheResult::Hit((version, entry)) => Some(match entry {
1562                ObjectEntry::Object(object) => object.compute_object_reference(),
1563                ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1564                ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1565            }),
1566            CacheResult::NegativeHit => None,
1567            CacheResult::Miss => self
1568                .record_db_get("latest_objref_or_tombstone")
1569                .get_latest_object_ref_or_tombstone(object_id)
1570                .expect("db error"),
1571        }
1572    }
1573
1574    fn get_latest_object_or_tombstone(
1575        &self,
1576        object_id: ObjectID,
1577    ) -> Option<(ObjectKey, ObjectOrTombstone)> {
1578        match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1579            CacheResult::Hit((version, entry)) => {
1580                let key = ObjectKey(object_id, version);
1581                Some(match entry {
1582                    ObjectEntry::Object(object) => (key, object.into()),
1583                    ObjectEntry::Deleted => (
1584                        key,
1585                        ObjectOrTombstone::Tombstone((
1586                            object_id,
1587                            version,
1588                            ObjectDigest::OBJECT_DIGEST_DELETED,
1589                        )),
1590                    ),
1591                    ObjectEntry::Wrapped => (
1592                        key,
1593                        ObjectOrTombstone::Tombstone((
1594                            object_id,
1595                            version,
1596                            ObjectDigest::OBJECT_DIGEST_WRAPPED,
1597                        )),
1598                    ),
1599                })
1600            }
1601            CacheResult::NegativeHit => None,
1602            CacheResult::Miss => self
1603                .record_db_get("latest_object_or_tombstone")
1604                .get_latest_object_or_tombstone(object_id)
1605                .expect("db error"),
1606        }
1607    }
1608
1609    fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool> {
1610        keys.iter()
1611            .map(|key| {
1612                if key.is_cancelled() {
1613                    true
1614                } else {
1615                    match key {
1616                        InputKey::VersionedObject { id, version } => {
1617                            matches!(
1618                                self.get_object_by_key_cache_only(&id.id(), *version),
1619                                CacheResult::Hit(_)
1620                            )
1621                        }
1622                        InputKey::Package { id } => self.packages.contains_key(id),
1623                    }
1624                }
1625            })
1626            .collect()
1627    }
1628
1629    #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1630    fn find_object_lt_or_eq_version(
1631        &self,
1632        object_id: ObjectID,
1633        version_bound: SequenceNumber,
1634    ) -> Option<Object> {
1635        macro_rules! check_cache_entry {
1636            ($level: expr, $objects: expr) => {
1637                self.metrics
1638                    .record_cache_request("object_lt_or_eq_version", $level);
1639                if let Some(objects) = $objects {
1640                    if let Some((_, object)) = objects
1641                        .all_versions_lt_or_eq_descending(&version_bound)
1642                        .next()
1643                    {
1644                        if let ObjectEntry::Object(object) = object {
1645                            self.metrics
1646                                .record_cache_hit("object_lt_or_eq_version", $level);
1647                            return Some(object.clone());
1648                        } else {
1649                            // if we find a tombstone, the object does not exist
1650                            self.metrics
1651                                .record_cache_negative_hit("object_lt_or_eq_version", $level);
1652                            return None;
1653                        }
1654                    } else {
1655                        self.metrics
1656                            .record_cache_miss("object_lt_or_eq_version", $level);
1657                    }
1658                }
1659            };
1660        }
1661
1662        // if we have the latest version cached, and it is within the bound, we are done
1663        self.metrics
1664            .record_cache_request("object_lt_or_eq_version", "object_by_id");
1665        let latest_cache_entry = self.object_by_id_cache.get(&object_id);
1666        if let Some(latest) = &latest_cache_entry {
1667            let latest = latest.lock();
1668            match &*latest {
1669                LatestObjectCacheEntry::Object(latest_version, object) => {
1670                    if *latest_version <= version_bound {
1671                        if let ObjectEntry::Object(object) = object {
1672                            self.metrics
1673                                .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1674                            return Some(object.clone());
1675                        } else {
1676                            // object is a tombstone, but is still within the version bound
1677                            self.metrics.record_cache_negative_hit(
1678                                "object_lt_or_eq_version",
1679                                "object_by_id",
1680                            );
1681                            return None;
1682                        }
1683                    }
1684                    // latest object is not within the version bound. fall through.
1685                }
1686                // No object by this ID exists at all
1687                LatestObjectCacheEntry::NonExistent => {
1688                    self.metrics
1689                        .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1690                    return None;
1691                }
1692            }
1693        }
1694        self.metrics
1695            .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1696
1697        Self::with_locked_cache_entries(
1698            &self.dirty.objects,
1699            &self.cached.object_cache,
1700            &object_id,
1701            |dirty_entry, cached_entry| {
1702                check_cache_entry!("committed", dirty_entry);
1703                check_cache_entry!("uncommitted", cached_entry);
1704
1705                // Much of the time, the query will be for the very latest object version, so
1706                // try that first. But we have to be careful:
1707                // 1. We must load the tombstone if it is present, because its version may exceed
1708                //    the version_bound, in which case we must do a scan.
1709                // 2. You might think we could just call `self.store.get_latest_object_or_tombstone` here.
1710                //    But we cannot, because there may be a more recent version in the dirty set, which
1711                //    we skipped over in check_cache_entry! because of the version bound. However, if we
1712                //    skipped it above, we will skip it here as well, again due to the version bound.
1713                // 3. Despite that, we really want to warm the cache here. Why? Because if the object is
1714                //    cold (not being written to), then we will very soon be able to start serving reads
1715                //    of it from the object_by_id cache, IF we can warm the cache. If we don't warm the
1716                //    the cache here, and no writes to the object occur, then we will always have to go
1717                //    to the db for the object.
1718                //
1719                // Lastly, it is important to understand the rationale for all this: If the object is
1720                // write-hot, we will serve almost all reads to it from the dirty set (or possibly the
1721                // cached set if it is only written to once every few checkpoints). If the object is
1722                // write-cold (or non-existent) and read-hot, then we will serve almost all reads to it
1723                // from the object_by_id cache check above.  Most of the apparently wasteful code here
1724                // exists only to ensure correctness in all the edge cases.
1725                let latest: Option<(SequenceNumber, ObjectEntry)> =
1726                    if let Some(dirty_set) = dirty_entry {
1727                        dirty_set
1728                            .get_highest()
1729                            .cloned()
1730                            .tap_none(|| panic!("dirty set cannot be empty"))
1731                    } else {
1732                        // TODO: we should try not to read from the db while holding the locks.
1733                        self.record_db_get("object_lt_or_eq_version_latest")
1734                            .get_latest_object_or_tombstone(object_id)
1735                            .expect("db error")
1736                            .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1737                                (version, ObjectEntry::from(obj_or_tombstone))
1738                            })
1739                    };
1740
1741                if let Some((obj_version, obj_entry)) = latest {
1742                    // we can always cache the latest object (or tombstone), even if it is not within the
1743                    // version_bound. This is done in order to warm the cache in the case where a sequence
1744                    // of transactions all read the same child object without writing to it.
1745
1746                    // Note: no need to call with_object_by_id_cache_update here, because we are holding
1747                    // the lock on the dirty cache entry, and `latest` cannot become out-of-date
1748                    // while we hold that lock.
1749                    self.cache_latest_object_by_id(
1750                        &object_id,
1751                        LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1752                        // We can get a ticket at the last second, because we are holding the lock
1753                        // on dirty, so there cannot be any concurrent writes.
1754                        self.object_by_id_cache.get_ticket_for_read(&object_id),
1755                    );
1756
1757                    if obj_version <= version_bound {
1758                        match obj_entry {
1759                            ObjectEntry::Object(object) => Some(object),
1760                            ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
1761                        }
1762                    } else {
1763                        // The latest object exceeded the bound, so now we have to do a scan
1764                        // But we already know there is no dirty entry within the bound,
1765                        // so we go to the db.
1766                        self.record_db_get("object_lt_or_eq_version_scan")
1767                            .find_object_lt_or_eq_version(object_id, version_bound)
1768                            .expect("db error")
1769                    }
1770
1771                // no object found in dirty set or db, object does not exist
1772                // When this is called from a read api (i.e. not the execution path) it is
1773                // possible that the object has been deleted and pruned. In this case,
1774                // there would be no entry at all on disk, but we may have a tombstone in the
1775                // cache
1776                } else if let Some(latest_cache_entry) = latest_cache_entry {
1777                    // If there is a latest cache entry, it had better not be a live object!
1778                    assert!(!latest_cache_entry.lock().is_alive());
1779                    None
1780                } else {
1781                    // If there is no latest cache entry, we can insert one.
1782                    let highest = cached_entry.and_then(|c| c.get_highest());
1783                    assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1784                    self.cache_object_not_found(
1785                        &object_id,
1786                        // okay to get ticket at last second - see above
1787                        self.object_by_id_cache.get_ticket_for_read(&object_id),
1788                    );
1789                    None
1790                }
1791            },
1792        )
1793    }
1794
1795    fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1796        get_sui_system_state(self)
1797    }
1798
1799    fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge> {
1800        get_bridge(self)
1801    }
1802
1803    fn get_marker_value(
1804        &self,
1805        object_key: FullObjectKey,
1806        epoch_id: EpochId,
1807    ) -> Option<MarkerValue> {
1808        match self.get_marker_value_cache_only(object_key, epoch_id) {
1809            CacheResult::Hit(marker) => Some(marker),
1810            CacheResult::NegativeHit => None,
1811            CacheResult::Miss => self
1812                .record_db_get("marker_by_version")
1813                .get_marker_value(object_key, epoch_id)
1814                .expect("db error"),
1815        }
1816    }
1817
1818    fn get_latest_marker(
1819        &self,
1820        object_id: FullObjectID,
1821        epoch_id: EpochId,
1822    ) -> Option<(SequenceNumber, MarkerValue)> {
1823        match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1824            CacheResult::Hit((v, marker)) => Some((v, marker)),
1825            CacheResult::NegativeHit => {
1826                panic!("cannot have negative hit when getting latest marker")
1827            }
1828            CacheResult::Miss => self
1829                .record_db_get("marker_latest")
1830                .get_latest_marker(object_id, epoch_id)
1831                .expect("db error"),
1832        }
1833    }
1834
1835    fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult {
1836        let cur_epoch = epoch_store.epoch();
1837        match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1838            CacheResult::Hit((_, obj)) => {
1839                let actual_objref = obj.compute_object_reference();
1840                if obj_ref != actual_objref {
1841                    Ok(ObjectLockStatus::LockedAtDifferentVersion {
1842                        locked_ref: actual_objref,
1843                    })
1844                } else {
1845                    // requested object ref is live, check if there is a lock
1846                    Ok(
1847                        match self
1848                            .object_locks
1849                            .get_transaction_lock(&obj_ref, epoch_store)?
1850                        {
1851                            Some(tx_digest) => ObjectLockStatus::LockedToTx {
1852                                locked_by_tx: LockDetailsDeprecated {
1853                                    epoch: cur_epoch,
1854                                    tx_digest,
1855                                },
1856                            },
1857                            None => ObjectLockStatus::Initialized,
1858                        },
1859                    )
1860                }
1861            }
1862            CacheResult::NegativeHit => {
1863                Err(SuiError::from(UserInputError::ObjectNotFound {
1864                    object_id: obj_ref.0,
1865                    // even though we know the requested version, we leave it as None to indicate
1866                    // that the object does not exist at any version
1867                    version: None,
1868                }))
1869            }
1870            CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1871        }
1872    }
1873
1874    fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef> {
1875        let obj = self.get_object_impl("live_objref", &object_id).ok_or(
1876            UserInputError::ObjectNotFound {
1877                object_id,
1878                version: None,
1879            },
1880        )?;
1881        Ok(obj.compute_object_reference())
1882    }
1883
1884    fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1885        do_fallback_lookup_fallible(
1886            owned_object_refs,
1887            |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1888                CacheResult::Hit((version, obj)) => {
1889                    if obj.compute_object_reference() != *obj_ref {
1890                        Err(UserInputError::ObjectVersionUnavailableForConsumption {
1891                            provided_obj_ref: *obj_ref,
1892                            current_version: version,
1893                        }
1894                        .into())
1895                    } else {
1896                        Ok(CacheResult::Hit(()))
1897                    }
1898                }
1899                CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1900                    object_id: obj_ref.0,
1901                    version: None,
1902                }
1903                .into()),
1904                CacheResult::Miss => Ok(CacheResult::Miss),
1905            },
1906            |remaining| {
1907                self.record_db_multi_get("object_is_live", remaining.len())
1908                    .check_owned_objects_are_live(remaining)?;
1909                Ok(vec![(); remaining.len()])
1910            },
1911        )?;
1912        Ok(())
1913    }
1914
1915    fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber> {
1916        self.store
1917            .perpetual_tables
1918            .get_highest_pruned_checkpoint()
1919            .expect("db error")
1920    }
1921
1922    fn notify_read_input_objects<'a>(
1923        &'a self,
1924        input_and_receiving_keys: &'a [InputKey],
1925        receiving_keys: &'a HashSet<InputKey>,
1926        epoch: EpochId,
1927    ) -> BoxFuture<'a, ()> {
1928        self.object_notify_read
1929            .read(
1930                "notify_read_input_objects",
1931                input_and_receiving_keys,
1932                move |keys| {
1933                    self.multi_input_objects_available(keys, receiving_keys, epoch)
1934                        .into_iter()
1935                        .map(|available| if available { Some(()) } else { None })
1936                        .collect::<Vec<_>>()
1937                },
1938            )
1939            .map(|_| ())
1940            .boxed()
1941    }
1942}
1943
1944impl TransactionCacheRead for WritebackCache {
1945    fn multi_get_transaction_blocks(
1946        &self,
1947        digests: &[TransactionDigest],
1948    ) -> Vec<Option<Arc<VerifiedTransaction>>> {
1949        let digests_and_tickets: Vec<_> = digests
1950            .iter()
1951            .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1952            .collect();
1953        do_fallback_lookup(
1954            &digests_and_tickets,
1955            |(digest, _)| {
1956                self.metrics
1957                    .record_cache_request("transaction_block", "uncommitted");
1958                if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1959                    self.metrics
1960                        .record_cache_hit("transaction_block", "uncommitted");
1961                    return CacheResult::Hit(Some(tx.transaction.clone()));
1962                }
1963                self.metrics
1964                    .record_cache_miss("transaction_block", "uncommitted");
1965
1966                self.metrics
1967                    .record_cache_request("transaction_block", "committed");
1968
1969                match self
1970                    .cached
1971                    .transactions
1972                    .get(digest)
1973                    .map(|l| l.lock().clone())
1974                {
1975                    Some(PointCacheItem::Some(tx)) => {
1976                        self.metrics
1977                            .record_cache_hit("transaction_block", "committed");
1978                        CacheResult::Hit(Some(tx))
1979                    }
1980                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
1981                    None => {
1982                        self.metrics
1983                            .record_cache_miss("transaction_block", "committed");
1984
1985                        CacheResult::Miss
1986                    }
1987                }
1988            },
1989            |remaining| {
1990                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1991                let results: Vec<_> = self
1992                    .record_db_multi_get("transaction_block", remaining.len())
1993                    .multi_get_transaction_blocks(&remaining_digests)
1994                    .expect("db error")
1995                    .into_iter()
1996                    .map(|o| o.map(Arc::new))
1997                    .collect();
1998                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1999                    if result.is_none() {
2000                        self.cached.transactions.insert(digest, None, *ticket).ok();
2001                    }
2002                }
2003                results
2004            },
2005        )
2006    }
2007
2008    fn multi_get_executed_effects_digests(
2009        &self,
2010        digests: &[TransactionDigest],
2011    ) -> Vec<Option<TransactionEffectsDigest>> {
2012        let digests_and_tickets: Vec<_> = digests
2013            .iter()
2014            .map(|d| {
2015                (
2016                    *d,
2017                    self.cached.executed_effects_digests.get_ticket_for_read(d),
2018                )
2019            })
2020            .collect();
2021        do_fallback_lookup(
2022            &digests_and_tickets,
2023            |(digest, _)| {
2024                self.metrics
2025                    .record_cache_request("executed_effects_digests", "uncommitted");
2026                if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
2027                    self.metrics
2028                        .record_cache_hit("executed_effects_digests", "uncommitted");
2029                    return CacheResult::Hit(Some(*digest));
2030                }
2031                self.metrics
2032                    .record_cache_miss("executed_effects_digests", "uncommitted");
2033
2034                self.metrics
2035                    .record_cache_request("executed_effects_digests", "committed");
2036                match self
2037                    .cached
2038                    .executed_effects_digests
2039                    .get(digest)
2040                    .map(|l| *l.lock())
2041                {
2042                    Some(PointCacheItem::Some(digest)) => {
2043                        self.metrics
2044                            .record_cache_hit("executed_effects_digests", "committed");
2045                        CacheResult::Hit(Some(digest))
2046                    }
2047                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2048                    None => {
2049                        self.metrics
2050                            .record_cache_miss("executed_effects_digests", "committed");
2051                        CacheResult::Miss
2052                    }
2053                }
2054            },
2055            |remaining| {
2056                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2057                let results = self
2058                    .record_db_multi_get("executed_effects_digests", remaining.len())
2059                    .multi_get_executed_effects_digests(&remaining_digests)
2060                    .expect("db error");
2061                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2062                    if result.is_none() {
2063                        self.cached
2064                            .executed_effects_digests
2065                            .insert(digest, None, *ticket)
2066                            .ok();
2067                    }
2068                }
2069                results
2070            },
2071        )
2072    }
2073
2074    fn multi_get_effects(
2075        &self,
2076        digests: &[TransactionEffectsDigest],
2077    ) -> Vec<Option<TransactionEffects>> {
2078        let digests_and_tickets: Vec<_> = digests
2079            .iter()
2080            .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
2081            .collect();
2082        do_fallback_lookup(
2083            &digests_and_tickets,
2084            |(digest, _)| {
2085                self.metrics
2086                    .record_cache_request("transaction_effects", "uncommitted");
2087                if let Some(effects) = self.dirty.transaction_effects.get(digest) {
2088                    self.metrics
2089                        .record_cache_hit("transaction_effects", "uncommitted");
2090                    return CacheResult::Hit(Some(effects.clone()));
2091                }
2092                self.metrics
2093                    .record_cache_miss("transaction_effects", "uncommitted");
2094
2095                self.metrics
2096                    .record_cache_request("transaction_effects", "committed");
2097                match self
2098                    .cached
2099                    .transaction_effects
2100                    .get(digest)
2101                    .map(|l| l.lock().clone())
2102                {
2103                    Some(PointCacheItem::Some(effects)) => {
2104                        self.metrics
2105                            .record_cache_hit("transaction_effects", "committed");
2106                        CacheResult::Hit(Some((*effects).clone()))
2107                    }
2108                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2109                    None => {
2110                        self.metrics
2111                            .record_cache_miss("transaction_effects", "committed");
2112                        CacheResult::Miss
2113                    }
2114                }
2115            },
2116            |remaining| {
2117                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2118                let results = self
2119                    .record_db_multi_get("transaction_effects", remaining.len())
2120                    .multi_get_effects(remaining_digests.iter())
2121                    .expect("db error");
2122                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2123                    if result.is_none() {
2124                        self.cached
2125                            .transaction_effects
2126                            .insert(digest, None, *ticket)
2127                            .ok();
2128                    }
2129                }
2130                results
2131            },
2132        )
2133    }
2134
2135    fn transaction_executed_in_last_epoch(
2136        &self,
2137        digest: &TransactionDigest,
2138        current_epoch: EpochId,
2139    ) -> bool {
2140        if current_epoch == 0 {
2141            return false;
2142        }
2143        let last_epoch = current_epoch - 1;
2144        let cache_key = (last_epoch, *digest);
2145
2146        let ticket = self
2147            .cached
2148            .transaction_executed_in_last_epoch
2149            .get_ticket_for_read(&cache_key);
2150
2151        if let Some(cached) = self
2152            .cached
2153            .transaction_executed_in_last_epoch
2154            .get(&cache_key)
2155        {
2156            return cached.lock().is_some();
2157        }
2158
2159        let was_executed = self
2160            .store
2161            .perpetual_tables
2162            .was_transaction_executed_in_last_epoch(digest, current_epoch);
2163
2164        let value = if was_executed { Some(()) } else { None };
2165        self.cached
2166            .transaction_executed_in_last_epoch
2167            .insert(&cache_key, value, ticket)
2168            .ok();
2169
2170        was_executed
2171    }
2172
2173    fn notify_read_executed_effects_digests<'a>(
2174        &'a self,
2175        task_name: &'static str,
2176        digests: &'a [TransactionDigest],
2177    ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>> {
2178        self.executed_effects_digests_notify_read
2179            .read(task_name, digests, |digests| {
2180                self.multi_get_executed_effects_digests(digests)
2181            })
2182            .boxed()
2183    }
2184
2185    fn multi_get_events(
2186        &self,
2187        event_digests: &[TransactionDigest],
2188    ) -> Vec<Option<TransactionEvents>> {
2189        fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
2190            if events.data.is_empty() {
2191                None
2192            } else {
2193                Some(events)
2194            }
2195        }
2196
2197        let digests_and_tickets: Vec<_> = event_digests
2198            .iter()
2199            .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2200            .collect();
2201        do_fallback_lookup(
2202            &digests_and_tickets,
2203            |(digest, _)| {
2204                self.metrics
2205                    .record_cache_request("transaction_events", "uncommitted");
2206                if let Some(events) = self.dirty.transaction_events.get(digest).map(|e| e.clone()) {
2207                    self.metrics
2208                        .record_cache_hit("transaction_events", "uncommitted");
2209
2210                    return CacheResult::Hit(map_events(events));
2211                }
2212                self.metrics
2213                    .record_cache_miss("transaction_events", "uncommitted");
2214
2215                self.metrics
2216                    .record_cache_request("transaction_events", "committed");
2217                match self
2218                    .cached
2219                    .transaction_events
2220                    .get(digest)
2221                    .map(|l| l.lock().clone())
2222                {
2223                    Some(PointCacheItem::Some(events)) => {
2224                        self.metrics
2225                            .record_cache_hit("transaction_events", "committed");
2226                        CacheResult::Hit(map_events((*events).clone()))
2227                    }
2228                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2229                    None => {
2230                        self.metrics
2231                            .record_cache_miss("transaction_events", "committed");
2232
2233                        CacheResult::Miss
2234                    }
2235                }
2236            },
2237            |remaining| {
2238                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2239                let results = self
2240                    .store
2241                    .multi_get_events(&remaining_digests)
2242                    .expect("db error");
2243                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2244                    if result.is_none() {
2245                        self.cached
2246                            .transaction_events
2247                            .insert(digest, None, *ticket)
2248                            .ok();
2249                    }
2250                }
2251                results
2252            },
2253        )
2254    }
2255
2256    fn get_unchanged_loaded_runtime_objects(
2257        &self,
2258        digest: &TransactionDigest,
2259    ) -> Option<Vec<ObjectKey>> {
2260        self.dirty
2261            .unchanged_loaded_runtime_objects
2262            .get(digest)
2263            .map(|b| b.clone())
2264            .or_else(|| {
2265                self.store
2266                    .get_unchanged_loaded_runtime_objects(digest)
2267                    .expect("db error")
2268            })
2269    }
2270
2271    fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
2272        self.dirty
2273            .pending_transaction_writes
2274            .get(digest)
2275            .map(|transaction_output| transaction_output.take_accumulator_events())
2276    }
2277}
2278
2279impl ExecutionCacheWrite for WritebackCache {
2280    fn validate_owned_object_versions(&self, owned_input_objects: &[ObjectRef]) -> SuiResult {
2281        ObjectLocks::validate_owned_object_versions(self, owned_input_objects)
2282    }
2283
2284    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
2285        WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs);
2286    }
2287
2288    #[cfg(test)]
2289    fn write_object_entry_for_test(&self, object: Object) {
2290        self.write_object_entry(&object.id(), object.version(), object.into());
2291    }
2292}
2293
2294implement_passthrough_traits!(WritebackCache);
2295
2296impl GlobalStateHashStore for WritebackCache {
2297    fn get_object_ref_prior_to_key_deprecated(
2298        &self,
2299        object_id: &ObjectID,
2300        version: SequenceNumber,
2301    ) -> SuiResult<Option<ObjectRef>> {
2302        // There is probably a more efficient way to implement this, but since this is only used by
2303        // old protocol versions, it is better to do the simple thing that is obviously correct.
2304        // In this case we previous version from all sources and choose the highest
2305        let mut candidates = Vec::new();
2306
2307        let check_versions =
2308            |versions: &CachedVersionMap<ObjectEntry>| match versions.get_prior_to(&version) {
2309                Some((version, object_entry)) => match object_entry {
2310                    ObjectEntry::Object(object) => {
2311                        assert_eq!(object.version(), version);
2312                        Some(object.compute_object_reference())
2313                    }
2314                    ObjectEntry::Deleted => {
2315                        Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED))
2316                    }
2317                    ObjectEntry::Wrapped => {
2318                        Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED))
2319                    }
2320                },
2321                None => None,
2322            };
2323
2324        // first check dirty data
2325        if let Some(objects) = self.dirty.objects.get(object_id)
2326            && let Some(prior) = check_versions(&objects)
2327        {
2328            candidates.push(prior);
2329        }
2330
2331        if let Some(objects) = self.cached.object_cache.get(object_id)
2332            && let Some(prior) = check_versions(&objects.lock())
2333        {
2334            candidates.push(prior);
2335        }
2336
2337        if let Some(prior) = self
2338            .store
2339            .get_object_ref_prior_to_key_deprecated(object_id, version)?
2340        {
2341            candidates.push(prior);
2342        }
2343
2344        // sort candidates by version, and return the highest
2345        candidates.sort_by_key(|(_, version, _)| *version);
2346        Ok(candidates.pop())
2347    }
2348
2349    fn get_root_state_hash_for_epoch(
2350        &self,
2351        epoch: EpochId,
2352    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
2353        self.store.get_root_state_hash_for_epoch(epoch)
2354    }
2355
2356    fn get_root_state_hash_for_highest_epoch(
2357        &self,
2358    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
2359        self.store.get_root_state_hash_for_highest_epoch()
2360    }
2361
2362    fn insert_state_hash_for_epoch(
2363        &self,
2364        epoch: EpochId,
2365        checkpoint_seq_num: &CheckpointSequenceNumber,
2366        acc: &GlobalStateHash,
2367    ) -> SuiResult {
2368        self.store
2369            .insert_state_hash_for_epoch(epoch, checkpoint_seq_num, acc)
2370    }
2371
2372    fn iter_live_object_set(
2373        &self,
2374        include_wrapped_tombstone: bool,
2375    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2376        // The only time it is safe to iterate the live object set is at an epoch boundary,
2377        // at which point the db is consistent and the dirty cache is empty. So this does
2378        // read the cache
2379        assert!(
2380            self.dirty.is_empty(),
2381            "cannot iterate live object set with dirty data"
2382        );
2383        self.store.iter_live_object_set(include_wrapped_tombstone)
2384    }
2385
2386    // A version of iter_live_object_set that reads the cache. Only use for testing. If used
2387    // on a live validator, can cause the server to block for as long as it takes to iterate
2388    // the entire live object set.
2389    fn iter_cached_live_object_set_for_testing(
2390        &self,
2391        include_wrapped_tombstone: bool,
2392    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2393        // hold iter until we are finished to prevent any concurrent inserts/deletes
2394        let iter = self.dirty.objects.iter();
2395        let mut dirty_objects = BTreeMap::new();
2396
2397        // add everything from the store
2398        for obj in self.store.iter_live_object_set(include_wrapped_tombstone) {
2399            dirty_objects.insert(obj.object_id(), obj);
2400        }
2401
2402        // add everything from the cache, but also remove deletions
2403        for entry in iter {
2404            let id = *entry.key();
2405            let value = entry.value();
2406            match value.get_highest().unwrap() {
2407                (_, ObjectEntry::Object(object)) => {
2408                    dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2409                }
2410                (version, ObjectEntry::Wrapped) => {
2411                    if include_wrapped_tombstone {
2412                        dirty_objects.insert(id, LiveObject::Wrapped(ObjectKey(id, *version)));
2413                    } else {
2414                        dirty_objects.remove(&id);
2415                    }
2416                }
2417                (_, ObjectEntry::Deleted) => {
2418                    dirty_objects.remove(&id);
2419                }
2420            }
2421        }
2422
2423        Box::new(dirty_objects.into_values())
2424    }
2425}
2426
2427// TODO: For correctness, we must at least invalidate the cache when items are written through this
2428// trait (since they could be negatively cached as absent). But it may or may not be optimal to
2429// actually insert them into the cache. For instance if state sync is running ahead of execution,
2430// they might evict other items that are about to be read. This could be an area for tuning in the
2431// future.
2432impl StateSyncAPI for WritebackCache {
2433    fn insert_transaction_and_effects(
2434        &self,
2435        transaction: &VerifiedTransaction,
2436        transaction_effects: &TransactionEffects,
2437    ) {
2438        self.store
2439            .insert_transaction_and_effects(transaction, transaction_effects)
2440            .expect("db error");
2441        self.cached
2442            .transactions
2443            .insert(
2444                transaction.digest(),
2445                PointCacheItem::Some(Arc::new(transaction.clone())),
2446                Ticket::Write,
2447            )
2448            .ok();
2449        self.cached
2450            .transaction_effects
2451            .insert(
2452                &transaction_effects.digest(),
2453                PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2454                Ticket::Write,
2455            )
2456            .ok();
2457    }
2458
2459    fn multi_insert_transaction_and_effects(
2460        &self,
2461        transactions_and_effects: &[VerifiedExecutionData],
2462    ) {
2463        self.store
2464            .multi_insert_transaction_and_effects(transactions_and_effects.iter())
2465            .expect("db error");
2466        for VerifiedExecutionData {
2467            transaction,
2468            effects,
2469        } in transactions_and_effects
2470        {
2471            self.cached
2472                .transactions
2473                .insert(
2474                    transaction.digest(),
2475                    PointCacheItem::Some(Arc::new(transaction.clone())),
2476                    Ticket::Write,
2477                )
2478                .ok();
2479            self.cached
2480                .transaction_effects
2481                .insert(
2482                    &effects.digest(),
2483                    PointCacheItem::Some(Arc::new(effects.clone())),
2484                    Ticket::Write,
2485                )
2486                .ok();
2487        }
2488    }
2489}