sui_core/execution_cache/
writeback_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! MemoryCache is a cache for the transaction execution which delays writes to the database until
5//! transaction results are certified (i.e. they appear in a certified checkpoint, or an effects cert
6//! is observed by a fullnode). The cache also stores committed data in memory in order to serve
7//! future reads without hitting the database.
8//!
9//! For storing uncommitted transaction outputs, we cannot evict the data at all until it is written
10//! to disk. Committed data not only can be evicted, but it is also unbounded (imagine a stream of
11//! transactions that keep splitting a coin into smaller coins).
12//!
13//! We also want to be able to support negative cache hits (i.e. the case where we can determine an
14//! object does not exist without hitting the database).
15//!
16//! To achieve both of these goals, we split the cache data into two pieces, a dirty set and a cached
17//! set. The dirty set has no automatic evictions, data is only removed after being committed. The
18//! cached set is in a bounded-sized cache with automatic evictions. In order to support negative
19//! cache hits, we treat the two halves of the cache as FIFO queue. Newly written (dirty) versions are
20//! inserted to one end of the dirty queue. As versions are committed to disk, they are
21//! removed from the other end of the dirty queue and inserted into the cache queue. The cache queue
22//! is truncated if it exceeds its maximum size, by removing all but the N newest versions.
23//!
24//! This gives us the property that the sequence of versions in the dirty and cached queues are the
25//! most recent versions of the object, i.e. there can be no "gaps". This allows for the following:
26//!
27//!   - Negative cache hits: If the queried version is not in memory, but is higher than the smallest
28//!     version in the cached queue, it does not exist in the db either.
29//!   - Bounded reads: When reading the most recent version that is <= some version bound, we can
30//!     correctly satisfy this query from the cache, or determine that we must go to the db.
31//!
32//! Note that at any time, either or both the dirty or the cached queue may be non-existent. There may be no
33//! dirty versions of the objects, in which case there will be no dirty queue. And, the cached queue
34//! may be evicted from the cache, in which case there will be no cached queue. Because only the cached
35//! queue can be evicted (the dirty queue can only become empty by moving versions from it to the cached
36//! queue), the "highest versions" property still holds in all cases.
37//!
38//! The above design is used for both objects and markers.
39
40use crate::accumulators::funds_read::AccountFundsRead;
41use crate::authority::AuthorityStore;
42use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
43use crate::authority::authority_store::{
44    ExecutionLockWriteGuard, LockDetailsDeprecated, ObjectLockStatus, SuiLockResult,
45};
46use crate::authority::authority_store_tables::LiveObject;
47use crate::authority::backpressure::BackpressureManager;
48use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
49use crate::fallback_fetch::{do_fallback_lookup, do_fallback_lookup_fallible};
50use crate::global_state_hasher::GlobalStateHashStore;
51use crate::transaction_outputs::TransactionOutputs;
52
53use dashmap::DashMap;
54use dashmap::mapref::entry::Entry as DashMapEntry;
55use futures::{FutureExt, future::BoxFuture};
56use moka::sync::SegmentedCache as MokaCache;
57use mysten_common::debug_fatal;
58use mysten_common::random_util::randomize_cache_capacity_in_tests;
59use mysten_common::sync::notify_read::NotifyRead;
60use parking_lot::Mutex;
61use std::collections::{BTreeMap, HashSet};
62use std::hash::Hash;
63use std::sync::Arc;
64use std::sync::atomic::AtomicU64;
65use sui_config::ExecutionCacheConfig;
66use sui_macros::fail_point;
67use sui_protocol_config::ProtocolVersion;
68use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
69use sui_types::accumulator_event::AccumulatorEvent;
70use sui_types::accumulator_root::{AccumulatorObjId, AccumulatorValue};
71use sui_types::base_types::{
72    EpochId, FullObjectID, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData,
73};
74use sui_types::bridge::{Bridge, get_bridge};
75use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest};
76use sui_types::effects::{TransactionEffects, TransactionEvents};
77use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
78use sui_types::executable_transaction::VerifiedExecutableTransaction;
79use sui_types::global_state_hash::GlobalStateHash;
80use sui_types::message_envelope::Message;
81use sui_types::messages_checkpoint::CheckpointSequenceNumber;
82use sui_types::object::Object;
83use sui_types::storage::{
84    FullObjectKey, InputKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject,
85};
86use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
87use sui_types::transaction::{TransactionDataAPI, VerifiedSignedTransaction, VerifiedTransaction};
88use tap::TapOptional;
89use tracing::{debug, info, instrument, trace, warn};
90
91use super::ExecutionCacheAPI;
92use super::cache_types::Ticket;
93use super::{
94    Batch, CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
95    ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
96    cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache},
97    implement_passthrough_traits,
98    object_locks::ObjectLocks,
99};
100
101#[cfg(test)]
102#[path = "unit_tests/writeback_cache_tests.rs"]
103pub mod writeback_cache_tests;
104
105#[cfg(test)]
106#[path = "unit_tests/notify_read_input_objects_tests.rs"]
107mod notify_read_input_objects_tests;
108
109#[derive(Clone, PartialEq, Eq)]
110enum ObjectEntry {
111    Object(Object),
112    Deleted,
113    Wrapped,
114}
115
116impl ObjectEntry {
117    #[cfg(test)]
118    fn unwrap_object(&self) -> &Object {
119        match self {
120            ObjectEntry::Object(o) => o,
121            _ => panic!("unwrap_object called on non-Object"),
122        }
123    }
124
125    fn is_tombstone(&self) -> bool {
126        match self {
127            ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
128            ObjectEntry::Object(_) => false,
129        }
130    }
131}
132
133impl std::fmt::Debug for ObjectEntry {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        match self {
136            ObjectEntry::Object(o) => {
137                write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
138            }
139            ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
140            ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
141        }
142    }
143}
144
145impl From<Object> for ObjectEntry {
146    fn from(object: Object) -> Self {
147        ObjectEntry::Object(object)
148    }
149}
150
151impl From<ObjectOrTombstone> for ObjectEntry {
152    fn from(object: ObjectOrTombstone) -> Self {
153        match object {
154            ObjectOrTombstone::Object(o) => o.into(),
155            ObjectOrTombstone::Tombstone(obj_ref) => {
156                if obj_ref.2.is_deleted() {
157                    ObjectEntry::Deleted
158                } else if obj_ref.2.is_wrapped() {
159                    ObjectEntry::Wrapped
160                } else {
161                    panic!("tombstone digest must either be deleted or wrapped");
162                }
163            }
164        }
165    }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169enum LatestObjectCacheEntry {
170    Object(SequenceNumber, ObjectEntry),
171    NonExistent,
172}
173
174impl LatestObjectCacheEntry {
175    #[cfg(test)]
176    fn version(&self) -> Option<SequenceNumber> {
177        match self {
178            LatestObjectCacheEntry::Object(version, _) => Some(*version),
179            LatestObjectCacheEntry::NonExistent => None,
180        }
181    }
182
183    fn is_alive(&self) -> bool {
184        match self {
185            LatestObjectCacheEntry::Object(_, entry) => !entry.is_tombstone(),
186            LatestObjectCacheEntry::NonExistent => false,
187        }
188    }
189}
190
191impl IsNewer for LatestObjectCacheEntry {
192    fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
193        match (self, other) {
194            (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
195                v1 > v2
196            }
197            (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
198            _ => false,
199        }
200    }
201}
202
203type MarkerKey = (EpochId, FullObjectID);
204
205/// UncommittedData stores execution outputs that are not yet written to the db. Entries in this
206/// struct can only be purged after they are committed.
207struct UncommittedData {
208    /// The object dirty set. All writes go into this table first. After we flush the data to the
209    /// db, the data is removed from this table and inserted into the object_cache.
210    ///
211    /// This table may contain both live and dead objects, since we flush both live and dead
212    /// objects to the db in order to support past object queries on fullnodes.
213    ///
214    /// Further, we only remove objects in FIFO order, which ensures that the cached
215    /// sequence of objects has no gaps. In other words, if we have versions 4, 8, 13 of
216    /// an object, we can deduce that version 9 does not exist. This also makes child object
217    /// reads efficient. `object_cache` cannot contain a more recent version of an object than
218    /// `objects`, and neither can have any gaps. Therefore if there is any object <= the version
219    /// bound for a child read in objects, it is the correct object to return.
220    objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
221
222    // Markers for received objects and deleted shared objects. This contains all of the dirty
223    // marker state, which is committed to the db at the same time as other transaction data.
224    // After markers are committed to the db we remove them from this table and insert them into
225    // marker_cache.
226    markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
227
228    transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
229
230    transaction_events: DashMap<TransactionDigest, TransactionEvents>,
231
232    unchanged_loaded_runtime_objects: DashMap<TransactionDigest, Vec<ObjectKey>>,
233
234    executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
235
236    // Transaction outputs that have not yet been written to the DB. Items are removed from this
237    // table as they are flushed to the db.
238    pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
239
240    // Transactions outputs from Mysticeti fastpath certified transaction executions.
241    // These outputs are not written to pending_transaction_writes until we are sure
242    // that they will not get rejected by consensus. This ensures that no dependent
243    // transactions can sign using the outputs of a fastpath certified transaction.
244    // Otherwise it will be challenging to revert them.
245    // We use a cache because it is possible to have entries that are not finalized
246    // due to data races, i.e. a transaction is first fastpath certified, then
247    // rejected through consensus commit, but at the same time it was executed
248    // and outputs are written here. We won't have a chance to remove them anymore.
249    // So we rely on the cache to evict them eventually.
250    // It is also safe to evict a transaction that will eventually be finalized,
251    // as we will just re-execute it.
252    fastpath_transaction_outputs: MokaCache<TransactionDigest, Arc<TransactionOutputs>>,
253
254    total_transaction_inserts: AtomicU64,
255    total_transaction_commits: AtomicU64,
256}
257
258impl UncommittedData {
259    fn new(config: &ExecutionCacheConfig) -> Self {
260        Self {
261            objects: DashMap::with_shard_amount(2048),
262            markers: DashMap::with_shard_amount(2048),
263            transaction_effects: DashMap::with_shard_amount(2048),
264            executed_effects_digests: DashMap::with_shard_amount(2048),
265            pending_transaction_writes: DashMap::with_shard_amount(2048),
266            fastpath_transaction_outputs: MokaCache::builder(8)
267                .max_capacity(randomize_cache_capacity_in_tests(
268                    config.fastpath_transaction_outputs_cache_size(),
269                ))
270                .build(),
271            transaction_events: DashMap::with_shard_amount(2048),
272            unchanged_loaded_runtime_objects: DashMap::with_shard_amount(2048),
273            total_transaction_inserts: AtomicU64::new(0),
274            total_transaction_commits: AtomicU64::new(0),
275        }
276    }
277
278    fn clear(&self) {
279        self.objects.clear();
280        self.markers.clear();
281        self.transaction_effects.clear();
282        self.executed_effects_digests.clear();
283        self.pending_transaction_writes.clear();
284        self.fastpath_transaction_outputs.invalidate_all();
285        self.transaction_events.clear();
286        self.unchanged_loaded_runtime_objects.clear();
287        self.total_transaction_inserts
288            .store(0, std::sync::atomic::Ordering::Relaxed);
289        self.total_transaction_commits
290            .store(0, std::sync::atomic::Ordering::Relaxed);
291    }
292
293    fn is_empty(&self) -> bool {
294        let empty = self.pending_transaction_writes.is_empty();
295        if empty && cfg!(debug_assertions) {
296            assert!(
297                self.objects.is_empty()
298                    && self.markers.is_empty()
299                    && self.transaction_effects.is_empty()
300                    && self.executed_effects_digests.is_empty()
301                    && self.transaction_events.is_empty()
302                    && self.unchanged_loaded_runtime_objects.is_empty()
303                    && self
304                        .total_transaction_inserts
305                        .load(std::sync::atomic::Ordering::Relaxed)
306                        == self
307                            .total_transaction_commits
308                            .load(std::sync::atomic::Ordering::Relaxed),
309            );
310        }
311        empty
312    }
313}
314
315// Point items (anything without a version number) can be negatively cached as None
316type PointCacheItem<T> = Option<T>;
317
318// PointCacheItem can only be used for insert-only collections, so a Some entry
319// is always newer than a None entry.
320impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
321    fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
322        match (self, other) {
323            (Some(_), None) => true,
324
325            (Some(a), Some(b)) => {
326                // conflicting inserts should never happen
327                debug_assert_eq!(a, b);
328                false
329            }
330
331            _ => false,
332        }
333    }
334}
335
336/// CachedData stores data that has been committed to the db, but is likely to be read soon.
337struct CachedCommittedData {
338    // See module level comment for an explanation of caching strategy.
339    object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
340
341    // See module level comment for an explanation of caching strategy.
342    marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
343
344    transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
345
346    transaction_effects:
347        MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
348
349    transaction_events: MonotonicCache<TransactionDigest, PointCacheItem<Arc<TransactionEvents>>>,
350
351    executed_effects_digests:
352        MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
353
354    transaction_executed_in_last_epoch:
355        MonotonicCache<(EpochId, TransactionDigest), PointCacheItem<()>>,
356
357    // Objects that were read at transaction signing time - allows us to access them again at
358    // execution time with a single lock / hash lookup
359    _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
360}
361
362impl CachedCommittedData {
363    fn new(config: &ExecutionCacheConfig) -> Self {
364        let object_cache = MokaCache::builder(8)
365            .max_capacity(randomize_cache_capacity_in_tests(
366                config.object_cache_size(),
367            ))
368            .build();
369        let marker_cache = MokaCache::builder(8)
370            .max_capacity(randomize_cache_capacity_in_tests(
371                config.marker_cache_size(),
372            ))
373            .build();
374
375        let transactions = MonotonicCache::new(randomize_cache_capacity_in_tests(
376            config.transaction_cache_size(),
377        ));
378        let transaction_effects = MonotonicCache::new(randomize_cache_capacity_in_tests(
379            config.effect_cache_size(),
380        ));
381        let transaction_events = MonotonicCache::new(randomize_cache_capacity_in_tests(
382            config.events_cache_size(),
383        ));
384        let executed_effects_digests = MonotonicCache::new(randomize_cache_capacity_in_tests(
385            config.executed_effect_cache_size(),
386        ));
387
388        let transaction_objects = MokaCache::builder(8)
389            .max_capacity(randomize_cache_capacity_in_tests(
390                config.transaction_objects_cache_size(),
391            ))
392            .build();
393
394        let transaction_executed_in_last_epoch = MonotonicCache::new(
395            randomize_cache_capacity_in_tests(config.executed_effect_cache_size()),
396        );
397
398        Self {
399            object_cache,
400            marker_cache,
401            transactions,
402            transaction_effects,
403            transaction_events,
404            executed_effects_digests,
405            transaction_executed_in_last_epoch,
406            _transaction_objects: transaction_objects,
407        }
408    }
409
410    fn clear_and_assert_empty(&self) {
411        self.object_cache.invalidate_all();
412        self.marker_cache.invalidate_all();
413        self.transactions.invalidate_all();
414        self.transaction_effects.invalidate_all();
415        self.transaction_events.invalidate_all();
416        self.executed_effects_digests.invalidate_all();
417        self.transaction_executed_in_last_epoch.invalidate_all();
418        self._transaction_objects.invalidate_all();
419
420        assert_empty(&self.object_cache);
421        assert_empty(&self.marker_cache);
422        assert!(self.transactions.is_empty());
423        assert!(self.transaction_effects.is_empty());
424        assert!(self.transaction_events.is_empty());
425        assert!(self.executed_effects_digests.is_empty());
426        assert!(self.transaction_executed_in_last_epoch.is_empty());
427        assert_empty(&self._transaction_objects);
428    }
429}
430
431fn assert_empty<K, V>(cache: &MokaCache<K, V>)
432where
433    K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
434    V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
435{
436    if cache.iter().next().is_some() {
437        panic!("cache should be empty");
438    }
439}
440
441pub struct WritebackCache {
442    dirty: UncommittedData,
443    cached: CachedCommittedData,
444
445    // We separately cache the latest version of each object. Although this seems
446    // redundant, it is the only way to support populating the cache after a read.
447    // We cannot simply insert objects that we read off the disk into `object_cache`,
448    // since that may violate the no-missing-versions property.
449    // `object_by_id_cache` is also written to on writes so that it is always coherent.
450    // Hence it contains both committed and dirty object data.
451    object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
452
453    // The packages cache is treated separately from objects, because they are immutable and can be
454    // used by any number of transactions. Additionally, many operations require loading large
455    // numbers of packages (due to dependencies), so we want to try to keep all packages in memory.
456    //
457    // Also, this cache can contain packages that are dirty or committed, so it does not live in
458    // UncachedData or CachedCommittedData. The cache is populated in two ways:
459    // - when packages are written (in which case they will also be present in the dirty set)
460    // - after a cache miss. Because package IDs are unique (only one version exists for each ID)
461    //   we do not need to worry about the contiguous version property.
462    // - note that we removed any unfinalized packages from the cache during revert_state_update().
463    packages: MokaCache<ObjectID, PackageObject>,
464
465    object_locks: ObjectLocks,
466
467    executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
468    object_notify_read: NotifyRead<InputKey, ()>,
469    fastpath_transaction_outputs_notify_read:
470        NotifyRead<TransactionDigest, Arc<TransactionOutputs>>,
471
472    store: Arc<AuthorityStore>,
473    backpressure_threshold: u64,
474    backpressure_manager: Arc<BackpressureManager>,
475    metrics: Arc<ExecutionCacheMetrics>,
476}
477
478macro_rules! check_cache_entry_by_version {
479    ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
480        $self.metrics.record_cache_request($table, $level);
481        if let Some(cache) = $cache {
482            if let Some(entry) = cache.get(&$version) {
483                $self.metrics.record_cache_hit($table, $level);
484                return CacheResult::Hit(entry.clone());
485            }
486
487            if let Some(least_version) = cache.get_least() {
488                if least_version.0 < $version {
489                    // If the version is greater than the least version in the cache, then we know
490                    // that the object does not exist anywhere
491                    $self.metrics.record_cache_negative_hit($table, $level);
492                    return CacheResult::NegativeHit;
493                }
494            }
495        }
496        $self.metrics.record_cache_miss($table, $level);
497    };
498}
499
500macro_rules! check_cache_entry_by_latest {
501    ($self: ident, $table: expr, $level: expr, $cache: expr) => {
502        $self.metrics.record_cache_request($table, $level);
503        if let Some(cache) = $cache {
504            if let Some((version, entry)) = cache.get_highest() {
505                $self.metrics.record_cache_hit($table, $level);
506                return CacheResult::Hit((*version, entry.clone()));
507            } else {
508                panic!("empty CachedVersionMap should have been removed");
509            }
510        }
511        $self.metrics.record_cache_miss($table, $level);
512    };
513}
514
515impl WritebackCache {
516    pub fn new(
517        config: &ExecutionCacheConfig,
518        store: Arc<AuthorityStore>,
519        metrics: Arc<ExecutionCacheMetrics>,
520        backpressure_manager: Arc<BackpressureManager>,
521    ) -> Self {
522        let packages = MokaCache::builder(8)
523            .max_capacity(randomize_cache_capacity_in_tests(
524                config.package_cache_size(),
525            ))
526            .build();
527        Self {
528            dirty: UncommittedData::new(config),
529            cached: CachedCommittedData::new(config),
530            object_by_id_cache: MonotonicCache::new(randomize_cache_capacity_in_tests(
531                config.object_by_id_cache_size(),
532            )),
533            packages,
534            object_locks: ObjectLocks::new(),
535            executed_effects_digests_notify_read: NotifyRead::new(),
536            object_notify_read: NotifyRead::new(),
537            fastpath_transaction_outputs_notify_read: NotifyRead::new(),
538            store,
539            backpressure_manager,
540            backpressure_threshold: config.backpressure_threshold(),
541            metrics,
542        }
543    }
544
545    pub fn new_for_tests(store: Arc<AuthorityStore>) -> Self {
546        Self::new(
547            &Default::default(),
548            store,
549            ExecutionCacheMetrics::new(&prometheus::Registry::new()).into(),
550            BackpressureManager::new_for_tests(),
551        )
552    }
553
554    #[cfg(test)]
555    pub fn reset_for_test(&mut self) {
556        let mut new = Self::new(
557            &Default::default(),
558            self.store.clone(),
559            self.metrics.clone(),
560            self.backpressure_manager.clone(),
561        );
562        std::mem::swap(self, &mut new);
563    }
564
565    pub fn evict_executed_effects_from_cache_for_testing(&self, tx_digest: &TransactionDigest) {
566        self.cached.executed_effects_digests.invalidate(tx_digest);
567        self.cached.transaction_events.invalidate(tx_digest);
568        self.cached.transactions.invalidate(tx_digest);
569    }
570
571    fn write_object_entry(
572        &self,
573        object_id: &ObjectID,
574        version: SequenceNumber,
575        object: ObjectEntry,
576    ) {
577        trace!(?object_id, ?version, ?object, "inserting object entry");
578        self.metrics.record_cache_write("object");
579
580        // We must hold the lock for the object entry while inserting to the
581        // object_by_id_cache. Otherwise, a surprising bug can occur:
582        //
583        // 1. A thread executing TX1 can write object (O,1) to the dirty set and then pause.
584        // 2. TX2, which reads (O,1) can begin executing, because ExecutionScheduler immediately
585        //    schedules transactions if their inputs are available. It does not matter that TX1
586        //    hasn't finished executing yet.
587        // 3. TX2 can write (O,2) to both the dirty set and the object_by_id_cache.
588        // 4. The thread executing TX1 can resume and write (O,1) to the object_by_id_cache.
589        //
590        // Now, any subsequent attempt to get the latest version of O will return (O,1) instead of
591        // (O,2).
592        //
593        // This seems very unlikely, but it may be possible under the following circumstances:
594        // - While a thread is unlikely to pause for so long, moka cache uses optimistic
595        //   lock-free algorithms that have retry loops. Possibly, under high contention, this
596        //   code might spin for a surprisingly long time.
597        // - Additionally, many concurrent re-executions of the same tx could happen due to
598        //   the tx finalizer, plus checkpoint executor, consensus, and RPCs from fullnodes.
599        let mut entry = self.dirty.objects.entry(*object_id).or_default();
600
601        self.object_by_id_cache
602            .insert(
603                object_id,
604                LatestObjectCacheEntry::Object(version, object.clone()),
605                Ticket::Write,
606            )
607            // While Ticket::Write cannot expire, this insert may still fail.
608            // See the comment in `MonotonicCache::insert`.
609            .ok();
610
611        entry.insert(version, object.clone());
612
613        if let ObjectEntry::Object(object) = &object {
614            if object.is_package() {
615                self.object_notify_read
616                    .notify(&InputKey::Package { id: *object_id }, &());
617            } else if !object.is_child_object() {
618                self.object_notify_read.notify(
619                    &InputKey::VersionedObject {
620                        id: object.full_id(),
621                        version: object.version(),
622                    },
623                    &(),
624                );
625            }
626        }
627    }
628
629    fn write_marker_value(
630        &self,
631        epoch_id: EpochId,
632        object_key: FullObjectKey,
633        marker_value: MarkerValue,
634    ) {
635        tracing::trace!("inserting marker value {object_key:?}: {marker_value:?}",);
636        self.metrics.record_cache_write("marker");
637        self.dirty
638            .markers
639            .entry((epoch_id, object_key.id()))
640            .or_default()
641            .value_mut()
642            .insert(object_key.version(), marker_value);
643        // It is possible for a transaction to use a consensus stream ended
644        // object in the input, hence we must notify that it is now available
645        // at the assigned version, so that any transaction waiting for this
646        // object version can start execution.
647        if matches!(marker_value, MarkerValue::ConsensusStreamEnded(_)) {
648            self.object_notify_read.notify(
649                &InputKey::VersionedObject {
650                    id: object_key.id(),
651                    version: object_key.version(),
652                },
653                &(),
654            );
655        }
656    }
657
658    // lock both the dirty and committed sides of the cache, and then pass the entries to
659    // the callback. Written with the `with` pattern because any other way of doing this
660    // creates lifetime hell.
661    fn with_locked_cache_entries<K, V, R>(
662        dirty_map: &DashMap<K, CachedVersionMap<V>>,
663        cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
664        key: &K,
665        cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
666    ) -> R
667    where
668        K: Copy + Eq + Hash + Send + Sync + 'static,
669        V: Send + Sync + 'static,
670    {
671        let dirty_entry = dirty_map.entry(*key);
672        let dirty_entry = match &dirty_entry {
673            DashMapEntry::Occupied(occupied) => Some(occupied.get()),
674            DashMapEntry::Vacant(_) => None,
675        };
676
677        let cached_entry = cached_map.get(key);
678        let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
679        let cached_entry = cached_lock.as_deref();
680
681        cb(dirty_entry, cached_entry)
682    }
683
684    // Attempt to get an object from the cache. The DB is not consulted.
685    // Can return Hit, Miss, or NegativeHit (if the object is known to not exist).
686    fn get_object_entry_by_key_cache_only(
687        &self,
688        object_id: &ObjectID,
689        version: SequenceNumber,
690    ) -> CacheResult<ObjectEntry> {
691        Self::with_locked_cache_entries(
692            &self.dirty.objects,
693            &self.cached.object_cache,
694            object_id,
695            |dirty_entry, cached_entry| {
696                check_cache_entry_by_version!(
697                    self,
698                    "object_by_version",
699                    "uncommitted",
700                    dirty_entry,
701                    version
702                );
703                check_cache_entry_by_version!(
704                    self,
705                    "object_by_version",
706                    "committed",
707                    cached_entry,
708                    version
709                );
710                CacheResult::Miss
711            },
712        )
713    }
714
715    fn get_object_by_key_cache_only(
716        &self,
717        object_id: &ObjectID,
718        version: SequenceNumber,
719    ) -> CacheResult<Object> {
720        match self.get_object_entry_by_key_cache_only(object_id, version) {
721            CacheResult::Hit(entry) => match entry {
722                ObjectEntry::Object(object) => CacheResult::Hit(object),
723                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
724            },
725            CacheResult::Miss => CacheResult::Miss,
726            CacheResult::NegativeHit => CacheResult::NegativeHit,
727        }
728    }
729
730    fn get_object_entry_by_id_cache_only(
731        &self,
732        request_type: &'static str,
733        object_id: &ObjectID,
734    ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
735        self.metrics
736            .record_cache_request(request_type, "object_by_id");
737        let entry = self.object_by_id_cache.get(object_id);
738
739        if cfg!(debug_assertions)
740            && let Some(entry) = &entry
741        {
742            // check that cache is coherent
743            let highest: Option<ObjectEntry> = self
744                .dirty
745                .objects
746                .get(object_id)
747                .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
748                .or_else(|| {
749                    let obj: Option<ObjectEntry> = self
750                        .store
751                        .get_latest_object_or_tombstone(*object_id)
752                        .unwrap()
753                        .map(|(_, o)| o.into());
754                    obj
755                });
756
757            let cache_entry = match &*entry.lock() {
758                LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
759                LatestObjectCacheEntry::NonExistent => None,
760            };
761
762            // If the cache entry is a tombstone, the db entry may be missing if it was pruned.
763            let tombstone_possibly_pruned = highest.is_none()
764                && cache_entry
765                    .as_ref()
766                    .map(|e| e.is_tombstone())
767                    .unwrap_or(false);
768
769            if highest != cache_entry && !tombstone_possibly_pruned {
770                tracing::error!(
771                    ?highest,
772                    ?cache_entry,
773                    ?tombstone_possibly_pruned,
774                    "object_by_id cache is incoherent for {:?}",
775                    object_id
776                );
777                panic!("object_by_id cache is incoherent for {:?}", object_id);
778            }
779        }
780
781        if let Some(entry) = entry {
782            let entry = entry.lock();
783            match &*entry {
784                LatestObjectCacheEntry::Object(latest_version, latest_object) => {
785                    self.metrics.record_cache_hit(request_type, "object_by_id");
786                    return CacheResult::Hit((*latest_version, latest_object.clone()));
787                }
788                LatestObjectCacheEntry::NonExistent => {
789                    self.metrics
790                        .record_cache_negative_hit(request_type, "object_by_id");
791                    return CacheResult::NegativeHit;
792                }
793            }
794        } else {
795            self.metrics.record_cache_miss(request_type, "object_by_id");
796        }
797
798        Self::with_locked_cache_entries(
799            &self.dirty.objects,
800            &self.cached.object_cache,
801            object_id,
802            |dirty_entry, cached_entry| {
803                check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
804                check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
805                CacheResult::Miss
806            },
807        )
808    }
809
810    fn get_object_by_id_cache_only(
811        &self,
812        request_type: &'static str,
813        object_id: &ObjectID,
814    ) -> CacheResult<(SequenceNumber, Object)> {
815        match self.get_object_entry_by_id_cache_only(request_type, object_id) {
816            CacheResult::Hit((version, entry)) => match entry {
817                ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
818                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
819            },
820            CacheResult::NegativeHit => CacheResult::NegativeHit,
821            CacheResult::Miss => CacheResult::Miss,
822        }
823    }
824
825    fn get_marker_value_cache_only(
826        &self,
827        object_key: FullObjectKey,
828        epoch_id: EpochId,
829    ) -> CacheResult<MarkerValue> {
830        Self::with_locked_cache_entries(
831            &self.dirty.markers,
832            &self.cached.marker_cache,
833            &(epoch_id, object_key.id()),
834            |dirty_entry, cached_entry| {
835                check_cache_entry_by_version!(
836                    self,
837                    "marker_by_version",
838                    "uncommitted",
839                    dirty_entry,
840                    object_key.version()
841                );
842                check_cache_entry_by_version!(
843                    self,
844                    "marker_by_version",
845                    "committed",
846                    cached_entry,
847                    object_key.version()
848                );
849                CacheResult::Miss
850            },
851        )
852    }
853
854    fn get_latest_marker_value_cache_only(
855        &self,
856        object_id: FullObjectID,
857        epoch_id: EpochId,
858    ) -> CacheResult<(SequenceNumber, MarkerValue)> {
859        Self::with_locked_cache_entries(
860            &self.dirty.markers,
861            &self.cached.marker_cache,
862            &(epoch_id, object_id),
863            |dirty_entry, cached_entry| {
864                check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
865                check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
866                CacheResult::Miss
867            },
868        )
869    }
870
871    fn get_object_impl(&self, request_type: &'static str, id: &ObjectID) -> Option<Object> {
872        let ticket = self.object_by_id_cache.get_ticket_for_read(id);
873        match self.get_object_entry_by_id_cache_only(request_type, id) {
874            CacheResult::Hit((_, entry)) => match entry {
875                ObjectEntry::Object(object) => Some(object),
876                ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
877            },
878            CacheResult::NegativeHit => None,
879            CacheResult::Miss => {
880                let obj = self
881                    .store
882                    .get_latest_object_or_tombstone(*id)
883                    .expect("db error");
884                match obj {
885                    Some((key, obj)) => {
886                        self.cache_latest_object_by_id(
887                            id,
888                            LatestObjectCacheEntry::Object(key.1, obj.clone().into()),
889                            ticket,
890                        );
891                        match obj {
892                            ObjectOrTombstone::Object(object) => Some(object),
893                            ObjectOrTombstone::Tombstone(_) => None,
894                        }
895                    }
896                    None => {
897                        self.cache_object_not_found(id, ticket);
898                        None
899                    }
900                }
901            }
902        }
903    }
904
905    fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
906        self.metrics.record_cache_request(request_type, "db");
907        &self.store
908    }
909
910    fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
911        self.metrics
912            .record_cache_multi_request(request_type, "db", count);
913        &self.store
914    }
915
916    #[instrument(level = "debug", skip_all)]
917    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
918        let tx_digest = *tx_outputs.transaction.digest();
919        trace!(?tx_digest, "writing transaction outputs to cache");
920
921        assert!(
922            !self.transaction_executed_in_last_epoch(&tx_digest, epoch_id),
923            "Transaction {:?} was already executed in epoch {}",
924            tx_digest,
925            epoch_id.saturating_sub(1)
926        );
927
928        self.dirty.fastpath_transaction_outputs.remove(&tx_digest);
929
930        let TransactionOutputs {
931            transaction,
932            effects,
933            markers,
934            written,
935            deleted,
936            wrapped,
937            events,
938            unchanged_loaded_runtime_objects,
939            ..
940        } = &*tx_outputs;
941
942        // Deletions and wraps must be written first. The reason is that one of the deletes
943        // may be a child object, and if we write the parent object first, a reader may or may
944        // not see the previous version of the child object, instead of the deleted/wrapped
945        // tombstone, which would cause an execution fork
946        for ObjectKey(id, version) in deleted.iter() {
947            self.write_object_entry(id, *version, ObjectEntry::Deleted);
948        }
949
950        for ObjectKey(id, version) in wrapped.iter() {
951            self.write_object_entry(id, *version, ObjectEntry::Wrapped);
952        }
953
954        // Update all markers
955        for (object_key, marker_value) in markers.iter() {
956            self.write_marker_value(epoch_id, *object_key, *marker_value);
957        }
958
959        // Write children before parents to ensure that readers do not observe a parent object
960        // before its most recent children are visible.
961        for (object_id, object) in written.iter() {
962            if object.is_child_object() {
963                self.write_object_entry(object_id, object.version(), object.clone().into());
964            }
965        }
966        for (object_id, object) in written.iter() {
967            if !object.is_child_object() {
968                self.write_object_entry(object_id, object.version(), object.clone().into());
969                if object.is_package() {
970                    debug!("caching package: {:?}", object.compute_object_reference());
971                    self.packages
972                        .insert(*object_id, PackageObject::new(object.clone()));
973                }
974            }
975        }
976
977        let tx_digest = *transaction.digest();
978        debug!(
979            ?tx_digest,
980            "Writing transaction output objects to cache: {:?}",
981            written
982                .values()
983                .map(|o| (o.id(), o.version()))
984                .collect::<Vec<_>>(),
985        );
986        let effects_digest = effects.digest();
987
988        self.metrics.record_cache_write("transaction_block");
989        self.dirty
990            .pending_transaction_writes
991            .insert(tx_digest, tx_outputs.clone());
992
993        // insert transaction effects before executed_effects_digests so that there
994        // are never dangling entries in executed_effects_digests
995        self.metrics.record_cache_write("transaction_effects");
996        self.dirty
997            .transaction_effects
998            .insert(effects_digest, effects.clone());
999
1000        // note: if events.data.is_empty(), then there are no events for this transaction. We
1001        // store it anyway to avoid special cases in commint_transaction_outputs, and translate
1002        // an empty events structure to None when reading.
1003        self.metrics.record_cache_write("transaction_events");
1004        self.dirty
1005            .transaction_events
1006            .insert(tx_digest, events.clone());
1007
1008        self.metrics
1009            .record_cache_write("unchanged_loaded_runtime_objects");
1010        self.dirty
1011            .unchanged_loaded_runtime_objects
1012            .insert(tx_digest, unchanged_loaded_runtime_objects.clone());
1013
1014        self.metrics.record_cache_write("executed_effects_digests");
1015        self.dirty
1016            .executed_effects_digests
1017            .insert(tx_digest, effects_digest);
1018
1019        self.executed_effects_digests_notify_read
1020            .notify(&tx_digest, &effects_digest);
1021
1022        self.metrics
1023            .pending_notify_read
1024            .set(self.executed_effects_digests_notify_read.num_pending() as i64);
1025
1026        let prev = self
1027            .dirty
1028            .total_transaction_inserts
1029            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1030
1031        let pending_count = (prev + 1).saturating_sub(
1032            self.dirty
1033                .total_transaction_commits
1034                .load(std::sync::atomic::Ordering::Relaxed),
1035        );
1036
1037        self.set_backpressure(pending_count);
1038    }
1039
1040    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1041        let _metrics_guard = mysten_metrics::monitored_scope("WritebackCache::build_db_batch");
1042        let mut all_outputs = Vec::with_capacity(digests.len());
1043        for tx in digests {
1044            let Some(outputs) = self
1045                .dirty
1046                .pending_transaction_writes
1047                .get(tx)
1048                .map(|o| o.clone())
1049            else {
1050                // This can happen in the following rare case:
1051                // All transactions in the checkpoint are committed to the db (by commit_transaction_outputs,
1052                // called in CheckpointExecutor::process_executed_transactions), but the process crashes before
1053                // the checkpoint water mark is bumped. We will then re-commit the checkpoint at startup,
1054                // despite that all transactions are already executed.
1055                warn!("Attempt to commit unknown transaction {:?}", tx);
1056                continue;
1057            };
1058            all_outputs.push(outputs);
1059        }
1060
1061        let batch = self
1062            .store
1063            .build_db_batch(epoch, &all_outputs)
1064            .expect("db error");
1065        (all_outputs, batch)
1066    }
1067
1068    // Commits dirty data for the given TransactionDigest to the db.
1069    #[instrument(level = "debug", skip_all)]
1070    fn commit_transaction_outputs(
1071        &self,
1072        epoch: EpochId,
1073        (all_outputs, db_batch): Batch,
1074        digests: &[TransactionDigest],
1075    ) {
1076        let _metrics_guard =
1077            mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs");
1078        fail_point!("writeback-cache-commit");
1079        trace!(?digests);
1080
1081        // Flush writes to disk before removing anything from dirty set. otherwise,
1082        // a cache eviction could cause a value to disappear briefly, even if we insert to the
1083        // cache before removing from the dirty set.
1084        db_batch.write().expect("db error");
1085
1086        let _metrics_guard =
1087            mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs::flush");
1088        for outputs in all_outputs.iter() {
1089            let tx_digest = outputs.transaction.digest();
1090            assert!(
1091                self.dirty
1092                    .pending_transaction_writes
1093                    .remove(tx_digest)
1094                    .is_some()
1095            );
1096            self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
1097        }
1098
1099        let num_outputs = all_outputs.len() as u64;
1100        let num_commits = self
1101            .dirty
1102            .total_transaction_commits
1103            .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
1104            + num_outputs;
1105
1106        let pending_count = self
1107            .dirty
1108            .total_transaction_inserts
1109            .load(std::sync::atomic::Ordering::Relaxed)
1110            .saturating_sub(num_commits);
1111
1112        self.set_backpressure(pending_count);
1113    }
1114
1115    fn approximate_pending_transaction_count(&self) -> u64 {
1116        let num_commits = self
1117            .dirty
1118            .total_transaction_commits
1119            .load(std::sync::atomic::Ordering::Relaxed);
1120
1121        self.dirty
1122            .total_transaction_inserts
1123            .load(std::sync::atomic::Ordering::Relaxed)
1124            .saturating_sub(num_commits)
1125    }
1126
1127    fn set_backpressure(&self, pending_count: u64) {
1128        let backpressure = pending_count > self.backpressure_threshold;
1129        let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1130        if backpressure_changed {
1131            self.metrics.backpressure_toggles.inc();
1132        }
1133        self.metrics
1134            .backpressure_status
1135            .set(if backpressure { 1 } else { 0 });
1136    }
1137
1138    fn flush_transactions_from_dirty_to_cached(
1139        &self,
1140        epoch: EpochId,
1141        tx_digest: TransactionDigest,
1142        outputs: &TransactionOutputs,
1143    ) {
1144        // Now, remove each piece of committed data from the dirty state and insert it into the cache.
1145        // TODO: outputs should have a strong count of 1 so we should be able to move out of it
1146        let TransactionOutputs {
1147            transaction,
1148            effects,
1149            markers,
1150            written,
1151            deleted,
1152            wrapped,
1153            events,
1154            ..
1155        } = outputs;
1156
1157        let effects_digest = effects.digest();
1158
1159        // Update cache before removing from self.dirty to avoid
1160        // unnecessary cache misses
1161        self.cached
1162            .transactions
1163            .insert(
1164                &tx_digest,
1165                PointCacheItem::Some(transaction.clone()),
1166                Ticket::Write,
1167            )
1168            .ok();
1169        self.cached
1170            .transaction_effects
1171            .insert(
1172                &effects_digest,
1173                PointCacheItem::Some(effects.clone().into()),
1174                Ticket::Write,
1175            )
1176            .ok();
1177        self.cached
1178            .executed_effects_digests
1179            .insert(
1180                &tx_digest,
1181                PointCacheItem::Some(effects_digest),
1182                Ticket::Write,
1183            )
1184            .ok();
1185        self.cached
1186            .transaction_events
1187            .insert(
1188                &tx_digest,
1189                PointCacheItem::Some(events.clone().into()),
1190                Ticket::Write,
1191            )
1192            .ok();
1193
1194        self.dirty
1195            .transaction_effects
1196            .remove(&effects_digest)
1197            .expect("effects must exist");
1198
1199        self.dirty
1200            .transaction_events
1201            .remove(&tx_digest)
1202            .expect("events must exist");
1203
1204        self.dirty
1205            .unchanged_loaded_runtime_objects
1206            .remove(&tx_digest)
1207            .expect("unchanged_loaded_runtime_objects must exist");
1208
1209        self.dirty
1210            .executed_effects_digests
1211            .remove(&tx_digest)
1212            .expect("executed effects must exist");
1213
1214        // Move dirty markers to cache
1215        for (object_key, marker_value) in markers.iter() {
1216            Self::move_version_from_dirty_to_cache(
1217                &self.dirty.markers,
1218                &self.cached.marker_cache,
1219                (epoch, object_key.id()),
1220                object_key.version(),
1221                marker_value,
1222            );
1223        }
1224
1225        for (object_id, object) in written.iter() {
1226            Self::move_version_from_dirty_to_cache(
1227                &self.dirty.objects,
1228                &self.cached.object_cache,
1229                *object_id,
1230                object.version(),
1231                &ObjectEntry::Object(object.clone()),
1232            );
1233        }
1234
1235        for ObjectKey(object_id, version) in deleted.iter() {
1236            Self::move_version_from_dirty_to_cache(
1237                &self.dirty.objects,
1238                &self.cached.object_cache,
1239                *object_id,
1240                *version,
1241                &ObjectEntry::Deleted,
1242            );
1243        }
1244
1245        for ObjectKey(object_id, version) in wrapped.iter() {
1246            Self::move_version_from_dirty_to_cache(
1247                &self.dirty.objects,
1248                &self.cached.object_cache,
1249                *object_id,
1250                *version,
1251                &ObjectEntry::Wrapped,
1252            );
1253        }
1254    }
1255
1256    // Move the oldest/least entry from the dirty queue to the cache queue.
1257    // This is called after the entry is committed to the db.
1258    fn move_version_from_dirty_to_cache<K, V>(
1259        dirty: &DashMap<K, CachedVersionMap<V>>,
1260        cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1261        key: K,
1262        version: SequenceNumber,
1263        value: &V,
1264    ) where
1265        K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1266        V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1267    {
1268        static MAX_VERSIONS: usize = 3;
1269
1270        // IMPORTANT: lock both the dirty set entry and the cache entry before modifying either.
1271        // this ensures that readers cannot see a value temporarily disappear.
1272        let dirty_entry = dirty.entry(key);
1273        let cache_entry = cache.entry(key).or_default();
1274        let mut cache_map = cache_entry.value().lock();
1275
1276        // insert into cache and drop old versions.
1277        cache_map.insert(version, value.clone());
1278        // TODO: make this automatic by giving CachedVersionMap an optional max capacity
1279        cache_map.truncate_to(MAX_VERSIONS);
1280
1281        let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1282            panic!("dirty map must exist");
1283        };
1284
1285        let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1286
1287        assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1288
1289        // if there are no versions remaining, remove the map entry
1290        if occupied_dirty_entry.get().is_empty() {
1291            occupied_dirty_entry.remove();
1292        }
1293    }
1294
1295    // Updates the latest object id cache with an entry that was read from the db.
1296    fn cache_latest_object_by_id(
1297        &self,
1298        object_id: &ObjectID,
1299        object: LatestObjectCacheEntry,
1300        ticket: Ticket,
1301    ) {
1302        trace!("caching object by id: {:?} {:?}", object_id, object);
1303        if self
1304            .object_by_id_cache
1305            .insert(object_id, object, ticket)
1306            .is_ok()
1307        {
1308            self.metrics.record_cache_write("object_by_id");
1309        } else {
1310            trace!("discarded cache write due to expired ticket");
1311            self.metrics.record_ticket_expiry();
1312        }
1313    }
1314
1315    fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1316        self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1317    }
1318
1319    fn clear_state_end_of_epoch_impl(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
1320        info!("clearing state at end of epoch");
1321
1322        // Note: there cannot be any concurrent writes to self.dirty while we are in this function,
1323        // as all transaction execution is paused.
1324        for r in self.dirty.pending_transaction_writes.iter() {
1325            let outputs = r.value();
1326            if !outputs
1327                .transaction
1328                .transaction_data()
1329                .shared_input_objects()
1330                .is_empty()
1331            {
1332                debug_fatal!("transaction must be single writer");
1333            }
1334            info!(
1335                "clearing state for transaction {:?}",
1336                outputs.transaction.digest()
1337            );
1338            for (object_id, object) in outputs.written.iter() {
1339                if object.is_package() {
1340                    info!("removing non-finalized package from cache: {:?}", object_id);
1341                    self.packages.invalidate(object_id);
1342                }
1343                self.object_by_id_cache.invalidate(object_id);
1344                self.cached.object_cache.invalidate(object_id);
1345            }
1346
1347            for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1348                self.object_by_id_cache.invalidate(object_id);
1349                self.cached.object_cache.invalidate(object_id);
1350            }
1351        }
1352
1353        self.dirty.clear();
1354
1355        info!("clearing old transaction locks");
1356        self.object_locks.clear();
1357        info!("clearing object per epoch marker table");
1358        self.store
1359            .clear_object_per_epoch_marker_table(execution_guard)
1360            .expect("db error");
1361    }
1362
1363    fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) {
1364        self.store
1365            .bulk_insert_genesis_objects(objects)
1366            .expect("db error");
1367        for obj in objects {
1368            self.cached.object_cache.invalidate(&obj.id());
1369            self.object_by_id_cache.invalidate(&obj.id());
1370        }
1371    }
1372
1373    fn insert_genesis_object_impl(&self, object: Object) {
1374        self.object_by_id_cache.invalidate(&object.id());
1375        self.cached.object_cache.invalidate(&object.id());
1376        self.store.insert_genesis_object(object).expect("db error");
1377    }
1378
1379    pub fn clear_caches_and_assert_empty(&self) {
1380        info!("clearing caches");
1381        self.cached.clear_and_assert_empty();
1382        self.object_by_id_cache.invalidate_all();
1383        assert!(&self.object_by_id_cache.is_empty());
1384        self.packages.invalidate_all();
1385        assert_empty(&self.packages);
1386    }
1387}
1388
1389impl AccountFundsRead for WritebackCache {
1390    fn get_latest_account_amount(&self, account_id: &AccumulatorObjId) -> (u128, SequenceNumber) {
1391        let mut pre_root_version =
1392            ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1393                .unwrap()
1394                .version();
1395        let mut loop_iter = 0;
1396        loop {
1397            let account_obj = ObjectCacheRead::get_object(self, account_id.inner());
1398            if let Some(account_obj) = account_obj {
1399                let (_, AccumulatorValue::U128(value)) =
1400                    account_obj.data.try_as_move().unwrap().try_into().unwrap();
1401                return (value.value, account_obj.version());
1402            }
1403            let post_root_version =
1404                ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1405                    .unwrap()
1406                    .version();
1407            if pre_root_version == post_root_version {
1408                return (0, pre_root_version);
1409            }
1410            debug!(
1411                "Root version changed from {} to {} while reading account amount, retrying",
1412                pre_root_version, post_root_version
1413            );
1414            pre_root_version = post_root_version;
1415            loop_iter += 1;
1416            if loop_iter >= 3 {
1417                debug_fatal!("Unable to get a stable version after 3 iterations");
1418            }
1419        }
1420    }
1421
1422    fn get_account_amount_at_version(
1423        &self,
1424        account_id: &AccumulatorObjId,
1425        version: SequenceNumber,
1426    ) -> u128 {
1427        let account_obj = self.find_object_lt_or_eq_version(*account_id.inner(), version);
1428        if let Some(account_obj) = account_obj {
1429            let (_, AccumulatorValue::U128(value)) =
1430                account_obj.data.try_as_move().unwrap().try_into().unwrap();
1431            value.value
1432        } else {
1433            0
1434        }
1435    }
1436}
1437
1438impl ExecutionCacheAPI for WritebackCache {}
1439
1440impl ExecutionCacheCommit for WritebackCache {
1441    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1442        self.build_db_batch(epoch, digests)
1443    }
1444
1445    fn commit_transaction_outputs(
1446        &self,
1447        epoch: EpochId,
1448        batch: Batch,
1449        digests: &[TransactionDigest],
1450    ) {
1451        WritebackCache::commit_transaction_outputs(self, epoch, batch, digests)
1452    }
1453
1454    fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) {
1455        self.store.persist_transaction(tx).expect("db error");
1456    }
1457
1458    fn approximate_pending_transaction_count(&self) -> u64 {
1459        WritebackCache::approximate_pending_transaction_count(self)
1460    }
1461}
1462
1463impl ObjectCacheRead for WritebackCache {
1464    fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1465        self.metrics
1466            .record_cache_request("package", "package_cache");
1467        if let Some(p) = self.packages.get(package_id) {
1468            if cfg!(debug_assertions) {
1469                let canonical_package = self
1470                    .dirty
1471                    .objects
1472                    .get(package_id)
1473                    .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1474                        Some(ObjectEntry::Object(object)) => Some(object),
1475                        _ => None,
1476                    })
1477                    .or_else(|| self.store.get_object(package_id));
1478
1479                if let Some(canonical_package) = canonical_package {
1480                    assert_eq!(
1481                        canonical_package.digest(),
1482                        p.object().digest(),
1483                        "Package object cache is inconsistent for package {:?}",
1484                        package_id
1485                    );
1486                }
1487            }
1488            self.metrics.record_cache_hit("package", "package_cache");
1489            return Ok(Some(p));
1490        } else {
1491            self.metrics.record_cache_miss("package", "package_cache");
1492        }
1493
1494        // We try the dirty objects cache as well before going to the database. This is necessary
1495        // because the package could be evicted from the package cache before it is committed
1496        // to the database.
1497        if let Some(p) = self.get_object_impl("package", package_id) {
1498            if p.is_package() {
1499                let p = PackageObject::new(p);
1500                tracing::trace!(
1501                    "caching package: {:?}",
1502                    p.object().compute_object_reference()
1503                );
1504                self.metrics.record_cache_write("package");
1505                self.packages.insert(*package_id, p.clone());
1506                Ok(Some(p))
1507            } else {
1508                Err(SuiErrorKind::UserInputError {
1509                    error: UserInputError::MoveObjectAsPackage {
1510                        object_id: *package_id,
1511                    },
1512                }
1513                .into())
1514            }
1515        } else {
1516            Ok(None)
1517        }
1518    }
1519
1520    fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1521        // This is a no-op because all writes go through the cache, therefore it can never
1522        // be incoherent
1523    }
1524
1525    // get_object and variants.
1526
1527    fn get_object(&self, id: &ObjectID) -> Option<Object> {
1528        self.get_object_impl("object_latest", id)
1529    }
1530
1531    fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object> {
1532        match self.get_object_by_key_cache_only(object_id, version) {
1533            CacheResult::Hit(object) => Some(object),
1534            CacheResult::NegativeHit => None,
1535            CacheResult::Miss => self
1536                .record_db_get("object_by_version")
1537                .get_object_by_key(object_id, version),
1538        }
1539    }
1540
1541    fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
1542        do_fallback_lookup(
1543            object_keys,
1544            |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1545                CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1546                CacheResult::NegativeHit => CacheResult::NegativeHit,
1547                CacheResult::Miss => CacheResult::Miss,
1548            },
1549            |remaining| {
1550                self.record_db_multi_get("object_by_version", remaining.len())
1551                    .multi_get_objects_by_key(remaining)
1552                    .expect("db error")
1553            },
1554        )
1555    }
1556
1557    fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool {
1558        match self.get_object_by_key_cache_only(object_id, version) {
1559            CacheResult::Hit(_) => true,
1560            CacheResult::NegativeHit => false,
1561            CacheResult::Miss => self
1562                .record_db_get("object_by_version")
1563                .object_exists_by_key(object_id, version)
1564                .expect("db error"),
1565        }
1566    }
1567
1568    fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool> {
1569        do_fallback_lookup(
1570            object_keys,
1571            |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1572                CacheResult::Hit(_) => CacheResult::Hit(true),
1573                CacheResult::NegativeHit => CacheResult::Hit(false),
1574                CacheResult::Miss => CacheResult::Miss,
1575            },
1576            |remaining| {
1577                self.record_db_multi_get("object_by_version", remaining.len())
1578                    .multi_object_exists_by_key(remaining)
1579                    .expect("db error")
1580            },
1581        )
1582    }
1583
1584    fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
1585        match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1586            CacheResult::Hit((version, entry)) => Some(match entry {
1587                ObjectEntry::Object(object) => object.compute_object_reference(),
1588                ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1589                ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1590            }),
1591            CacheResult::NegativeHit => None,
1592            CacheResult::Miss => self
1593                .record_db_get("latest_objref_or_tombstone")
1594                .get_latest_object_ref_or_tombstone(object_id)
1595                .expect("db error"),
1596        }
1597    }
1598
1599    fn get_latest_object_or_tombstone(
1600        &self,
1601        object_id: ObjectID,
1602    ) -> Option<(ObjectKey, ObjectOrTombstone)> {
1603        match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1604            CacheResult::Hit((version, entry)) => {
1605                let key = ObjectKey(object_id, version);
1606                Some(match entry {
1607                    ObjectEntry::Object(object) => (key, object.into()),
1608                    ObjectEntry::Deleted => (
1609                        key,
1610                        ObjectOrTombstone::Tombstone((
1611                            object_id,
1612                            version,
1613                            ObjectDigest::OBJECT_DIGEST_DELETED,
1614                        )),
1615                    ),
1616                    ObjectEntry::Wrapped => (
1617                        key,
1618                        ObjectOrTombstone::Tombstone((
1619                            object_id,
1620                            version,
1621                            ObjectDigest::OBJECT_DIGEST_WRAPPED,
1622                        )),
1623                    ),
1624                })
1625            }
1626            CacheResult::NegativeHit => None,
1627            CacheResult::Miss => self
1628                .record_db_get("latest_object_or_tombstone")
1629                .get_latest_object_or_tombstone(object_id)
1630                .expect("db error"),
1631        }
1632    }
1633
1634    fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool> {
1635        keys.iter()
1636            .map(|key| {
1637                if key.is_cancelled() {
1638                    true
1639                } else {
1640                    match key {
1641                        InputKey::VersionedObject { id, version } => {
1642                            matches!(
1643                                self.get_object_by_key_cache_only(&id.id(), *version),
1644                                CacheResult::Hit(_)
1645                            )
1646                        }
1647                        InputKey::Package { id } => self.packages.contains_key(id),
1648                    }
1649                }
1650            })
1651            .collect()
1652    }
1653
1654    #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1655    fn find_object_lt_or_eq_version(
1656        &self,
1657        object_id: ObjectID,
1658        version_bound: SequenceNumber,
1659    ) -> Option<Object> {
1660        macro_rules! check_cache_entry {
1661            ($level: expr, $objects: expr) => {
1662                self.metrics
1663                    .record_cache_request("object_lt_or_eq_version", $level);
1664                if let Some(objects) = $objects {
1665                    if let Some((_, object)) = objects
1666                        .all_versions_lt_or_eq_descending(&version_bound)
1667                        .next()
1668                    {
1669                        if let ObjectEntry::Object(object) = object {
1670                            self.metrics
1671                                .record_cache_hit("object_lt_or_eq_version", $level);
1672                            return Some(object.clone());
1673                        } else {
1674                            // if we find a tombstone, the object does not exist
1675                            self.metrics
1676                                .record_cache_negative_hit("object_lt_or_eq_version", $level);
1677                            return None;
1678                        }
1679                    } else {
1680                        self.metrics
1681                            .record_cache_miss("object_lt_or_eq_version", $level);
1682                    }
1683                }
1684            };
1685        }
1686
1687        // if we have the latest version cached, and it is within the bound, we are done
1688        self.metrics
1689            .record_cache_request("object_lt_or_eq_version", "object_by_id");
1690        let latest_cache_entry = self.object_by_id_cache.get(&object_id);
1691        if let Some(latest) = &latest_cache_entry {
1692            let latest = latest.lock();
1693            match &*latest {
1694                LatestObjectCacheEntry::Object(latest_version, object) => {
1695                    if *latest_version <= version_bound {
1696                        if let ObjectEntry::Object(object) = object {
1697                            self.metrics
1698                                .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1699                            return Some(object.clone());
1700                        } else {
1701                            // object is a tombstone, but is still within the version bound
1702                            self.metrics.record_cache_negative_hit(
1703                                "object_lt_or_eq_version",
1704                                "object_by_id",
1705                            );
1706                            return None;
1707                        }
1708                    }
1709                    // latest object is not within the version bound. fall through.
1710                }
1711                // No object by this ID exists at all
1712                LatestObjectCacheEntry::NonExistent => {
1713                    self.metrics
1714                        .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1715                    return None;
1716                }
1717            }
1718        }
1719        self.metrics
1720            .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1721
1722        Self::with_locked_cache_entries(
1723            &self.dirty.objects,
1724            &self.cached.object_cache,
1725            &object_id,
1726            |dirty_entry, cached_entry| {
1727                check_cache_entry!("committed", dirty_entry);
1728                check_cache_entry!("uncommitted", cached_entry);
1729
1730                // Much of the time, the query will be for the very latest object version, so
1731                // try that first. But we have to be careful:
1732                // 1. We must load the tombstone if it is present, because its version may exceed
1733                //    the version_bound, in which case we must do a scan.
1734                // 2. You might think we could just call `self.store.get_latest_object_or_tombstone` here.
1735                //    But we cannot, because there may be a more recent version in the dirty set, which
1736                //    we skipped over in check_cache_entry! because of the version bound. However, if we
1737                //    skipped it above, we will skip it here as well, again due to the version bound.
1738                // 3. Despite that, we really want to warm the cache here. Why? Because if the object is
1739                //    cold (not being written to), then we will very soon be able to start serving reads
1740                //    of it from the object_by_id cache, IF we can warm the cache. If we don't warm the
1741                //    the cache here, and no writes to the object occur, then we will always have to go
1742                //    to the db for the object.
1743                //
1744                // Lastly, it is important to understand the rationale for all this: If the object is
1745                // write-hot, we will serve almost all reads to it from the dirty set (or possibly the
1746                // cached set if it is only written to once every few checkpoints). If the object is
1747                // write-cold (or non-existent) and read-hot, then we will serve almost all reads to it
1748                // from the object_by_id cache check above.  Most of the apparently wasteful code here
1749                // exists only to ensure correctness in all the edge cases.
1750                let latest: Option<(SequenceNumber, ObjectEntry)> =
1751                    if let Some(dirty_set) = dirty_entry {
1752                        dirty_set
1753                            .get_highest()
1754                            .cloned()
1755                            .tap_none(|| panic!("dirty set cannot be empty"))
1756                    } else {
1757                        // TODO: we should try not to read from the db while holding the locks.
1758                        self.record_db_get("object_lt_or_eq_version_latest")
1759                            .get_latest_object_or_tombstone(object_id)
1760                            .expect("db error")
1761                            .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1762                                (version, ObjectEntry::from(obj_or_tombstone))
1763                            })
1764                    };
1765
1766                if let Some((obj_version, obj_entry)) = latest {
1767                    // we can always cache the latest object (or tombstone), even if it is not within the
1768                    // version_bound. This is done in order to warm the cache in the case where a sequence
1769                    // of transactions all read the same child object without writing to it.
1770
1771                    // Note: no need to call with_object_by_id_cache_update here, because we are holding
1772                    // the lock on the dirty cache entry, and `latest` cannot become out-of-date
1773                    // while we hold that lock.
1774                    self.cache_latest_object_by_id(
1775                        &object_id,
1776                        LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1777                        // We can get a ticket at the last second, because we are holding the lock
1778                        // on dirty, so there cannot be any concurrent writes.
1779                        self.object_by_id_cache.get_ticket_for_read(&object_id),
1780                    );
1781
1782                    if obj_version <= version_bound {
1783                        match obj_entry {
1784                            ObjectEntry::Object(object) => Some(object),
1785                            ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
1786                        }
1787                    } else {
1788                        // The latest object exceeded the bound, so now we have to do a scan
1789                        // But we already know there is no dirty entry within the bound,
1790                        // so we go to the db.
1791                        self.record_db_get("object_lt_or_eq_version_scan")
1792                            .find_object_lt_or_eq_version(object_id, version_bound)
1793                            .expect("db error")
1794                    }
1795
1796                // no object found in dirty set or db, object does not exist
1797                // When this is called from a read api (i.e. not the execution path) it is
1798                // possible that the object has been deleted and pruned. In this case,
1799                // there would be no entry at all on disk, but we may have a tombstone in the
1800                // cache
1801                } else if let Some(latest_cache_entry) = latest_cache_entry {
1802                    // If there is a latest cache entry, it had better not be a live object!
1803                    assert!(!latest_cache_entry.lock().is_alive());
1804                    None
1805                } else {
1806                    // If there is no latest cache entry, we can insert one.
1807                    let highest = cached_entry.and_then(|c| c.get_highest());
1808                    assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1809                    self.cache_object_not_found(
1810                        &object_id,
1811                        // okay to get ticket at last second - see above
1812                        self.object_by_id_cache.get_ticket_for_read(&object_id),
1813                    );
1814                    None
1815                }
1816            },
1817        )
1818    }
1819
1820    fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1821        get_sui_system_state(self)
1822    }
1823
1824    fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge> {
1825        get_bridge(self)
1826    }
1827
1828    fn get_marker_value(
1829        &self,
1830        object_key: FullObjectKey,
1831        epoch_id: EpochId,
1832    ) -> Option<MarkerValue> {
1833        match self.get_marker_value_cache_only(object_key, epoch_id) {
1834            CacheResult::Hit(marker) => Some(marker),
1835            CacheResult::NegativeHit => None,
1836            CacheResult::Miss => self
1837                .record_db_get("marker_by_version")
1838                .get_marker_value(object_key, epoch_id)
1839                .expect("db error"),
1840        }
1841    }
1842
1843    fn get_latest_marker(
1844        &self,
1845        object_id: FullObjectID,
1846        epoch_id: EpochId,
1847    ) -> Option<(SequenceNumber, MarkerValue)> {
1848        match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1849            CacheResult::Hit((v, marker)) => Some((v, marker)),
1850            CacheResult::NegativeHit => {
1851                panic!("cannot have negative hit when getting latest marker")
1852            }
1853            CacheResult::Miss => self
1854                .record_db_get("marker_latest")
1855                .get_latest_marker(object_id, epoch_id)
1856                .expect("db error"),
1857        }
1858    }
1859
1860    fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult {
1861        let cur_epoch = epoch_store.epoch();
1862        match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1863            CacheResult::Hit((_, obj)) => {
1864                let actual_objref = obj.compute_object_reference();
1865                if obj_ref != actual_objref {
1866                    Ok(ObjectLockStatus::LockedAtDifferentVersion {
1867                        locked_ref: actual_objref,
1868                    })
1869                } else {
1870                    // requested object ref is live, check if there is a lock
1871                    Ok(
1872                        match self
1873                            .object_locks
1874                            .get_transaction_lock(&obj_ref, epoch_store)?
1875                        {
1876                            Some(tx_digest) => ObjectLockStatus::LockedToTx {
1877                                locked_by_tx: LockDetailsDeprecated {
1878                                    epoch: cur_epoch,
1879                                    tx_digest,
1880                                },
1881                            },
1882                            None => ObjectLockStatus::Initialized,
1883                        },
1884                    )
1885                }
1886            }
1887            CacheResult::NegativeHit => {
1888                Err(SuiError::from(UserInputError::ObjectNotFound {
1889                    object_id: obj_ref.0,
1890                    // even though we know the requested version, we leave it as None to indicate
1891                    // that the object does not exist at any version
1892                    version: None,
1893                }))
1894            }
1895            CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1896        }
1897    }
1898
1899    fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef> {
1900        let obj = self.get_object_impl("live_objref", &object_id).ok_or(
1901            UserInputError::ObjectNotFound {
1902                object_id,
1903                version: None,
1904            },
1905        )?;
1906        Ok(obj.compute_object_reference())
1907    }
1908
1909    fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1910        do_fallback_lookup_fallible(
1911            owned_object_refs,
1912            |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1913                CacheResult::Hit((version, obj)) => {
1914                    if obj.compute_object_reference() != *obj_ref {
1915                        Err(UserInputError::ObjectVersionUnavailableForConsumption {
1916                            provided_obj_ref: *obj_ref,
1917                            current_version: version,
1918                        }
1919                        .into())
1920                    } else {
1921                        Ok(CacheResult::Hit(()))
1922                    }
1923                }
1924                CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1925                    object_id: obj_ref.0,
1926                    version: None,
1927                }
1928                .into()),
1929                CacheResult::Miss => Ok(CacheResult::Miss),
1930            },
1931            |remaining| {
1932                self.record_db_multi_get("object_is_live", remaining.len())
1933                    .check_owned_objects_are_live(remaining)?;
1934                Ok(vec![(); remaining.len()])
1935            },
1936        )?;
1937        Ok(())
1938    }
1939
1940    fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber> {
1941        self.store
1942            .perpetual_tables
1943            .get_highest_pruned_checkpoint()
1944            .expect("db error")
1945    }
1946
1947    fn notify_read_input_objects<'a>(
1948        &'a self,
1949        input_and_receiving_keys: &'a [InputKey],
1950        receiving_keys: &'a HashSet<InputKey>,
1951        epoch: EpochId,
1952    ) -> BoxFuture<'a, ()> {
1953        self.object_notify_read
1954            .read(
1955                "notify_read_input_objects",
1956                input_and_receiving_keys,
1957                move |keys| {
1958                    self.multi_input_objects_available(keys, receiving_keys, epoch)
1959                        .into_iter()
1960                        .map(|available| if available { Some(()) } else { None })
1961                        .collect::<Vec<_>>()
1962                },
1963            )
1964            .map(|_| ())
1965            .boxed()
1966    }
1967}
1968
1969impl TransactionCacheRead for WritebackCache {
1970    fn multi_get_transaction_blocks(
1971        &self,
1972        digests: &[TransactionDigest],
1973    ) -> Vec<Option<Arc<VerifiedTransaction>>> {
1974        let digests_and_tickets: Vec<_> = digests
1975            .iter()
1976            .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1977            .collect();
1978        do_fallback_lookup(
1979            &digests_and_tickets,
1980            |(digest, _)| {
1981                self.metrics
1982                    .record_cache_request("transaction_block", "uncommitted");
1983                if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1984                    self.metrics
1985                        .record_cache_hit("transaction_block", "uncommitted");
1986                    return CacheResult::Hit(Some(tx.transaction.clone()));
1987                }
1988                self.metrics
1989                    .record_cache_miss("transaction_block", "uncommitted");
1990
1991                self.metrics
1992                    .record_cache_request("transaction_block", "committed");
1993
1994                match self
1995                    .cached
1996                    .transactions
1997                    .get(digest)
1998                    .map(|l| l.lock().clone())
1999                {
2000                    Some(PointCacheItem::Some(tx)) => {
2001                        self.metrics
2002                            .record_cache_hit("transaction_block", "committed");
2003                        CacheResult::Hit(Some(tx))
2004                    }
2005                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2006                    None => {
2007                        self.metrics
2008                            .record_cache_miss("transaction_block", "committed");
2009
2010                        CacheResult::Miss
2011                    }
2012                }
2013            },
2014            |remaining| {
2015                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2016                let results: Vec<_> = self
2017                    .record_db_multi_get("transaction_block", remaining.len())
2018                    .multi_get_transaction_blocks(&remaining_digests)
2019                    .expect("db error")
2020                    .into_iter()
2021                    .map(|o| o.map(Arc::new))
2022                    .collect();
2023                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2024                    if result.is_none() {
2025                        self.cached.transactions.insert(digest, None, *ticket).ok();
2026                    }
2027                }
2028                results
2029            },
2030        )
2031    }
2032
2033    fn multi_get_executed_effects_digests(
2034        &self,
2035        digests: &[TransactionDigest],
2036    ) -> Vec<Option<TransactionEffectsDigest>> {
2037        let digests_and_tickets: Vec<_> = digests
2038            .iter()
2039            .map(|d| {
2040                (
2041                    *d,
2042                    self.cached.executed_effects_digests.get_ticket_for_read(d),
2043                )
2044            })
2045            .collect();
2046        do_fallback_lookup(
2047            &digests_and_tickets,
2048            |(digest, _)| {
2049                self.metrics
2050                    .record_cache_request("executed_effects_digests", "uncommitted");
2051                if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
2052                    self.metrics
2053                        .record_cache_hit("executed_effects_digests", "uncommitted");
2054                    return CacheResult::Hit(Some(*digest));
2055                }
2056                self.metrics
2057                    .record_cache_miss("executed_effects_digests", "uncommitted");
2058
2059                self.metrics
2060                    .record_cache_request("executed_effects_digests", "committed");
2061                match self
2062                    .cached
2063                    .executed_effects_digests
2064                    .get(digest)
2065                    .map(|l| *l.lock())
2066                {
2067                    Some(PointCacheItem::Some(digest)) => {
2068                        self.metrics
2069                            .record_cache_hit("executed_effects_digests", "committed");
2070                        CacheResult::Hit(Some(digest))
2071                    }
2072                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2073                    None => {
2074                        self.metrics
2075                            .record_cache_miss("executed_effects_digests", "committed");
2076                        CacheResult::Miss
2077                    }
2078                }
2079            },
2080            |remaining| {
2081                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2082                let results = self
2083                    .record_db_multi_get("executed_effects_digests", remaining.len())
2084                    .multi_get_executed_effects_digests(&remaining_digests)
2085                    .expect("db error");
2086                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2087                    if result.is_none() {
2088                        self.cached
2089                            .executed_effects_digests
2090                            .insert(digest, None, *ticket)
2091                            .ok();
2092                    }
2093                }
2094                results
2095            },
2096        )
2097    }
2098
2099    fn multi_get_effects(
2100        &self,
2101        digests: &[TransactionEffectsDigest],
2102    ) -> Vec<Option<TransactionEffects>> {
2103        let digests_and_tickets: Vec<_> = digests
2104            .iter()
2105            .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
2106            .collect();
2107        do_fallback_lookup(
2108            &digests_and_tickets,
2109            |(digest, _)| {
2110                self.metrics
2111                    .record_cache_request("transaction_effects", "uncommitted");
2112                if let Some(effects) = self.dirty.transaction_effects.get(digest) {
2113                    self.metrics
2114                        .record_cache_hit("transaction_effects", "uncommitted");
2115                    return CacheResult::Hit(Some(effects.clone()));
2116                }
2117                self.metrics
2118                    .record_cache_miss("transaction_effects", "uncommitted");
2119
2120                self.metrics
2121                    .record_cache_request("transaction_effects", "committed");
2122                match self
2123                    .cached
2124                    .transaction_effects
2125                    .get(digest)
2126                    .map(|l| l.lock().clone())
2127                {
2128                    Some(PointCacheItem::Some(effects)) => {
2129                        self.metrics
2130                            .record_cache_hit("transaction_effects", "committed");
2131                        CacheResult::Hit(Some((*effects).clone()))
2132                    }
2133                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2134                    None => {
2135                        self.metrics
2136                            .record_cache_miss("transaction_effects", "committed");
2137                        CacheResult::Miss
2138                    }
2139                }
2140            },
2141            |remaining| {
2142                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2143                let results = self
2144                    .record_db_multi_get("transaction_effects", remaining.len())
2145                    .multi_get_effects(remaining_digests.iter())
2146                    .expect("db error");
2147                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2148                    if result.is_none() {
2149                        self.cached
2150                            .transaction_effects
2151                            .insert(digest, None, *ticket)
2152                            .ok();
2153                    }
2154                }
2155                results
2156            },
2157        )
2158    }
2159
2160    fn transaction_executed_in_last_epoch(
2161        &self,
2162        digest: &TransactionDigest,
2163        current_epoch: EpochId,
2164    ) -> bool {
2165        if current_epoch == 0 {
2166            return false;
2167        }
2168        let last_epoch = current_epoch - 1;
2169        let cache_key = (last_epoch, *digest);
2170
2171        let ticket = self
2172            .cached
2173            .transaction_executed_in_last_epoch
2174            .get_ticket_for_read(&cache_key);
2175
2176        if let Some(cached) = self
2177            .cached
2178            .transaction_executed_in_last_epoch
2179            .get(&cache_key)
2180        {
2181            return cached.lock().is_some();
2182        }
2183
2184        let was_executed = self
2185            .store
2186            .perpetual_tables
2187            .was_transaction_executed_in_last_epoch(digest, current_epoch);
2188
2189        let value = if was_executed { Some(()) } else { None };
2190        self.cached
2191            .transaction_executed_in_last_epoch
2192            .insert(&cache_key, value, ticket)
2193            .ok();
2194
2195        was_executed
2196    }
2197
2198    fn notify_read_executed_effects_digests<'a>(
2199        &'a self,
2200        task_name: &'static str,
2201        digests: &'a [TransactionDigest],
2202    ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>> {
2203        self.executed_effects_digests_notify_read
2204            .read(task_name, digests, |digests| {
2205                self.multi_get_executed_effects_digests(digests)
2206            })
2207            .boxed()
2208    }
2209
2210    fn multi_get_events(
2211        &self,
2212        event_digests: &[TransactionDigest],
2213    ) -> Vec<Option<TransactionEvents>> {
2214        fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
2215            if events.data.is_empty() {
2216                None
2217            } else {
2218                Some(events)
2219            }
2220        }
2221
2222        let digests_and_tickets: Vec<_> = event_digests
2223            .iter()
2224            .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2225            .collect();
2226        do_fallback_lookup(
2227            &digests_and_tickets,
2228            |(digest, _)| {
2229                self.metrics
2230                    .record_cache_request("transaction_events", "uncommitted");
2231                if let Some(events) = self.dirty.transaction_events.get(digest).map(|e| e.clone()) {
2232                    self.metrics
2233                        .record_cache_hit("transaction_events", "uncommitted");
2234
2235                    return CacheResult::Hit(map_events(events));
2236                }
2237                self.metrics
2238                    .record_cache_miss("transaction_events", "uncommitted");
2239
2240                self.metrics
2241                    .record_cache_request("transaction_events", "committed");
2242                match self
2243                    .cached
2244                    .transaction_events
2245                    .get(digest)
2246                    .map(|l| l.lock().clone())
2247                {
2248                    Some(PointCacheItem::Some(events)) => {
2249                        self.metrics
2250                            .record_cache_hit("transaction_events", "committed");
2251                        CacheResult::Hit(map_events((*events).clone()))
2252                    }
2253                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2254                    None => {
2255                        self.metrics
2256                            .record_cache_miss("transaction_events", "committed");
2257
2258                        CacheResult::Miss
2259                    }
2260                }
2261            },
2262            |remaining| {
2263                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2264                let results = self
2265                    .store
2266                    .multi_get_events(&remaining_digests)
2267                    .expect("db error");
2268                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2269                    if result.is_none() {
2270                        self.cached
2271                            .transaction_events
2272                            .insert(digest, None, *ticket)
2273                            .ok();
2274                    }
2275                }
2276                results
2277            },
2278        )
2279    }
2280
2281    fn get_unchanged_loaded_runtime_objects(
2282        &self,
2283        digest: &TransactionDigest,
2284    ) -> Option<Vec<ObjectKey>> {
2285        self.dirty
2286            .unchanged_loaded_runtime_objects
2287            .get(digest)
2288            .map(|b| b.clone())
2289            .or_else(|| {
2290                self.store
2291                    .get_unchanged_loaded_runtime_objects(digest)
2292                    .expect("db error")
2293            })
2294    }
2295
2296    fn get_mysticeti_fastpath_outputs(
2297        &self,
2298        tx_digest: &TransactionDigest,
2299    ) -> Option<Arc<TransactionOutputs>> {
2300        self.dirty.fastpath_transaction_outputs.get(tx_digest)
2301    }
2302
2303    fn notify_read_fastpath_transaction_outputs<'a>(
2304        &'a self,
2305        tx_digests: &'a [TransactionDigest],
2306    ) -> BoxFuture<'a, Vec<Arc<TransactionOutputs>>> {
2307        self.fastpath_transaction_outputs_notify_read
2308            .read(
2309                "notify_read_fastpath_transaction_outputs",
2310                tx_digests,
2311                |tx_digests| {
2312                    tx_digests
2313                        .iter()
2314                        .map(|tx_digest| self.get_mysticeti_fastpath_outputs(tx_digest))
2315                        .collect()
2316                },
2317            )
2318            .boxed()
2319    }
2320
2321    fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
2322        self.dirty
2323            .pending_transaction_writes
2324            .get(digest)
2325            .map(|transaction_output| transaction_output.take_accumulator_events())
2326    }
2327}
2328
2329impl ExecutionCacheWrite for WritebackCache {
2330    fn acquire_transaction_locks(
2331        &self,
2332        epoch_store: &AuthorityPerEpochStore,
2333        owned_input_objects: &[ObjectRef],
2334        tx_digest: TransactionDigest,
2335        signed_transaction: Option<VerifiedSignedTransaction>,
2336    ) -> SuiResult {
2337        self.object_locks.acquire_transaction_locks(
2338            self,
2339            epoch_store,
2340            owned_input_objects,
2341            tx_digest,
2342            signed_transaction,
2343        )
2344    }
2345
2346    fn validate_owned_object_versions(&self, owned_input_objects: &[ObjectRef]) -> SuiResult {
2347        ObjectLocks::validate_owned_object_versions(self, owned_input_objects)
2348    }
2349
2350    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
2351        WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs);
2352    }
2353
2354    fn write_fastpath_transaction_outputs(&self, tx_outputs: Arc<TransactionOutputs>) {
2355        let tx_digest = *tx_outputs.transaction.digest();
2356        debug!(
2357            ?tx_digest,
2358            "writing mysticeti fastpath certified transaction outputs"
2359        );
2360        self.dirty
2361            .fastpath_transaction_outputs
2362            .insert(tx_digest, tx_outputs.clone());
2363        self.fastpath_transaction_outputs_notify_read
2364            .notify(&tx_digest, &tx_outputs);
2365    }
2366
2367    #[cfg(test)]
2368    fn write_object_entry_for_test(&self, object: Object) {
2369        self.write_object_entry(&object.id(), object.version(), object.into());
2370    }
2371}
2372
2373implement_passthrough_traits!(WritebackCache);
2374
2375impl GlobalStateHashStore for WritebackCache {
2376    fn get_object_ref_prior_to_key_deprecated(
2377        &self,
2378        object_id: &ObjectID,
2379        version: SequenceNumber,
2380    ) -> SuiResult<Option<ObjectRef>> {
2381        // There is probably a more efficient way to implement this, but since this is only used by
2382        // old protocol versions, it is better to do the simple thing that is obviously correct.
2383        // In this case we previous version from all sources and choose the highest
2384        let mut candidates = Vec::new();
2385
2386        let check_versions =
2387            |versions: &CachedVersionMap<ObjectEntry>| match versions.get_prior_to(&version) {
2388                Some((version, object_entry)) => match object_entry {
2389                    ObjectEntry::Object(object) => {
2390                        assert_eq!(object.version(), version);
2391                        Some(object.compute_object_reference())
2392                    }
2393                    ObjectEntry::Deleted => {
2394                        Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED))
2395                    }
2396                    ObjectEntry::Wrapped => {
2397                        Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED))
2398                    }
2399                },
2400                None => None,
2401            };
2402
2403        // first check dirty data
2404        if let Some(objects) = self.dirty.objects.get(object_id)
2405            && let Some(prior) = check_versions(&objects)
2406        {
2407            candidates.push(prior);
2408        }
2409
2410        if let Some(objects) = self.cached.object_cache.get(object_id)
2411            && let Some(prior) = check_versions(&objects.lock())
2412        {
2413            candidates.push(prior);
2414        }
2415
2416        if let Some(prior) = self
2417            .store
2418            .get_object_ref_prior_to_key_deprecated(object_id, version)?
2419        {
2420            candidates.push(prior);
2421        }
2422
2423        // sort candidates by version, and return the highest
2424        candidates.sort_by_key(|(_, version, _)| *version);
2425        Ok(candidates.pop())
2426    }
2427
2428    fn get_root_state_hash_for_epoch(
2429        &self,
2430        epoch: EpochId,
2431    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
2432        self.store.get_root_state_hash_for_epoch(epoch)
2433    }
2434
2435    fn get_root_state_hash_for_highest_epoch(
2436        &self,
2437    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
2438        self.store.get_root_state_hash_for_highest_epoch()
2439    }
2440
2441    fn insert_state_hash_for_epoch(
2442        &self,
2443        epoch: EpochId,
2444        checkpoint_seq_num: &CheckpointSequenceNumber,
2445        acc: &GlobalStateHash,
2446    ) -> SuiResult {
2447        self.store
2448            .insert_state_hash_for_epoch(epoch, checkpoint_seq_num, acc)
2449    }
2450
2451    fn iter_live_object_set(
2452        &self,
2453        include_wrapped_tombstone: bool,
2454    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2455        // The only time it is safe to iterate the live object set is at an epoch boundary,
2456        // at which point the db is consistent and the dirty cache is empty. So this does
2457        // read the cache
2458        assert!(
2459            self.dirty.is_empty(),
2460            "cannot iterate live object set with dirty data"
2461        );
2462        self.store.iter_live_object_set(include_wrapped_tombstone)
2463    }
2464
2465    // A version of iter_live_object_set that reads the cache. Only use for testing. If used
2466    // on a live validator, can cause the server to block for as long as it takes to iterate
2467    // the entire live object set.
2468    fn iter_cached_live_object_set_for_testing(
2469        &self,
2470        include_wrapped_tombstone: bool,
2471    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2472        // hold iter until we are finished to prevent any concurrent inserts/deletes
2473        let iter = self.dirty.objects.iter();
2474        let mut dirty_objects = BTreeMap::new();
2475
2476        // add everything from the store
2477        for obj in self.store.iter_live_object_set(include_wrapped_tombstone) {
2478            dirty_objects.insert(obj.object_id(), obj);
2479        }
2480
2481        // add everything from the cache, but also remove deletions
2482        for entry in iter {
2483            let id = *entry.key();
2484            let value = entry.value();
2485            match value.get_highest().unwrap() {
2486                (_, ObjectEntry::Object(object)) => {
2487                    dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2488                }
2489                (version, ObjectEntry::Wrapped) => {
2490                    if include_wrapped_tombstone {
2491                        dirty_objects.insert(id, LiveObject::Wrapped(ObjectKey(id, *version)));
2492                    } else {
2493                        dirty_objects.remove(&id);
2494                    }
2495                }
2496                (_, ObjectEntry::Deleted) => {
2497                    dirty_objects.remove(&id);
2498                }
2499            }
2500        }
2501
2502        Box::new(dirty_objects.into_values())
2503    }
2504}
2505
2506// TODO: For correctness, we must at least invalidate the cache when items are written through this
2507// trait (since they could be negatively cached as absent). But it may or may not be optimal to
2508// actually insert them into the cache. For instance if state sync is running ahead of execution,
2509// they might evict other items that are about to be read. This could be an area for tuning in the
2510// future.
2511impl StateSyncAPI for WritebackCache {
2512    fn insert_transaction_and_effects(
2513        &self,
2514        transaction: &VerifiedTransaction,
2515        transaction_effects: &TransactionEffects,
2516    ) {
2517        self.store
2518            .insert_transaction_and_effects(transaction, transaction_effects)
2519            .expect("db error");
2520        self.cached
2521            .transactions
2522            .insert(
2523                transaction.digest(),
2524                PointCacheItem::Some(Arc::new(transaction.clone())),
2525                Ticket::Write,
2526            )
2527            .ok();
2528        self.cached
2529            .transaction_effects
2530            .insert(
2531                &transaction_effects.digest(),
2532                PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2533                Ticket::Write,
2534            )
2535            .ok();
2536    }
2537
2538    fn multi_insert_transaction_and_effects(
2539        &self,
2540        transactions_and_effects: &[VerifiedExecutionData],
2541    ) {
2542        self.store
2543            .multi_insert_transaction_and_effects(transactions_and_effects.iter())
2544            .expect("db error");
2545        for VerifiedExecutionData {
2546            transaction,
2547            effects,
2548        } in transactions_and_effects
2549        {
2550            self.cached
2551                .transactions
2552                .insert(
2553                    transaction.digest(),
2554                    PointCacheItem::Some(Arc::new(transaction.clone())),
2555                    Ticket::Write,
2556                )
2557                .ok();
2558            self.cached
2559                .transaction_effects
2560                .insert(
2561                    &effects.digest(),
2562                    PointCacheItem::Some(Arc::new(effects.clone())),
2563                    Ticket::Write,
2564                )
2565                .ok();
2566        }
2567    }
2568}