sui_core/
execution_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::AuthorityStore;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::authority::authority_store::{ExecutionLockWriteGuard, SuiLockResult};
7use crate::authority::backpressure::BackpressureManager;
8use crate::authority::epoch_start_configuration::EpochFlag;
9use crate::authority::epoch_start_configuration::EpochStartConfiguration;
10use crate::global_state_hasher::GlobalStateHashStore;
11use crate::transaction_outputs::TransactionOutputs;
12use either::Either;
13use itertools::Itertools;
14use mysten_common::fatal;
15use sui_types::accumulator_event::AccumulatorEvent;
16use sui_types::bridge::Bridge;
17
18use futures::{FutureExt, future::BoxFuture};
19use prometheus::Registry;
20use std::collections::HashSet;
21use std::path::Path;
22use std::sync::Arc;
23use sui_config::ExecutionCacheConfig;
24use sui_protocol_config::ProtocolVersion;
25use sui_types::base_types::{FullObjectID, VerifiedExecutionData};
26use sui_types::digests::{TransactionDigest, TransactionEffectsDigest};
27use sui_types::effects::{TransactionEffects, TransactionEvents};
28use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
29use sui_types::executable_transaction::VerifiedExecutableTransaction;
30use sui_types::messages_checkpoint::CheckpointSequenceNumber;
31use sui_types::object::Object;
32use sui_types::storage::{
33    BackingPackageStore, BackingStore, ChildObjectResolver, FullObjectKey, MarkerValue, ObjectKey,
34    ObjectOrTombstone, ObjectStore, PackageObject, ParentSync,
35};
36use sui_types::sui_system_state::SuiSystemState;
37use sui_types::transaction::{VerifiedSignedTransaction, VerifiedTransaction};
38use sui_types::{
39    base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber},
40    object::Owner,
41    storage::InputKey,
42};
43use tracing::instrument;
44use typed_store::rocks::DBBatch;
45
46pub(crate) mod cache_types;
47pub mod metrics;
48mod object_locks;
49pub mod writeback_cache;
50
51pub use writeback_cache::WritebackCache;
52
53use metrics::ExecutionCacheMetrics;
54
55// If you have Arc<ExecutionCache>, you cannot return a reference to it as
56// an &Arc<dyn ExecutionCacheRead> (for example), because the trait object is a fat pointer.
57// So, in order to be able to return &Arc<dyn T>, we create all the converted trait objects
58// (aka fat pointers) up front and return references to them.
59#[derive(Clone)]
60pub struct ExecutionCacheTraitPointers {
61    pub object_cache_reader: Arc<dyn ObjectCacheRead>,
62    pub transaction_cache_reader: Arc<dyn TransactionCacheRead>,
63    pub cache_writer: Arc<dyn ExecutionCacheWrite>,
64    pub backing_store: Arc<dyn BackingStore + Send + Sync>,
65    pub child_object_resolver: Arc<dyn ChildObjectResolver + Send + Sync>,
66    pub backing_package_store: Arc<dyn BackingPackageStore + Send + Sync>,
67    pub object_store: Arc<dyn ObjectStore + Send + Sync>,
68    pub reconfig_api: Arc<dyn ExecutionCacheReconfigAPI>,
69    pub global_state_hash_store: Arc<dyn GlobalStateHashStore>,
70    pub checkpoint_cache: Arc<dyn CheckpointCache>,
71    pub state_sync_store: Arc<dyn StateSyncAPI>,
72    pub cache_commit: Arc<dyn ExecutionCacheCommit>,
73    pub testing_api: Arc<dyn TestingAPI>,
74}
75
76impl ExecutionCacheTraitPointers {
77    pub fn new<T>(cache: Arc<T>) -> Self
78    where
79        T: ObjectCacheRead
80            + TransactionCacheRead
81            + ExecutionCacheWrite
82            + BackingStore
83            + BackingPackageStore
84            + ObjectStore
85            + ExecutionCacheReconfigAPI
86            + GlobalStateHashStore
87            + CheckpointCache
88            + StateSyncAPI
89            + ExecutionCacheCommit
90            + TestingAPI
91            + 'static,
92    {
93        Self {
94            object_cache_reader: cache.clone(),
95            transaction_cache_reader: cache.clone(),
96            cache_writer: cache.clone(),
97            backing_store: cache.clone(),
98            child_object_resolver: cache.clone(),
99            backing_package_store: cache.clone(),
100            object_store: cache.clone(),
101            reconfig_api: cache.clone(),
102            global_state_hash_store: cache.clone(),
103            checkpoint_cache: cache.clone(),
104            state_sync_store: cache.clone(),
105            cache_commit: cache.clone(),
106            testing_api: cache.clone(),
107        }
108    }
109}
110
111pub fn build_execution_cache(
112    cache_config: &ExecutionCacheConfig,
113    prometheus_registry: &Registry,
114    store: &Arc<AuthorityStore>,
115    backpressure_manager: Arc<BackpressureManager>,
116) -> ExecutionCacheTraitPointers {
117    let execution_cache_metrics = Arc::new(ExecutionCacheMetrics::new(prometheus_registry));
118
119    ExecutionCacheTraitPointers::new(
120        WritebackCache::new(
121            cache_config,
122            store.clone(),
123            execution_cache_metrics,
124            backpressure_manager,
125        )
126        .into(),
127    )
128}
129
130/// Should only be used for sui-tool or tests. Nodes must use build_execution_cache which
131/// uses the epoch_start_config to prevent cache impl from switching except at epoch boundaries.
132pub fn build_execution_cache_from_env(
133    prometheus_registry: &Registry,
134    store: &Arc<AuthorityStore>,
135) -> ExecutionCacheTraitPointers {
136    let execution_cache_metrics = Arc::new(ExecutionCacheMetrics::new(prometheus_registry));
137
138    ExecutionCacheTraitPointers::new(
139        WritebackCache::new(
140            &Default::default(),
141            store.clone(),
142            execution_cache_metrics,
143            BackpressureManager::new_for_tests(),
144        )
145        .into(),
146    )
147}
148
149pub type Batch = (Vec<Arc<TransactionOutputs>>, DBBatch);
150
151pub trait ExecutionCacheCommit: Send + Sync {
152    /// Build a DBBatch containing the given transaction outputs.
153    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch;
154
155    /// Durably commit the outputs of the given transactions to the database.
156    /// Will be called by CheckpointExecutor to ensure that transaction outputs are
157    /// written durably before marking a checkpoint as finalized.
158    fn commit_transaction_outputs(
159        &self,
160        epoch: EpochId,
161        batch: Batch,
162        digests: &[TransactionDigest],
163    );
164
165    /// Durably commit a transaction to the database. Used to store any transactions
166    /// that cannot be reconstructed at start-up by consensus replay. Currently the only
167    /// case of this is RandomnessStateUpdate.
168    fn persist_transaction(&self, transaction: &VerifiedExecutableTransaction);
169
170    // Number of pending uncommitted transactions
171    fn approximate_pending_transaction_count(&self) -> u64;
172}
173
174pub trait ObjectCacheRead: Send + Sync {
175    fn get_package_object(&self, id: &ObjectID) -> SuiResult<Option<PackageObject>>;
176    fn force_reload_system_packages(&self, system_package_ids: &[ObjectID]);
177
178    fn get_object(&self, id: &ObjectID) -> Option<Object>;
179
180    fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
181        let mut ret = Vec::with_capacity(objects.len());
182        for object_id in objects {
183            ret.push(self.get_object(object_id));
184        }
185        ret
186    }
187
188    fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef>;
189
190    fn get_latest_object_or_tombstone(
191        &self,
192        object_id: ObjectID,
193    ) -> Option<(ObjectKey, ObjectOrTombstone)>;
194
195    fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object>;
196
197    fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>>;
198
199    fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool;
200
201    fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool>;
202
203    /// Load a list of objects from the store by object reference.
204    /// If they exist in the store, they are returned directly.
205    /// If any object missing, we try to figure out the best error to return.
206    /// If the object we are asking is currently locked at a future version, we know this
207    /// transaction is out-of-date and we return a ObjectVersionUnavailableForConsumption,
208    /// which indicates this is not retriable.
209    /// Otherwise, we return a ObjectNotFound error, which indicates this is retriable.
210    fn multi_get_objects_with_more_accurate_error_return(
211        &self,
212        object_refs: &[ObjectRef],
213    ) -> Result<Vec<Object>, SuiError> {
214        let objects = self
215            .multi_get_objects_by_key(&object_refs.iter().map(ObjectKey::from).collect::<Vec<_>>());
216        let mut result = Vec::new();
217        for (object_opt, object_ref) in objects.into_iter().zip(object_refs) {
218            match object_opt {
219                None => {
220                    let live_objref = self._get_live_objref(object_ref.0)?;
221                    let error = if live_objref.1 >= object_ref.1 {
222                        UserInputError::ObjectVersionUnavailableForConsumption {
223                            provided_obj_ref: *object_ref,
224                            current_version: live_objref.1,
225                        }
226                    } else {
227                        UserInputError::ObjectNotFound {
228                            object_id: object_ref.0,
229                            version: Some(object_ref.1),
230                        }
231                    };
232                    return Err(SuiErrorKind::UserInputError { error }.into());
233                }
234                Some(object) => {
235                    result.push(object);
236                }
237            }
238        }
239        assert_eq!(result.len(), object_refs.len());
240        Ok(result)
241    }
242
243    /// Used by execution scheduler to determine if input objects are ready. Distinct from multi_get_object_by_key
244    /// because it also consults markers to handle the case where an object will never become available (e.g.
245    /// because it has been received by some other transaction already).
246    fn multi_input_objects_available(
247        &self,
248        keys: &[InputKey],
249        receiving_objects: &HashSet<InputKey>,
250        epoch: EpochId,
251    ) -> Vec<bool> {
252        let mut results = vec![false; keys.len()];
253        let non_canceled_keys = keys.iter().enumerate().filter(|(idx, key)| {
254            if key.is_cancelled() {
255                // Shared objects in canceled transactions are always available.
256                results[*idx] = true;
257                false
258            } else {
259                true
260            }
261        });
262        let (move_object_keys, package_object_keys): (Vec<_>, Vec<_>) = non_canceled_keys
263            .partition_map(|(idx, key)| match key {
264                InputKey::VersionedObject { id, version } => Either::Left((idx, (id, version))),
265                InputKey::Package { id } => Either::Right((idx, id)),
266            });
267
268        for ((idx, (id, version)), has_key) in move_object_keys.iter().zip(
269            self.multi_object_exists_by_key(
270                &move_object_keys
271                    .iter()
272                    .map(|(_, k)| ObjectKey(k.0.id(), *k.1))
273                    .collect::<Vec<_>>(),
274            )
275            .into_iter(),
276        ) {
277            // If the key exists at the specified version, then the object is available.
278            if has_key {
279                results[*idx] = true;
280            } else if receiving_objects.contains(&InputKey::VersionedObject {
281                id: **id,
282                version: **version,
283            }) {
284                // There could be a more recent version of this object, and the object at the
285                // specified version could have already been pruned. In such a case `has_key` will
286                // be false, but since this is a receiving object we should mark it as available if
287                // we can determine that an object with a version greater than or equal to the
288                // specified version exists or was deleted. We will then let mark it as available
289                // to let the transaction through so it can fail at execution.
290                let is_available = self
291                    .get_object(&id.id())
292                    .map(|obj| obj.version() >= **version)
293                    .unwrap_or(false)
294                    || self.fastpath_stream_ended_at_version_or_after(id.id(), **version, epoch);
295                results[*idx] = is_available;
296            } else {
297                // If the object is an already-removed consensus object, mark it as available if the
298                // version for that object is in the marker table.
299                let is_consensus_stream_ended = self
300                    .get_consensus_stream_end_tx_digest(FullObjectKey::new(**id, **version), epoch)
301                    .is_some();
302                results[*idx] = is_consensus_stream_ended;
303            }
304        }
305
306        package_object_keys.into_iter().for_each(|(idx, id)| {
307            // get_package_object() only errors when the object is not a package, so returning false on error.
308            // Error is possible when this gets called on uncertified transactions.
309            results[idx] = self.get_package_object(id).is_ok_and(|p| p.is_some());
310        });
311
312        results
313    }
314
315    fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool>;
316
317    /// Return the object with version less then or eq to the provided seq number.
318    /// This is used by indexer to find the correct version of dynamic field child object.
319    /// We do not store the version of the child object, but because of lamport timestamp,
320    /// we know the child must have version number less then or eq to the parent.
321    fn find_object_lt_or_eq_version(
322        &self,
323        object_id: ObjectID,
324        version: SequenceNumber,
325    ) -> Option<Object>;
326
327    fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult;
328
329    // This method is considered "private" - only used by multi_get_objects_with_more_accurate_error_return
330    fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef>;
331
332    // Check that the given set of objects are live at the given version. This is used as a
333    // safety check before execution, and could potentially be deleted or changed to a debug_assert
334    fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult;
335
336    fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState>;
337
338    fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge>;
339
340    // Marker methods
341
342    /// Get the marker at a specific version
343    fn get_marker_value(&self, object_key: FullObjectKey, epoch_id: EpochId)
344    -> Option<MarkerValue>;
345
346    /// Get the latest marker for a given object.
347    fn get_latest_marker(
348        &self,
349        object_id: FullObjectID,
350        epoch_id: EpochId,
351    ) -> Option<(SequenceNumber, MarkerValue)>;
352
353    /// If the given consensus object stream was ended, return related
354    /// version and transaction digest.
355    fn get_last_consensus_stream_end_info(
356        &self,
357        object_id: FullObjectID,
358        epoch_id: EpochId,
359    ) -> Option<(SequenceNumber, TransactionDigest)> {
360        match self.get_latest_marker(object_id, epoch_id) {
361            Some((version, MarkerValue::ConsensusStreamEnded(digest))) => Some((version, digest)),
362            _ => None,
363        }
364    }
365
366    /// If the given consensus object stream was ended at the specified version,
367    /// return related transaction digest.
368    fn get_consensus_stream_end_tx_digest(
369        &self,
370        object_key: FullObjectKey,
371        epoch_id: EpochId,
372    ) -> Option<TransactionDigest> {
373        match self.get_marker_value(object_key, epoch_id) {
374            Some(MarkerValue::ConsensusStreamEnded(digest)) => Some(digest),
375            _ => None,
376        }
377    }
378
379    fn have_received_object_at_version(
380        &self,
381        object_key: FullObjectKey,
382        epoch_id: EpochId,
383    ) -> bool {
384        matches!(
385            self.get_marker_value(object_key, epoch_id),
386            Some(MarkerValue::Received)
387        )
388    }
389
390    fn fastpath_stream_ended_at_version_or_after(
391        &self,
392        object_id: ObjectID,
393        version: SequenceNumber,
394        epoch_id: EpochId,
395    ) -> bool {
396        let full_id = FullObjectID::Fastpath(object_id); // function explicitly assumes "fastpath"
397        matches!(
398            self.get_latest_marker(full_id, epoch_id),
399            Some((marker_version, MarkerValue::FastpathStreamEnded)) if marker_version >= version
400        )
401    }
402
403    /// Return the watermark for the highest checkpoint for which we've pruned objects.
404    fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber>;
405
406    /// Given a list of input and receiving objects for a transaction,
407    /// wait until all of them become available, so that the transaction
408    /// can start execution.
409    /// `input_and_receiving_keys` contains both input objects and receiving
410    /// input objects, including canceled objects.
411    /// TODO: Eventually this can return the objects read results,
412    /// so that execution does not need to load them again.
413    fn notify_read_input_objects<'a>(
414        &'a self,
415        input_and_receiving_keys: &'a [InputKey],
416        receiving_keys: &'a HashSet<InputKey>,
417        epoch: EpochId,
418    ) -> BoxFuture<'a, ()>;
419}
420
421pub trait TransactionCacheRead: Send + Sync {
422    fn multi_get_transaction_blocks(
423        &self,
424        digests: &[TransactionDigest],
425    ) -> Vec<Option<Arc<VerifiedTransaction>>>;
426
427    fn get_transaction_block(
428        &self,
429        digest: &TransactionDigest,
430    ) -> Option<Arc<VerifiedTransaction>> {
431        self.multi_get_transaction_blocks(&[*digest])
432            .pop()
433            .expect("multi-get must return correct number of items")
434    }
435
436    #[instrument(level = "trace", skip_all)]
437    fn get_transactions_and_serialized_sizes(
438        &self,
439        digests: &[TransactionDigest],
440    ) -> SuiResult<Vec<Option<(VerifiedTransaction, usize)>>> {
441        let txns = self.multi_get_transaction_blocks(digests);
442        txns.into_iter()
443            .map(|txn| {
444                txn.map(|txn| {
445                    // Note: if the transaction is read from the db, we are wasting some
446                    // effort relative to reading the raw bytes from the db instead of
447                    // calling serialized_size. However, transactions should usually be
448                    // fetched from cache.
449                    match txn.serialized_size() {
450                        Ok(size) => Ok(((*txn).clone(), size)),
451                        Err(e) => Err(e),
452                    }
453                })
454                .transpose()
455            })
456            .collect::<Result<Vec<_>, _>>()
457    }
458
459    fn multi_get_executed_effects_digests(
460        &self,
461        digests: &[TransactionDigest],
462    ) -> Vec<Option<TransactionEffectsDigest>>;
463
464    fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
465        self.multi_get_executed_effects_digests(&[*digest])
466            .pop()
467            .expect("multi-get must return correct number of items")
468            .is_some()
469    }
470
471    fn multi_get_executed_effects(
472        &self,
473        digests: &[TransactionDigest],
474    ) -> Vec<Option<TransactionEffects>> {
475        let effects_digests = self.multi_get_executed_effects_digests(digests);
476        assert_eq!(effects_digests.len(), digests.len());
477
478        let mut results = vec![None; digests.len()];
479        let mut fetch_digests = Vec::with_capacity(digests.len());
480        let mut fetch_indices = Vec::with_capacity(digests.len());
481
482        for (i, digest) in effects_digests.into_iter().enumerate() {
483            if let Some(digest) = digest {
484                fetch_digests.push(digest);
485                fetch_indices.push(i);
486            }
487        }
488
489        let effects = self.multi_get_effects(&fetch_digests);
490        for (i, effects) in fetch_indices.into_iter().zip(effects.into_iter()) {
491            results[i] = effects;
492        }
493
494        results
495    }
496
497    fn get_executed_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
498        self.multi_get_executed_effects(&[*digest])
499            .pop()
500            .expect("multi-get must return correct number of items")
501    }
502
503    fn multi_get_effects(
504        &self,
505        digests: &[TransactionEffectsDigest],
506    ) -> Vec<Option<TransactionEffects>>;
507
508    fn get_effects(&self, digest: &TransactionEffectsDigest) -> Option<TransactionEffects> {
509        self.multi_get_effects(&[*digest])
510            .pop()
511            .expect("multi-get must return correct number of items")
512    }
513
514    fn multi_get_events(&self, digests: &[TransactionDigest]) -> Vec<Option<TransactionEvents>>;
515
516    fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
517        self.multi_get_events(&[*digest])
518            .pop()
519            .expect("multi-get must return correct number of items")
520    }
521
522    fn get_unchanged_loaded_runtime_objects(
523        &self,
524        digest: &TransactionDigest,
525    ) -> Option<Vec<ObjectKey>>;
526
527    fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>>;
528
529    fn notify_read_executed_effects_digests<'a>(
530        &'a self,
531        task_name: &'static str,
532        digests: &'a [TransactionDigest],
533    ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>>;
534
535    /// Wait until the effects of the given transactions are available and return them.
536    /// WARNING: If calling this on a transaction that could be reverted, you must be
537    /// sure that this function cannot be called during reconfiguration. The best way to
538    /// do this is to wrap your future in EpochStore::within_alive_epoch. Holding an
539    /// ExecutionLockReadGuard would also prevent reconfig from happening while waiting,
540    /// but this is very dangerous, as it could prevent reconfiguration from ever
541    /// occurring!
542    fn notify_read_executed_effects<'a>(
543        &'a self,
544        task_name: &'static str,
545        digests: &'a [TransactionDigest],
546    ) -> BoxFuture<'a, Vec<TransactionEffects>> {
547        async move {
548            let digests = self
549                .notify_read_executed_effects_digests(task_name, digests)
550                .await;
551            // once digests are available, effects must be present as well
552            self.multi_get_effects(&digests)
553                .into_iter()
554                .map(|e| e.unwrap_or_else(|| fatal!("digests must exist")))
555                .collect()
556        }
557        .boxed()
558    }
559
560    /// Get the execution outputs of a mysticeti fastpath certified transaction, if it exists.
561    fn get_mysticeti_fastpath_outputs(
562        &self,
563        tx_digest: &TransactionDigest,
564    ) -> Option<Arc<TransactionOutputs>>;
565
566    /// Wait until the outputs of the given transactions are available
567    /// in the temporary buffer holding mysticeti fastpath outputs.
568    fn notify_read_fastpath_transaction_outputs<'a>(
569        &'a self,
570        tx_digests: &'a [TransactionDigest],
571    ) -> BoxFuture<'a, Vec<Arc<TransactionOutputs>>>;
572}
573
574pub trait ExecutionCacheWrite: Send + Sync {
575    /// Write the output of a transaction.
576    ///
577    /// Because of the child object consistency rule (readers that observe parents must observe all
578    /// children of that parent, up to the parent's version bound), implementations of this method
579    /// must not write any top-level (address-owned or shared) objects before they have written all
580    /// of the object-owned objects (i.e. child objects) in the `objects` list.
581    ///
582    /// In the future, we may modify this method to expose finer-grained information about
583    /// parent/child relationships. (This may be especially necessary for distributed object
584    /// storage, but is unlikely to be an issue before we tackle that problem).
585    ///
586    /// This function may evict the mutable input objects (and successfully received objects) of
587    /// transaction from the cache, since they cannot be read by any other transaction.
588    ///
589    /// Any write performed by this method immediately notifies any waiter that has previously
590    /// called notify_read_objects_for_execution or notify_read_objects_for_signing for the object
591    /// in question.
592    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>);
593
594    /// Write the output of a Mysticeti fastpath certified transaction.
595    /// Such output cannot be written to the dirty cache right away because
596    /// the transaction may end up rejected by consensus later. We need to make sure
597    /// that it is not visible to any subsequent transaction until we observe it
598    /// from consensus or checkpoints.
599    fn write_fastpath_transaction_outputs(&self, tx_outputs: Arc<TransactionOutputs>);
600
601    /// Attempt to acquire object locks for all of the owned input locks.
602    fn acquire_transaction_locks(
603        &self,
604        epoch_store: &AuthorityPerEpochStore,
605        owned_input_objects: &[ObjectRef],
606        tx_digest: TransactionDigest,
607        signed_transaction: Option<VerifiedSignedTransaction>,
608    ) -> SuiResult;
609
610    /// Write an object entry directly to the cache for testing.
611    /// This allows us to write an object without constructing the entire
612    /// transaction outputs.
613    #[cfg(test)]
614    fn write_object_entry_for_test(&self, object: Object);
615}
616
617pub trait CheckpointCache: Send + Sync {
618    // TODO: In addition to the deprecated methods below, this will eventually include access
619    // to the CheckpointStore
620
621    // DEPRECATED METHODS
622    fn deprecated_get_transaction_checkpoint(
623        &self,
624        digest: &TransactionDigest,
625    ) -> Option<(EpochId, CheckpointSequenceNumber)>;
626
627    fn deprecated_multi_get_transaction_checkpoint(
628        &self,
629        digests: &[TransactionDigest],
630    ) -> Vec<Option<(EpochId, CheckpointSequenceNumber)>>;
631
632    fn deprecated_insert_finalized_transactions(
633        &self,
634        digests: &[TransactionDigest],
635        epoch: EpochId,
636        sequence: CheckpointSequenceNumber,
637    );
638}
639
640pub trait ExecutionCacheReconfigAPI: Send + Sync {
641    fn insert_genesis_object(&self, object: Object);
642    fn bulk_insert_genesis_objects(&self, objects: &[Object]);
643
644    fn set_epoch_start_configuration(&self, epoch_start_config: &EpochStartConfiguration);
645
646    fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]);
647
648    fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>);
649
650    fn expensive_check_sui_conservation(
651        &self,
652        old_epoch_store: &AuthorityPerEpochStore,
653    ) -> SuiResult;
654
655    fn checkpoint_db(&self, path: &Path) -> SuiResult;
656
657    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
658    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
659    fn maybe_reaccumulate_state_hash(
660        &self,
661        cur_epoch_store: &AuthorityPerEpochStore,
662        new_protocol_version: ProtocolVersion,
663    );
664
665    /// Reconfigure the cache itself.
666    /// TODO: this is only needed for ProxyCache to switch between cache impls. It can be removed
667    /// once WritebackCache is the sole cache impl.
668    fn reconfigure_cache<'a>(
669        &'a self,
670        epoch_start_config: &'a EpochStartConfiguration,
671    ) -> BoxFuture<'a, ()>;
672}
673
674// StateSyncAPI is for writing any data that was not the result of transaction execution,
675// but that arrived via state sync. The fact that it came via state sync implies that it
676// is certified output, and can be immediately persisted to the store.
677pub trait StateSyncAPI: Send + Sync {
678    fn insert_transaction_and_effects(
679        &self,
680        transaction: &VerifiedTransaction,
681        transaction_effects: &TransactionEffects,
682    );
683
684    fn multi_insert_transaction_and_effects(
685        &self,
686        transactions_and_effects: &[VerifiedExecutionData],
687    );
688}
689
690pub trait TestingAPI: Send + Sync {
691    fn database_for_testing(&self) -> Arc<AuthorityStore>;
692}
693
694macro_rules! implement_storage_traits {
695    ($implementor: ident) => {
696        impl ObjectStore for $implementor {
697            fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
698                ObjectCacheRead::get_object(self, object_id)
699            }
700
701            fn get_object_by_key(
702                &self,
703                object_id: &ObjectID,
704                version: sui_types::base_types::VersionNumber,
705            ) -> Option<Object> {
706                ObjectCacheRead::get_object_by_key(self, object_id, version)
707            }
708        }
709
710        impl ChildObjectResolver for $implementor {
711            fn read_child_object(
712                &self,
713                parent: &ObjectID,
714                child: &ObjectID,
715                child_version_upper_bound: SequenceNumber,
716            ) -> SuiResult<Option<Object>> {
717                let Some(child_object) =
718                    self.find_object_lt_or_eq_version(*child, child_version_upper_bound)
719                else {
720                    return Ok(None);
721                };
722
723                let parent = *parent;
724                if child_object.owner != Owner::ObjectOwner(parent.into()) {
725                    return Err(SuiErrorKind::InvalidChildObjectAccess {
726                        object: *child,
727                        given_parent: parent,
728                        actual_owner: child_object.owner.clone(),
729                    }
730                    .into());
731                }
732                Ok(Some(child_object))
733            }
734
735            fn get_object_received_at_version(
736                &self,
737                owner: &ObjectID,
738                receiving_object_id: &ObjectID,
739                receive_object_at_version: SequenceNumber,
740                epoch_id: EpochId,
741            ) -> SuiResult<Option<Object>> {
742                let Some(recv_object) = ObjectCacheRead::get_object_by_key(
743                    self,
744                    receiving_object_id,
745                    receive_object_at_version,
746                ) else {
747                    return Ok(None);
748                };
749
750                // Check for:
751                // * Invalid access -- treat as the object does not exist. Or;
752                // * If we've already received the object at the version -- then treat it as though it doesn't exist.
753                // These two cases must remain indisguishable to the caller otherwise we risk forks in
754                // transaction replay due to possible reordering of transactions during replay.
755                if recv_object.owner != Owner::AddressOwner((*owner).into())
756                    || self.have_received_object_at_version(
757                        // TODO: Add support for receiving consensus objects. For now this assumes fastpath.
758                        FullObjectKey::new(
759                            FullObjectID::new(*receiving_object_id, None),
760                            receive_object_at_version,
761                        ),
762                        epoch_id,
763                    )
764                {
765                    return Ok(None);
766                }
767
768                Ok(Some(recv_object))
769            }
770        }
771
772        impl BackingPackageStore for $implementor {
773            fn get_package_object(
774                &self,
775                package_id: &ObjectID,
776            ) -> SuiResult<Option<PackageObject>> {
777                ObjectCacheRead::get_package_object(self, package_id)
778            }
779        }
780
781        impl ParentSync for $implementor {
782            fn get_latest_parent_entry_ref_deprecated(
783                &self,
784                object_id: ObjectID,
785            ) -> Option<ObjectRef> {
786                ObjectCacheRead::get_latest_object_ref_or_tombstone(self, object_id)
787            }
788        }
789    };
790}
791
792// Implement traits for a cache implementation that always go directly to the store.
793macro_rules! implement_passthrough_traits {
794    ($implementor: ident) => {
795        impl CheckpointCache for $implementor {
796            fn deprecated_get_transaction_checkpoint(
797                &self,
798                digest: &TransactionDigest,
799            ) -> Option<(EpochId, CheckpointSequenceNumber)> {
800                self.store
801                    .deprecated_get_transaction_checkpoint(digest)
802                    .expect("db error")
803            }
804
805            fn deprecated_multi_get_transaction_checkpoint(
806                &self,
807                digests: &[TransactionDigest],
808            ) -> Vec<Option<(EpochId, CheckpointSequenceNumber)>> {
809                self.store
810                    .deprecated_multi_get_transaction_checkpoint(digests)
811                    .expect("db error")
812            }
813
814            fn deprecated_insert_finalized_transactions(
815                &self,
816                digests: &[TransactionDigest],
817                epoch: EpochId,
818                sequence: CheckpointSequenceNumber,
819            ) {
820                self.store
821                    .deprecated_insert_finalized_transactions(digests, epoch, sequence)
822                    .expect("db error");
823            }
824        }
825
826        impl ExecutionCacheReconfigAPI for $implementor {
827            fn insert_genesis_object(&self, object: Object) {
828                self.insert_genesis_object_impl(object)
829            }
830
831            fn bulk_insert_genesis_objects(&self, objects: &[Object]) {
832                self.bulk_insert_genesis_objects_impl(objects)
833            }
834
835            fn set_epoch_start_configuration(&self, epoch_start_config: &EpochStartConfiguration) {
836                self.store
837                    .set_epoch_start_configuration(epoch_start_config)
838                    .expect("db error");
839            }
840
841            fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
842                self.store.update_epoch_flags_metrics(old, new)
843            }
844
845            fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
846                self.clear_state_end_of_epoch_impl(execution_guard)
847            }
848
849            fn expensive_check_sui_conservation(
850                &self,
851                old_epoch_store: &AuthorityPerEpochStore,
852            ) -> SuiResult {
853                self.store
854                    .expensive_check_sui_conservation(self, old_epoch_store)
855            }
856
857            fn checkpoint_db(&self, path: &std::path::Path) -> SuiResult {
858                self.store.perpetual_tables.checkpoint_db(path)
859            }
860
861            fn maybe_reaccumulate_state_hash(
862                &self,
863                cur_epoch_store: &AuthorityPerEpochStore,
864                new_protocol_version: ProtocolVersion,
865            ) {
866                self.store
867                    .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version)
868            }
869
870            fn reconfigure_cache<'a>(
871                &'a self,
872                _: &'a EpochStartConfiguration,
873            ) -> BoxFuture<'a, ()> {
874                // Since we now use WritebackCache directly at startup (if the epoch flag is set),
875                // this can be called at reconfiguration time. It is a no-op.
876                // TODO: remove this once we completely remove ProxyCache.
877                std::future::ready(()).boxed()
878            }
879        }
880
881        impl TestingAPI for $implementor {
882            fn database_for_testing(&self) -> Arc<AuthorityStore> {
883                self.store.clone()
884            }
885        }
886    };
887}
888
889use implement_passthrough_traits;
890
891implement_storage_traits!(WritebackCache);
892
893pub trait ExecutionCacheAPI:
894    ObjectCacheRead
895    + ExecutionCacheWrite
896    + ExecutionCacheCommit
897    + ExecutionCacheReconfigAPI
898    + CheckpointCache
899    + StateSyncAPI
900{
901}