sui_core/
execution_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::AuthorityStore;
5use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
6use crate::authority::authority_store::{ExecutionLockWriteGuard, SuiLockResult};
7use crate::authority::backpressure::BackpressureManager;
8use crate::authority::epoch_start_configuration::EpochFlag;
9use crate::authority::epoch_start_configuration::EpochStartConfiguration;
10use crate::global_state_hasher::GlobalStateHashStore;
11use crate::transaction_outputs::TransactionOutputs;
12use either::Either;
13use itertools::Itertools;
14use mysten_common::fatal;
15use sui_types::accumulator_event::AccumulatorEvent;
16use sui_types::bridge::Bridge;
17
18use futures::{FutureExt, future::BoxFuture};
19use prometheus::Registry;
20use std::collections::HashSet;
21use std::path::Path;
22use std::sync::Arc;
23use sui_config::ExecutionCacheConfig;
24use sui_protocol_config::ProtocolVersion;
25use sui_types::base_types::{FullObjectID, VerifiedExecutionData};
26use sui_types::digests::{TransactionDigest, TransactionEffectsDigest};
27use sui_types::effects::{TransactionEffects, TransactionEvents};
28use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
29use sui_types::executable_transaction::VerifiedExecutableTransaction;
30use sui_types::messages_checkpoint::CheckpointSequenceNumber;
31use sui_types::object::Object;
32use sui_types::storage::{
33    BackingPackageStore, BackingStore, ChildObjectResolver, FullObjectKey, MarkerValue, ObjectKey,
34    ObjectOrTombstone, ObjectStore, PackageObject, ParentSync,
35};
36use sui_types::sui_system_state::SuiSystemState;
37use sui_types::transaction::{VerifiedSignedTransaction, VerifiedTransaction};
38use sui_types::{
39    base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber},
40    object::Owner,
41    storage::InputKey,
42};
43use tracing::instrument;
44use typed_store::rocks::DBBatch;
45
46pub(crate) mod cache_types;
47pub mod metrics;
48mod object_locks;
49pub mod writeback_cache;
50
51pub use writeback_cache::WritebackCache;
52
53use metrics::ExecutionCacheMetrics;
54
55// If you have Arc<ExecutionCache>, you cannot return a reference to it as
56// an &Arc<dyn ExecutionCacheRead> (for example), because the trait object is a fat pointer.
57// So, in order to be able to return &Arc<dyn T>, we create all the converted trait objects
58// (aka fat pointers) up front and return references to them.
59#[derive(Clone)]
60pub struct ExecutionCacheTraitPointers {
61    pub object_cache_reader: Arc<dyn ObjectCacheRead>,
62    pub transaction_cache_reader: Arc<dyn TransactionCacheRead>,
63    pub cache_writer: Arc<dyn ExecutionCacheWrite>,
64    pub backing_store: Arc<dyn BackingStore + Send + Sync>,
65    pub child_object_resolver: Arc<dyn ChildObjectResolver + Send + Sync>,
66    pub backing_package_store: Arc<dyn BackingPackageStore + Send + Sync>,
67    pub object_store: Arc<dyn ObjectStore + Send + Sync>,
68    pub reconfig_api: Arc<dyn ExecutionCacheReconfigAPI>,
69    pub global_state_hash_store: Arc<dyn GlobalStateHashStore>,
70    pub checkpoint_cache: Arc<dyn CheckpointCache>,
71    pub state_sync_store: Arc<dyn StateSyncAPI>,
72    pub cache_commit: Arc<dyn ExecutionCacheCommit>,
73    pub testing_api: Arc<dyn TestingAPI>,
74}
75
76impl ExecutionCacheTraitPointers {
77    pub fn new<T>(cache: Arc<T>) -> Self
78    where
79        T: ObjectCacheRead
80            + TransactionCacheRead
81            + ExecutionCacheWrite
82            + BackingStore
83            + BackingPackageStore
84            + ObjectStore
85            + ExecutionCacheReconfigAPI
86            + GlobalStateHashStore
87            + CheckpointCache
88            + StateSyncAPI
89            + ExecutionCacheCommit
90            + TestingAPI
91            + 'static,
92    {
93        Self {
94            object_cache_reader: cache.clone(),
95            transaction_cache_reader: cache.clone(),
96            cache_writer: cache.clone(),
97            backing_store: cache.clone(),
98            child_object_resolver: cache.clone(),
99            backing_package_store: cache.clone(),
100            object_store: cache.clone(),
101            reconfig_api: cache.clone(),
102            global_state_hash_store: cache.clone(),
103            checkpoint_cache: cache.clone(),
104            state_sync_store: cache.clone(),
105            cache_commit: cache.clone(),
106            testing_api: cache.clone(),
107        }
108    }
109}
110
111pub fn build_execution_cache(
112    cache_config: &ExecutionCacheConfig,
113    prometheus_registry: &Registry,
114    store: &Arc<AuthorityStore>,
115    backpressure_manager: Arc<BackpressureManager>,
116) -> ExecutionCacheTraitPointers {
117    let execution_cache_metrics = Arc::new(ExecutionCacheMetrics::new(prometheus_registry));
118
119    ExecutionCacheTraitPointers::new(
120        WritebackCache::new(
121            cache_config,
122            store.clone(),
123            execution_cache_metrics,
124            backpressure_manager,
125        )
126        .into(),
127    )
128}
129
130/// Should only be used for sui-tool or tests. Nodes must use build_execution_cache which
131/// uses the epoch_start_config to prevent cache impl from switching except at epoch boundaries.
132pub fn build_execution_cache_from_env(
133    prometheus_registry: &Registry,
134    store: &Arc<AuthorityStore>,
135) -> ExecutionCacheTraitPointers {
136    let execution_cache_metrics = Arc::new(ExecutionCacheMetrics::new(prometheus_registry));
137
138    ExecutionCacheTraitPointers::new(
139        WritebackCache::new(
140            &Default::default(),
141            store.clone(),
142            execution_cache_metrics,
143            BackpressureManager::new_for_tests(),
144        )
145        .into(),
146    )
147}
148
149pub type Batch = (Vec<Arc<TransactionOutputs>>, DBBatch);
150
151pub trait ExecutionCacheCommit: Send + Sync {
152    /// Build a DBBatch containing the given transaction outputs.
153    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch;
154
155    /// Durably commit the outputs of the given transactions to the database.
156    /// Will be called by CheckpointExecutor to ensure that transaction outputs are
157    /// written durably before marking a checkpoint as finalized.
158    fn commit_transaction_outputs(
159        &self,
160        epoch: EpochId,
161        batch: Batch,
162        digests: &[TransactionDigest],
163    );
164
165    /// Durably commit a transaction to the database. Used to store any transactions
166    /// that cannot be reconstructed at start-up by consensus replay. Currently the only
167    /// case of this is RandomnessStateUpdate.
168    fn persist_transaction(&self, transaction: &VerifiedExecutableTransaction);
169
170    // Number of pending uncommitted transactions
171    fn approximate_pending_transaction_count(&self) -> u64;
172}
173
174pub trait ObjectCacheRead: Send + Sync {
175    fn get_package_object(&self, id: &ObjectID) -> SuiResult<Option<PackageObject>>;
176    fn force_reload_system_packages(&self, system_package_ids: &[ObjectID]);
177
178    fn get_object(&self, id: &ObjectID) -> Option<Object>;
179
180    fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
181        let mut ret = Vec::with_capacity(objects.len());
182        for object_id in objects {
183            ret.push(self.get_object(object_id));
184        }
185        ret
186    }
187
188    fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef>;
189
190    fn get_latest_object_or_tombstone(
191        &self,
192        object_id: ObjectID,
193    ) -> Option<(ObjectKey, ObjectOrTombstone)>;
194
195    fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object>;
196
197    fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>>;
198
199    fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool;
200
201    fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool>;
202
203    /// Load a list of objects from the store by object reference.
204    /// If they exist in the store, they are returned directly.
205    /// If any object missing, we try to figure out the best error to return.
206    /// If the object we are asking is currently locked at a future version, we know this
207    /// transaction is out-of-date and we return a ObjectVersionUnavailableForConsumption,
208    /// which indicates this is not retriable.
209    /// Otherwise, we return a ObjectNotFound error, which indicates this is retriable.
210    fn multi_get_objects_with_more_accurate_error_return(
211        &self,
212        object_refs: &[ObjectRef],
213    ) -> Result<Vec<Object>, SuiError> {
214        let objects = self
215            .multi_get_objects_by_key(&object_refs.iter().map(ObjectKey::from).collect::<Vec<_>>());
216        let mut result = Vec::new();
217        for (object_opt, object_ref) in objects.into_iter().zip(object_refs) {
218            match object_opt {
219                None => {
220                    let live_objref = self._get_live_objref(object_ref.0)?;
221                    let error = if live_objref.1 >= object_ref.1 {
222                        UserInputError::ObjectVersionUnavailableForConsumption {
223                            provided_obj_ref: *object_ref,
224                            current_version: live_objref.1,
225                        }
226                    } else {
227                        UserInputError::ObjectNotFound {
228                            object_id: object_ref.0,
229                            version: Some(object_ref.1),
230                        }
231                    };
232                    return Err(SuiErrorKind::UserInputError { error }.into());
233                }
234                Some(object) => {
235                    result.push(object);
236                }
237            }
238        }
239        assert_eq!(result.len(), object_refs.len());
240        Ok(result)
241    }
242
243    /// Used by execution scheduler to determine if input objects are ready. Distinct from multi_get_object_by_key
244    /// because it also consults markers to handle the case where an object will never become available (e.g.
245    /// because it has been received by some other transaction already).
246    fn multi_input_objects_available(
247        &self,
248        keys: &[InputKey],
249        receiving_objects: &HashSet<InputKey>,
250        epoch: EpochId,
251    ) -> Vec<bool> {
252        let mut results = vec![false; keys.len()];
253        let non_canceled_keys = keys.iter().enumerate().filter(|(idx, key)| {
254            if key.is_cancelled() {
255                // Shared objects in canceled transactions are always available.
256                results[*idx] = true;
257                false
258            } else {
259                true
260            }
261        });
262        let (move_object_keys, package_object_keys): (Vec<_>, Vec<_>) = non_canceled_keys
263            .partition_map(|(idx, key)| match key {
264                InputKey::VersionedObject { id, version } => Either::Left((idx, (id, version))),
265                InputKey::Package { id } => Either::Right((idx, id)),
266            });
267
268        for ((idx, (id, version)), has_key) in move_object_keys.iter().zip(
269            self.multi_object_exists_by_key(
270                &move_object_keys
271                    .iter()
272                    .map(|(_, k)| ObjectKey(k.0.id(), *k.1))
273                    .collect::<Vec<_>>(),
274            )
275            .into_iter(),
276        ) {
277            // If the key exists at the specified version, then the object is available.
278            if has_key {
279                results[*idx] = true;
280            } else if receiving_objects.contains(&InputKey::VersionedObject {
281                id: **id,
282                version: **version,
283            }) {
284                // There could be a more recent version of this object, and the object at the
285                // specified version could have already been pruned. In such a case `has_key` will
286                // be false, but since this is a receiving object we should mark it as available if
287                // we can determine that an object with a version greater than or equal to the
288                // specified version exists or was deleted. We will then let mark it as available
289                // to let the transaction through so it can fail at execution.
290                let is_available = self
291                    .get_object(&id.id())
292                    .map(|obj| obj.version() >= **version)
293                    .unwrap_or(false)
294                    || self.fastpath_stream_ended_at_version_or_after(id.id(), **version, epoch);
295                results[*idx] = is_available;
296            } else {
297                // If the object is an already-removed consensus object, mark it as available if the
298                // version for that object is in the marker table.
299                let is_consensus_stream_ended = self
300                    .get_consensus_stream_end_tx_digest(FullObjectKey::new(**id, **version), epoch)
301                    .is_some();
302                results[*idx] = is_consensus_stream_ended;
303            }
304        }
305
306        package_object_keys.into_iter().for_each(|(idx, id)| {
307            // get_package_object() only errors when the object is not a package, so returning false on error.
308            // Error is possible when this gets called on uncertified transactions.
309            results[idx] = self.get_package_object(id).is_ok_and(|p| p.is_some());
310        });
311
312        results
313    }
314
315    fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool>;
316
317    /// Return the object with version less then or eq to the provided seq number.
318    /// This is used by indexer to find the correct version of dynamic field child object.
319    /// We do not store the version of the child object, but because of lamport timestamp,
320    /// we know the child must have version number less then or eq to the parent.
321    fn find_object_lt_or_eq_version(
322        &self,
323        object_id: ObjectID,
324        version: SequenceNumber,
325    ) -> Option<Object>;
326
327    fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult;
328
329    // This method is considered "private" - only used by multi_get_objects_with_more_accurate_error_return
330    fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef>;
331
332    // Check that the given set of objects are live at the given version. This is used as a
333    // safety check before execution, and could potentially be deleted or changed to a debug_assert
334    fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult;
335
336    fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState>;
337
338    fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge>;
339
340    // Marker methods
341
342    /// Get the marker at a specific version
343    fn get_marker_value(&self, object_key: FullObjectKey, epoch_id: EpochId)
344    -> Option<MarkerValue>;
345
346    /// Get the latest marker for a given object.
347    fn get_latest_marker(
348        &self,
349        object_id: FullObjectID,
350        epoch_id: EpochId,
351    ) -> Option<(SequenceNumber, MarkerValue)>;
352
353    /// If the given consensus object stream was ended, return related
354    /// version and transaction digest.
355    fn get_last_consensus_stream_end_info(
356        &self,
357        object_id: FullObjectID,
358        epoch_id: EpochId,
359    ) -> Option<(SequenceNumber, TransactionDigest)> {
360        match self.get_latest_marker(object_id, epoch_id) {
361            Some((version, MarkerValue::ConsensusStreamEnded(digest))) => Some((version, digest)),
362            _ => None,
363        }
364    }
365
366    /// If the given consensus object stream was ended at the specified version,
367    /// return related transaction digest.
368    fn get_consensus_stream_end_tx_digest(
369        &self,
370        object_key: FullObjectKey,
371        epoch_id: EpochId,
372    ) -> Option<TransactionDigest> {
373        match self.get_marker_value(object_key, epoch_id) {
374            Some(MarkerValue::ConsensusStreamEnded(digest)) => Some(digest),
375            _ => None,
376        }
377    }
378
379    fn have_received_object_at_version(
380        &self,
381        object_key: FullObjectKey,
382        epoch_id: EpochId,
383    ) -> bool {
384        matches!(
385            self.get_marker_value(object_key, epoch_id),
386            Some(MarkerValue::Received)
387        )
388    }
389
390    fn fastpath_stream_ended_at_version_or_after(
391        &self,
392        object_id: ObjectID,
393        version: SequenceNumber,
394        epoch_id: EpochId,
395    ) -> bool {
396        let full_id = FullObjectID::Fastpath(object_id); // function explicitly assumes "fastpath"
397        matches!(
398            self.get_latest_marker(full_id, epoch_id),
399            Some((marker_version, MarkerValue::FastpathStreamEnded)) if marker_version >= version
400        )
401    }
402
403    /// Return the watermark for the highest checkpoint for which we've pruned objects.
404    fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber>;
405
406    /// Given a list of input and receiving objects for a transaction,
407    /// wait until all of them become available, so that the transaction
408    /// can start execution.
409    /// `input_and_receiving_keys` contains both input objects and receiving
410    /// input objects, including canceled objects.
411    /// TODO: Eventually this can return the objects read results,
412    /// so that execution does not need to load them again.
413    fn notify_read_input_objects<'a>(
414        &'a self,
415        input_and_receiving_keys: &'a [InputKey],
416        receiving_keys: &'a HashSet<InputKey>,
417        epoch: EpochId,
418    ) -> BoxFuture<'a, ()>;
419}
420
421pub trait TransactionCacheRead: Send + Sync {
422    fn multi_get_transaction_blocks(
423        &self,
424        digests: &[TransactionDigest],
425    ) -> Vec<Option<Arc<VerifiedTransaction>>>;
426
427    fn get_transaction_block(
428        &self,
429        digest: &TransactionDigest,
430    ) -> Option<Arc<VerifiedTransaction>> {
431        self.multi_get_transaction_blocks(&[*digest])
432            .pop()
433            .expect("multi-get must return correct number of items")
434    }
435
436    #[instrument(level = "trace", skip_all)]
437    fn get_transactions_and_serialized_sizes(
438        &self,
439        digests: &[TransactionDigest],
440    ) -> SuiResult<Vec<Option<(VerifiedTransaction, usize)>>> {
441        let txns = self.multi_get_transaction_blocks(digests);
442        txns.into_iter()
443            .map(|txn| {
444                txn.map(|txn| {
445                    // Note: if the transaction is read from the db, we are wasting some
446                    // effort relative to reading the raw bytes from the db instead of
447                    // calling serialized_size. However, transactions should usually be
448                    // fetched from cache.
449                    match txn.serialized_size() {
450                        Ok(size) => Ok(((*txn).clone(), size)),
451                        Err(e) => Err(e),
452                    }
453                })
454                .transpose()
455            })
456            .collect::<Result<Vec<_>, _>>()
457    }
458
459    fn multi_get_executed_effects_digests(
460        &self,
461        digests: &[TransactionDigest],
462    ) -> Vec<Option<TransactionEffectsDigest>>;
463
464    fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
465        self.multi_get_executed_effects_digests(&[*digest])
466            .pop()
467            .expect("multi-get must return correct number of items")
468            .is_some()
469    }
470
471    fn multi_get_executed_effects(
472        &self,
473        digests: &[TransactionDigest],
474    ) -> Vec<Option<TransactionEffects>> {
475        let effects_digests = self.multi_get_executed_effects_digests(digests);
476        assert_eq!(effects_digests.len(), digests.len());
477
478        let mut results = vec![None; digests.len()];
479        let mut fetch_digests = Vec::with_capacity(digests.len());
480        let mut fetch_indices = Vec::with_capacity(digests.len());
481
482        for (i, digest) in effects_digests.into_iter().enumerate() {
483            if let Some(digest) = digest {
484                fetch_digests.push(digest);
485                fetch_indices.push(i);
486            }
487        }
488
489        let effects = self.multi_get_effects(&fetch_digests);
490        for (i, effects) in fetch_indices.into_iter().zip(effects.into_iter()) {
491            results[i] = effects;
492        }
493
494        results
495    }
496
497    fn get_executed_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
498        self.multi_get_executed_effects(&[*digest])
499            .pop()
500            .expect("multi-get must return correct number of items")
501    }
502
503    fn transaction_executed_in_last_epoch(
504        &self,
505        digest: &TransactionDigest,
506        current_epoch: EpochId,
507    ) -> bool;
508
509    fn multi_get_effects(
510        &self,
511        digests: &[TransactionEffectsDigest],
512    ) -> Vec<Option<TransactionEffects>>;
513
514    fn get_effects(&self, digest: &TransactionEffectsDigest) -> Option<TransactionEffects> {
515        self.multi_get_effects(&[*digest])
516            .pop()
517            .expect("multi-get must return correct number of items")
518    }
519
520    fn multi_get_events(&self, digests: &[TransactionDigest]) -> Vec<Option<TransactionEvents>>;
521
522    fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
523        self.multi_get_events(&[*digest])
524            .pop()
525            .expect("multi-get must return correct number of items")
526    }
527
528    fn get_unchanged_loaded_runtime_objects(
529        &self,
530        digest: &TransactionDigest,
531    ) -> Option<Vec<ObjectKey>>;
532
533    fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>>;
534
535    fn notify_read_executed_effects_digests<'a>(
536        &'a self,
537        task_name: &'static str,
538        digests: &'a [TransactionDigest],
539    ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>>;
540
541    /// Wait until the effects of the given transactions are available and return them.
542    /// WARNING: If calling this on a transaction that could be reverted, you must be
543    /// sure that this function cannot be called during reconfiguration. The best way to
544    /// do this is to wrap your future in EpochStore::within_alive_epoch. Holding an
545    /// ExecutionLockReadGuard would also prevent reconfig from happening while waiting,
546    /// but this is very dangerous, as it could prevent reconfiguration from ever
547    /// occurring!
548    fn notify_read_executed_effects<'a>(
549        &'a self,
550        task_name: &'static str,
551        digests: &'a [TransactionDigest],
552    ) -> BoxFuture<'a, Vec<TransactionEffects>> {
553        async move {
554            let digests = self
555                .notify_read_executed_effects_digests(task_name, digests)
556                .await;
557            // once digests are available, effects must be present as well
558            self.multi_get_effects(&digests)
559                .into_iter()
560                .map(|e| e.unwrap_or_else(|| fatal!("digests must exist")))
561                .collect()
562        }
563        .boxed()
564    }
565
566    /// Get the execution outputs of a mysticeti fastpath certified transaction, if it exists.
567    fn get_mysticeti_fastpath_outputs(
568        &self,
569        tx_digest: &TransactionDigest,
570    ) -> Option<Arc<TransactionOutputs>>;
571
572    /// Wait until the outputs of the given transactions are available
573    /// in the temporary buffer holding mysticeti fastpath outputs.
574    fn notify_read_fastpath_transaction_outputs<'a>(
575        &'a self,
576        tx_digests: &'a [TransactionDigest],
577    ) -> BoxFuture<'a, Vec<Arc<TransactionOutputs>>>;
578}
579
580pub trait ExecutionCacheWrite: Send + Sync {
581    /// Write the output of a transaction.
582    ///
583    /// Because of the child object consistency rule (readers that observe parents must observe all
584    /// children of that parent, up to the parent's version bound), implementations of this method
585    /// must not write any top-level (address-owned or shared) objects before they have written all
586    /// of the object-owned objects (i.e. child objects) in the `objects` list.
587    ///
588    /// In the future, we may modify this method to expose finer-grained information about
589    /// parent/child relationships. (This may be especially necessary for distributed object
590    /// storage, but is unlikely to be an issue before we tackle that problem).
591    ///
592    /// This function may evict the mutable input objects (and successfully received objects) of
593    /// transaction from the cache, since they cannot be read by any other transaction.
594    ///
595    /// Any write performed by this method immediately notifies any waiter that has previously
596    /// called notify_read_objects_for_execution or notify_read_objects_for_signing for the object
597    /// in question.
598    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>);
599
600    /// Write the output of a Mysticeti fastpath certified transaction.
601    /// Such output cannot be written to the dirty cache right away because
602    /// the transaction may end up rejected by consensus later. We need to make sure
603    /// that it is not visible to any subsequent transaction until we observe it
604    /// from consensus or checkpoints.
605    fn write_fastpath_transaction_outputs(&self, tx_outputs: Arc<TransactionOutputs>);
606
607    /// Attempt to acquire object locks for all of the owned input locks.
608    fn acquire_transaction_locks(
609        &self,
610        epoch_store: &AuthorityPerEpochStore,
611        owned_input_objects: &[ObjectRef],
612        tx_digest: TransactionDigest,
613        signed_transaction: Option<VerifiedSignedTransaction>,
614    ) -> SuiResult;
615
616    /// Write an object entry directly to the cache for testing.
617    /// This allows us to write an object without constructing the entire
618    /// transaction outputs.
619    #[cfg(test)]
620    fn write_object_entry_for_test(&self, object: Object);
621}
622
623pub trait CheckpointCache: Send + Sync {
624    // TODO: In addition to the deprecated methods below, this will eventually include access
625    // to the CheckpointStore
626
627    // DEPRECATED METHODS
628    fn deprecated_get_transaction_checkpoint(
629        &self,
630        digest: &TransactionDigest,
631    ) -> Option<(EpochId, CheckpointSequenceNumber)>;
632
633    fn deprecated_multi_get_transaction_checkpoint(
634        &self,
635        digests: &[TransactionDigest],
636    ) -> Vec<Option<(EpochId, CheckpointSequenceNumber)>>;
637
638    fn deprecated_insert_finalized_transactions(
639        &self,
640        digests: &[TransactionDigest],
641        epoch: EpochId,
642        sequence: CheckpointSequenceNumber,
643    );
644}
645
646pub trait ExecutionCacheReconfigAPI: Send + Sync {
647    fn insert_genesis_object(&self, object: Object);
648    fn bulk_insert_genesis_objects(&self, objects: &[Object]);
649
650    fn set_epoch_start_configuration(&self, epoch_start_config: &EpochStartConfiguration);
651
652    fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]);
653
654    fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>);
655
656    fn expensive_check_sui_conservation(
657        &self,
658        old_epoch_store: &AuthorityPerEpochStore,
659    ) -> SuiResult;
660
661    fn checkpoint_db(&self, path: &Path) -> SuiResult;
662
663    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
664    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
665    fn maybe_reaccumulate_state_hash(
666        &self,
667        cur_epoch_store: &AuthorityPerEpochStore,
668        new_protocol_version: ProtocolVersion,
669    );
670
671    /// Reconfigure the cache itself.
672    /// TODO: this is only needed for ProxyCache to switch between cache impls. It can be removed
673    /// once WritebackCache is the sole cache impl.
674    fn reconfigure_cache<'a>(
675        &'a self,
676        epoch_start_config: &'a EpochStartConfiguration,
677    ) -> BoxFuture<'a, ()>;
678}
679
680// StateSyncAPI is for writing any data that was not the result of transaction execution,
681// but that arrived via state sync. The fact that it came via state sync implies that it
682// is certified output, and can be immediately persisted to the store.
683pub trait StateSyncAPI: Send + Sync {
684    fn insert_transaction_and_effects(
685        &self,
686        transaction: &VerifiedTransaction,
687        transaction_effects: &TransactionEffects,
688    );
689
690    fn multi_insert_transaction_and_effects(
691        &self,
692        transactions_and_effects: &[VerifiedExecutionData],
693    );
694}
695
696pub trait TestingAPI: Send + Sync {
697    fn database_for_testing(&self) -> Arc<AuthorityStore>;
698
699    fn cache_for_testing(&self) -> &WritebackCache;
700}
701
702macro_rules! implement_storage_traits {
703    ($implementor: ident) => {
704        impl ObjectStore for $implementor {
705            fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
706                ObjectCacheRead::get_object(self, object_id)
707            }
708
709            fn get_object_by_key(
710                &self,
711                object_id: &ObjectID,
712                version: sui_types::base_types::VersionNumber,
713            ) -> Option<Object> {
714                ObjectCacheRead::get_object_by_key(self, object_id, version)
715            }
716        }
717
718        impl ChildObjectResolver for $implementor {
719            fn read_child_object(
720                &self,
721                parent: &ObjectID,
722                child: &ObjectID,
723                child_version_upper_bound: SequenceNumber,
724            ) -> SuiResult<Option<Object>> {
725                let Some(child_object) =
726                    self.find_object_lt_or_eq_version(*child, child_version_upper_bound)
727                else {
728                    return Ok(None);
729                };
730
731                let parent = *parent;
732                if child_object.owner != Owner::ObjectOwner(parent.into()) {
733                    return Err(SuiErrorKind::InvalidChildObjectAccess {
734                        object: *child,
735                        given_parent: parent,
736                        actual_owner: child_object.owner.clone(),
737                    }
738                    .into());
739                }
740                Ok(Some(child_object))
741            }
742
743            fn get_object_received_at_version(
744                &self,
745                owner: &ObjectID,
746                receiving_object_id: &ObjectID,
747                receive_object_at_version: SequenceNumber,
748                epoch_id: EpochId,
749            ) -> SuiResult<Option<Object>> {
750                let Some(recv_object) = ObjectCacheRead::get_object_by_key(
751                    self,
752                    receiving_object_id,
753                    receive_object_at_version,
754                ) else {
755                    return Ok(None);
756                };
757
758                // Check for:
759                // * Invalid access -- treat as the object does not exist. Or;
760                // * If we've already received the object at the version -- then treat it as though it doesn't exist.
761                // These two cases must remain indisguishable to the caller otherwise we risk forks in
762                // transaction replay due to possible reordering of transactions during replay.
763                if recv_object.owner != Owner::AddressOwner((*owner).into())
764                    || self.have_received_object_at_version(
765                        // TODO: Add support for receiving consensus objects. For now this assumes fastpath.
766                        FullObjectKey::new(
767                            FullObjectID::new(*receiving_object_id, None),
768                            receive_object_at_version,
769                        ),
770                        epoch_id,
771                    )
772                {
773                    return Ok(None);
774                }
775
776                Ok(Some(recv_object))
777            }
778        }
779
780        impl BackingPackageStore for $implementor {
781            fn get_package_object(
782                &self,
783                package_id: &ObjectID,
784            ) -> SuiResult<Option<PackageObject>> {
785                ObjectCacheRead::get_package_object(self, package_id)
786            }
787        }
788
789        impl ParentSync for $implementor {
790            fn get_latest_parent_entry_ref_deprecated(
791                &self,
792                object_id: ObjectID,
793            ) -> Option<ObjectRef> {
794                ObjectCacheRead::get_latest_object_ref_or_tombstone(self, object_id)
795            }
796        }
797    };
798}
799
800// Implement traits for a cache implementation that always go directly to the store.
801macro_rules! implement_passthrough_traits {
802    ($implementor: ident) => {
803        impl CheckpointCache for $implementor {
804            fn deprecated_get_transaction_checkpoint(
805                &self,
806                digest: &TransactionDigest,
807            ) -> Option<(EpochId, CheckpointSequenceNumber)> {
808                self.store
809                    .deprecated_get_transaction_checkpoint(digest)
810                    .expect("db error")
811            }
812
813            fn deprecated_multi_get_transaction_checkpoint(
814                &self,
815                digests: &[TransactionDigest],
816            ) -> Vec<Option<(EpochId, CheckpointSequenceNumber)>> {
817                self.store
818                    .deprecated_multi_get_transaction_checkpoint(digests)
819                    .expect("db error")
820            }
821
822            fn deprecated_insert_finalized_transactions(
823                &self,
824                digests: &[TransactionDigest],
825                epoch: EpochId,
826                sequence: CheckpointSequenceNumber,
827            ) {
828                self.store
829                    .deprecated_insert_finalized_transactions(digests, epoch, sequence)
830                    .expect("db error");
831            }
832        }
833
834        impl ExecutionCacheReconfigAPI for $implementor {
835            fn insert_genesis_object(&self, object: Object) {
836                self.insert_genesis_object_impl(object)
837            }
838
839            fn bulk_insert_genesis_objects(&self, objects: &[Object]) {
840                self.bulk_insert_genesis_objects_impl(objects)
841            }
842
843            fn set_epoch_start_configuration(&self, epoch_start_config: &EpochStartConfiguration) {
844                self.store
845                    .set_epoch_start_configuration(epoch_start_config)
846                    .expect("db error");
847            }
848
849            fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
850                self.store.update_epoch_flags_metrics(old, new)
851            }
852
853            fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
854                self.clear_state_end_of_epoch_impl(execution_guard)
855            }
856
857            fn expensive_check_sui_conservation(
858                &self,
859                old_epoch_store: &AuthorityPerEpochStore,
860            ) -> SuiResult {
861                self.store
862                    .expensive_check_sui_conservation(self, old_epoch_store)
863            }
864
865            fn checkpoint_db(&self, path: &std::path::Path) -> SuiResult {
866                self.store.perpetual_tables.checkpoint_db(path)
867            }
868
869            fn maybe_reaccumulate_state_hash(
870                &self,
871                cur_epoch_store: &AuthorityPerEpochStore,
872                new_protocol_version: ProtocolVersion,
873            ) {
874                self.store
875                    .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version)
876            }
877
878            fn reconfigure_cache<'a>(
879                &'a self,
880                _: &'a EpochStartConfiguration,
881            ) -> BoxFuture<'a, ()> {
882                // Since we now use WritebackCache directly at startup (if the epoch flag is set),
883                // this can be called at reconfiguration time. It is a no-op.
884                // TODO: remove this once we completely remove ProxyCache.
885                std::future::ready(()).boxed()
886            }
887        }
888
889        impl TestingAPI for $implementor {
890            fn database_for_testing(&self) -> Arc<AuthorityStore> {
891                self.store.clone()
892            }
893
894            fn cache_for_testing(&self) -> &WritebackCache {
895                self
896            }
897        }
898    };
899}
900
901use implement_passthrough_traits;
902
903implement_storage_traits!(WritebackCache);
904
905pub trait ExecutionCacheAPI:
906    ObjectCacheRead
907    + ExecutionCacheWrite
908    + ExecutionCacheCommit
909    + ExecutionCacheReconfigAPI
910    + CheckpointCache
911    + StateSyncAPI
912{
913}