sui_core/execution_cache/
writeback_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! MemoryCache is a cache for the transaction execution which delays writes to the database until
5//! transaction results are certified (i.e. they appear in a certified checkpoint, or an effects cert
6//! is observed by a fullnode). The cache also stores committed data in memory in order to serve
7//! future reads without hitting the database.
8//!
9//! For storing uncommitted transaction outputs, we cannot evict the data at all until it is written
10//! to disk. Committed data not only can be evicted, but it is also unbounded (imagine a stream of
11//! transactions that keep splitting a coin into smaller coins).
12//!
13//! We also want to be able to support negative cache hits (i.e. the case where we can determine an
14//! object does not exist without hitting the database).
15//!
16//! To achieve both of these goals, we split the cache data into two pieces, a dirty set and a cached
17//! set. The dirty set has no automatic evictions, data is only removed after being committed. The
18//! cached set is in a bounded-sized cache with automatic evictions. In order to support negative
19//! cache hits, we treat the two halves of the cache as FIFO queue. Newly written (dirty) versions are
20//! inserted to one end of the dirty queue. As versions are committed to disk, they are
21//! removed from the other end of the dirty queue and inserted into the cache queue. The cache queue
22//! is truncated if it exceeds its maximum size, by removing all but the N newest versions.
23//!
24//! This gives us the property that the sequence of versions in the dirty and cached queues are the
25//! most recent versions of the object, i.e. there can be no "gaps". This allows for the following:
26//!
27//!   - Negative cache hits: If the queried version is not in memory, but is higher than the smallest
28//!     version in the cached queue, it does not exist in the db either.
29//!   - Bounded reads: When reading the most recent version that is <= some version bound, we can
30//!     correctly satisfy this query from the cache, or determine that we must go to the db.
31//!
32//! Note that at any time, either or both the dirty or the cached queue may be non-existent. There may be no
33//! dirty versions of the objects, in which case there will be no dirty queue. And, the cached queue
34//! may be evicted from the cache, in which case there will be no cached queue. Because only the cached
35//! queue can be evicted (the dirty queue can only become empty by moving versions from it to the cached
36//! queue), the "highest versions" property still holds in all cases.
37//!
38//! The above design is used for both objects and markers.
39
40use crate::authority::AuthorityStore;
41use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
42use crate::authority::authority_store::{
43    ExecutionLockWriteGuard, LockDetailsDeprecated, ObjectLockStatus, SuiLockResult,
44};
45use crate::authority::authority_store_tables::LiveObject;
46use crate::authority::backpressure::BackpressureManager;
47use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
48use crate::fallback_fetch::{do_fallback_lookup, do_fallback_lookup_fallible};
49use crate::global_state_hasher::GlobalStateHashStore;
50use crate::transaction_outputs::TransactionOutputs;
51
52use dashmap::DashMap;
53use dashmap::mapref::entry::Entry as DashMapEntry;
54use futures::{FutureExt, future::BoxFuture};
55use moka::sync::SegmentedCache as MokaCache;
56use mysten_common::debug_fatal;
57use mysten_common::random_util::randomize_cache_capacity_in_tests;
58use mysten_common::sync::notify_read::NotifyRead;
59use parking_lot::Mutex;
60use std::collections::{BTreeMap, HashSet};
61use std::hash::Hash;
62use std::sync::Arc;
63use std::sync::atomic::AtomicU64;
64use sui_config::ExecutionCacheConfig;
65use sui_macros::fail_point;
66use sui_protocol_config::ProtocolVersion;
67use sui_types::accumulator_event::AccumulatorEvent;
68use sui_types::base_types::{
69    EpochId, FullObjectID, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData,
70};
71use sui_types::bridge::{Bridge, get_bridge};
72use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest};
73use sui_types::effects::{TransactionEffects, TransactionEvents};
74use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
75use sui_types::executable_transaction::VerifiedExecutableTransaction;
76use sui_types::global_state_hash::GlobalStateHash;
77use sui_types::message_envelope::Message;
78use sui_types::messages_checkpoint::CheckpointSequenceNumber;
79use sui_types::object::Object;
80use sui_types::storage::{
81    FullObjectKey, InputKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject,
82};
83use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
84use sui_types::transaction::{TransactionDataAPI, VerifiedSignedTransaction, VerifiedTransaction};
85use tap::TapOptional;
86use tracing::{debug, info, instrument, trace, warn};
87
88use super::ExecutionCacheAPI;
89use super::cache_types::Ticket;
90use super::{
91    Batch, CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
92    ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
93    cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache},
94    implement_passthrough_traits,
95    object_locks::ObjectLocks,
96};
97
98#[cfg(test)]
99#[path = "unit_tests/writeback_cache_tests.rs"]
100pub mod writeback_cache_tests;
101
102#[cfg(test)]
103#[path = "unit_tests/notify_read_input_objects_tests.rs"]
104mod notify_read_input_objects_tests;
105
106#[derive(Clone, PartialEq, Eq)]
107enum ObjectEntry {
108    Object(Object),
109    Deleted,
110    Wrapped,
111}
112
113impl ObjectEntry {
114    #[cfg(test)]
115    fn unwrap_object(&self) -> &Object {
116        match self {
117            ObjectEntry::Object(o) => o,
118            _ => panic!("unwrap_object called on non-Object"),
119        }
120    }
121
122    fn is_tombstone(&self) -> bool {
123        match self {
124            ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
125            ObjectEntry::Object(_) => false,
126        }
127    }
128}
129
130impl std::fmt::Debug for ObjectEntry {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        match self {
133            ObjectEntry::Object(o) => {
134                write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
135            }
136            ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
137            ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
138        }
139    }
140}
141
142impl From<Object> for ObjectEntry {
143    fn from(object: Object) -> Self {
144        ObjectEntry::Object(object)
145    }
146}
147
148impl From<ObjectOrTombstone> for ObjectEntry {
149    fn from(object: ObjectOrTombstone) -> Self {
150        match object {
151            ObjectOrTombstone::Object(o) => o.into(),
152            ObjectOrTombstone::Tombstone(obj_ref) => {
153                if obj_ref.2.is_deleted() {
154                    ObjectEntry::Deleted
155                } else if obj_ref.2.is_wrapped() {
156                    ObjectEntry::Wrapped
157                } else {
158                    panic!("tombstone digest must either be deleted or wrapped");
159                }
160            }
161        }
162    }
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166enum LatestObjectCacheEntry {
167    Object(SequenceNumber, ObjectEntry),
168    NonExistent,
169}
170
171impl LatestObjectCacheEntry {
172    #[cfg(test)]
173    fn version(&self) -> Option<SequenceNumber> {
174        match self {
175            LatestObjectCacheEntry::Object(version, _) => Some(*version),
176            LatestObjectCacheEntry::NonExistent => None,
177        }
178    }
179
180    fn is_alive(&self) -> bool {
181        match self {
182            LatestObjectCacheEntry::Object(_, entry) => !entry.is_tombstone(),
183            LatestObjectCacheEntry::NonExistent => false,
184        }
185    }
186}
187
188impl IsNewer for LatestObjectCacheEntry {
189    fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
190        match (self, other) {
191            (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
192                v1 > v2
193            }
194            (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
195            _ => false,
196        }
197    }
198}
199
200type MarkerKey = (EpochId, FullObjectID);
201
202/// UncommittedData stores execution outputs that are not yet written to the db. Entries in this
203/// struct can only be purged after they are committed.
204struct UncommittedData {
205    /// The object dirty set. All writes go into this table first. After we flush the data to the
206    /// db, the data is removed from this table and inserted into the object_cache.
207    ///
208    /// This table may contain both live and dead objects, since we flush both live and dead
209    /// objects to the db in order to support past object queries on fullnodes.
210    ///
211    /// Further, we only remove objects in FIFO order, which ensures that the cached
212    /// sequence of objects has no gaps. In other words, if we have versions 4, 8, 13 of
213    /// an object, we can deduce that version 9 does not exist. This also makes child object
214    /// reads efficient. `object_cache` cannot contain a more recent version of an object than
215    /// `objects`, and neither can have any gaps. Therefore if there is any object <= the version
216    /// bound for a child read in objects, it is the correct object to return.
217    objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
218
219    // Markers for received objects and deleted shared objects. This contains all of the dirty
220    // marker state, which is committed to the db at the same time as other transaction data.
221    // After markers are committed to the db we remove them from this table and insert them into
222    // marker_cache.
223    markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
224
225    transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
226
227    transaction_events: DashMap<TransactionDigest, TransactionEvents>,
228
229    unchanged_loaded_runtime_objects: DashMap<TransactionDigest, Vec<ObjectKey>>,
230
231    executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
232
233    // Transaction outputs that have not yet been written to the DB. Items are removed from this
234    // table as they are flushed to the db.
235    pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
236
237    // Transactions outputs from Mysticeti fastpath certified transaction executions.
238    // These outputs are not written to pending_transaction_writes until we are sure
239    // that they will not get rejected by consensus. This ensures that no dependent
240    // transactions can sign using the outputs of a fastpath certified transaction.
241    // Otherwise it will be challenging to revert them.
242    // We use a cache because it is possible to have entries that are not finalized
243    // due to data races, i.e. a transaction is first fastpath certified, then
244    // rejected through consensus commit, but at the same time it was executed
245    // and outputs are written here. We won't have a chance to remove them anymore.
246    // So we rely on the cache to evict them eventually.
247    // It is also safe to evict a transaction that will eventually be finalized,
248    // as we will just re-execute it.
249    fastpath_transaction_outputs: MokaCache<TransactionDigest, Arc<TransactionOutputs>>,
250
251    total_transaction_inserts: AtomicU64,
252    total_transaction_commits: AtomicU64,
253}
254
255impl UncommittedData {
256    fn new(config: &ExecutionCacheConfig) -> Self {
257        Self {
258            objects: DashMap::with_shard_amount(2048),
259            markers: DashMap::with_shard_amount(2048),
260            transaction_effects: DashMap::with_shard_amount(2048),
261            executed_effects_digests: DashMap::with_shard_amount(2048),
262            pending_transaction_writes: DashMap::with_shard_amount(2048),
263            fastpath_transaction_outputs: MokaCache::builder(8)
264                .max_capacity(randomize_cache_capacity_in_tests(
265                    config.fastpath_transaction_outputs_cache_size(),
266                ))
267                .build(),
268            transaction_events: DashMap::with_shard_amount(2048),
269            unchanged_loaded_runtime_objects: DashMap::with_shard_amount(2048),
270            total_transaction_inserts: AtomicU64::new(0),
271            total_transaction_commits: AtomicU64::new(0),
272        }
273    }
274
275    fn clear(&self) {
276        self.objects.clear();
277        self.markers.clear();
278        self.transaction_effects.clear();
279        self.executed_effects_digests.clear();
280        self.pending_transaction_writes.clear();
281        self.fastpath_transaction_outputs.invalidate_all();
282        self.transaction_events.clear();
283        self.unchanged_loaded_runtime_objects.clear();
284        self.total_transaction_inserts
285            .store(0, std::sync::atomic::Ordering::Relaxed);
286        self.total_transaction_commits
287            .store(0, std::sync::atomic::Ordering::Relaxed);
288    }
289
290    fn is_empty(&self) -> bool {
291        let empty = self.pending_transaction_writes.is_empty();
292        if empty && cfg!(debug_assertions) {
293            assert!(
294                self.objects.is_empty()
295                    && self.markers.is_empty()
296                    && self.transaction_effects.is_empty()
297                    && self.executed_effects_digests.is_empty()
298                    && self.transaction_events.is_empty()
299                    && self.unchanged_loaded_runtime_objects.is_empty()
300                    && self
301                        .total_transaction_inserts
302                        .load(std::sync::atomic::Ordering::Relaxed)
303                        == self
304                            .total_transaction_commits
305                            .load(std::sync::atomic::Ordering::Relaxed),
306            );
307        }
308        empty
309    }
310}
311
312// Point items (anything without a version number) can be negatively cached as None
313type PointCacheItem<T> = Option<T>;
314
315// PointCacheItem can only be used for insert-only collections, so a Some entry
316// is always newer than a None entry.
317impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
318    fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
319        match (self, other) {
320            (Some(_), None) => true,
321
322            (Some(a), Some(b)) => {
323                // conflicting inserts should never happen
324                debug_assert_eq!(a, b);
325                false
326            }
327
328            _ => false,
329        }
330    }
331}
332
333/// CachedData stores data that has been committed to the db, but is likely to be read soon.
334struct CachedCommittedData {
335    // See module level comment for an explanation of caching strategy.
336    object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
337
338    // See module level comment for an explanation of caching strategy.
339    marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
340
341    transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
342
343    transaction_effects:
344        MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
345
346    transaction_events: MonotonicCache<TransactionDigest, PointCacheItem<Arc<TransactionEvents>>>,
347
348    executed_effects_digests:
349        MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
350
351    transaction_executed_in_last_epoch:
352        MonotonicCache<(EpochId, TransactionDigest), PointCacheItem<()>>,
353
354    // Objects that were read at transaction signing time - allows us to access them again at
355    // execution time with a single lock / hash lookup
356    _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
357}
358
359impl CachedCommittedData {
360    fn new(config: &ExecutionCacheConfig) -> Self {
361        let object_cache = MokaCache::builder(8)
362            .max_capacity(randomize_cache_capacity_in_tests(
363                config.object_cache_size(),
364            ))
365            .build();
366        let marker_cache = MokaCache::builder(8)
367            .max_capacity(randomize_cache_capacity_in_tests(
368                config.marker_cache_size(),
369            ))
370            .build();
371
372        let transactions = MonotonicCache::new(randomize_cache_capacity_in_tests(
373            config.transaction_cache_size(),
374        ));
375        let transaction_effects = MonotonicCache::new(randomize_cache_capacity_in_tests(
376            config.effect_cache_size(),
377        ));
378        let transaction_events = MonotonicCache::new(randomize_cache_capacity_in_tests(
379            config.events_cache_size(),
380        ));
381        let executed_effects_digests = MonotonicCache::new(randomize_cache_capacity_in_tests(
382            config.executed_effect_cache_size(),
383        ));
384
385        let transaction_objects = MokaCache::builder(8)
386            .max_capacity(randomize_cache_capacity_in_tests(
387                config.transaction_objects_cache_size(),
388            ))
389            .build();
390
391        let transaction_executed_in_last_epoch = MonotonicCache::new(
392            randomize_cache_capacity_in_tests(config.executed_effect_cache_size()),
393        );
394
395        Self {
396            object_cache,
397            marker_cache,
398            transactions,
399            transaction_effects,
400            transaction_events,
401            executed_effects_digests,
402            transaction_executed_in_last_epoch,
403            _transaction_objects: transaction_objects,
404        }
405    }
406
407    fn clear_and_assert_empty(&self) {
408        self.object_cache.invalidate_all();
409        self.marker_cache.invalidate_all();
410        self.transactions.invalidate_all();
411        self.transaction_effects.invalidate_all();
412        self.transaction_events.invalidate_all();
413        self.executed_effects_digests.invalidate_all();
414        self.transaction_executed_in_last_epoch.invalidate_all();
415        self._transaction_objects.invalidate_all();
416
417        assert_empty(&self.object_cache);
418        assert_empty(&self.marker_cache);
419        assert!(self.transactions.is_empty());
420        assert!(self.transaction_effects.is_empty());
421        assert!(self.transaction_events.is_empty());
422        assert!(self.executed_effects_digests.is_empty());
423        assert!(self.transaction_executed_in_last_epoch.is_empty());
424        assert_empty(&self._transaction_objects);
425    }
426}
427
428fn assert_empty<K, V>(cache: &MokaCache<K, V>)
429where
430    K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
431    V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
432{
433    if cache.iter().next().is_some() {
434        panic!("cache should be empty");
435    }
436}
437
438pub struct WritebackCache {
439    dirty: UncommittedData,
440    cached: CachedCommittedData,
441
442    // We separately cache the latest version of each object. Although this seems
443    // redundant, it is the only way to support populating the cache after a read.
444    // We cannot simply insert objects that we read off the disk into `object_cache`,
445    // since that may violate the no-missing-versions property.
446    // `object_by_id_cache` is also written to on writes so that it is always coherent.
447    // Hence it contains both committed and dirty object data.
448    object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
449
450    // The packages cache is treated separately from objects, because they are immutable and can be
451    // used by any number of transactions. Additionally, many operations require loading large
452    // numbers of packages (due to dependencies), so we want to try to keep all packages in memory.
453    //
454    // Also, this cache can contain packages that are dirty or committed, so it does not live in
455    // UncachedData or CachedCommittedData. The cache is populated in two ways:
456    // - when packages are written (in which case they will also be present in the dirty set)
457    // - after a cache miss. Because package IDs are unique (only one version exists for each ID)
458    //   we do not need to worry about the contiguous version property.
459    // - note that we removed any unfinalized packages from the cache during revert_state_update().
460    packages: MokaCache<ObjectID, PackageObject>,
461
462    object_locks: ObjectLocks,
463
464    executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
465    object_notify_read: NotifyRead<InputKey, ()>,
466    fastpath_transaction_outputs_notify_read:
467        NotifyRead<TransactionDigest, Arc<TransactionOutputs>>,
468
469    store: Arc<AuthorityStore>,
470    backpressure_threshold: u64,
471    backpressure_manager: Arc<BackpressureManager>,
472    metrics: Arc<ExecutionCacheMetrics>,
473}
474
475macro_rules! check_cache_entry_by_version {
476    ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
477        $self.metrics.record_cache_request($table, $level);
478        if let Some(cache) = $cache {
479            if let Some(entry) = cache.get(&$version) {
480                $self.metrics.record_cache_hit($table, $level);
481                return CacheResult::Hit(entry.clone());
482            }
483
484            if let Some(least_version) = cache.get_least() {
485                if least_version.0 < $version {
486                    // If the version is greater than the least version in the cache, then we know
487                    // that the object does not exist anywhere
488                    $self.metrics.record_cache_negative_hit($table, $level);
489                    return CacheResult::NegativeHit;
490                }
491            }
492        }
493        $self.metrics.record_cache_miss($table, $level);
494    };
495}
496
497macro_rules! check_cache_entry_by_latest {
498    ($self: ident, $table: expr, $level: expr, $cache: expr) => {
499        $self.metrics.record_cache_request($table, $level);
500        if let Some(cache) = $cache {
501            if let Some((version, entry)) = cache.get_highest() {
502                $self.metrics.record_cache_hit($table, $level);
503                return CacheResult::Hit((*version, entry.clone()));
504            } else {
505                panic!("empty CachedVersionMap should have been removed");
506            }
507        }
508        $self.metrics.record_cache_miss($table, $level);
509    };
510}
511
512impl WritebackCache {
513    pub fn new(
514        config: &ExecutionCacheConfig,
515        store: Arc<AuthorityStore>,
516        metrics: Arc<ExecutionCacheMetrics>,
517        backpressure_manager: Arc<BackpressureManager>,
518    ) -> Self {
519        let packages = MokaCache::builder(8)
520            .max_capacity(randomize_cache_capacity_in_tests(
521                config.package_cache_size(),
522            ))
523            .build();
524        Self {
525            dirty: UncommittedData::new(config),
526            cached: CachedCommittedData::new(config),
527            object_by_id_cache: MonotonicCache::new(randomize_cache_capacity_in_tests(
528                config.object_by_id_cache_size(),
529            )),
530            packages,
531            object_locks: ObjectLocks::new(),
532            executed_effects_digests_notify_read: NotifyRead::new(),
533            object_notify_read: NotifyRead::new(),
534            fastpath_transaction_outputs_notify_read: NotifyRead::new(),
535            store,
536            backpressure_manager,
537            backpressure_threshold: config.backpressure_threshold(),
538            metrics,
539        }
540    }
541
542    pub fn new_for_tests(store: Arc<AuthorityStore>) -> Self {
543        Self::new(
544            &Default::default(),
545            store,
546            ExecutionCacheMetrics::new(&prometheus::Registry::new()).into(),
547            BackpressureManager::new_for_tests(),
548        )
549    }
550
551    #[cfg(test)]
552    pub fn reset_for_test(&mut self) {
553        let mut new = Self::new(
554            &Default::default(),
555            self.store.clone(),
556            self.metrics.clone(),
557            self.backpressure_manager.clone(),
558        );
559        std::mem::swap(self, &mut new);
560    }
561
562    pub fn evict_executed_effects_from_cache_for_testing(&self, tx_digest: &TransactionDigest) {
563        self.cached.executed_effects_digests.invalidate(tx_digest);
564        self.cached.transaction_events.invalidate(tx_digest);
565        self.cached.transactions.invalidate(tx_digest);
566    }
567
568    fn write_object_entry(
569        &self,
570        object_id: &ObjectID,
571        version: SequenceNumber,
572        object: ObjectEntry,
573    ) {
574        trace!(?object_id, ?version, ?object, "inserting object entry");
575        self.metrics.record_cache_write("object");
576
577        // We must hold the lock for the object entry while inserting to the
578        // object_by_id_cache. Otherwise, a surprising bug can occur:
579        //
580        // 1. A thread executing TX1 can write object (O,1) to the dirty set and then pause.
581        // 2. TX2, which reads (O,1) can begin executing, because ExecutionScheduler immediately
582        //    schedules transactions if their inputs are available. It does not matter that TX1
583        //    hasn't finished executing yet.
584        // 3. TX2 can write (O,2) to both the dirty set and the object_by_id_cache.
585        // 4. The thread executing TX1 can resume and write (O,1) to the object_by_id_cache.
586        //
587        // Now, any subsequent attempt to get the latest version of O will return (O,1) instead of
588        // (O,2).
589        //
590        // This seems very unlikely, but it may be possible under the following circumstances:
591        // - While a thread is unlikely to pause for so long, moka cache uses optimistic
592        //   lock-free algorithms that have retry loops. Possibly, under high contention, this
593        //   code might spin for a surprisingly long time.
594        // - Additionally, many concurrent re-executions of the same tx could happen due to
595        //   the tx finalizer, plus checkpoint executor, consensus, and RPCs from fullnodes.
596        let mut entry = self.dirty.objects.entry(*object_id).or_default();
597
598        self.object_by_id_cache
599            .insert(
600                object_id,
601                LatestObjectCacheEntry::Object(version, object.clone()),
602                Ticket::Write,
603            )
604            // While Ticket::Write cannot expire, this insert may still fail.
605            // See the comment in `MonotonicCache::insert`.
606            .ok();
607
608        entry.insert(version, object.clone());
609
610        if let ObjectEntry::Object(object) = &object {
611            if object.is_package() {
612                self.object_notify_read
613                    .notify(&InputKey::Package { id: *object_id }, &());
614            } else if !object.is_child_object() {
615                self.object_notify_read.notify(
616                    &InputKey::VersionedObject {
617                        id: object.full_id(),
618                        version: object.version(),
619                    },
620                    &(),
621                );
622            }
623        }
624    }
625
626    fn write_marker_value(
627        &self,
628        epoch_id: EpochId,
629        object_key: FullObjectKey,
630        marker_value: MarkerValue,
631    ) {
632        tracing::trace!("inserting marker value {object_key:?}: {marker_value:?}",);
633        self.metrics.record_cache_write("marker");
634        self.dirty
635            .markers
636            .entry((epoch_id, object_key.id()))
637            .or_default()
638            .value_mut()
639            .insert(object_key.version(), marker_value);
640        // It is possible for a transaction to use a consensus stream ended
641        // object in the input, hence we must notify that it is now available
642        // at the assigned version, so that any transaction waiting for this
643        // object version can start execution.
644        if matches!(marker_value, MarkerValue::ConsensusStreamEnded(_)) {
645            self.object_notify_read.notify(
646                &InputKey::VersionedObject {
647                    id: object_key.id(),
648                    version: object_key.version(),
649                },
650                &(),
651            );
652        }
653    }
654
655    // lock both the dirty and committed sides of the cache, and then pass the entries to
656    // the callback. Written with the `with` pattern because any other way of doing this
657    // creates lifetime hell.
658    fn with_locked_cache_entries<K, V, R>(
659        dirty_map: &DashMap<K, CachedVersionMap<V>>,
660        cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
661        key: &K,
662        cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
663    ) -> R
664    where
665        K: Copy + Eq + Hash + Send + Sync + 'static,
666        V: Send + Sync + 'static,
667    {
668        let dirty_entry = dirty_map.entry(*key);
669        let dirty_entry = match &dirty_entry {
670            DashMapEntry::Occupied(occupied) => Some(occupied.get()),
671            DashMapEntry::Vacant(_) => None,
672        };
673
674        let cached_entry = cached_map.get(key);
675        let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
676        let cached_entry = cached_lock.as_deref();
677
678        cb(dirty_entry, cached_entry)
679    }
680
681    // Attempt to get an object from the cache. The DB is not consulted.
682    // Can return Hit, Miss, or NegativeHit (if the object is known to not exist).
683    fn get_object_entry_by_key_cache_only(
684        &self,
685        object_id: &ObjectID,
686        version: SequenceNumber,
687    ) -> CacheResult<ObjectEntry> {
688        Self::with_locked_cache_entries(
689            &self.dirty.objects,
690            &self.cached.object_cache,
691            object_id,
692            |dirty_entry, cached_entry| {
693                check_cache_entry_by_version!(
694                    self,
695                    "object_by_version",
696                    "uncommitted",
697                    dirty_entry,
698                    version
699                );
700                check_cache_entry_by_version!(
701                    self,
702                    "object_by_version",
703                    "committed",
704                    cached_entry,
705                    version
706                );
707                CacheResult::Miss
708            },
709        )
710    }
711
712    fn get_object_by_key_cache_only(
713        &self,
714        object_id: &ObjectID,
715        version: SequenceNumber,
716    ) -> CacheResult<Object> {
717        match self.get_object_entry_by_key_cache_only(object_id, version) {
718            CacheResult::Hit(entry) => match entry {
719                ObjectEntry::Object(object) => CacheResult::Hit(object),
720                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
721            },
722            CacheResult::Miss => CacheResult::Miss,
723            CacheResult::NegativeHit => CacheResult::NegativeHit,
724        }
725    }
726
727    fn get_object_entry_by_id_cache_only(
728        &self,
729        request_type: &'static str,
730        object_id: &ObjectID,
731    ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
732        self.metrics
733            .record_cache_request(request_type, "object_by_id");
734        let entry = self.object_by_id_cache.get(object_id);
735
736        if cfg!(debug_assertions)
737            && let Some(entry) = &entry
738        {
739            // check that cache is coherent
740            let highest: Option<ObjectEntry> = self
741                .dirty
742                .objects
743                .get(object_id)
744                .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
745                .or_else(|| {
746                    let obj: Option<ObjectEntry> = self
747                        .store
748                        .get_latest_object_or_tombstone(*object_id)
749                        .unwrap()
750                        .map(|(_, o)| o.into());
751                    obj
752                });
753
754            let cache_entry = match &*entry.lock() {
755                LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
756                LatestObjectCacheEntry::NonExistent => None,
757            };
758
759            // If the cache entry is a tombstone, the db entry may be missing if it was pruned.
760            let tombstone_possibly_pruned = highest.is_none()
761                && cache_entry
762                    .as_ref()
763                    .map(|e| e.is_tombstone())
764                    .unwrap_or(false);
765
766            if highest != cache_entry && !tombstone_possibly_pruned {
767                tracing::error!(
768                    ?highest,
769                    ?cache_entry,
770                    ?tombstone_possibly_pruned,
771                    "object_by_id cache is incoherent for {:?}",
772                    object_id
773                );
774                panic!("object_by_id cache is incoherent for {:?}", object_id);
775            }
776        }
777
778        if let Some(entry) = entry {
779            let entry = entry.lock();
780            match &*entry {
781                LatestObjectCacheEntry::Object(latest_version, latest_object) => {
782                    self.metrics.record_cache_hit(request_type, "object_by_id");
783                    return CacheResult::Hit((*latest_version, latest_object.clone()));
784                }
785                LatestObjectCacheEntry::NonExistent => {
786                    self.metrics
787                        .record_cache_negative_hit(request_type, "object_by_id");
788                    return CacheResult::NegativeHit;
789                }
790            }
791        } else {
792            self.metrics.record_cache_miss(request_type, "object_by_id");
793        }
794
795        Self::with_locked_cache_entries(
796            &self.dirty.objects,
797            &self.cached.object_cache,
798            object_id,
799            |dirty_entry, cached_entry| {
800                check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
801                check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
802                CacheResult::Miss
803            },
804        )
805    }
806
807    fn get_object_by_id_cache_only(
808        &self,
809        request_type: &'static str,
810        object_id: &ObjectID,
811    ) -> CacheResult<(SequenceNumber, Object)> {
812        match self.get_object_entry_by_id_cache_only(request_type, object_id) {
813            CacheResult::Hit((version, entry)) => match entry {
814                ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
815                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
816            },
817            CacheResult::NegativeHit => CacheResult::NegativeHit,
818            CacheResult::Miss => CacheResult::Miss,
819        }
820    }
821
822    fn get_marker_value_cache_only(
823        &self,
824        object_key: FullObjectKey,
825        epoch_id: EpochId,
826    ) -> CacheResult<MarkerValue> {
827        Self::with_locked_cache_entries(
828            &self.dirty.markers,
829            &self.cached.marker_cache,
830            &(epoch_id, object_key.id()),
831            |dirty_entry, cached_entry| {
832                check_cache_entry_by_version!(
833                    self,
834                    "marker_by_version",
835                    "uncommitted",
836                    dirty_entry,
837                    object_key.version()
838                );
839                check_cache_entry_by_version!(
840                    self,
841                    "marker_by_version",
842                    "committed",
843                    cached_entry,
844                    object_key.version()
845                );
846                CacheResult::Miss
847            },
848        )
849    }
850
851    fn get_latest_marker_value_cache_only(
852        &self,
853        object_id: FullObjectID,
854        epoch_id: EpochId,
855    ) -> CacheResult<(SequenceNumber, MarkerValue)> {
856        Self::with_locked_cache_entries(
857            &self.dirty.markers,
858            &self.cached.marker_cache,
859            &(epoch_id, object_id),
860            |dirty_entry, cached_entry| {
861                check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
862                check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
863                CacheResult::Miss
864            },
865        )
866    }
867
868    fn get_object_impl(&self, request_type: &'static str, id: &ObjectID) -> Option<Object> {
869        let ticket = self.object_by_id_cache.get_ticket_for_read(id);
870        match self.get_object_entry_by_id_cache_only(request_type, id) {
871            CacheResult::Hit((_, entry)) => match entry {
872                ObjectEntry::Object(object) => Some(object),
873                ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
874            },
875            CacheResult::NegativeHit => None,
876            CacheResult::Miss => {
877                let obj = self
878                    .store
879                    .get_latest_object_or_tombstone(*id)
880                    .expect("db error");
881                match obj {
882                    Some((key, obj)) => {
883                        self.cache_latest_object_by_id(
884                            id,
885                            LatestObjectCacheEntry::Object(key.1, obj.clone().into()),
886                            ticket,
887                        );
888                        match obj {
889                            ObjectOrTombstone::Object(object) => Some(object),
890                            ObjectOrTombstone::Tombstone(_) => None,
891                        }
892                    }
893                    None => {
894                        self.cache_object_not_found(id, ticket);
895                        None
896                    }
897                }
898            }
899        }
900    }
901
902    fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
903        self.metrics.record_cache_request(request_type, "db");
904        &self.store
905    }
906
907    fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
908        self.metrics
909            .record_cache_multi_request(request_type, "db", count);
910        &self.store
911    }
912
913    #[instrument(level = "debug", skip_all)]
914    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
915        let tx_digest = *tx_outputs.transaction.digest();
916        trace!(?tx_digest, "writing transaction outputs to cache");
917
918        assert!(
919            !self.transaction_executed_in_last_epoch(&tx_digest, epoch_id),
920            "Transaction {:?} was already executed in epoch {}",
921            tx_digest,
922            epoch_id.saturating_sub(1)
923        );
924
925        self.dirty.fastpath_transaction_outputs.remove(&tx_digest);
926
927        let TransactionOutputs {
928            transaction,
929            effects,
930            markers,
931            written,
932            deleted,
933            wrapped,
934            events,
935            unchanged_loaded_runtime_objects,
936            ..
937        } = &*tx_outputs;
938
939        // Deletions and wraps must be written first. The reason is that one of the deletes
940        // may be a child object, and if we write the parent object first, a reader may or may
941        // not see the previous version of the child object, instead of the deleted/wrapped
942        // tombstone, which would cause an execution fork
943        for ObjectKey(id, version) in deleted.iter() {
944            self.write_object_entry(id, *version, ObjectEntry::Deleted);
945        }
946
947        for ObjectKey(id, version) in wrapped.iter() {
948            self.write_object_entry(id, *version, ObjectEntry::Wrapped);
949        }
950
951        // Update all markers
952        for (object_key, marker_value) in markers.iter() {
953            self.write_marker_value(epoch_id, *object_key, *marker_value);
954        }
955
956        // Write children before parents to ensure that readers do not observe a parent object
957        // before its most recent children are visible.
958        for (object_id, object) in written.iter() {
959            if object.is_child_object() {
960                self.write_object_entry(object_id, object.version(), object.clone().into());
961            }
962        }
963        for (object_id, object) in written.iter() {
964            if !object.is_child_object() {
965                self.write_object_entry(object_id, object.version(), object.clone().into());
966                if object.is_package() {
967                    debug!("caching package: {:?}", object.compute_object_reference());
968                    self.packages
969                        .insert(*object_id, PackageObject::new(object.clone()));
970                }
971            }
972        }
973
974        let tx_digest = *transaction.digest();
975        debug!(
976            ?tx_digest,
977            "Writing transaction output objects to cache: {:?}",
978            written
979                .values()
980                .map(|o| (o.id(), o.version()))
981                .collect::<Vec<_>>(),
982        );
983        let effects_digest = effects.digest();
984
985        self.metrics.record_cache_write("transaction_block");
986        self.dirty
987            .pending_transaction_writes
988            .insert(tx_digest, tx_outputs.clone());
989
990        // insert transaction effects before executed_effects_digests so that there
991        // are never dangling entries in executed_effects_digests
992        self.metrics.record_cache_write("transaction_effects");
993        self.dirty
994            .transaction_effects
995            .insert(effects_digest, effects.clone());
996
997        // note: if events.data.is_empty(), then there are no events for this transaction. We
998        // store it anyway to avoid special cases in commint_transaction_outputs, and translate
999        // an empty events structure to None when reading.
1000        self.metrics.record_cache_write("transaction_events");
1001        self.dirty
1002            .transaction_events
1003            .insert(tx_digest, events.clone());
1004
1005        self.metrics
1006            .record_cache_write("unchanged_loaded_runtime_objects");
1007        self.dirty
1008            .unchanged_loaded_runtime_objects
1009            .insert(tx_digest, unchanged_loaded_runtime_objects.clone());
1010
1011        self.metrics.record_cache_write("executed_effects_digests");
1012        self.dirty
1013            .executed_effects_digests
1014            .insert(tx_digest, effects_digest);
1015
1016        self.executed_effects_digests_notify_read
1017            .notify(&tx_digest, &effects_digest);
1018
1019        self.metrics
1020            .pending_notify_read
1021            .set(self.executed_effects_digests_notify_read.num_pending() as i64);
1022
1023        let prev = self
1024            .dirty
1025            .total_transaction_inserts
1026            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1027
1028        let pending_count = (prev + 1).saturating_sub(
1029            self.dirty
1030                .total_transaction_commits
1031                .load(std::sync::atomic::Ordering::Relaxed),
1032        );
1033
1034        self.set_backpressure(pending_count);
1035    }
1036
1037    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1038        let _metrics_guard = mysten_metrics::monitored_scope("WritebackCache::build_db_batch");
1039        let mut all_outputs = Vec::with_capacity(digests.len());
1040        for tx in digests {
1041            let Some(outputs) = self
1042                .dirty
1043                .pending_transaction_writes
1044                .get(tx)
1045                .map(|o| o.clone())
1046            else {
1047                // This can happen in the following rare case:
1048                // All transactions in the checkpoint are committed to the db (by commit_transaction_outputs,
1049                // called in CheckpointExecutor::process_executed_transactions), but the process crashes before
1050                // the checkpoint water mark is bumped. We will then re-commit the checkpoint at startup,
1051                // despite that all transactions are already executed.
1052                warn!("Attempt to commit unknown transaction {:?}", tx);
1053                continue;
1054            };
1055            all_outputs.push(outputs);
1056        }
1057
1058        let batch = self
1059            .store
1060            .build_db_batch(epoch, &all_outputs)
1061            .expect("db error");
1062        (all_outputs, batch)
1063    }
1064
1065    // Commits dirty data for the given TransactionDigest to the db.
1066    #[instrument(level = "debug", skip_all)]
1067    fn commit_transaction_outputs(
1068        &self,
1069        epoch: EpochId,
1070        (all_outputs, db_batch): Batch,
1071        digests: &[TransactionDigest],
1072    ) {
1073        let _metrics_guard =
1074            mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs");
1075        fail_point!("writeback-cache-commit");
1076        trace!(?digests);
1077
1078        // Flush writes to disk before removing anything from dirty set. otherwise,
1079        // a cache eviction could cause a value to disappear briefly, even if we insert to the
1080        // cache before removing from the dirty set.
1081        db_batch.write().expect("db error");
1082
1083        let _metrics_guard =
1084            mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs::flush");
1085        for outputs in all_outputs.iter() {
1086            let tx_digest = outputs.transaction.digest();
1087            assert!(
1088                self.dirty
1089                    .pending_transaction_writes
1090                    .remove(tx_digest)
1091                    .is_some()
1092            );
1093            self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
1094        }
1095
1096        let num_outputs = all_outputs.len() as u64;
1097        let num_commits = self
1098            .dirty
1099            .total_transaction_commits
1100            .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
1101            + num_outputs;
1102
1103        let pending_count = self
1104            .dirty
1105            .total_transaction_inserts
1106            .load(std::sync::atomic::Ordering::Relaxed)
1107            .saturating_sub(num_commits);
1108
1109        self.set_backpressure(pending_count);
1110    }
1111
1112    fn approximate_pending_transaction_count(&self) -> u64 {
1113        let num_commits = self
1114            .dirty
1115            .total_transaction_commits
1116            .load(std::sync::atomic::Ordering::Relaxed);
1117
1118        self.dirty
1119            .total_transaction_inserts
1120            .load(std::sync::atomic::Ordering::Relaxed)
1121            .saturating_sub(num_commits)
1122    }
1123
1124    fn set_backpressure(&self, pending_count: u64) {
1125        let backpressure = pending_count > self.backpressure_threshold;
1126        let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1127        if backpressure_changed {
1128            self.metrics.backpressure_toggles.inc();
1129        }
1130        self.metrics
1131            .backpressure_status
1132            .set(if backpressure { 1 } else { 0 });
1133    }
1134
1135    fn flush_transactions_from_dirty_to_cached(
1136        &self,
1137        epoch: EpochId,
1138        tx_digest: TransactionDigest,
1139        outputs: &TransactionOutputs,
1140    ) {
1141        // Now, remove each piece of committed data from the dirty state and insert it into the cache.
1142        // TODO: outputs should have a strong count of 1 so we should be able to move out of it
1143        let TransactionOutputs {
1144            transaction,
1145            effects,
1146            markers,
1147            written,
1148            deleted,
1149            wrapped,
1150            events,
1151            ..
1152        } = outputs;
1153
1154        let effects_digest = effects.digest();
1155
1156        // Update cache before removing from self.dirty to avoid
1157        // unnecessary cache misses
1158        self.cached
1159            .transactions
1160            .insert(
1161                &tx_digest,
1162                PointCacheItem::Some(transaction.clone()),
1163                Ticket::Write,
1164            )
1165            .ok();
1166        self.cached
1167            .transaction_effects
1168            .insert(
1169                &effects_digest,
1170                PointCacheItem::Some(effects.clone().into()),
1171                Ticket::Write,
1172            )
1173            .ok();
1174        self.cached
1175            .executed_effects_digests
1176            .insert(
1177                &tx_digest,
1178                PointCacheItem::Some(effects_digest),
1179                Ticket::Write,
1180            )
1181            .ok();
1182        self.cached
1183            .transaction_events
1184            .insert(
1185                &tx_digest,
1186                PointCacheItem::Some(events.clone().into()),
1187                Ticket::Write,
1188            )
1189            .ok();
1190
1191        self.dirty
1192            .transaction_effects
1193            .remove(&effects_digest)
1194            .expect("effects must exist");
1195
1196        self.dirty
1197            .transaction_events
1198            .remove(&tx_digest)
1199            .expect("events must exist");
1200
1201        self.dirty
1202            .unchanged_loaded_runtime_objects
1203            .remove(&tx_digest)
1204            .expect("unchanged_loaded_runtime_objects must exist");
1205
1206        self.dirty
1207            .executed_effects_digests
1208            .remove(&tx_digest)
1209            .expect("executed effects must exist");
1210
1211        // Move dirty markers to cache
1212        for (object_key, marker_value) in markers.iter() {
1213            Self::move_version_from_dirty_to_cache(
1214                &self.dirty.markers,
1215                &self.cached.marker_cache,
1216                (epoch, object_key.id()),
1217                object_key.version(),
1218                marker_value,
1219            );
1220        }
1221
1222        for (object_id, object) in written.iter() {
1223            Self::move_version_from_dirty_to_cache(
1224                &self.dirty.objects,
1225                &self.cached.object_cache,
1226                *object_id,
1227                object.version(),
1228                &ObjectEntry::Object(object.clone()),
1229            );
1230        }
1231
1232        for ObjectKey(object_id, version) in deleted.iter() {
1233            Self::move_version_from_dirty_to_cache(
1234                &self.dirty.objects,
1235                &self.cached.object_cache,
1236                *object_id,
1237                *version,
1238                &ObjectEntry::Deleted,
1239            );
1240        }
1241
1242        for ObjectKey(object_id, version) in wrapped.iter() {
1243            Self::move_version_from_dirty_to_cache(
1244                &self.dirty.objects,
1245                &self.cached.object_cache,
1246                *object_id,
1247                *version,
1248                &ObjectEntry::Wrapped,
1249            );
1250        }
1251    }
1252
1253    // Move the oldest/least entry from the dirty queue to the cache queue.
1254    // This is called after the entry is committed to the db.
1255    fn move_version_from_dirty_to_cache<K, V>(
1256        dirty: &DashMap<K, CachedVersionMap<V>>,
1257        cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1258        key: K,
1259        version: SequenceNumber,
1260        value: &V,
1261    ) where
1262        K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1263        V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1264    {
1265        static MAX_VERSIONS: usize = 3;
1266
1267        // IMPORTANT: lock both the dirty set entry and the cache entry before modifying either.
1268        // this ensures that readers cannot see a value temporarily disappear.
1269        let dirty_entry = dirty.entry(key);
1270        let cache_entry = cache.entry(key).or_default();
1271        let mut cache_map = cache_entry.value().lock();
1272
1273        // insert into cache and drop old versions.
1274        cache_map.insert(version, value.clone());
1275        // TODO: make this automatic by giving CachedVersionMap an optional max capacity
1276        cache_map.truncate_to(MAX_VERSIONS);
1277
1278        let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1279            panic!("dirty map must exist");
1280        };
1281
1282        let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1283
1284        assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1285
1286        // if there are no versions remaining, remove the map entry
1287        if occupied_dirty_entry.get().is_empty() {
1288            occupied_dirty_entry.remove();
1289        }
1290    }
1291
1292    // Updates the latest object id cache with an entry that was read from the db.
1293    fn cache_latest_object_by_id(
1294        &self,
1295        object_id: &ObjectID,
1296        object: LatestObjectCacheEntry,
1297        ticket: Ticket,
1298    ) {
1299        trace!("caching object by id: {:?} {:?}", object_id, object);
1300        if self
1301            .object_by_id_cache
1302            .insert(object_id, object, ticket)
1303            .is_ok()
1304        {
1305            self.metrics.record_cache_write("object_by_id");
1306        } else {
1307            trace!("discarded cache write due to expired ticket");
1308            self.metrics.record_ticket_expiry();
1309        }
1310    }
1311
1312    fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1313        self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1314    }
1315
1316    fn clear_state_end_of_epoch_impl(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
1317        info!("clearing state at end of epoch");
1318
1319        // Note: there cannot be any concurrent writes to self.dirty while we are in this function,
1320        // as all transaction execution is paused.
1321        for r in self.dirty.pending_transaction_writes.iter() {
1322            let outputs = r.value();
1323            if !outputs
1324                .transaction
1325                .transaction_data()
1326                .shared_input_objects()
1327                .is_empty()
1328            {
1329                debug_fatal!("transaction must be single writer");
1330            }
1331            info!(
1332                "clearing state for transaction {:?}",
1333                outputs.transaction.digest()
1334            );
1335            for (object_id, object) in outputs.written.iter() {
1336                if object.is_package() {
1337                    info!("removing non-finalized package from cache: {:?}", object_id);
1338                    self.packages.invalidate(object_id);
1339                }
1340                self.object_by_id_cache.invalidate(object_id);
1341                self.cached.object_cache.invalidate(object_id);
1342            }
1343
1344            for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1345                self.object_by_id_cache.invalidate(object_id);
1346                self.cached.object_cache.invalidate(object_id);
1347            }
1348        }
1349
1350        self.dirty.clear();
1351
1352        info!("clearing old transaction locks");
1353        self.object_locks.clear();
1354        info!("clearing object per epoch marker table");
1355        self.store
1356            .clear_object_per_epoch_marker_table(execution_guard)
1357            .expect("db error");
1358    }
1359
1360    fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) {
1361        self.store
1362            .bulk_insert_genesis_objects(objects)
1363            .expect("db error");
1364        for obj in objects {
1365            self.cached.object_cache.invalidate(&obj.id());
1366            self.object_by_id_cache.invalidate(&obj.id());
1367        }
1368    }
1369
1370    fn insert_genesis_object_impl(&self, object: Object) {
1371        self.object_by_id_cache.invalidate(&object.id());
1372        self.cached.object_cache.invalidate(&object.id());
1373        self.store.insert_genesis_object(object).expect("db error");
1374    }
1375
1376    pub fn clear_caches_and_assert_empty(&self) {
1377        info!("clearing caches");
1378        self.cached.clear_and_assert_empty();
1379        self.object_by_id_cache.invalidate_all();
1380        assert!(&self.object_by_id_cache.is_empty());
1381        self.packages.invalidate_all();
1382        assert_empty(&self.packages);
1383    }
1384}
1385
1386impl ExecutionCacheAPI for WritebackCache {}
1387
1388impl ExecutionCacheCommit for WritebackCache {
1389    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1390        self.build_db_batch(epoch, digests)
1391    }
1392
1393    fn commit_transaction_outputs(
1394        &self,
1395        epoch: EpochId,
1396        batch: Batch,
1397        digests: &[TransactionDigest],
1398    ) {
1399        WritebackCache::commit_transaction_outputs(self, epoch, batch, digests)
1400    }
1401
1402    fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) {
1403        self.store.persist_transaction(tx).expect("db error");
1404    }
1405
1406    fn approximate_pending_transaction_count(&self) -> u64 {
1407        WritebackCache::approximate_pending_transaction_count(self)
1408    }
1409}
1410
1411impl ObjectCacheRead for WritebackCache {
1412    fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1413        self.metrics
1414            .record_cache_request("package", "package_cache");
1415        if let Some(p) = self.packages.get(package_id) {
1416            if cfg!(debug_assertions) {
1417                let canonical_package = self
1418                    .dirty
1419                    .objects
1420                    .get(package_id)
1421                    .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1422                        Some(ObjectEntry::Object(object)) => Some(object),
1423                        _ => None,
1424                    })
1425                    .or_else(|| self.store.get_object(package_id));
1426
1427                if let Some(canonical_package) = canonical_package {
1428                    assert_eq!(
1429                        canonical_package.digest(),
1430                        p.object().digest(),
1431                        "Package object cache is inconsistent for package {:?}",
1432                        package_id
1433                    );
1434                }
1435            }
1436            self.metrics.record_cache_hit("package", "package_cache");
1437            return Ok(Some(p));
1438        } else {
1439            self.metrics.record_cache_miss("package", "package_cache");
1440        }
1441
1442        // We try the dirty objects cache as well before going to the database. This is necessary
1443        // because the package could be evicted from the package cache before it is committed
1444        // to the database.
1445        if let Some(p) = self.get_object_impl("package", package_id) {
1446            if p.is_package() {
1447                let p = PackageObject::new(p);
1448                tracing::trace!(
1449                    "caching package: {:?}",
1450                    p.object().compute_object_reference()
1451                );
1452                self.metrics.record_cache_write("package");
1453                self.packages.insert(*package_id, p.clone());
1454                Ok(Some(p))
1455            } else {
1456                Err(SuiErrorKind::UserInputError {
1457                    error: UserInputError::MoveObjectAsPackage {
1458                        object_id: *package_id,
1459                    },
1460                }
1461                .into())
1462            }
1463        } else {
1464            Ok(None)
1465        }
1466    }
1467
1468    fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1469        // This is a no-op because all writes go through the cache, therefore it can never
1470        // be incoherent
1471    }
1472
1473    // get_object and variants.
1474
1475    fn get_object(&self, id: &ObjectID) -> Option<Object> {
1476        self.get_object_impl("object_latest", id)
1477    }
1478
1479    fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object> {
1480        match self.get_object_by_key_cache_only(object_id, version) {
1481            CacheResult::Hit(object) => Some(object),
1482            CacheResult::NegativeHit => None,
1483            CacheResult::Miss => self
1484                .record_db_get("object_by_version")
1485                .get_object_by_key(object_id, version),
1486        }
1487    }
1488
1489    fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
1490        do_fallback_lookup(
1491            object_keys,
1492            |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1493                CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1494                CacheResult::NegativeHit => CacheResult::NegativeHit,
1495                CacheResult::Miss => CacheResult::Miss,
1496            },
1497            |remaining| {
1498                self.record_db_multi_get("object_by_version", remaining.len())
1499                    .multi_get_objects_by_key(remaining)
1500                    .expect("db error")
1501            },
1502        )
1503    }
1504
1505    fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool {
1506        match self.get_object_by_key_cache_only(object_id, version) {
1507            CacheResult::Hit(_) => true,
1508            CacheResult::NegativeHit => false,
1509            CacheResult::Miss => self
1510                .record_db_get("object_by_version")
1511                .object_exists_by_key(object_id, version)
1512                .expect("db error"),
1513        }
1514    }
1515
1516    fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool> {
1517        do_fallback_lookup(
1518            object_keys,
1519            |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1520                CacheResult::Hit(_) => CacheResult::Hit(true),
1521                CacheResult::NegativeHit => CacheResult::Hit(false),
1522                CacheResult::Miss => CacheResult::Miss,
1523            },
1524            |remaining| {
1525                self.record_db_multi_get("object_by_version", remaining.len())
1526                    .multi_object_exists_by_key(remaining)
1527                    .expect("db error")
1528            },
1529        )
1530    }
1531
1532    fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
1533        match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1534            CacheResult::Hit((version, entry)) => Some(match entry {
1535                ObjectEntry::Object(object) => object.compute_object_reference(),
1536                ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1537                ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1538            }),
1539            CacheResult::NegativeHit => None,
1540            CacheResult::Miss => self
1541                .record_db_get("latest_objref_or_tombstone")
1542                .get_latest_object_ref_or_tombstone(object_id)
1543                .expect("db error"),
1544        }
1545    }
1546
1547    fn get_latest_object_or_tombstone(
1548        &self,
1549        object_id: ObjectID,
1550    ) -> Option<(ObjectKey, ObjectOrTombstone)> {
1551        match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1552            CacheResult::Hit((version, entry)) => {
1553                let key = ObjectKey(object_id, version);
1554                Some(match entry {
1555                    ObjectEntry::Object(object) => (key, object.into()),
1556                    ObjectEntry::Deleted => (
1557                        key,
1558                        ObjectOrTombstone::Tombstone((
1559                            object_id,
1560                            version,
1561                            ObjectDigest::OBJECT_DIGEST_DELETED,
1562                        )),
1563                    ),
1564                    ObjectEntry::Wrapped => (
1565                        key,
1566                        ObjectOrTombstone::Tombstone((
1567                            object_id,
1568                            version,
1569                            ObjectDigest::OBJECT_DIGEST_WRAPPED,
1570                        )),
1571                    ),
1572                })
1573            }
1574            CacheResult::NegativeHit => None,
1575            CacheResult::Miss => self
1576                .record_db_get("latest_object_or_tombstone")
1577                .get_latest_object_or_tombstone(object_id)
1578                .expect("db error"),
1579        }
1580    }
1581
1582    fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool> {
1583        keys.iter()
1584            .map(|key| {
1585                if key.is_cancelled() {
1586                    true
1587                } else {
1588                    match key {
1589                        InputKey::VersionedObject { id, version } => {
1590                            matches!(
1591                                self.get_object_by_key_cache_only(&id.id(), *version),
1592                                CacheResult::Hit(_)
1593                            )
1594                        }
1595                        InputKey::Package { id } => self.packages.contains_key(id),
1596                    }
1597                }
1598            })
1599            .collect()
1600    }
1601
1602    #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1603    fn find_object_lt_or_eq_version(
1604        &self,
1605        object_id: ObjectID,
1606        version_bound: SequenceNumber,
1607    ) -> Option<Object> {
1608        macro_rules! check_cache_entry {
1609            ($level: expr, $objects: expr) => {
1610                self.metrics
1611                    .record_cache_request("object_lt_or_eq_version", $level);
1612                if let Some(objects) = $objects {
1613                    if let Some((_, object)) = objects
1614                        .all_versions_lt_or_eq_descending(&version_bound)
1615                        .next()
1616                    {
1617                        if let ObjectEntry::Object(object) = object {
1618                            self.metrics
1619                                .record_cache_hit("object_lt_or_eq_version", $level);
1620                            return Some(object.clone());
1621                        } else {
1622                            // if we find a tombstone, the object does not exist
1623                            self.metrics
1624                                .record_cache_negative_hit("object_lt_or_eq_version", $level);
1625                            return None;
1626                        }
1627                    } else {
1628                        self.metrics
1629                            .record_cache_miss("object_lt_or_eq_version", $level);
1630                    }
1631                }
1632            };
1633        }
1634
1635        // if we have the latest version cached, and it is within the bound, we are done
1636        self.metrics
1637            .record_cache_request("object_lt_or_eq_version", "object_by_id");
1638        let latest_cache_entry = self.object_by_id_cache.get(&object_id);
1639        if let Some(latest) = &latest_cache_entry {
1640            let latest = latest.lock();
1641            match &*latest {
1642                LatestObjectCacheEntry::Object(latest_version, object) => {
1643                    if *latest_version <= version_bound {
1644                        if let ObjectEntry::Object(object) = object {
1645                            self.metrics
1646                                .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1647                            return Some(object.clone());
1648                        } else {
1649                            // object is a tombstone, but is still within the version bound
1650                            self.metrics.record_cache_negative_hit(
1651                                "object_lt_or_eq_version",
1652                                "object_by_id",
1653                            );
1654                            return None;
1655                        }
1656                    }
1657                    // latest object is not within the version bound. fall through.
1658                }
1659                // No object by this ID exists at all
1660                LatestObjectCacheEntry::NonExistent => {
1661                    self.metrics
1662                        .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1663                    return None;
1664                }
1665            }
1666        }
1667        self.metrics
1668            .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1669
1670        Self::with_locked_cache_entries(
1671            &self.dirty.objects,
1672            &self.cached.object_cache,
1673            &object_id,
1674            |dirty_entry, cached_entry| {
1675                check_cache_entry!("committed", dirty_entry);
1676                check_cache_entry!("uncommitted", cached_entry);
1677
1678                // Much of the time, the query will be for the very latest object version, so
1679                // try that first. But we have to be careful:
1680                // 1. We must load the tombstone if it is present, because its version may exceed
1681                //    the version_bound, in which case we must do a scan.
1682                // 2. You might think we could just call `self.store.get_latest_object_or_tombstone` here.
1683                //    But we cannot, because there may be a more recent version in the dirty set, which
1684                //    we skipped over in check_cache_entry! because of the version bound. However, if we
1685                //    skipped it above, we will skip it here as well, again due to the version bound.
1686                // 3. Despite that, we really want to warm the cache here. Why? Because if the object is
1687                //    cold (not being written to), then we will very soon be able to start serving reads
1688                //    of it from the object_by_id cache, IF we can warm the cache. If we don't warm the
1689                //    the cache here, and no writes to the object occur, then we will always have to go
1690                //    to the db for the object.
1691                //
1692                // Lastly, it is important to understand the rationale for all this: If the object is
1693                // write-hot, we will serve almost all reads to it from the dirty set (or possibly the
1694                // cached set if it is only written to once every few checkpoints). If the object is
1695                // write-cold (or non-existent) and read-hot, then we will serve almost all reads to it
1696                // from the object_by_id cache check above.  Most of the apparently wasteful code here
1697                // exists only to ensure correctness in all the edge cases.
1698                let latest: Option<(SequenceNumber, ObjectEntry)> =
1699                    if let Some(dirty_set) = dirty_entry {
1700                        dirty_set
1701                            .get_highest()
1702                            .cloned()
1703                            .tap_none(|| panic!("dirty set cannot be empty"))
1704                    } else {
1705                        // TODO: we should try not to read from the db while holding the locks.
1706                        self.record_db_get("object_lt_or_eq_version_latest")
1707                            .get_latest_object_or_tombstone(object_id)
1708                            .expect("db error")
1709                            .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1710                                (version, ObjectEntry::from(obj_or_tombstone))
1711                            })
1712                    };
1713
1714                if let Some((obj_version, obj_entry)) = latest {
1715                    // we can always cache the latest object (or tombstone), even if it is not within the
1716                    // version_bound. This is done in order to warm the cache in the case where a sequence
1717                    // of transactions all read the same child object without writing to it.
1718
1719                    // Note: no need to call with_object_by_id_cache_update here, because we are holding
1720                    // the lock on the dirty cache entry, and `latest` cannot become out-of-date
1721                    // while we hold that lock.
1722                    self.cache_latest_object_by_id(
1723                        &object_id,
1724                        LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1725                        // We can get a ticket at the last second, because we are holding the lock
1726                        // on dirty, so there cannot be any concurrent writes.
1727                        self.object_by_id_cache.get_ticket_for_read(&object_id),
1728                    );
1729
1730                    if obj_version <= version_bound {
1731                        match obj_entry {
1732                            ObjectEntry::Object(object) => Some(object),
1733                            ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
1734                        }
1735                    } else {
1736                        // The latest object exceeded the bound, so now we have to do a scan
1737                        // But we already know there is no dirty entry within the bound,
1738                        // so we go to the db.
1739                        self.record_db_get("object_lt_or_eq_version_scan")
1740                            .find_object_lt_or_eq_version(object_id, version_bound)
1741                            .expect("db error")
1742                    }
1743
1744                // no object found in dirty set or db, object does not exist
1745                // When this is called from a read api (i.e. not the execution path) it is
1746                // possible that the object has been deleted and pruned. In this case,
1747                // there would be no entry at all on disk, but we may have a tombstone in the
1748                // cache
1749                } else if let Some(latest_cache_entry) = latest_cache_entry {
1750                    // If there is a latest cache entry, it had better not be a live object!
1751                    assert!(!latest_cache_entry.lock().is_alive());
1752                    None
1753                } else {
1754                    // If there is no latest cache entry, we can insert one.
1755                    let highest = cached_entry.and_then(|c| c.get_highest());
1756                    assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1757                    self.cache_object_not_found(
1758                        &object_id,
1759                        // okay to get ticket at last second - see above
1760                        self.object_by_id_cache.get_ticket_for_read(&object_id),
1761                    );
1762                    None
1763                }
1764            },
1765        )
1766    }
1767
1768    fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1769        get_sui_system_state(self)
1770    }
1771
1772    fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge> {
1773        get_bridge(self)
1774    }
1775
1776    fn get_marker_value(
1777        &self,
1778        object_key: FullObjectKey,
1779        epoch_id: EpochId,
1780    ) -> Option<MarkerValue> {
1781        match self.get_marker_value_cache_only(object_key, epoch_id) {
1782            CacheResult::Hit(marker) => Some(marker),
1783            CacheResult::NegativeHit => None,
1784            CacheResult::Miss => self
1785                .record_db_get("marker_by_version")
1786                .get_marker_value(object_key, epoch_id)
1787                .expect("db error"),
1788        }
1789    }
1790
1791    fn get_latest_marker(
1792        &self,
1793        object_id: FullObjectID,
1794        epoch_id: EpochId,
1795    ) -> Option<(SequenceNumber, MarkerValue)> {
1796        match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1797            CacheResult::Hit((v, marker)) => Some((v, marker)),
1798            CacheResult::NegativeHit => {
1799                panic!("cannot have negative hit when getting latest marker")
1800            }
1801            CacheResult::Miss => self
1802                .record_db_get("marker_latest")
1803                .get_latest_marker(object_id, epoch_id)
1804                .expect("db error"),
1805        }
1806    }
1807
1808    fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult {
1809        let cur_epoch = epoch_store.epoch();
1810        match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1811            CacheResult::Hit((_, obj)) => {
1812                let actual_objref = obj.compute_object_reference();
1813                if obj_ref != actual_objref {
1814                    Ok(ObjectLockStatus::LockedAtDifferentVersion {
1815                        locked_ref: actual_objref,
1816                    })
1817                } else {
1818                    // requested object ref is live, check if there is a lock
1819                    Ok(
1820                        match self
1821                            .object_locks
1822                            .get_transaction_lock(&obj_ref, epoch_store)?
1823                        {
1824                            Some(tx_digest) => ObjectLockStatus::LockedToTx {
1825                                locked_by_tx: LockDetailsDeprecated {
1826                                    epoch: cur_epoch,
1827                                    tx_digest,
1828                                },
1829                            },
1830                            None => ObjectLockStatus::Initialized,
1831                        },
1832                    )
1833                }
1834            }
1835            CacheResult::NegativeHit => {
1836                Err(SuiError::from(UserInputError::ObjectNotFound {
1837                    object_id: obj_ref.0,
1838                    // even though we know the requested version, we leave it as None to indicate
1839                    // that the object does not exist at any version
1840                    version: None,
1841                }))
1842            }
1843            CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1844        }
1845    }
1846
1847    fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef> {
1848        let obj = self.get_object_impl("live_objref", &object_id).ok_or(
1849            UserInputError::ObjectNotFound {
1850                object_id,
1851                version: None,
1852            },
1853        )?;
1854        Ok(obj.compute_object_reference())
1855    }
1856
1857    fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1858        do_fallback_lookup_fallible(
1859            owned_object_refs,
1860            |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1861                CacheResult::Hit((version, obj)) => {
1862                    if obj.compute_object_reference() != *obj_ref {
1863                        Err(UserInputError::ObjectVersionUnavailableForConsumption {
1864                            provided_obj_ref: *obj_ref,
1865                            current_version: version,
1866                        }
1867                        .into())
1868                    } else {
1869                        Ok(CacheResult::Hit(()))
1870                    }
1871                }
1872                CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1873                    object_id: obj_ref.0,
1874                    version: None,
1875                }
1876                .into()),
1877                CacheResult::Miss => Ok(CacheResult::Miss),
1878            },
1879            |remaining| {
1880                self.record_db_multi_get("object_is_live", remaining.len())
1881                    .check_owned_objects_are_live(remaining)?;
1882                Ok(vec![(); remaining.len()])
1883            },
1884        )?;
1885        Ok(())
1886    }
1887
1888    fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber> {
1889        self.store
1890            .perpetual_tables
1891            .get_highest_pruned_checkpoint()
1892            .expect("db error")
1893    }
1894
1895    fn notify_read_input_objects<'a>(
1896        &'a self,
1897        input_and_receiving_keys: &'a [InputKey],
1898        receiving_keys: &'a HashSet<InputKey>,
1899        epoch: EpochId,
1900    ) -> BoxFuture<'a, ()> {
1901        self.object_notify_read
1902            .read(
1903                "notify_read_input_objects",
1904                input_and_receiving_keys,
1905                move |keys| {
1906                    self.multi_input_objects_available(keys, receiving_keys, epoch)
1907                        .into_iter()
1908                        .map(|available| if available { Some(()) } else { None })
1909                        .collect::<Vec<_>>()
1910                },
1911            )
1912            .map(|_| ())
1913            .boxed()
1914    }
1915}
1916
1917impl TransactionCacheRead for WritebackCache {
1918    fn multi_get_transaction_blocks(
1919        &self,
1920        digests: &[TransactionDigest],
1921    ) -> Vec<Option<Arc<VerifiedTransaction>>> {
1922        let digests_and_tickets: Vec<_> = digests
1923            .iter()
1924            .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1925            .collect();
1926        do_fallback_lookup(
1927            &digests_and_tickets,
1928            |(digest, _)| {
1929                self.metrics
1930                    .record_cache_request("transaction_block", "uncommitted");
1931                if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1932                    self.metrics
1933                        .record_cache_hit("transaction_block", "uncommitted");
1934                    return CacheResult::Hit(Some(tx.transaction.clone()));
1935                }
1936                self.metrics
1937                    .record_cache_miss("transaction_block", "uncommitted");
1938
1939                self.metrics
1940                    .record_cache_request("transaction_block", "committed");
1941
1942                match self
1943                    .cached
1944                    .transactions
1945                    .get(digest)
1946                    .map(|l| l.lock().clone())
1947                {
1948                    Some(PointCacheItem::Some(tx)) => {
1949                        self.metrics
1950                            .record_cache_hit("transaction_block", "committed");
1951                        CacheResult::Hit(Some(tx))
1952                    }
1953                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
1954                    None => {
1955                        self.metrics
1956                            .record_cache_miss("transaction_block", "committed");
1957
1958                        CacheResult::Miss
1959                    }
1960                }
1961            },
1962            |remaining| {
1963                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1964                let results: Vec<_> = self
1965                    .record_db_multi_get("transaction_block", remaining.len())
1966                    .multi_get_transaction_blocks(&remaining_digests)
1967                    .expect("db error")
1968                    .into_iter()
1969                    .map(|o| o.map(Arc::new))
1970                    .collect();
1971                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1972                    if result.is_none() {
1973                        self.cached.transactions.insert(digest, None, *ticket).ok();
1974                    }
1975                }
1976                results
1977            },
1978        )
1979    }
1980
1981    fn multi_get_executed_effects_digests(
1982        &self,
1983        digests: &[TransactionDigest],
1984    ) -> Vec<Option<TransactionEffectsDigest>> {
1985        let digests_and_tickets: Vec<_> = digests
1986            .iter()
1987            .map(|d| {
1988                (
1989                    *d,
1990                    self.cached.executed_effects_digests.get_ticket_for_read(d),
1991                )
1992            })
1993            .collect();
1994        do_fallback_lookup(
1995            &digests_and_tickets,
1996            |(digest, _)| {
1997                self.metrics
1998                    .record_cache_request("executed_effects_digests", "uncommitted");
1999                if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
2000                    self.metrics
2001                        .record_cache_hit("executed_effects_digests", "uncommitted");
2002                    return CacheResult::Hit(Some(*digest));
2003                }
2004                self.metrics
2005                    .record_cache_miss("executed_effects_digests", "uncommitted");
2006
2007                self.metrics
2008                    .record_cache_request("executed_effects_digests", "committed");
2009                match self
2010                    .cached
2011                    .executed_effects_digests
2012                    .get(digest)
2013                    .map(|l| *l.lock())
2014                {
2015                    Some(PointCacheItem::Some(digest)) => {
2016                        self.metrics
2017                            .record_cache_hit("executed_effects_digests", "committed");
2018                        CacheResult::Hit(Some(digest))
2019                    }
2020                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2021                    None => {
2022                        self.metrics
2023                            .record_cache_miss("executed_effects_digests", "committed");
2024                        CacheResult::Miss
2025                    }
2026                }
2027            },
2028            |remaining| {
2029                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2030                let results = self
2031                    .record_db_multi_get("executed_effects_digests", remaining.len())
2032                    .multi_get_executed_effects_digests(&remaining_digests)
2033                    .expect("db error");
2034                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2035                    if result.is_none() {
2036                        self.cached
2037                            .executed_effects_digests
2038                            .insert(digest, None, *ticket)
2039                            .ok();
2040                    }
2041                }
2042                results
2043            },
2044        )
2045    }
2046
2047    fn multi_get_effects(
2048        &self,
2049        digests: &[TransactionEffectsDigest],
2050    ) -> Vec<Option<TransactionEffects>> {
2051        let digests_and_tickets: Vec<_> = digests
2052            .iter()
2053            .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
2054            .collect();
2055        do_fallback_lookup(
2056            &digests_and_tickets,
2057            |(digest, _)| {
2058                self.metrics
2059                    .record_cache_request("transaction_effects", "uncommitted");
2060                if let Some(effects) = self.dirty.transaction_effects.get(digest) {
2061                    self.metrics
2062                        .record_cache_hit("transaction_effects", "uncommitted");
2063                    return CacheResult::Hit(Some(effects.clone()));
2064                }
2065                self.metrics
2066                    .record_cache_miss("transaction_effects", "uncommitted");
2067
2068                self.metrics
2069                    .record_cache_request("transaction_effects", "committed");
2070                match self
2071                    .cached
2072                    .transaction_effects
2073                    .get(digest)
2074                    .map(|l| l.lock().clone())
2075                {
2076                    Some(PointCacheItem::Some(effects)) => {
2077                        self.metrics
2078                            .record_cache_hit("transaction_effects", "committed");
2079                        CacheResult::Hit(Some((*effects).clone()))
2080                    }
2081                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2082                    None => {
2083                        self.metrics
2084                            .record_cache_miss("transaction_effects", "committed");
2085                        CacheResult::Miss
2086                    }
2087                }
2088            },
2089            |remaining| {
2090                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2091                let results = self
2092                    .record_db_multi_get("transaction_effects", remaining.len())
2093                    .multi_get_effects(remaining_digests.iter())
2094                    .expect("db error");
2095                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2096                    if result.is_none() {
2097                        self.cached
2098                            .transaction_effects
2099                            .insert(digest, None, *ticket)
2100                            .ok();
2101                    }
2102                }
2103                results
2104            },
2105        )
2106    }
2107
2108    fn transaction_executed_in_last_epoch(
2109        &self,
2110        digest: &TransactionDigest,
2111        current_epoch: EpochId,
2112    ) -> bool {
2113        if current_epoch == 0 {
2114            return false;
2115        }
2116        let last_epoch = current_epoch - 1;
2117        let cache_key = (last_epoch, *digest);
2118
2119        let ticket = self
2120            .cached
2121            .transaction_executed_in_last_epoch
2122            .get_ticket_for_read(&cache_key);
2123
2124        if let Some(cached) = self
2125            .cached
2126            .transaction_executed_in_last_epoch
2127            .get(&cache_key)
2128        {
2129            return cached.lock().is_some();
2130        }
2131
2132        let was_executed = self
2133            .store
2134            .perpetual_tables
2135            .was_transaction_executed_in_last_epoch(digest, current_epoch);
2136
2137        let value = if was_executed { Some(()) } else { None };
2138        self.cached
2139            .transaction_executed_in_last_epoch
2140            .insert(&cache_key, value, ticket)
2141            .ok();
2142
2143        was_executed
2144    }
2145
2146    fn notify_read_executed_effects_digests<'a>(
2147        &'a self,
2148        task_name: &'static str,
2149        digests: &'a [TransactionDigest],
2150    ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>> {
2151        self.executed_effects_digests_notify_read
2152            .read(task_name, digests, |digests| {
2153                self.multi_get_executed_effects_digests(digests)
2154            })
2155            .boxed()
2156    }
2157
2158    fn multi_get_events(
2159        &self,
2160        event_digests: &[TransactionDigest],
2161    ) -> Vec<Option<TransactionEvents>> {
2162        fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
2163            if events.data.is_empty() {
2164                None
2165            } else {
2166                Some(events)
2167            }
2168        }
2169
2170        let digests_and_tickets: Vec<_> = event_digests
2171            .iter()
2172            .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2173            .collect();
2174        do_fallback_lookup(
2175            &digests_and_tickets,
2176            |(digest, _)| {
2177                self.metrics
2178                    .record_cache_request("transaction_events", "uncommitted");
2179                if let Some(events) = self.dirty.transaction_events.get(digest).map(|e| e.clone()) {
2180                    self.metrics
2181                        .record_cache_hit("transaction_events", "uncommitted");
2182
2183                    return CacheResult::Hit(map_events(events));
2184                }
2185                self.metrics
2186                    .record_cache_miss("transaction_events", "uncommitted");
2187
2188                self.metrics
2189                    .record_cache_request("transaction_events", "committed");
2190                match self
2191                    .cached
2192                    .transaction_events
2193                    .get(digest)
2194                    .map(|l| l.lock().clone())
2195                {
2196                    Some(PointCacheItem::Some(events)) => {
2197                        self.metrics
2198                            .record_cache_hit("transaction_events", "committed");
2199                        CacheResult::Hit(map_events((*events).clone()))
2200                    }
2201                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2202                    None => {
2203                        self.metrics
2204                            .record_cache_miss("transaction_events", "committed");
2205
2206                        CacheResult::Miss
2207                    }
2208                }
2209            },
2210            |remaining| {
2211                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2212                let results = self
2213                    .store
2214                    .multi_get_events(&remaining_digests)
2215                    .expect("db error");
2216                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2217                    if result.is_none() {
2218                        self.cached
2219                            .transaction_events
2220                            .insert(digest, None, *ticket)
2221                            .ok();
2222                    }
2223                }
2224                results
2225            },
2226        )
2227    }
2228
2229    fn get_unchanged_loaded_runtime_objects(
2230        &self,
2231        digest: &TransactionDigest,
2232    ) -> Option<Vec<ObjectKey>> {
2233        self.dirty
2234            .unchanged_loaded_runtime_objects
2235            .get(digest)
2236            .map(|b| b.clone())
2237            .or_else(|| {
2238                self.store
2239                    .get_unchanged_loaded_runtime_objects(digest)
2240                    .expect("db error")
2241            })
2242    }
2243
2244    fn get_mysticeti_fastpath_outputs(
2245        &self,
2246        tx_digest: &TransactionDigest,
2247    ) -> Option<Arc<TransactionOutputs>> {
2248        self.dirty.fastpath_transaction_outputs.get(tx_digest)
2249    }
2250
2251    fn notify_read_fastpath_transaction_outputs<'a>(
2252        &'a self,
2253        tx_digests: &'a [TransactionDigest],
2254    ) -> BoxFuture<'a, Vec<Arc<TransactionOutputs>>> {
2255        self.fastpath_transaction_outputs_notify_read
2256            .read(
2257                "notify_read_fastpath_transaction_outputs",
2258                tx_digests,
2259                |tx_digests| {
2260                    tx_digests
2261                        .iter()
2262                        .map(|tx_digest| self.get_mysticeti_fastpath_outputs(tx_digest))
2263                        .collect()
2264                },
2265            )
2266            .boxed()
2267    }
2268
2269    fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
2270        self.dirty
2271            .pending_transaction_writes
2272            .get(digest)
2273            .map(|transaction_output| transaction_output.take_accumulator_events())
2274    }
2275}
2276
2277impl ExecutionCacheWrite for WritebackCache {
2278    fn acquire_transaction_locks(
2279        &self,
2280        epoch_store: &AuthorityPerEpochStore,
2281        owned_input_objects: &[ObjectRef],
2282        tx_digest: TransactionDigest,
2283        signed_transaction: Option<VerifiedSignedTransaction>,
2284    ) -> SuiResult {
2285        self.object_locks.acquire_transaction_locks(
2286            self,
2287            epoch_store,
2288            owned_input_objects,
2289            tx_digest,
2290            signed_transaction,
2291        )
2292    }
2293
2294    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
2295        WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs);
2296    }
2297
2298    fn write_fastpath_transaction_outputs(&self, tx_outputs: Arc<TransactionOutputs>) {
2299        let tx_digest = *tx_outputs.transaction.digest();
2300        debug!(
2301            ?tx_digest,
2302            "writing mysticeti fastpath certified transaction outputs"
2303        );
2304        self.dirty
2305            .fastpath_transaction_outputs
2306            .insert(tx_digest, tx_outputs.clone());
2307        self.fastpath_transaction_outputs_notify_read
2308            .notify(&tx_digest, &tx_outputs);
2309    }
2310
2311    #[cfg(test)]
2312    fn write_object_entry_for_test(&self, object: Object) {
2313        self.write_object_entry(&object.id(), object.version(), object.into());
2314    }
2315}
2316
2317implement_passthrough_traits!(WritebackCache);
2318
2319impl GlobalStateHashStore for WritebackCache {
2320    fn get_object_ref_prior_to_key_deprecated(
2321        &self,
2322        object_id: &ObjectID,
2323        version: SequenceNumber,
2324    ) -> SuiResult<Option<ObjectRef>> {
2325        // There is probably a more efficient way to implement this, but since this is only used by
2326        // old protocol versions, it is better to do the simple thing that is obviously correct.
2327        // In this case we previous version from all sources and choose the highest
2328        let mut candidates = Vec::new();
2329
2330        let check_versions =
2331            |versions: &CachedVersionMap<ObjectEntry>| match versions.get_prior_to(&version) {
2332                Some((version, object_entry)) => match object_entry {
2333                    ObjectEntry::Object(object) => {
2334                        assert_eq!(object.version(), version);
2335                        Some(object.compute_object_reference())
2336                    }
2337                    ObjectEntry::Deleted => {
2338                        Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED))
2339                    }
2340                    ObjectEntry::Wrapped => {
2341                        Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED))
2342                    }
2343                },
2344                None => None,
2345            };
2346
2347        // first check dirty data
2348        if let Some(objects) = self.dirty.objects.get(object_id)
2349            && let Some(prior) = check_versions(&objects)
2350        {
2351            candidates.push(prior);
2352        }
2353
2354        if let Some(objects) = self.cached.object_cache.get(object_id)
2355            && let Some(prior) = check_versions(&objects.lock())
2356        {
2357            candidates.push(prior);
2358        }
2359
2360        if let Some(prior) = self
2361            .store
2362            .get_object_ref_prior_to_key_deprecated(object_id, version)?
2363        {
2364            candidates.push(prior);
2365        }
2366
2367        // sort candidates by version, and return the highest
2368        candidates.sort_by_key(|(_, version, _)| *version);
2369        Ok(candidates.pop())
2370    }
2371
2372    fn get_root_state_hash_for_epoch(
2373        &self,
2374        epoch: EpochId,
2375    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
2376        self.store.get_root_state_hash_for_epoch(epoch)
2377    }
2378
2379    fn get_root_state_hash_for_highest_epoch(
2380        &self,
2381    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
2382        self.store.get_root_state_hash_for_highest_epoch()
2383    }
2384
2385    fn insert_state_hash_for_epoch(
2386        &self,
2387        epoch: EpochId,
2388        checkpoint_seq_num: &CheckpointSequenceNumber,
2389        acc: &GlobalStateHash,
2390    ) -> SuiResult {
2391        self.store
2392            .insert_state_hash_for_epoch(epoch, checkpoint_seq_num, acc)
2393    }
2394
2395    fn iter_live_object_set(
2396        &self,
2397        include_wrapped_tombstone: bool,
2398    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2399        // The only time it is safe to iterate the live object set is at an epoch boundary,
2400        // at which point the db is consistent and the dirty cache is empty. So this does
2401        // read the cache
2402        assert!(
2403            self.dirty.is_empty(),
2404            "cannot iterate live object set with dirty data"
2405        );
2406        self.store.iter_live_object_set(include_wrapped_tombstone)
2407    }
2408
2409    // A version of iter_live_object_set that reads the cache. Only use for testing. If used
2410    // on a live validator, can cause the server to block for as long as it takes to iterate
2411    // the entire live object set.
2412    fn iter_cached_live_object_set_for_testing(
2413        &self,
2414        include_wrapped_tombstone: bool,
2415    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2416        // hold iter until we are finished to prevent any concurrent inserts/deletes
2417        let iter = self.dirty.objects.iter();
2418        let mut dirty_objects = BTreeMap::new();
2419
2420        // add everything from the store
2421        for obj in self.store.iter_live_object_set(include_wrapped_tombstone) {
2422            dirty_objects.insert(obj.object_id(), obj);
2423        }
2424
2425        // add everything from the cache, but also remove deletions
2426        for entry in iter {
2427            let id = *entry.key();
2428            let value = entry.value();
2429            match value.get_highest().unwrap() {
2430                (_, ObjectEntry::Object(object)) => {
2431                    dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2432                }
2433                (version, ObjectEntry::Wrapped) => {
2434                    if include_wrapped_tombstone {
2435                        dirty_objects.insert(id, LiveObject::Wrapped(ObjectKey(id, *version)));
2436                    } else {
2437                        dirty_objects.remove(&id);
2438                    }
2439                }
2440                (_, ObjectEntry::Deleted) => {
2441                    dirty_objects.remove(&id);
2442                }
2443            }
2444        }
2445
2446        Box::new(dirty_objects.into_values())
2447    }
2448}
2449
2450// TODO: For correctness, we must at least invalidate the cache when items are written through this
2451// trait (since they could be negatively cached as absent). But it may or may not be optimal to
2452// actually insert them into the cache. For instance if state sync is running ahead of execution,
2453// they might evict other items that are about to be read. This could be an area for tuning in the
2454// future.
2455impl StateSyncAPI for WritebackCache {
2456    fn insert_transaction_and_effects(
2457        &self,
2458        transaction: &VerifiedTransaction,
2459        transaction_effects: &TransactionEffects,
2460    ) {
2461        self.store
2462            .insert_transaction_and_effects(transaction, transaction_effects)
2463            .expect("db error");
2464        self.cached
2465            .transactions
2466            .insert(
2467                transaction.digest(),
2468                PointCacheItem::Some(Arc::new(transaction.clone())),
2469                Ticket::Write,
2470            )
2471            .ok();
2472        self.cached
2473            .transaction_effects
2474            .insert(
2475                &transaction_effects.digest(),
2476                PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2477                Ticket::Write,
2478            )
2479            .ok();
2480    }
2481
2482    fn multi_insert_transaction_and_effects(
2483        &self,
2484        transactions_and_effects: &[VerifiedExecutionData],
2485    ) {
2486        self.store
2487            .multi_insert_transaction_and_effects(transactions_and_effects.iter())
2488            .expect("db error");
2489        for VerifiedExecutionData {
2490            transaction,
2491            effects,
2492        } in transactions_and_effects
2493        {
2494            self.cached
2495                .transactions
2496                .insert(
2497                    transaction.digest(),
2498                    PointCacheItem::Some(Arc::new(transaction.clone())),
2499                    Ticket::Write,
2500                )
2501                .ok();
2502            self.cached
2503                .transaction_effects
2504                .insert(
2505                    &effects.digest(),
2506                    PointCacheItem::Some(Arc::new(effects.clone())),
2507                    Ticket::Write,
2508                )
2509                .ok();
2510        }
2511    }
2512}