1use crate::authority::AuthorityStore;
41use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
42use crate::authority::authority_store::{
43    ExecutionLockWriteGuard, LockDetailsDeprecated, ObjectLockStatus, SuiLockResult,
44};
45use crate::authority::authority_store_tables::LiveObject;
46use crate::authority::backpressure::BackpressureManager;
47use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
48use crate::fallback_fetch::{do_fallback_lookup, do_fallback_lookup_fallible};
49use crate::global_state_hasher::GlobalStateHashStore;
50use crate::transaction_outputs::TransactionOutputs;
51
52use dashmap::DashMap;
53use dashmap::mapref::entry::Entry as DashMapEntry;
54use futures::{FutureExt, future::BoxFuture};
55use moka::sync::SegmentedCache as MokaCache;
56use mysten_common::debug_fatal;
57use mysten_common::random_util::randomize_cache_capacity_in_tests;
58use mysten_common::sync::notify_read::NotifyRead;
59use parking_lot::Mutex;
60use std::collections::{BTreeMap, HashSet};
61use std::hash::Hash;
62use std::sync::Arc;
63use std::sync::atomic::AtomicU64;
64use sui_config::ExecutionCacheConfig;
65use sui_macros::fail_point;
66use sui_protocol_config::ProtocolVersion;
67use sui_types::accumulator_event::AccumulatorEvent;
68use sui_types::base_types::{
69    EpochId, FullObjectID, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData,
70};
71use sui_types::bridge::{Bridge, get_bridge};
72use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest};
73use sui_types::effects::{TransactionEffects, TransactionEvents};
74use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
75use sui_types::executable_transaction::VerifiedExecutableTransaction;
76use sui_types::global_state_hash::GlobalStateHash;
77use sui_types::message_envelope::Message;
78use sui_types::messages_checkpoint::CheckpointSequenceNumber;
79use sui_types::object::Object;
80use sui_types::storage::{
81    FullObjectKey, InputKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject,
82};
83use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
84use sui_types::transaction::{TransactionDataAPI, VerifiedSignedTransaction, VerifiedTransaction};
85use tap::TapOptional;
86use tracing::{debug, info, instrument, trace, warn};
87
88use super::ExecutionCacheAPI;
89use super::cache_types::Ticket;
90use super::{
91    Batch, CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
92    ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
93    cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache},
94    implement_passthrough_traits,
95    object_locks::ObjectLocks,
96};
97
98#[cfg(test)]
99#[path = "unit_tests/writeback_cache_tests.rs"]
100pub mod writeback_cache_tests;
101
102#[cfg(test)]
103#[path = "unit_tests/notify_read_input_objects_tests.rs"]
104mod notify_read_input_objects_tests;
105
106#[derive(Clone, PartialEq, Eq)]
107enum ObjectEntry {
108    Object(Object),
109    Deleted,
110    Wrapped,
111}
112
113impl ObjectEntry {
114    #[cfg(test)]
115    fn unwrap_object(&self) -> &Object {
116        match self {
117            ObjectEntry::Object(o) => o,
118            _ => panic!("unwrap_object called on non-Object"),
119        }
120    }
121
122    fn is_tombstone(&self) -> bool {
123        match self {
124            ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
125            ObjectEntry::Object(_) => false,
126        }
127    }
128}
129
130impl std::fmt::Debug for ObjectEntry {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        match self {
133            ObjectEntry::Object(o) => {
134                write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
135            }
136            ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
137            ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
138        }
139    }
140}
141
142impl From<Object> for ObjectEntry {
143    fn from(object: Object) -> Self {
144        ObjectEntry::Object(object)
145    }
146}
147
148impl From<ObjectOrTombstone> for ObjectEntry {
149    fn from(object: ObjectOrTombstone) -> Self {
150        match object {
151            ObjectOrTombstone::Object(o) => o.into(),
152            ObjectOrTombstone::Tombstone(obj_ref) => {
153                if obj_ref.2.is_deleted() {
154                    ObjectEntry::Deleted
155                } else if obj_ref.2.is_wrapped() {
156                    ObjectEntry::Wrapped
157                } else {
158                    panic!("tombstone digest must either be deleted or wrapped");
159                }
160            }
161        }
162    }
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166enum LatestObjectCacheEntry {
167    Object(SequenceNumber, ObjectEntry),
168    NonExistent,
169}
170
171impl LatestObjectCacheEntry {
172    #[cfg(test)]
173    fn version(&self) -> Option<SequenceNumber> {
174        match self {
175            LatestObjectCacheEntry::Object(version, _) => Some(*version),
176            LatestObjectCacheEntry::NonExistent => None,
177        }
178    }
179
180    fn is_alive(&self) -> bool {
181        match self {
182            LatestObjectCacheEntry::Object(_, entry) => !entry.is_tombstone(),
183            LatestObjectCacheEntry::NonExistent => false,
184        }
185    }
186}
187
188impl IsNewer for LatestObjectCacheEntry {
189    fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
190        match (self, other) {
191            (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
192                v1 > v2
193            }
194            (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
195            _ => false,
196        }
197    }
198}
199
200type MarkerKey = (EpochId, FullObjectID);
201
202struct UncommittedData {
205    objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
218
219    markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
224
225    transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
226
227    transaction_events: DashMap<TransactionDigest, TransactionEvents>,
228
229    unchanged_loaded_runtime_objects: DashMap<TransactionDigest, Vec<ObjectKey>>,
230
231    executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
232
233    pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
236
237    fastpath_transaction_outputs: MokaCache<TransactionDigest, Arc<TransactionOutputs>>,
250
251    total_transaction_inserts: AtomicU64,
252    total_transaction_commits: AtomicU64,
253}
254
255impl UncommittedData {
256    fn new(config: &ExecutionCacheConfig) -> Self {
257        Self {
258            objects: DashMap::with_shard_amount(2048),
259            markers: DashMap::with_shard_amount(2048),
260            transaction_effects: DashMap::with_shard_amount(2048),
261            executed_effects_digests: DashMap::with_shard_amount(2048),
262            pending_transaction_writes: DashMap::with_shard_amount(2048),
263            fastpath_transaction_outputs: MokaCache::builder(8)
264                .max_capacity(randomize_cache_capacity_in_tests(
265                    config.fastpath_transaction_outputs_cache_size(),
266                ))
267                .build(),
268            transaction_events: DashMap::with_shard_amount(2048),
269            unchanged_loaded_runtime_objects: DashMap::with_shard_amount(2048),
270            total_transaction_inserts: AtomicU64::new(0),
271            total_transaction_commits: AtomicU64::new(0),
272        }
273    }
274
275    fn clear(&self) {
276        self.objects.clear();
277        self.markers.clear();
278        self.transaction_effects.clear();
279        self.executed_effects_digests.clear();
280        self.pending_transaction_writes.clear();
281        self.fastpath_transaction_outputs.invalidate_all();
282        self.transaction_events.clear();
283        self.unchanged_loaded_runtime_objects.clear();
284        self.total_transaction_inserts
285            .store(0, std::sync::atomic::Ordering::Relaxed);
286        self.total_transaction_commits
287            .store(0, std::sync::atomic::Ordering::Relaxed);
288    }
289
290    fn is_empty(&self) -> bool {
291        let empty = self.pending_transaction_writes.is_empty();
292        if empty && cfg!(debug_assertions) {
293            assert!(
294                self.objects.is_empty()
295                    && self.markers.is_empty()
296                    && self.transaction_effects.is_empty()
297                    && self.executed_effects_digests.is_empty()
298                    && self.transaction_events.is_empty()
299                    && self.unchanged_loaded_runtime_objects.is_empty()
300                    && self
301                        .total_transaction_inserts
302                        .load(std::sync::atomic::Ordering::Relaxed)
303                        == self
304                            .total_transaction_commits
305                            .load(std::sync::atomic::Ordering::Relaxed),
306            );
307        }
308        empty
309    }
310}
311
312type PointCacheItem<T> = Option<T>;
314
315impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
318    fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
319        match (self, other) {
320            (Some(_), None) => true,
321
322            (Some(a), Some(b)) => {
323                debug_assert_eq!(a, b);
325                false
326            }
327
328            _ => false,
329        }
330    }
331}
332
333struct CachedCommittedData {
335    object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
337
338    marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
340
341    transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
342
343    transaction_effects:
344        MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
345
346    transaction_events: MonotonicCache<TransactionDigest, PointCacheItem<Arc<TransactionEvents>>>,
347
348    executed_effects_digests:
349        MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
350
351    _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
354}
355
356impl CachedCommittedData {
357    fn new(config: &ExecutionCacheConfig) -> Self {
358        let object_cache = MokaCache::builder(8)
359            .max_capacity(randomize_cache_capacity_in_tests(
360                config.object_cache_size(),
361            ))
362            .build();
363        let marker_cache = MokaCache::builder(8)
364            .max_capacity(randomize_cache_capacity_in_tests(
365                config.marker_cache_size(),
366            ))
367            .build();
368
369        let transactions = MonotonicCache::new(randomize_cache_capacity_in_tests(
370            config.transaction_cache_size(),
371        ));
372        let transaction_effects = MonotonicCache::new(randomize_cache_capacity_in_tests(
373            config.effect_cache_size(),
374        ));
375        let transaction_events = MonotonicCache::new(randomize_cache_capacity_in_tests(
376            config.events_cache_size(),
377        ));
378        let executed_effects_digests = MonotonicCache::new(randomize_cache_capacity_in_tests(
379            config.executed_effect_cache_size(),
380        ));
381
382        let transaction_objects = MokaCache::builder(8)
383            .max_capacity(randomize_cache_capacity_in_tests(
384                config.transaction_objects_cache_size(),
385            ))
386            .build();
387
388        Self {
389            object_cache,
390            marker_cache,
391            transactions,
392            transaction_effects,
393            transaction_events,
394            executed_effects_digests,
395            _transaction_objects: transaction_objects,
396        }
397    }
398
399    fn clear_and_assert_empty(&self) {
400        self.object_cache.invalidate_all();
401        self.marker_cache.invalidate_all();
402        self.transactions.invalidate_all();
403        self.transaction_effects.invalidate_all();
404        self.transaction_events.invalidate_all();
405        self.executed_effects_digests.invalidate_all();
406        self._transaction_objects.invalidate_all();
407
408        assert_empty(&self.object_cache);
409        assert_empty(&self.marker_cache);
410        assert!(self.transactions.is_empty());
411        assert!(self.transaction_effects.is_empty());
412        assert!(self.transaction_events.is_empty());
413        assert!(self.executed_effects_digests.is_empty());
414        assert_empty(&self._transaction_objects);
415    }
416}
417
418fn assert_empty<K, V>(cache: &MokaCache<K, V>)
419where
420    K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
421    V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
422{
423    if cache.iter().next().is_some() {
424        panic!("cache should be empty");
425    }
426}
427
428pub struct WritebackCache {
429    dirty: UncommittedData,
430    cached: CachedCommittedData,
431
432    object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
439
440    packages: MokaCache<ObjectID, PackageObject>,
451
452    object_locks: ObjectLocks,
453
454    executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
455    object_notify_read: NotifyRead<InputKey, ()>,
456    fastpath_transaction_outputs_notify_read:
457        NotifyRead<TransactionDigest, Arc<TransactionOutputs>>,
458
459    store: Arc<AuthorityStore>,
460    backpressure_threshold: u64,
461    backpressure_manager: Arc<BackpressureManager>,
462    metrics: Arc<ExecutionCacheMetrics>,
463}
464
465macro_rules! check_cache_entry_by_version {
466    ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
467        $self.metrics.record_cache_request($table, $level);
468        if let Some(cache) = $cache {
469            if let Some(entry) = cache.get(&$version) {
470                $self.metrics.record_cache_hit($table, $level);
471                return CacheResult::Hit(entry.clone());
472            }
473
474            if let Some(least_version) = cache.get_least() {
475                if least_version.0 < $version {
476                    $self.metrics.record_cache_negative_hit($table, $level);
479                    return CacheResult::NegativeHit;
480                }
481            }
482        }
483        $self.metrics.record_cache_miss($table, $level);
484    };
485}
486
487macro_rules! check_cache_entry_by_latest {
488    ($self: ident, $table: expr, $level: expr, $cache: expr) => {
489        $self.metrics.record_cache_request($table, $level);
490        if let Some(cache) = $cache {
491            if let Some((version, entry)) = cache.get_highest() {
492                $self.metrics.record_cache_hit($table, $level);
493                return CacheResult::Hit((*version, entry.clone()));
494            } else {
495                panic!("empty CachedVersionMap should have been removed");
496            }
497        }
498        $self.metrics.record_cache_miss($table, $level);
499    };
500}
501
502impl WritebackCache {
503    pub fn new(
504        config: &ExecutionCacheConfig,
505        store: Arc<AuthorityStore>,
506        metrics: Arc<ExecutionCacheMetrics>,
507        backpressure_manager: Arc<BackpressureManager>,
508    ) -> Self {
509        let packages = MokaCache::builder(8)
510            .max_capacity(randomize_cache_capacity_in_tests(
511                config.package_cache_size(),
512            ))
513            .build();
514        Self {
515            dirty: UncommittedData::new(config),
516            cached: CachedCommittedData::new(config),
517            object_by_id_cache: MonotonicCache::new(randomize_cache_capacity_in_tests(
518                config.object_by_id_cache_size(),
519            )),
520            packages,
521            object_locks: ObjectLocks::new(),
522            executed_effects_digests_notify_read: NotifyRead::new(),
523            object_notify_read: NotifyRead::new(),
524            fastpath_transaction_outputs_notify_read: NotifyRead::new(),
525            store,
526            backpressure_manager,
527            backpressure_threshold: config.backpressure_threshold(),
528            metrics,
529        }
530    }
531
532    pub fn new_for_tests(store: Arc<AuthorityStore>) -> Self {
533        Self::new(
534            &Default::default(),
535            store,
536            ExecutionCacheMetrics::new(&prometheus::Registry::new()).into(),
537            BackpressureManager::new_for_tests(),
538        )
539    }
540
541    #[cfg(test)]
542    pub fn reset_for_test(&mut self) {
543        let mut new = Self::new(
544            &Default::default(),
545            self.store.clone(),
546            self.metrics.clone(),
547            self.backpressure_manager.clone(),
548        );
549        std::mem::swap(self, &mut new);
550    }
551
552    fn write_object_entry(
553        &self,
554        object_id: &ObjectID,
555        version: SequenceNumber,
556        object: ObjectEntry,
557    ) {
558        trace!(?object_id, ?version, ?object, "inserting object entry");
559        self.metrics.record_cache_write("object");
560
561        let mut entry = self.dirty.objects.entry(*object_id).or_default();
581
582        self.object_by_id_cache
583            .insert(
584                object_id,
585                LatestObjectCacheEntry::Object(version, object.clone()),
586                Ticket::Write,
587            )
588            .ok();
591
592        entry.insert(version, object.clone());
593
594        if let ObjectEntry::Object(object) = &object {
595            if object.is_package() {
596                self.object_notify_read
597                    .notify(&InputKey::Package { id: *object_id }, &());
598            } else if !object.is_child_object() {
599                self.object_notify_read.notify(
600                    &InputKey::VersionedObject {
601                        id: object.full_id(),
602                        version: object.version(),
603                    },
604                    &(),
605                );
606            }
607        }
608    }
609
610    fn write_marker_value(
611        &self,
612        epoch_id: EpochId,
613        object_key: FullObjectKey,
614        marker_value: MarkerValue,
615    ) {
616        tracing::trace!("inserting marker value {object_key:?}: {marker_value:?}",);
617        self.metrics.record_cache_write("marker");
618        self.dirty
619            .markers
620            .entry((epoch_id, object_key.id()))
621            .or_default()
622            .value_mut()
623            .insert(object_key.version(), marker_value);
624        if matches!(marker_value, MarkerValue::ConsensusStreamEnded(_)) {
629            self.object_notify_read.notify(
630                &InputKey::VersionedObject {
631                    id: object_key.id(),
632                    version: object_key.version(),
633                },
634                &(),
635            );
636        }
637    }
638
639    fn with_locked_cache_entries<K, V, R>(
643        dirty_map: &DashMap<K, CachedVersionMap<V>>,
644        cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
645        key: &K,
646        cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
647    ) -> R
648    where
649        K: Copy + Eq + Hash + Send + Sync + 'static,
650        V: Send + Sync + 'static,
651    {
652        let dirty_entry = dirty_map.entry(*key);
653        let dirty_entry = match &dirty_entry {
654            DashMapEntry::Occupied(occupied) => Some(occupied.get()),
655            DashMapEntry::Vacant(_) => None,
656        };
657
658        let cached_entry = cached_map.get(key);
659        let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
660        let cached_entry = cached_lock.as_deref();
661
662        cb(dirty_entry, cached_entry)
663    }
664
665    fn get_object_entry_by_key_cache_only(
668        &self,
669        object_id: &ObjectID,
670        version: SequenceNumber,
671    ) -> CacheResult<ObjectEntry> {
672        Self::with_locked_cache_entries(
673            &self.dirty.objects,
674            &self.cached.object_cache,
675            object_id,
676            |dirty_entry, cached_entry| {
677                check_cache_entry_by_version!(
678                    self,
679                    "object_by_version",
680                    "uncommitted",
681                    dirty_entry,
682                    version
683                );
684                check_cache_entry_by_version!(
685                    self,
686                    "object_by_version",
687                    "committed",
688                    cached_entry,
689                    version
690                );
691                CacheResult::Miss
692            },
693        )
694    }
695
696    fn get_object_by_key_cache_only(
697        &self,
698        object_id: &ObjectID,
699        version: SequenceNumber,
700    ) -> CacheResult<Object> {
701        match self.get_object_entry_by_key_cache_only(object_id, version) {
702            CacheResult::Hit(entry) => match entry {
703                ObjectEntry::Object(object) => CacheResult::Hit(object),
704                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
705            },
706            CacheResult::Miss => CacheResult::Miss,
707            CacheResult::NegativeHit => CacheResult::NegativeHit,
708        }
709    }
710
711    fn get_object_entry_by_id_cache_only(
712        &self,
713        request_type: &'static str,
714        object_id: &ObjectID,
715    ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
716        self.metrics
717            .record_cache_request(request_type, "object_by_id");
718        let entry = self.object_by_id_cache.get(object_id);
719
720        if cfg!(debug_assertions)
721            && let Some(entry) = &entry
722        {
723            let highest: Option<ObjectEntry> = self
725                .dirty
726                .objects
727                .get(object_id)
728                .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
729                .or_else(|| {
730                    let obj: Option<ObjectEntry> = self
731                        .store
732                        .get_latest_object_or_tombstone(*object_id)
733                        .unwrap()
734                        .map(|(_, o)| o.into());
735                    obj
736                });
737
738            let cache_entry = match &*entry.lock() {
739                LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
740                LatestObjectCacheEntry::NonExistent => None,
741            };
742
743            let tombstone_possibly_pruned = highest.is_none()
745                && cache_entry
746                    .as_ref()
747                    .map(|e| e.is_tombstone())
748                    .unwrap_or(false);
749
750            if highest != cache_entry && !tombstone_possibly_pruned {
751                tracing::error!(
752                    ?highest,
753                    ?cache_entry,
754                    ?tombstone_possibly_pruned,
755                    "object_by_id cache is incoherent for {:?}",
756                    object_id
757                );
758                panic!("object_by_id cache is incoherent for {:?}", object_id);
759            }
760        }
761
762        if let Some(entry) = entry {
763            let entry = entry.lock();
764            match &*entry {
765                LatestObjectCacheEntry::Object(latest_version, latest_object) => {
766                    self.metrics.record_cache_hit(request_type, "object_by_id");
767                    return CacheResult::Hit((*latest_version, latest_object.clone()));
768                }
769                LatestObjectCacheEntry::NonExistent => {
770                    self.metrics
771                        .record_cache_negative_hit(request_type, "object_by_id");
772                    return CacheResult::NegativeHit;
773                }
774            }
775        } else {
776            self.metrics.record_cache_miss(request_type, "object_by_id");
777        }
778
779        Self::with_locked_cache_entries(
780            &self.dirty.objects,
781            &self.cached.object_cache,
782            object_id,
783            |dirty_entry, cached_entry| {
784                check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
785                check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
786                CacheResult::Miss
787            },
788        )
789    }
790
791    fn get_object_by_id_cache_only(
792        &self,
793        request_type: &'static str,
794        object_id: &ObjectID,
795    ) -> CacheResult<(SequenceNumber, Object)> {
796        match self.get_object_entry_by_id_cache_only(request_type, object_id) {
797            CacheResult::Hit((version, entry)) => match entry {
798                ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
799                ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
800            },
801            CacheResult::NegativeHit => CacheResult::NegativeHit,
802            CacheResult::Miss => CacheResult::Miss,
803        }
804    }
805
806    fn get_marker_value_cache_only(
807        &self,
808        object_key: FullObjectKey,
809        epoch_id: EpochId,
810    ) -> CacheResult<MarkerValue> {
811        Self::with_locked_cache_entries(
812            &self.dirty.markers,
813            &self.cached.marker_cache,
814            &(epoch_id, object_key.id()),
815            |dirty_entry, cached_entry| {
816                check_cache_entry_by_version!(
817                    self,
818                    "marker_by_version",
819                    "uncommitted",
820                    dirty_entry,
821                    object_key.version()
822                );
823                check_cache_entry_by_version!(
824                    self,
825                    "marker_by_version",
826                    "committed",
827                    cached_entry,
828                    object_key.version()
829                );
830                CacheResult::Miss
831            },
832        )
833    }
834
835    fn get_latest_marker_value_cache_only(
836        &self,
837        object_id: FullObjectID,
838        epoch_id: EpochId,
839    ) -> CacheResult<(SequenceNumber, MarkerValue)> {
840        Self::with_locked_cache_entries(
841            &self.dirty.markers,
842            &self.cached.marker_cache,
843            &(epoch_id, object_id),
844            |dirty_entry, cached_entry| {
845                check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
846                check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
847                CacheResult::Miss
848            },
849        )
850    }
851
852    fn get_object_impl(&self, request_type: &'static str, id: &ObjectID) -> Option<Object> {
853        let ticket = self.object_by_id_cache.get_ticket_for_read(id);
854        match self.get_object_entry_by_id_cache_only(request_type, id) {
855            CacheResult::Hit((_, entry)) => match entry {
856                ObjectEntry::Object(object) => Some(object),
857                ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
858            },
859            CacheResult::NegativeHit => None,
860            CacheResult::Miss => {
861                let obj = self
862                    .store
863                    .get_latest_object_or_tombstone(*id)
864                    .expect("db error");
865                match obj {
866                    Some((key, obj)) => {
867                        self.cache_latest_object_by_id(
868                            id,
869                            LatestObjectCacheEntry::Object(key.1, obj.clone().into()),
870                            ticket,
871                        );
872                        match obj {
873                            ObjectOrTombstone::Object(object) => Some(object),
874                            ObjectOrTombstone::Tombstone(_) => None,
875                        }
876                    }
877                    None => {
878                        self.cache_object_not_found(id, ticket);
879                        None
880                    }
881                }
882            }
883        }
884    }
885
886    fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
887        self.metrics.record_cache_request(request_type, "db");
888        &self.store
889    }
890
891    fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
892        self.metrics
893            .record_cache_multi_request(request_type, "db", count);
894        &self.store
895    }
896
897    #[instrument(level = "debug", skip_all)]
898    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
899        let tx_digest = *tx_outputs.transaction.digest();
900        trace!(?tx_digest, "writing transaction outputs to cache");
901
902        self.dirty.fastpath_transaction_outputs.remove(&tx_digest);
903
904        let TransactionOutputs {
905            transaction,
906            effects,
907            markers,
908            written,
909            deleted,
910            wrapped,
911            events,
912            unchanged_loaded_runtime_objects,
913            ..
914        } = &*tx_outputs;
915
916        for ObjectKey(id, version) in deleted.iter() {
921            self.write_object_entry(id, *version, ObjectEntry::Deleted);
922        }
923
924        for ObjectKey(id, version) in wrapped.iter() {
925            self.write_object_entry(id, *version, ObjectEntry::Wrapped);
926        }
927
928        for (object_key, marker_value) in markers.iter() {
930            self.write_marker_value(epoch_id, *object_key, *marker_value);
931        }
932
933        for (object_id, object) in written.iter() {
936            if object.is_child_object() {
937                self.write_object_entry(object_id, object.version(), object.clone().into());
938            }
939        }
940        for (object_id, object) in written.iter() {
941            if !object.is_child_object() {
942                self.write_object_entry(object_id, object.version(), object.clone().into());
943                if object.is_package() {
944                    debug!("caching package: {:?}", object.compute_object_reference());
945                    self.packages
946                        .insert(*object_id, PackageObject::new(object.clone()));
947                }
948            }
949        }
950
951        let tx_digest = *transaction.digest();
952        debug!(
953            ?tx_digest,
954            "Writing transaction output objects to cache: {:?}",
955            written
956                .values()
957                .map(|o| (o.id(), o.version()))
958                .collect::<Vec<_>>(),
959        );
960        let effects_digest = effects.digest();
961
962        self.metrics.record_cache_write("transaction_block");
963        self.dirty
964            .pending_transaction_writes
965            .insert(tx_digest, tx_outputs.clone());
966
967        self.metrics.record_cache_write("transaction_effects");
970        self.dirty
971            .transaction_effects
972            .insert(effects_digest, effects.clone());
973
974        self.metrics.record_cache_write("transaction_events");
978        self.dirty
979            .transaction_events
980            .insert(tx_digest, events.clone());
981
982        self.metrics
983            .record_cache_write("unchanged_loaded_runtime_objects");
984        self.dirty
985            .unchanged_loaded_runtime_objects
986            .insert(tx_digest, unchanged_loaded_runtime_objects.clone());
987
988        self.metrics.record_cache_write("executed_effects_digests");
989        self.dirty
990            .executed_effects_digests
991            .insert(tx_digest, effects_digest);
992
993        self.executed_effects_digests_notify_read
994            .notify(&tx_digest, &effects_digest);
995
996        self.metrics
997            .pending_notify_read
998            .set(self.executed_effects_digests_notify_read.num_pending() as i64);
999
1000        let prev = self
1001            .dirty
1002            .total_transaction_inserts
1003            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1004
1005        let pending_count = (prev + 1).saturating_sub(
1006            self.dirty
1007                .total_transaction_commits
1008                .load(std::sync::atomic::Ordering::Relaxed),
1009        );
1010
1011        self.set_backpressure(pending_count);
1012    }
1013
1014    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1015        let _metrics_guard = mysten_metrics::monitored_scope("WritebackCache::build_db_batch");
1016        let mut all_outputs = Vec::with_capacity(digests.len());
1017        for tx in digests {
1018            let Some(outputs) = self
1019                .dirty
1020                .pending_transaction_writes
1021                .get(tx)
1022                .map(|o| o.clone())
1023            else {
1024                warn!("Attempt to commit unknown transaction {:?}", tx);
1030                continue;
1031            };
1032            all_outputs.push(outputs);
1033        }
1034
1035        let batch = self
1036            .store
1037            .build_db_batch(epoch, &all_outputs)
1038            .expect("db error");
1039        (all_outputs, batch)
1040    }
1041
1042    #[instrument(level = "debug", skip_all)]
1044    fn commit_transaction_outputs(
1045        &self,
1046        epoch: EpochId,
1047        (all_outputs, db_batch): Batch,
1048        digests: &[TransactionDigest],
1049    ) {
1050        let _metrics_guard =
1051            mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs");
1052        fail_point!("writeback-cache-commit");
1053        trace!(?digests);
1054
1055        db_batch.write().expect("db error");
1059
1060        let _metrics_guard =
1061            mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs::flush");
1062        for outputs in all_outputs.iter() {
1063            let tx_digest = outputs.transaction.digest();
1064            assert!(
1065                self.dirty
1066                    .pending_transaction_writes
1067                    .remove(tx_digest)
1068                    .is_some()
1069            );
1070            self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
1071        }
1072
1073        let num_outputs = all_outputs.len() as u64;
1074        let num_commits = self
1075            .dirty
1076            .total_transaction_commits
1077            .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
1078            + num_outputs;
1079
1080        let pending_count = self
1081            .dirty
1082            .total_transaction_inserts
1083            .load(std::sync::atomic::Ordering::Relaxed)
1084            .saturating_sub(num_commits);
1085
1086        self.set_backpressure(pending_count);
1087    }
1088
1089    fn approximate_pending_transaction_count(&self) -> u64 {
1090        let num_commits = self
1091            .dirty
1092            .total_transaction_commits
1093            .load(std::sync::atomic::Ordering::Relaxed);
1094
1095        self.dirty
1096            .total_transaction_inserts
1097            .load(std::sync::atomic::Ordering::Relaxed)
1098            .saturating_sub(num_commits)
1099    }
1100
1101    fn set_backpressure(&self, pending_count: u64) {
1102        let backpressure = pending_count > self.backpressure_threshold;
1103        let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1104        if backpressure_changed {
1105            self.metrics.backpressure_toggles.inc();
1106        }
1107        self.metrics
1108            .backpressure_status
1109            .set(if backpressure { 1 } else { 0 });
1110    }
1111
1112    fn flush_transactions_from_dirty_to_cached(
1113        &self,
1114        epoch: EpochId,
1115        tx_digest: TransactionDigest,
1116        outputs: &TransactionOutputs,
1117    ) {
1118        let TransactionOutputs {
1121            transaction,
1122            effects,
1123            markers,
1124            written,
1125            deleted,
1126            wrapped,
1127            events,
1128            ..
1129        } = outputs;
1130
1131        let effects_digest = effects.digest();
1132
1133        self.cached
1136            .transactions
1137            .insert(
1138                &tx_digest,
1139                PointCacheItem::Some(transaction.clone()),
1140                Ticket::Write,
1141            )
1142            .ok();
1143        self.cached
1144            .transaction_effects
1145            .insert(
1146                &effects_digest,
1147                PointCacheItem::Some(effects.clone().into()),
1148                Ticket::Write,
1149            )
1150            .ok();
1151        self.cached
1152            .executed_effects_digests
1153            .insert(
1154                &tx_digest,
1155                PointCacheItem::Some(effects_digest),
1156                Ticket::Write,
1157            )
1158            .ok();
1159        self.cached
1160            .transaction_events
1161            .insert(
1162                &tx_digest,
1163                PointCacheItem::Some(events.clone().into()),
1164                Ticket::Write,
1165            )
1166            .ok();
1167
1168        self.dirty
1169            .transaction_effects
1170            .remove(&effects_digest)
1171            .expect("effects must exist");
1172
1173        self.dirty
1174            .transaction_events
1175            .remove(&tx_digest)
1176            .expect("events must exist");
1177
1178        self.dirty
1179            .unchanged_loaded_runtime_objects
1180            .remove(&tx_digest)
1181            .expect("unchanged_loaded_runtime_objects must exist");
1182
1183        self.dirty
1184            .executed_effects_digests
1185            .remove(&tx_digest)
1186            .expect("executed effects must exist");
1187
1188        for (object_key, marker_value) in markers.iter() {
1190            Self::move_version_from_dirty_to_cache(
1191                &self.dirty.markers,
1192                &self.cached.marker_cache,
1193                (epoch, object_key.id()),
1194                object_key.version(),
1195                marker_value,
1196            );
1197        }
1198
1199        for (object_id, object) in written.iter() {
1200            Self::move_version_from_dirty_to_cache(
1201                &self.dirty.objects,
1202                &self.cached.object_cache,
1203                *object_id,
1204                object.version(),
1205                &ObjectEntry::Object(object.clone()),
1206            );
1207        }
1208
1209        for ObjectKey(object_id, version) in deleted.iter() {
1210            Self::move_version_from_dirty_to_cache(
1211                &self.dirty.objects,
1212                &self.cached.object_cache,
1213                *object_id,
1214                *version,
1215                &ObjectEntry::Deleted,
1216            );
1217        }
1218
1219        for ObjectKey(object_id, version) in wrapped.iter() {
1220            Self::move_version_from_dirty_to_cache(
1221                &self.dirty.objects,
1222                &self.cached.object_cache,
1223                *object_id,
1224                *version,
1225                &ObjectEntry::Wrapped,
1226            );
1227        }
1228    }
1229
1230    fn move_version_from_dirty_to_cache<K, V>(
1233        dirty: &DashMap<K, CachedVersionMap<V>>,
1234        cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1235        key: K,
1236        version: SequenceNumber,
1237        value: &V,
1238    ) where
1239        K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1240        V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1241    {
1242        static MAX_VERSIONS: usize = 3;
1243
1244        let dirty_entry = dirty.entry(key);
1247        let cache_entry = cache.entry(key).or_default();
1248        let mut cache_map = cache_entry.value().lock();
1249
1250        cache_map.insert(version, value.clone());
1252        cache_map.truncate_to(MAX_VERSIONS);
1254
1255        let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1256            panic!("dirty map must exist");
1257        };
1258
1259        let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1260
1261        assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1262
1263        if occupied_dirty_entry.get().is_empty() {
1265            occupied_dirty_entry.remove();
1266        }
1267    }
1268
1269    fn cache_latest_object_by_id(
1271        &self,
1272        object_id: &ObjectID,
1273        object: LatestObjectCacheEntry,
1274        ticket: Ticket,
1275    ) {
1276        trace!("caching object by id: {:?} {:?}", object_id, object);
1277        if self
1278            .object_by_id_cache
1279            .insert(object_id, object, ticket)
1280            .is_ok()
1281        {
1282            self.metrics.record_cache_write("object_by_id");
1283        } else {
1284            trace!("discarded cache write due to expired ticket");
1285            self.metrics.record_ticket_expiry();
1286        }
1287    }
1288
1289    fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1290        self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1291    }
1292
1293    fn clear_state_end_of_epoch_impl(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
1294        info!("clearing state at end of epoch");
1295
1296        for r in self.dirty.pending_transaction_writes.iter() {
1299            let outputs = r.value();
1300            if !outputs
1301                .transaction
1302                .transaction_data()
1303                .shared_input_objects()
1304                .is_empty()
1305            {
1306                debug_fatal!("transaction must be single writer");
1307            }
1308            info!(
1309                "clearing state for transaction {:?}",
1310                outputs.transaction.digest()
1311            );
1312            for (object_id, object) in outputs.written.iter() {
1313                if object.is_package() {
1314                    info!("removing non-finalized package from cache: {:?}", object_id);
1315                    self.packages.invalidate(object_id);
1316                }
1317                self.object_by_id_cache.invalidate(object_id);
1318                self.cached.object_cache.invalidate(object_id);
1319            }
1320
1321            for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1322                self.object_by_id_cache.invalidate(object_id);
1323                self.cached.object_cache.invalidate(object_id);
1324            }
1325        }
1326
1327        self.dirty.clear();
1328
1329        info!("clearing old transaction locks");
1330        self.object_locks.clear();
1331        info!("clearing object per epoch marker table");
1332        self.store
1333            .clear_object_per_epoch_marker_table(execution_guard)
1334            .expect("db error");
1335    }
1336
1337    fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) {
1338        self.store
1339            .bulk_insert_genesis_objects(objects)
1340            .expect("db error");
1341        for obj in objects {
1342            self.cached.object_cache.invalidate(&obj.id());
1343            self.object_by_id_cache.invalidate(&obj.id());
1344        }
1345    }
1346
1347    fn insert_genesis_object_impl(&self, object: Object) {
1348        self.object_by_id_cache.invalidate(&object.id());
1349        self.cached.object_cache.invalidate(&object.id());
1350        self.store.insert_genesis_object(object).expect("db error");
1351    }
1352
1353    pub fn clear_caches_and_assert_empty(&self) {
1354        info!("clearing caches");
1355        self.cached.clear_and_assert_empty();
1356        self.object_by_id_cache.invalidate_all();
1357        assert!(&self.object_by_id_cache.is_empty());
1358        self.packages.invalidate_all();
1359        assert_empty(&self.packages);
1360    }
1361}
1362
1363impl ExecutionCacheAPI for WritebackCache {}
1364
1365impl ExecutionCacheCommit for WritebackCache {
1366    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1367        self.build_db_batch(epoch, digests)
1368    }
1369
1370    fn commit_transaction_outputs(
1371        &self,
1372        epoch: EpochId,
1373        batch: Batch,
1374        digests: &[TransactionDigest],
1375    ) {
1376        WritebackCache::commit_transaction_outputs(self, epoch, batch, digests)
1377    }
1378
1379    fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) {
1380        self.store.persist_transaction(tx).expect("db error");
1381    }
1382
1383    fn approximate_pending_transaction_count(&self) -> u64 {
1384        WritebackCache::approximate_pending_transaction_count(self)
1385    }
1386}
1387
1388impl ObjectCacheRead for WritebackCache {
1389    fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1390        self.metrics
1391            .record_cache_request("package", "package_cache");
1392        if let Some(p) = self.packages.get(package_id) {
1393            if cfg!(debug_assertions) {
1394                let canonical_package = self
1395                    .dirty
1396                    .objects
1397                    .get(package_id)
1398                    .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1399                        Some(ObjectEntry::Object(object)) => Some(object),
1400                        _ => None,
1401                    })
1402                    .or_else(|| self.store.get_object(package_id));
1403
1404                if let Some(canonical_package) = canonical_package {
1405                    assert_eq!(
1406                        canonical_package.digest(),
1407                        p.object().digest(),
1408                        "Package object cache is inconsistent for package {:?}",
1409                        package_id
1410                    );
1411                }
1412            }
1413            self.metrics.record_cache_hit("package", "package_cache");
1414            return Ok(Some(p));
1415        } else {
1416            self.metrics.record_cache_miss("package", "package_cache");
1417        }
1418
1419        if let Some(p) = self.get_object_impl("package", package_id) {
1423            if p.is_package() {
1424                let p = PackageObject::new(p);
1425                tracing::trace!(
1426                    "caching package: {:?}",
1427                    p.object().compute_object_reference()
1428                );
1429                self.metrics.record_cache_write("package");
1430                self.packages.insert(*package_id, p.clone());
1431                Ok(Some(p))
1432            } else {
1433                Err(SuiErrorKind::UserInputError {
1434                    error: UserInputError::MoveObjectAsPackage {
1435                        object_id: *package_id,
1436                    },
1437                }
1438                .into())
1439            }
1440        } else {
1441            Ok(None)
1442        }
1443    }
1444
1445    fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1446        }
1449
1450    fn get_object(&self, id: &ObjectID) -> Option<Object> {
1453        self.get_object_impl("object_latest", id)
1454    }
1455
1456    fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object> {
1457        match self.get_object_by_key_cache_only(object_id, version) {
1458            CacheResult::Hit(object) => Some(object),
1459            CacheResult::NegativeHit => None,
1460            CacheResult::Miss => self
1461                .record_db_get("object_by_version")
1462                .get_object_by_key(object_id, version),
1463        }
1464    }
1465
1466    fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
1467        do_fallback_lookup(
1468            object_keys,
1469            |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1470                CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1471                CacheResult::NegativeHit => CacheResult::NegativeHit,
1472                CacheResult::Miss => CacheResult::Miss,
1473            },
1474            |remaining| {
1475                self.record_db_multi_get("object_by_version", remaining.len())
1476                    .multi_get_objects_by_key(remaining)
1477                    .expect("db error")
1478            },
1479        )
1480    }
1481
1482    fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool {
1483        match self.get_object_by_key_cache_only(object_id, version) {
1484            CacheResult::Hit(_) => true,
1485            CacheResult::NegativeHit => false,
1486            CacheResult::Miss => self
1487                .record_db_get("object_by_version")
1488                .object_exists_by_key(object_id, version)
1489                .expect("db error"),
1490        }
1491    }
1492
1493    fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool> {
1494        do_fallback_lookup(
1495            object_keys,
1496            |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1497                CacheResult::Hit(_) => CacheResult::Hit(true),
1498                CacheResult::NegativeHit => CacheResult::Hit(false),
1499                CacheResult::Miss => CacheResult::Miss,
1500            },
1501            |remaining| {
1502                self.record_db_multi_get("object_by_version", remaining.len())
1503                    .multi_object_exists_by_key(remaining)
1504                    .expect("db error")
1505            },
1506        )
1507    }
1508
1509    fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
1510        match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1511            CacheResult::Hit((version, entry)) => Some(match entry {
1512                ObjectEntry::Object(object) => object.compute_object_reference(),
1513                ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1514                ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1515            }),
1516            CacheResult::NegativeHit => None,
1517            CacheResult::Miss => self
1518                .record_db_get("latest_objref_or_tombstone")
1519                .get_latest_object_ref_or_tombstone(object_id)
1520                .expect("db error"),
1521        }
1522    }
1523
1524    fn get_latest_object_or_tombstone(
1525        &self,
1526        object_id: ObjectID,
1527    ) -> Option<(ObjectKey, ObjectOrTombstone)> {
1528        match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1529            CacheResult::Hit((version, entry)) => {
1530                let key = ObjectKey(object_id, version);
1531                Some(match entry {
1532                    ObjectEntry::Object(object) => (key, object.into()),
1533                    ObjectEntry::Deleted => (
1534                        key,
1535                        ObjectOrTombstone::Tombstone((
1536                            object_id,
1537                            version,
1538                            ObjectDigest::OBJECT_DIGEST_DELETED,
1539                        )),
1540                    ),
1541                    ObjectEntry::Wrapped => (
1542                        key,
1543                        ObjectOrTombstone::Tombstone((
1544                            object_id,
1545                            version,
1546                            ObjectDigest::OBJECT_DIGEST_WRAPPED,
1547                        )),
1548                    ),
1549                })
1550            }
1551            CacheResult::NegativeHit => None,
1552            CacheResult::Miss => self
1553                .record_db_get("latest_object_or_tombstone")
1554                .get_latest_object_or_tombstone(object_id)
1555                .expect("db error"),
1556        }
1557    }
1558
1559    fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool> {
1560        keys.iter()
1561            .map(|key| {
1562                if key.is_cancelled() {
1563                    true
1564                } else {
1565                    match key {
1566                        InputKey::VersionedObject { id, version } => {
1567                            matches!(
1568                                self.get_object_by_key_cache_only(&id.id(), *version),
1569                                CacheResult::Hit(_)
1570                            )
1571                        }
1572                        InputKey::Package { id } => self.packages.contains_key(id),
1573                    }
1574                }
1575            })
1576            .collect()
1577    }
1578
1579    #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1580    fn find_object_lt_or_eq_version(
1581        &self,
1582        object_id: ObjectID,
1583        version_bound: SequenceNumber,
1584    ) -> Option<Object> {
1585        macro_rules! check_cache_entry {
1586            ($level: expr, $objects: expr) => {
1587                self.metrics
1588                    .record_cache_request("object_lt_or_eq_version", $level);
1589                if let Some(objects) = $objects {
1590                    if let Some((_, object)) = objects
1591                        .all_versions_lt_or_eq_descending(&version_bound)
1592                        .next()
1593                    {
1594                        if let ObjectEntry::Object(object) = object {
1595                            self.metrics
1596                                .record_cache_hit("object_lt_or_eq_version", $level);
1597                            return Some(object.clone());
1598                        } else {
1599                            self.metrics
1601                                .record_cache_negative_hit("object_lt_or_eq_version", $level);
1602                            return None;
1603                        }
1604                    } else {
1605                        self.metrics
1606                            .record_cache_miss("object_lt_or_eq_version", $level);
1607                    }
1608                }
1609            };
1610        }
1611
1612        self.metrics
1614            .record_cache_request("object_lt_or_eq_version", "object_by_id");
1615        let latest_cache_entry = self.object_by_id_cache.get(&object_id);
1616        if let Some(latest) = &latest_cache_entry {
1617            let latest = latest.lock();
1618            match &*latest {
1619                LatestObjectCacheEntry::Object(latest_version, object) => {
1620                    if *latest_version <= version_bound {
1621                        if let ObjectEntry::Object(object) = object {
1622                            self.metrics
1623                                .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1624                            return Some(object.clone());
1625                        } else {
1626                            self.metrics.record_cache_negative_hit(
1628                                "object_lt_or_eq_version",
1629                                "object_by_id",
1630                            );
1631                            return None;
1632                        }
1633                    }
1634                    }
1636                LatestObjectCacheEntry::NonExistent => {
1638                    self.metrics
1639                        .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1640                    return None;
1641                }
1642            }
1643        }
1644        self.metrics
1645            .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1646
1647        Self::with_locked_cache_entries(
1648            &self.dirty.objects,
1649            &self.cached.object_cache,
1650            &object_id,
1651            |dirty_entry, cached_entry| {
1652                check_cache_entry!("committed", dirty_entry);
1653                check_cache_entry!("uncommitted", cached_entry);
1654
1655                let latest: Option<(SequenceNumber, ObjectEntry)> =
1676                    if let Some(dirty_set) = dirty_entry {
1677                        dirty_set
1678                            .get_highest()
1679                            .cloned()
1680                            .tap_none(|| panic!("dirty set cannot be empty"))
1681                    } else {
1682                        self.record_db_get("object_lt_or_eq_version_latest")
1684                            .get_latest_object_or_tombstone(object_id)
1685                            .expect("db error")
1686                            .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1687                                (version, ObjectEntry::from(obj_or_tombstone))
1688                            })
1689                    };
1690
1691                if let Some((obj_version, obj_entry)) = latest {
1692                    self.cache_latest_object_by_id(
1700                        &object_id,
1701                        LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1702                        self.object_by_id_cache.get_ticket_for_read(&object_id),
1705                    );
1706
1707                    if obj_version <= version_bound {
1708                        match obj_entry {
1709                            ObjectEntry::Object(object) => Some(object),
1710                            ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
1711                        }
1712                    } else {
1713                        self.record_db_get("object_lt_or_eq_version_scan")
1717                            .find_object_lt_or_eq_version(object_id, version_bound)
1718                            .expect("db error")
1719                    }
1720
1721                } else if let Some(latest_cache_entry) = latest_cache_entry {
1727                    assert!(!latest_cache_entry.lock().is_alive());
1729                    None
1730                } else {
1731                    let highest = cached_entry.and_then(|c| c.get_highest());
1733                    assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1734                    self.cache_object_not_found(
1735                        &object_id,
1736                        self.object_by_id_cache.get_ticket_for_read(&object_id),
1738                    );
1739                    None
1740                }
1741            },
1742        )
1743    }
1744
1745    fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1746        get_sui_system_state(self)
1747    }
1748
1749    fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge> {
1750        get_bridge(self)
1751    }
1752
1753    fn get_marker_value(
1754        &self,
1755        object_key: FullObjectKey,
1756        epoch_id: EpochId,
1757    ) -> Option<MarkerValue> {
1758        match self.get_marker_value_cache_only(object_key, epoch_id) {
1759            CacheResult::Hit(marker) => Some(marker),
1760            CacheResult::NegativeHit => None,
1761            CacheResult::Miss => self
1762                .record_db_get("marker_by_version")
1763                .get_marker_value(object_key, epoch_id)
1764                .expect("db error"),
1765        }
1766    }
1767
1768    fn get_latest_marker(
1769        &self,
1770        object_id: FullObjectID,
1771        epoch_id: EpochId,
1772    ) -> Option<(SequenceNumber, MarkerValue)> {
1773        match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1774            CacheResult::Hit((v, marker)) => Some((v, marker)),
1775            CacheResult::NegativeHit => {
1776                panic!("cannot have negative hit when getting latest marker")
1777            }
1778            CacheResult::Miss => self
1779                .record_db_get("marker_latest")
1780                .get_latest_marker(object_id, epoch_id)
1781                .expect("db error"),
1782        }
1783    }
1784
1785    fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult {
1786        let cur_epoch = epoch_store.epoch();
1787        match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1788            CacheResult::Hit((_, obj)) => {
1789                let actual_objref = obj.compute_object_reference();
1790                if obj_ref != actual_objref {
1791                    Ok(ObjectLockStatus::LockedAtDifferentVersion {
1792                        locked_ref: actual_objref,
1793                    })
1794                } else {
1795                    Ok(
1797                        match self
1798                            .object_locks
1799                            .get_transaction_lock(&obj_ref, epoch_store)?
1800                        {
1801                            Some(tx_digest) => ObjectLockStatus::LockedToTx {
1802                                locked_by_tx: LockDetailsDeprecated {
1803                                    epoch: cur_epoch,
1804                                    tx_digest,
1805                                },
1806                            },
1807                            None => ObjectLockStatus::Initialized,
1808                        },
1809                    )
1810                }
1811            }
1812            CacheResult::NegativeHit => {
1813                Err(SuiError::from(UserInputError::ObjectNotFound {
1814                    object_id: obj_ref.0,
1815                    version: None,
1818                }))
1819            }
1820            CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1821        }
1822    }
1823
1824    fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef> {
1825        let obj = self.get_object_impl("live_objref", &object_id).ok_or(
1826            UserInputError::ObjectNotFound {
1827                object_id,
1828                version: None,
1829            },
1830        )?;
1831        Ok(obj.compute_object_reference())
1832    }
1833
1834    fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1835        do_fallback_lookup_fallible(
1836            owned_object_refs,
1837            |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1838                CacheResult::Hit((version, obj)) => {
1839                    if obj.compute_object_reference() != *obj_ref {
1840                        Err(UserInputError::ObjectVersionUnavailableForConsumption {
1841                            provided_obj_ref: *obj_ref,
1842                            current_version: version,
1843                        }
1844                        .into())
1845                    } else {
1846                        Ok(CacheResult::Hit(()))
1847                    }
1848                }
1849                CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1850                    object_id: obj_ref.0,
1851                    version: None,
1852                }
1853                .into()),
1854                CacheResult::Miss => Ok(CacheResult::Miss),
1855            },
1856            |remaining| {
1857                self.record_db_multi_get("object_is_live", remaining.len())
1858                    .check_owned_objects_are_live(remaining)?;
1859                Ok(vec![(); remaining.len()])
1860            },
1861        )?;
1862        Ok(())
1863    }
1864
1865    fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber> {
1866        self.store
1867            .perpetual_tables
1868            .get_highest_pruned_checkpoint()
1869            .expect("db error")
1870    }
1871
1872    fn notify_read_input_objects<'a>(
1873        &'a self,
1874        input_and_receiving_keys: &'a [InputKey],
1875        receiving_keys: &'a HashSet<InputKey>,
1876        epoch: EpochId,
1877    ) -> BoxFuture<'a, ()> {
1878        self.object_notify_read
1879            .read(
1880                "notify_read_input_objects",
1881                input_and_receiving_keys,
1882                move |keys| {
1883                    self.multi_input_objects_available(keys, receiving_keys, epoch)
1884                        .into_iter()
1885                        .map(|available| if available { Some(()) } else { None })
1886                        .collect::<Vec<_>>()
1887                },
1888            )
1889            .map(|_| ())
1890            .boxed()
1891    }
1892}
1893
1894impl TransactionCacheRead for WritebackCache {
1895    fn multi_get_transaction_blocks(
1896        &self,
1897        digests: &[TransactionDigest],
1898    ) -> Vec<Option<Arc<VerifiedTransaction>>> {
1899        let digests_and_tickets: Vec<_> = digests
1900            .iter()
1901            .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1902            .collect();
1903        do_fallback_lookup(
1904            &digests_and_tickets,
1905            |(digest, _)| {
1906                self.metrics
1907                    .record_cache_request("transaction_block", "uncommitted");
1908                if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1909                    self.metrics
1910                        .record_cache_hit("transaction_block", "uncommitted");
1911                    return CacheResult::Hit(Some(tx.transaction.clone()));
1912                }
1913                self.metrics
1914                    .record_cache_miss("transaction_block", "uncommitted");
1915
1916                self.metrics
1917                    .record_cache_request("transaction_block", "committed");
1918
1919                match self
1920                    .cached
1921                    .transactions
1922                    .get(digest)
1923                    .map(|l| l.lock().clone())
1924                {
1925                    Some(PointCacheItem::Some(tx)) => {
1926                        self.metrics
1927                            .record_cache_hit("transaction_block", "committed");
1928                        CacheResult::Hit(Some(tx))
1929                    }
1930                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
1931                    None => {
1932                        self.metrics
1933                            .record_cache_miss("transaction_block", "committed");
1934
1935                        CacheResult::Miss
1936                    }
1937                }
1938            },
1939            |remaining| {
1940                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1941                let results: Vec<_> = self
1942                    .record_db_multi_get("transaction_block", remaining.len())
1943                    .multi_get_transaction_blocks(&remaining_digests)
1944                    .expect("db error")
1945                    .into_iter()
1946                    .map(|o| o.map(Arc::new))
1947                    .collect();
1948                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1949                    if result.is_none() {
1950                        self.cached.transactions.insert(digest, None, *ticket).ok();
1951                    }
1952                }
1953                results
1954            },
1955        )
1956    }
1957
1958    fn multi_get_executed_effects_digests(
1959        &self,
1960        digests: &[TransactionDigest],
1961    ) -> Vec<Option<TransactionEffectsDigest>> {
1962        let digests_and_tickets: Vec<_> = digests
1963            .iter()
1964            .map(|d| {
1965                (
1966                    *d,
1967                    self.cached.executed_effects_digests.get_ticket_for_read(d),
1968                )
1969            })
1970            .collect();
1971        do_fallback_lookup(
1972            &digests_and_tickets,
1973            |(digest, _)| {
1974                self.metrics
1975                    .record_cache_request("executed_effects_digests", "uncommitted");
1976                if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
1977                    self.metrics
1978                        .record_cache_hit("executed_effects_digests", "uncommitted");
1979                    return CacheResult::Hit(Some(*digest));
1980                }
1981                self.metrics
1982                    .record_cache_miss("executed_effects_digests", "uncommitted");
1983
1984                self.metrics
1985                    .record_cache_request("executed_effects_digests", "committed");
1986                match self
1987                    .cached
1988                    .executed_effects_digests
1989                    .get(digest)
1990                    .map(|l| *l.lock())
1991                {
1992                    Some(PointCacheItem::Some(digest)) => {
1993                        self.metrics
1994                            .record_cache_hit("executed_effects_digests", "committed");
1995                        CacheResult::Hit(Some(digest))
1996                    }
1997                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
1998                    None => {
1999                        self.metrics
2000                            .record_cache_miss("executed_effects_digests", "committed");
2001                        CacheResult::Miss
2002                    }
2003                }
2004            },
2005            |remaining| {
2006                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2007                let results = self
2008                    .record_db_multi_get("executed_effects_digests", remaining.len())
2009                    .multi_get_executed_effects_digests(&remaining_digests)
2010                    .expect("db error");
2011                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2012                    if result.is_none() {
2013                        self.cached
2014                            .executed_effects_digests
2015                            .insert(digest, None, *ticket)
2016                            .ok();
2017                    }
2018                }
2019                results
2020            },
2021        )
2022    }
2023
2024    fn multi_get_effects(
2025        &self,
2026        digests: &[TransactionEffectsDigest],
2027    ) -> Vec<Option<TransactionEffects>> {
2028        let digests_and_tickets: Vec<_> = digests
2029            .iter()
2030            .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
2031            .collect();
2032        do_fallback_lookup(
2033            &digests_and_tickets,
2034            |(digest, _)| {
2035                self.metrics
2036                    .record_cache_request("transaction_effects", "uncommitted");
2037                if let Some(effects) = self.dirty.transaction_effects.get(digest) {
2038                    self.metrics
2039                        .record_cache_hit("transaction_effects", "uncommitted");
2040                    return CacheResult::Hit(Some(effects.clone()));
2041                }
2042                self.metrics
2043                    .record_cache_miss("transaction_effects", "uncommitted");
2044
2045                self.metrics
2046                    .record_cache_request("transaction_effects", "committed");
2047                match self
2048                    .cached
2049                    .transaction_effects
2050                    .get(digest)
2051                    .map(|l| l.lock().clone())
2052                {
2053                    Some(PointCacheItem::Some(effects)) => {
2054                        self.metrics
2055                            .record_cache_hit("transaction_effects", "committed");
2056                        CacheResult::Hit(Some((*effects).clone()))
2057                    }
2058                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2059                    None => {
2060                        self.metrics
2061                            .record_cache_miss("transaction_effects", "committed");
2062                        CacheResult::Miss
2063                    }
2064                }
2065            },
2066            |remaining| {
2067                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2068                let results = self
2069                    .record_db_multi_get("transaction_effects", remaining.len())
2070                    .multi_get_effects(remaining_digests.iter())
2071                    .expect("db error");
2072                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2073                    if result.is_none() {
2074                        self.cached
2075                            .transaction_effects
2076                            .insert(digest, None, *ticket)
2077                            .ok();
2078                    }
2079                }
2080                results
2081            },
2082        )
2083    }
2084
2085    fn notify_read_executed_effects_digests<'a>(
2086        &'a self,
2087        task_name: &'static str,
2088        digests: &'a [TransactionDigest],
2089    ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>> {
2090        self.executed_effects_digests_notify_read
2091            .read(task_name, digests, |digests| {
2092                self.multi_get_executed_effects_digests(digests)
2093            })
2094            .boxed()
2095    }
2096
2097    fn multi_get_events(
2098        &self,
2099        event_digests: &[TransactionDigest],
2100    ) -> Vec<Option<TransactionEvents>> {
2101        fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
2102            if events.data.is_empty() {
2103                None
2104            } else {
2105                Some(events)
2106            }
2107        }
2108
2109        let digests_and_tickets: Vec<_> = event_digests
2110            .iter()
2111            .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2112            .collect();
2113        do_fallback_lookup(
2114            &digests_and_tickets,
2115            |(digest, _)| {
2116                self.metrics
2117                    .record_cache_request("transaction_events", "uncommitted");
2118                if let Some(events) = self.dirty.transaction_events.get(digest).map(|e| e.clone()) {
2119                    self.metrics
2120                        .record_cache_hit("transaction_events", "uncommitted");
2121
2122                    return CacheResult::Hit(map_events(events));
2123                }
2124                self.metrics
2125                    .record_cache_miss("transaction_events", "uncommitted");
2126
2127                self.metrics
2128                    .record_cache_request("transaction_events", "committed");
2129                match self
2130                    .cached
2131                    .transaction_events
2132                    .get(digest)
2133                    .map(|l| l.lock().clone())
2134                {
2135                    Some(PointCacheItem::Some(events)) => {
2136                        self.metrics
2137                            .record_cache_hit("transaction_events", "committed");
2138                        CacheResult::Hit(map_events((*events).clone()))
2139                    }
2140                    Some(PointCacheItem::None) => CacheResult::NegativeHit,
2141                    None => {
2142                        self.metrics
2143                            .record_cache_miss("transaction_events", "committed");
2144
2145                        CacheResult::Miss
2146                    }
2147                }
2148            },
2149            |remaining| {
2150                let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2151                let results = self
2152                    .store
2153                    .multi_get_events(&remaining_digests)
2154                    .expect("db error");
2155                for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2156                    if result.is_none() {
2157                        self.cached
2158                            .transaction_events
2159                            .insert(digest, None, *ticket)
2160                            .ok();
2161                    }
2162                }
2163                results
2164            },
2165        )
2166    }
2167
2168    fn get_unchanged_loaded_runtime_objects(
2169        &self,
2170        digest: &TransactionDigest,
2171    ) -> Option<Vec<ObjectKey>> {
2172        self.dirty
2173            .unchanged_loaded_runtime_objects
2174            .get(digest)
2175            .map(|b| b.clone())
2176            .or_else(|| {
2177                self.store
2178                    .get_unchanged_loaded_runtime_objects(digest)
2179                    .expect("db error")
2180            })
2181    }
2182
2183    fn get_mysticeti_fastpath_outputs(
2184        &self,
2185        tx_digest: &TransactionDigest,
2186    ) -> Option<Arc<TransactionOutputs>> {
2187        self.dirty.fastpath_transaction_outputs.get(tx_digest)
2188    }
2189
2190    fn notify_read_fastpath_transaction_outputs<'a>(
2191        &'a self,
2192        tx_digests: &'a [TransactionDigest],
2193    ) -> BoxFuture<'a, Vec<Arc<TransactionOutputs>>> {
2194        self.fastpath_transaction_outputs_notify_read
2195            .read(
2196                "notify_read_fastpath_transaction_outputs",
2197                tx_digests,
2198                |tx_digests| {
2199                    tx_digests
2200                        .iter()
2201                        .map(|tx_digest| self.get_mysticeti_fastpath_outputs(tx_digest))
2202                        .collect()
2203                },
2204            )
2205            .boxed()
2206    }
2207
2208    fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
2209        self.dirty
2210            .pending_transaction_writes
2211            .get(digest)
2212            .map(|transaction_output| transaction_output.take_accumulator_events())
2213    }
2214}
2215
2216impl ExecutionCacheWrite for WritebackCache {
2217    fn acquire_transaction_locks(
2218        &self,
2219        epoch_store: &AuthorityPerEpochStore,
2220        owned_input_objects: &[ObjectRef],
2221        tx_digest: TransactionDigest,
2222        signed_transaction: Option<VerifiedSignedTransaction>,
2223    ) -> SuiResult {
2224        self.object_locks.acquire_transaction_locks(
2225            self,
2226            epoch_store,
2227            owned_input_objects,
2228            tx_digest,
2229            signed_transaction,
2230        )
2231    }
2232
2233    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
2234        WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs);
2235    }
2236
2237    fn write_fastpath_transaction_outputs(&self, tx_outputs: Arc<TransactionOutputs>) {
2238        let tx_digest = *tx_outputs.transaction.digest();
2239        debug!(
2240            ?tx_digest,
2241            "writing mysticeti fastpath certified transaction outputs"
2242        );
2243        self.dirty
2244            .fastpath_transaction_outputs
2245            .insert(tx_digest, tx_outputs.clone());
2246        self.fastpath_transaction_outputs_notify_read
2247            .notify(&tx_digest, &tx_outputs);
2248    }
2249
2250    #[cfg(test)]
2251    fn write_object_entry_for_test(&self, object: Object) {
2252        self.write_object_entry(&object.id(), object.version(), object.into());
2253    }
2254}
2255
2256implement_passthrough_traits!(WritebackCache);
2257
2258impl GlobalStateHashStore for WritebackCache {
2259    fn get_object_ref_prior_to_key_deprecated(
2260        &self,
2261        object_id: &ObjectID,
2262        version: SequenceNumber,
2263    ) -> SuiResult<Option<ObjectRef>> {
2264        let mut candidates = Vec::new();
2268
2269        let check_versions =
2270            |versions: &CachedVersionMap<ObjectEntry>| match versions.get_prior_to(&version) {
2271                Some((version, object_entry)) => match object_entry {
2272                    ObjectEntry::Object(object) => {
2273                        assert_eq!(object.version(), version);
2274                        Some(object.compute_object_reference())
2275                    }
2276                    ObjectEntry::Deleted => {
2277                        Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED))
2278                    }
2279                    ObjectEntry::Wrapped => {
2280                        Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED))
2281                    }
2282                },
2283                None => None,
2284            };
2285
2286        if let Some(objects) = self.dirty.objects.get(object_id)
2288            && let Some(prior) = check_versions(&objects)
2289        {
2290            candidates.push(prior);
2291        }
2292
2293        if let Some(objects) = self.cached.object_cache.get(object_id)
2294            && let Some(prior) = check_versions(&objects.lock())
2295        {
2296            candidates.push(prior);
2297        }
2298
2299        if let Some(prior) = self
2300            .store
2301            .get_object_ref_prior_to_key_deprecated(object_id, version)?
2302        {
2303            candidates.push(prior);
2304        }
2305
2306        candidates.sort_by_key(|(_, version, _)| *version);
2308        Ok(candidates.pop())
2309    }
2310
2311    fn get_root_state_hash_for_epoch(
2312        &self,
2313        epoch: EpochId,
2314    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
2315        self.store.get_root_state_hash_for_epoch(epoch)
2316    }
2317
2318    fn get_root_state_hash_for_highest_epoch(
2319        &self,
2320    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
2321        self.store.get_root_state_hash_for_highest_epoch()
2322    }
2323
2324    fn insert_state_hash_for_epoch(
2325        &self,
2326        epoch: EpochId,
2327        checkpoint_seq_num: &CheckpointSequenceNumber,
2328        acc: &GlobalStateHash,
2329    ) -> SuiResult {
2330        self.store
2331            .insert_state_hash_for_epoch(epoch, checkpoint_seq_num, acc)
2332    }
2333
2334    fn iter_live_object_set(
2335        &self,
2336        include_wrapped_tombstone: bool,
2337    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2338        assert!(
2342            self.dirty.is_empty(),
2343            "cannot iterate live object set with dirty data"
2344        );
2345        self.store.iter_live_object_set(include_wrapped_tombstone)
2346    }
2347
2348    fn iter_cached_live_object_set_for_testing(
2352        &self,
2353        include_wrapped_tombstone: bool,
2354    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2355        let iter = self.dirty.objects.iter();
2357        let mut dirty_objects = BTreeMap::new();
2358
2359        for obj in self.store.iter_live_object_set(include_wrapped_tombstone) {
2361            dirty_objects.insert(obj.object_id(), obj);
2362        }
2363
2364        for entry in iter {
2366            let id = *entry.key();
2367            let value = entry.value();
2368            match value.get_highest().unwrap() {
2369                (_, ObjectEntry::Object(object)) => {
2370                    dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2371                }
2372                (version, ObjectEntry::Wrapped) => {
2373                    if include_wrapped_tombstone {
2374                        dirty_objects.insert(id, LiveObject::Wrapped(ObjectKey(id, *version)));
2375                    } else {
2376                        dirty_objects.remove(&id);
2377                    }
2378                }
2379                (_, ObjectEntry::Deleted) => {
2380                    dirty_objects.remove(&id);
2381                }
2382            }
2383        }
2384
2385        Box::new(dirty_objects.into_values())
2386    }
2387}
2388
2389impl StateSyncAPI for WritebackCache {
2395    fn insert_transaction_and_effects(
2396        &self,
2397        transaction: &VerifiedTransaction,
2398        transaction_effects: &TransactionEffects,
2399    ) {
2400        self.store
2401            .insert_transaction_and_effects(transaction, transaction_effects)
2402            .expect("db error");
2403        self.cached
2404            .transactions
2405            .insert(
2406                transaction.digest(),
2407                PointCacheItem::Some(Arc::new(transaction.clone())),
2408                Ticket::Write,
2409            )
2410            .ok();
2411        self.cached
2412            .transaction_effects
2413            .insert(
2414                &transaction_effects.digest(),
2415                PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2416                Ticket::Write,
2417            )
2418            .ok();
2419    }
2420
2421    fn multi_insert_transaction_and_effects(
2422        &self,
2423        transactions_and_effects: &[VerifiedExecutionData],
2424    ) {
2425        self.store
2426            .multi_insert_transaction_and_effects(transactions_and_effects.iter())
2427            .expect("db error");
2428        for VerifiedExecutionData {
2429            transaction,
2430            effects,
2431        } in transactions_and_effects
2432        {
2433            self.cached
2434                .transactions
2435                .insert(
2436                    transaction.digest(),
2437                    PointCacheItem::Some(Arc::new(transaction.clone())),
2438                    Ticket::Write,
2439                )
2440                .ok();
2441            self.cached
2442                .transaction_effects
2443                .insert(
2444                    &effects.digest(),
2445                    PointCacheItem::Some(Arc::new(effects.clone())),
2446                    Ticket::Write,
2447                )
2448                .ok();
2449        }
2450    }
2451}