sui_core/
execution_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::accumulators::funds_read::AccountFundsRead;
5use crate::authority::AuthorityStore;
6use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
7use crate::authority::authority_store::{ExecutionLockWriteGuard, SuiLockResult};
8use crate::authority::backpressure::BackpressureManager;
9use crate::authority::epoch_start_configuration::EpochFlag;
10use crate::authority::epoch_start_configuration::EpochStartConfiguration;
11use crate::global_state_hasher::GlobalStateHashStore;
12use crate::transaction_outputs::TransactionOutputs;
13use either::Either;
14use itertools::Itertools;
15use sui_types::accumulator_event::AccumulatorEvent;
16use sui_types::bridge::Bridge;
17
18use futures::{FutureExt, future::BoxFuture};
19use prometheus::Registry;
20use std::collections::HashSet;
21use std::path::Path;
22use std::sync::Arc;
23use sui_config::ExecutionCacheConfig;
24use sui_protocol_config::ProtocolVersion;
25use sui_types::base_types::{FullObjectID, VerifiedExecutionData};
26use sui_types::digests::{TransactionDigest, TransactionEffectsDigest};
27use sui_types::effects::{TransactionEffects, TransactionEvents};
28use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
29use sui_types::executable_transaction::VerifiedExecutableTransaction;
30use sui_types::messages_checkpoint::CheckpointSequenceNumber;
31use sui_types::object::Object;
32use sui_types::storage::{
33    BackingPackageStore, BackingStore, ChildObjectResolver, FullObjectKey, MarkerValue, ObjectKey,
34    ObjectOrTombstone, ObjectStore, PackageObject, ParentSync,
35};
36use sui_types::sui_system_state::SuiSystemState;
37use sui_types::transaction::VerifiedTransaction;
38use sui_types::{
39    base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber},
40    object::Owner,
41    storage::InputKey,
42};
43use tracing::instrument;
44use typed_store::rocks::DBBatch;
45
46pub(crate) mod cache_types;
47pub mod metrics;
48mod object_locks;
49pub mod writeback_cache;
50
51pub use writeback_cache::WritebackCache;
52
53use metrics::ExecutionCacheMetrics;
54
55// If you have Arc<ExecutionCache>, you cannot return a reference to it as
56// an &Arc<dyn ExecutionCacheRead> (for example), because the trait object is a fat pointer.
57// So, in order to be able to return &Arc<dyn T>, we create all the converted trait objects
58// (aka fat pointers) up front and return references to them.
59#[derive(Clone)]
60pub struct ExecutionCacheTraitPointers {
61    pub object_cache_reader: Arc<dyn ObjectCacheRead>,
62    pub transaction_cache_reader: Arc<dyn TransactionCacheRead>,
63    pub cache_writer: Arc<dyn ExecutionCacheWrite>,
64    pub backing_store: Arc<dyn BackingStore + Send + Sync>,
65    pub child_object_resolver: Arc<dyn ChildObjectResolver + Send + Sync>,
66    pub backing_package_store: Arc<dyn BackingPackageStore + Send + Sync>,
67    pub object_store: Arc<dyn ObjectStore + Send + Sync>,
68    pub reconfig_api: Arc<dyn ExecutionCacheReconfigAPI>,
69    pub global_state_hash_store: Arc<dyn GlobalStateHashStore>,
70    pub checkpoint_cache: Arc<dyn CheckpointCache>,
71    pub state_sync_store: Arc<dyn StateSyncAPI>,
72    pub cache_commit: Arc<dyn ExecutionCacheCommit>,
73    pub testing_api: Arc<dyn TestingAPI>,
74    pub account_funds_read: Arc<dyn AccountFundsRead>,
75}
76
77impl ExecutionCacheTraitPointers {
78    pub fn new<T>(cache: Arc<T>) -> Self
79    where
80        T: ObjectCacheRead
81            + TransactionCacheRead
82            + ExecutionCacheWrite
83            + BackingStore
84            + BackingPackageStore
85            + ObjectStore
86            + ExecutionCacheReconfigAPI
87            + GlobalStateHashStore
88            + CheckpointCache
89            + StateSyncAPI
90            + ExecutionCacheCommit
91            + TestingAPI
92            + AccountFundsRead
93            + 'static,
94    {
95        Self {
96            object_cache_reader: cache.clone(),
97            transaction_cache_reader: cache.clone(),
98            cache_writer: cache.clone(),
99            backing_store: cache.clone(),
100            child_object_resolver: cache.clone(),
101            backing_package_store: cache.clone(),
102            object_store: cache.clone(),
103            reconfig_api: cache.clone(),
104            global_state_hash_store: cache.clone(),
105            checkpoint_cache: cache.clone(),
106            state_sync_store: cache.clone(),
107            cache_commit: cache.clone(),
108            testing_api: cache.clone(),
109            account_funds_read: cache.clone(),
110        }
111    }
112}
113
114pub fn build_execution_cache(
115    cache_config: &ExecutionCacheConfig,
116    prometheus_registry: &Registry,
117    store: &Arc<AuthorityStore>,
118    backpressure_manager: Arc<BackpressureManager>,
119) -> ExecutionCacheTraitPointers {
120    let execution_cache_metrics = Arc::new(ExecutionCacheMetrics::new(prometheus_registry));
121
122    ExecutionCacheTraitPointers::new(
123        WritebackCache::new(
124            cache_config,
125            store.clone(),
126            execution_cache_metrics,
127            backpressure_manager,
128        )
129        .into(),
130    )
131}
132
133/// Should only be used for sui-tool or tests. Nodes must use build_execution_cache which
134/// uses the epoch_start_config to prevent cache impl from switching except at epoch boundaries.
135pub fn build_execution_cache_from_env(
136    prometheus_registry: &Registry,
137    store: &Arc<AuthorityStore>,
138) -> ExecutionCacheTraitPointers {
139    let execution_cache_metrics = Arc::new(ExecutionCacheMetrics::new(prometheus_registry));
140
141    ExecutionCacheTraitPointers::new(
142        WritebackCache::new(
143            &Default::default(),
144            store.clone(),
145            execution_cache_metrics,
146            BackpressureManager::new_for_tests(),
147        )
148        .into(),
149    )
150}
151
152pub type Batch = (Vec<Arc<TransactionOutputs>>, DBBatch);
153
154pub trait ExecutionCacheCommit: Send + Sync {
155    /// Build a DBBatch containing the given transaction outputs.
156    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch;
157
158    /// Durably commit the outputs of the given transactions to the database.
159    /// Will be called by CheckpointExecutor to ensure that transaction outputs are
160    /// written durably before marking a checkpoint as finalized.
161    fn commit_transaction_outputs(
162        &self,
163        epoch: EpochId,
164        batch: Batch,
165        digests: &[TransactionDigest],
166    );
167
168    /// Durably commit a transaction to the database. Used to store any transactions
169    /// that cannot be reconstructed at start-up by consensus replay. Currently the only
170    /// case of this is RandomnessStateUpdate.
171    fn persist_transaction(&self, transaction: &VerifiedExecutableTransaction);
172
173    // Number of pending uncommitted transactions
174    fn approximate_pending_transaction_count(&self) -> u64;
175}
176
177pub trait ObjectCacheRead: Send + Sync {
178    fn get_package_object(&self, id: &ObjectID) -> SuiResult<Option<PackageObject>>;
179    fn force_reload_system_packages(&self, system_package_ids: &[ObjectID]);
180
181    fn get_object(&self, id: &ObjectID) -> Option<Object>;
182
183    fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
184        let mut ret = Vec::with_capacity(objects.len());
185        for object_id in objects {
186            ret.push(self.get_object(object_id));
187        }
188        ret
189    }
190
191    fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef>;
192
193    fn get_latest_object_or_tombstone(
194        &self,
195        object_id: ObjectID,
196    ) -> Option<(ObjectKey, ObjectOrTombstone)>;
197
198    fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object>;
199
200    fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>>;
201
202    fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool;
203
204    fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool>;
205
206    /// Load a list of objects from the store by object reference.
207    /// If they exist in the store, they are returned directly.
208    /// If any object missing, we try to figure out the best error to return.
209    /// If the object we are asking is currently locked at a future version, we know this
210    /// transaction is out-of-date and we return a ObjectVersionUnavailableForConsumption,
211    /// which indicates this is not retriable.
212    /// Otherwise, we return a ObjectNotFound error, which indicates this is retriable.
213    fn multi_get_objects_with_more_accurate_error_return(
214        &self,
215        object_refs: &[ObjectRef],
216    ) -> Result<Vec<Object>, SuiError> {
217        let objects = self
218            .multi_get_objects_by_key(&object_refs.iter().map(ObjectKey::from).collect::<Vec<_>>());
219        let mut result = Vec::new();
220        for (object_opt, object_ref) in objects.into_iter().zip(object_refs) {
221            match object_opt {
222                None => {
223                    let live_objref = self._get_live_objref(object_ref.0)?;
224                    let error = if live_objref.1 >= object_ref.1 {
225                        UserInputError::ObjectVersionUnavailableForConsumption {
226                            provided_obj_ref: *object_ref,
227                            current_version: live_objref.1,
228                        }
229                    } else {
230                        UserInputError::ObjectNotFound {
231                            object_id: object_ref.0,
232                            version: Some(object_ref.1),
233                        }
234                    };
235                    return Err(SuiErrorKind::UserInputError { error }.into());
236                }
237                Some(object) => {
238                    result.push(object);
239                }
240            }
241        }
242        assert_eq!(result.len(), object_refs.len());
243        Ok(result)
244    }
245
246    /// Used by execution scheduler to determine if input objects are ready. Distinct from multi_get_object_by_key
247    /// because it also consults markers to handle the case where an object will never become available (e.g.
248    /// because it has been received by some other transaction already).
249    fn multi_input_objects_available(
250        &self,
251        keys: &[InputKey],
252        receiving_objects: &HashSet<InputKey>,
253        epoch: EpochId,
254    ) -> Vec<bool> {
255        let mut results = vec![false; keys.len()];
256        let non_canceled_keys = keys.iter().enumerate().filter(|(idx, key)| {
257            if key.is_cancelled() {
258                // Shared objects in canceled transactions are always available.
259                results[*idx] = true;
260                false
261            } else {
262                true
263            }
264        });
265        let (move_object_keys, package_object_keys): (Vec<_>, Vec<_>) = non_canceled_keys
266            .partition_map(|(idx, key)| match key {
267                InputKey::VersionedObject { id, version } => Either::Left((idx, (id, version))),
268                InputKey::Package { id } => Either::Right((idx, id)),
269            });
270
271        for ((idx, (id, version)), has_key) in move_object_keys.iter().zip(
272            self.multi_object_exists_by_key(
273                &move_object_keys
274                    .iter()
275                    .map(|(_, k)| ObjectKey(k.0.id(), *k.1))
276                    .collect::<Vec<_>>(),
277            )
278            .into_iter(),
279        ) {
280            // If the key exists at the specified version, then the object is available.
281            if has_key {
282                results[*idx] = true;
283            } else if receiving_objects.contains(&InputKey::VersionedObject {
284                id: **id,
285                version: **version,
286            }) {
287                // There could be a more recent version of this object, and the object at the
288                // specified version could have already been pruned. In such a case `has_key` will
289                // be false, but since this is a receiving object we should mark it as available if
290                // we can determine that an object with a version greater than or equal to the
291                // specified version exists or was deleted. We will then let mark it as available
292                // to let the transaction through so it can fail at execution.
293                let is_available = self
294                    .get_object(&id.id())
295                    .map(|obj| obj.version() >= **version)
296                    .unwrap_or(false)
297                    || self.fastpath_stream_ended_at_version_or_after(id.id(), **version, epoch);
298                results[*idx] = is_available;
299            } else {
300                // If the object is an already-removed consensus object, mark it as available if the
301                // version for that object is in the marker table.
302                let is_consensus_stream_ended = self
303                    .get_consensus_stream_end_tx_digest(FullObjectKey::new(**id, **version), epoch)
304                    .is_some();
305                results[*idx] = is_consensus_stream_ended;
306            }
307        }
308
309        package_object_keys.into_iter().for_each(|(idx, id)| {
310            // get_package_object() only errors when the object is not a package, so returning false on error.
311            // Error is possible when this gets called on uncertified transactions.
312            results[idx] = self.get_package_object(id).is_ok_and(|p| p.is_some());
313        });
314
315        results
316    }
317
318    fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool>;
319
320    /// Return the object with version less then or eq to the provided seq number.
321    /// This is used by indexer to find the correct version of dynamic field child object.
322    /// We do not store the version of the child object, but because of lamport timestamp,
323    /// we know the child must have version number less then or eq to the parent.
324    fn find_object_lt_or_eq_version(
325        &self,
326        object_id: ObjectID,
327        version: SequenceNumber,
328    ) -> Option<Object>;
329
330    fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult;
331
332    // This method is considered "private" - only used by multi_get_objects_with_more_accurate_error_return
333    fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef>;
334
335    // Check that the given set of objects are live at the given version. This is used as a
336    // safety check before execution, and could potentially be deleted or changed to a debug_assert
337    fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult;
338
339    fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState>;
340
341    fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge>;
342
343    // Marker methods
344
345    /// Get the marker at a specific version
346    fn get_marker_value(&self, object_key: FullObjectKey, epoch_id: EpochId)
347    -> Option<MarkerValue>;
348
349    /// Get the latest marker for a given object.
350    fn get_latest_marker(
351        &self,
352        object_id: FullObjectID,
353        epoch_id: EpochId,
354    ) -> Option<(SequenceNumber, MarkerValue)>;
355
356    /// If the given consensus object stream was ended, return related
357    /// version and transaction digest.
358    fn get_last_consensus_stream_end_info(
359        &self,
360        object_id: FullObjectID,
361        epoch_id: EpochId,
362    ) -> Option<(SequenceNumber, TransactionDigest)> {
363        match self.get_latest_marker(object_id, epoch_id) {
364            Some((version, MarkerValue::ConsensusStreamEnded(digest))) => Some((version, digest)),
365            _ => None,
366        }
367    }
368
369    /// If the given consensus object stream was ended at the specified version,
370    /// return related transaction digest.
371    fn get_consensus_stream_end_tx_digest(
372        &self,
373        object_key: FullObjectKey,
374        epoch_id: EpochId,
375    ) -> Option<TransactionDigest> {
376        match self.get_marker_value(object_key, epoch_id) {
377            Some(MarkerValue::ConsensusStreamEnded(digest)) => Some(digest),
378            _ => None,
379        }
380    }
381
382    fn have_received_object_at_version(
383        &self,
384        object_key: FullObjectKey,
385        epoch_id: EpochId,
386    ) -> bool {
387        matches!(
388            self.get_marker_value(object_key, epoch_id),
389            Some(MarkerValue::Received)
390        )
391    }
392
393    fn fastpath_stream_ended_at_version_or_after(
394        &self,
395        object_id: ObjectID,
396        version: SequenceNumber,
397        epoch_id: EpochId,
398    ) -> bool {
399        let full_id = FullObjectID::Fastpath(object_id); // function explicitly assumes "fastpath"
400        matches!(
401            self.get_latest_marker(full_id, epoch_id),
402            Some((marker_version, MarkerValue::FastpathStreamEnded)) if marker_version >= version
403        )
404    }
405
406    /// Return the watermark for the highest checkpoint for which we've pruned objects.
407    fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber>;
408
409    /// Given a list of input and receiving objects for a transaction,
410    /// wait until all of them become available, so that the transaction
411    /// can start execution.
412    /// `input_and_receiving_keys` contains both input objects and receiving
413    /// input objects, including canceled objects.
414    /// TODO: Eventually this can return the objects read results,
415    /// so that execution does not need to load them again.
416    fn notify_read_input_objects<'a>(
417        &'a self,
418        input_and_receiving_keys: &'a [InputKey],
419        receiving_keys: &'a HashSet<InputKey>,
420        epoch: EpochId,
421    ) -> BoxFuture<'a, ()>;
422}
423
424pub trait TransactionCacheRead: Send + Sync {
425    fn multi_get_transaction_blocks(
426        &self,
427        digests: &[TransactionDigest],
428    ) -> Vec<Option<Arc<VerifiedTransaction>>>;
429
430    fn get_transaction_block(
431        &self,
432        digest: &TransactionDigest,
433    ) -> Option<Arc<VerifiedTransaction>> {
434        self.multi_get_transaction_blocks(&[*digest])
435            .pop()
436            .expect("multi-get must return correct number of items")
437    }
438
439    #[instrument(level = "trace", skip_all)]
440    fn get_transactions_and_serialized_sizes(
441        &self,
442        digests: &[TransactionDigest],
443    ) -> SuiResult<Vec<Option<(VerifiedTransaction, usize)>>> {
444        let txns = self.multi_get_transaction_blocks(digests);
445        txns.into_iter()
446            .map(|txn| {
447                txn.map(|txn| {
448                    // Note: if the transaction is read from the db, we are wasting some
449                    // effort relative to reading the raw bytes from the db instead of
450                    // calling serialized_size. However, transactions should usually be
451                    // fetched from cache.
452                    match txn.serialized_size() {
453                        Ok(size) => Ok(((*txn).clone(), size)),
454                        Err(e) => Err(e),
455                    }
456                })
457                .transpose()
458            })
459            .collect::<Result<Vec<_>, _>>()
460    }
461
462    fn multi_get_executed_effects_digests(
463        &self,
464        digests: &[TransactionDigest],
465    ) -> Vec<Option<TransactionEffectsDigest>>;
466
467    fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
468        self.multi_get_executed_effects_digests(&[*digest])
469            .pop()
470            .expect("multi-get must return correct number of items")
471            .is_some()
472    }
473
474    fn multi_get_executed_effects(
475        &self,
476        digests: &[TransactionDigest],
477    ) -> Vec<Option<TransactionEffects>> {
478        let effects_digests = self.multi_get_executed_effects_digests(digests);
479        assert_eq!(effects_digests.len(), digests.len());
480
481        let mut results = vec![None; digests.len()];
482        let mut fetch_digests = Vec::with_capacity(digests.len());
483        let mut fetch_indices = Vec::with_capacity(digests.len());
484
485        for (i, digest) in effects_digests.into_iter().enumerate() {
486            if let Some(digest) = digest {
487                fetch_digests.push(digest);
488                fetch_indices.push(i);
489            }
490        }
491
492        let effects = self.multi_get_effects(&fetch_digests);
493        for (i, effects) in fetch_indices.into_iter().zip(effects.into_iter()) {
494            results[i] = effects;
495        }
496
497        results
498    }
499
500    fn get_executed_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
501        self.multi_get_executed_effects(&[*digest])
502            .pop()
503            .expect("multi-get must return correct number of items")
504    }
505
506    fn transaction_executed_in_last_epoch(
507        &self,
508        digest: &TransactionDigest,
509        current_epoch: EpochId,
510    ) -> bool;
511
512    fn multi_get_effects(
513        &self,
514        digests: &[TransactionEffectsDigest],
515    ) -> Vec<Option<TransactionEffects>>;
516
517    fn get_effects(&self, digest: &TransactionEffectsDigest) -> Option<TransactionEffects> {
518        self.multi_get_effects(&[*digest])
519            .pop()
520            .expect("multi-get must return correct number of items")
521    }
522
523    fn multi_get_events(&self, digests: &[TransactionDigest]) -> Vec<Option<TransactionEvents>>;
524
525    fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
526        self.multi_get_events(&[*digest])
527            .pop()
528            .expect("multi-get must return correct number of items")
529    }
530
531    fn get_unchanged_loaded_runtime_objects(
532        &self,
533        digest: &TransactionDigest,
534    ) -> Option<Vec<ObjectKey>>;
535
536    fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>>;
537
538    fn notify_read_executed_effects_digests<'a>(
539        &'a self,
540        task_name: &'static str,
541        digests: &'a [TransactionDigest],
542    ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>>;
543
544    /// Wait until the effects of the given transactions are available and return them.
545    /// WARNING: If calling this on a transaction that could be reverted, you must be
546    /// sure that this function cannot be called during reconfiguration. The best way to
547    /// do this is to wrap your future in EpochStore::within_alive_epoch. Holding an
548    /// ExecutionLockReadGuard would also prevent reconfig from happening while waiting,
549    /// but this is very dangerous, as it could prevent reconfiguration from ever
550    /// occurring!
551    ///
552    /// This function panics if any of the requested effects are not found. Use this in
553    /// critical paths where effects are expected to exist (e.g., checkpoint building,
554    /// consensus commit processing). For non-critical paths where effects may have been
555    /// pruned (e.g., serving historical data to clients), use `notify_read_executed_effects_may_fail`.
556    fn notify_read_executed_effects<'a>(
557        &'a self,
558        task_name: &'static str,
559        digests: &'a [TransactionDigest],
560    ) -> BoxFuture<'a, Vec<TransactionEffects>> {
561        async move {
562            self.notify_read_executed_effects_may_fail(task_name, digests)
563                .await
564                .unwrap_or_else(|e| panic!("effects must exist: {e}"))
565        }
566        .boxed()
567    }
568
569    /// Returns an error if any of the requested effects have been pruned from the database.
570    /// Use this in non-critical paths where effects may not exist (e.g., serving historical
571    /// data that may have been pruned). For critical paths where effects must exist,
572    /// use `notify_read_executed_effects`.
573    fn notify_read_executed_effects_may_fail<'a>(
574        &'a self,
575        task_name: &'static str,
576        digests: &'a [TransactionDigest],
577    ) -> BoxFuture<'a, SuiResult<Vec<TransactionEffects>>> {
578        async move {
579            let effects_digests = self
580                .notify_read_executed_effects_digests(task_name, digests)
581                .await;
582            self.multi_get_effects(&effects_digests)
583                .into_iter()
584                .zip(digests)
585                .map(|(e, digest)| {
586                    e.ok_or_else(|| {
587                        SuiError::from(SuiErrorKind::TransactionEffectsNotFound { digest: *digest })
588                    })
589                })
590                .collect()
591        }
592        .boxed()
593    }
594}
595
596pub trait ExecutionCacheWrite: Send + Sync {
597    /// Write the output of a transaction.
598    ///
599    /// Because of the child object consistency rule (readers that observe parents must observe all
600    /// children of that parent, up to the parent's version bound), implementations of this method
601    /// must not write any top-level (address-owned or shared) objects before they have written all
602    /// of the object-owned objects (i.e. child objects) in the `objects` list.
603    ///
604    /// In the future, we may modify this method to expose finer-grained information about
605    /// parent/child relationships. (This may be especially necessary for distributed object
606    /// storage, but is unlikely to be an issue before we tackle that problem).
607    ///
608    /// This function may evict the mutable input objects (and successfully received objects) of
609    /// transaction from the cache, since they cannot be read by any other transaction.
610    ///
611    /// Any write performed by this method immediately notifies any waiter that has previously
612    /// called notify_read_objects_for_execution or notify_read_objects_for_signing for the object
613    /// in question.
614    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>);
615
616    /// Validate owned object versions and digests without acquiring locks.
617    /// Used to validate transaction input before submitting or voting to accept the transaction.
618    fn validate_owned_object_versions(&self, owned_input_objects: &[ObjectRef]) -> SuiResult;
619
620    /// Write an object entry directly to the cache for testing.
621    /// This allows us to write an object without constructing the entire
622    /// transaction outputs.
623    #[cfg(test)]
624    fn write_object_entry_for_test(&self, object: Object);
625}
626
627pub trait CheckpointCache: Send + Sync {
628    // TODO: In addition to the deprecated methods below, this will eventually include access
629    // to the CheckpointStore
630
631    // DEPRECATED METHODS
632    fn deprecated_get_transaction_checkpoint(
633        &self,
634        digest: &TransactionDigest,
635    ) -> Option<(EpochId, CheckpointSequenceNumber)>;
636
637    fn deprecated_multi_get_transaction_checkpoint(
638        &self,
639        digests: &[TransactionDigest],
640    ) -> Vec<Option<(EpochId, CheckpointSequenceNumber)>>;
641
642    fn deprecated_insert_finalized_transactions(
643        &self,
644        digests: &[TransactionDigest],
645        epoch: EpochId,
646        sequence: CheckpointSequenceNumber,
647    );
648}
649
650pub trait ExecutionCacheReconfigAPI: Send + Sync {
651    fn insert_genesis_object(&self, object: Object);
652    fn bulk_insert_genesis_objects(&self, objects: &[Object]);
653
654    fn set_epoch_start_configuration(&self, epoch_start_config: &EpochStartConfiguration);
655
656    fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]);
657
658    fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>);
659
660    fn expensive_check_sui_conservation(
661        &self,
662        old_epoch_store: &AuthorityPerEpochStore,
663    ) -> SuiResult;
664
665    fn checkpoint_db(&self, path: &Path) -> SuiResult;
666
667    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
668    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
669    fn maybe_reaccumulate_state_hash(
670        &self,
671        cur_epoch_store: &AuthorityPerEpochStore,
672        new_protocol_version: ProtocolVersion,
673    );
674
675    /// Reconfigure the cache itself.
676    /// TODO: this is only needed for ProxyCache to switch between cache impls. It can be removed
677    /// once WritebackCache is the sole cache impl.
678    fn reconfigure_cache<'a>(
679        &'a self,
680        epoch_start_config: &'a EpochStartConfiguration,
681    ) -> BoxFuture<'a, ()>;
682}
683
684// StateSyncAPI is for writing any data that was not the result of transaction execution,
685// but that arrived via state sync. The fact that it came via state sync implies that it
686// is certified output, and can be immediately persisted to the store.
687pub trait StateSyncAPI: Send + Sync {
688    fn insert_transaction_and_effects(
689        &self,
690        transaction: &VerifiedTransaction,
691        transaction_effects: &TransactionEffects,
692    );
693
694    fn multi_insert_transaction_and_effects(
695        &self,
696        transactions_and_effects: &[VerifiedExecutionData],
697    );
698}
699
700pub trait TestingAPI: Send + Sync {
701    fn database_for_testing(&self) -> Arc<AuthorityStore>;
702
703    fn cache_for_testing(&self) -> &WritebackCache;
704}
705
706macro_rules! implement_storage_traits {
707    ($implementor: ident) => {
708        impl ObjectStore for $implementor {
709            fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
710                ObjectCacheRead::get_object(self, object_id)
711            }
712
713            fn get_object_by_key(
714                &self,
715                object_id: &ObjectID,
716                version: sui_types::base_types::VersionNumber,
717            ) -> Option<Object> {
718                ObjectCacheRead::get_object_by_key(self, object_id, version)
719            }
720        }
721
722        impl ChildObjectResolver for $implementor {
723            fn read_child_object(
724                &self,
725                parent: &ObjectID,
726                child: &ObjectID,
727                child_version_upper_bound: SequenceNumber,
728            ) -> SuiResult<Option<Object>> {
729                let Some(child_object) =
730                    self.find_object_lt_or_eq_version(*child, child_version_upper_bound)
731                else {
732                    return Ok(None);
733                };
734
735                let parent = *parent;
736                if child_object.owner != Owner::ObjectOwner(parent.into()) {
737                    return Err(SuiErrorKind::InvalidChildObjectAccess {
738                        object: *child,
739                        given_parent: parent,
740                        actual_owner: child_object.owner.clone(),
741                    }
742                    .into());
743                }
744                Ok(Some(child_object))
745            }
746
747            fn get_object_received_at_version(
748                &self,
749                owner: &ObjectID,
750                receiving_object_id: &ObjectID,
751                receive_object_at_version: SequenceNumber,
752                epoch_id: EpochId,
753            ) -> SuiResult<Option<Object>> {
754                let Some(recv_object) = ObjectCacheRead::get_object_by_key(
755                    self,
756                    receiving_object_id,
757                    receive_object_at_version,
758                ) else {
759                    return Ok(None);
760                };
761
762                // Check for:
763                // * Invalid access -- treat as the object does not exist. Or;
764                // * If we've already received the object at the version -- then treat it as though it doesn't exist.
765                // These two cases must remain indisguishable to the caller otherwise we risk forks in
766                // transaction replay due to possible reordering of transactions during replay.
767                if recv_object.owner != Owner::AddressOwner((*owner).into())
768                    || self.have_received_object_at_version(
769                        // TODO: Add support for receiving consensus objects. For now this assumes fastpath.
770                        FullObjectKey::new(
771                            FullObjectID::new(*receiving_object_id, None),
772                            receive_object_at_version,
773                        ),
774                        epoch_id,
775                    )
776                {
777                    return Ok(None);
778                }
779
780                Ok(Some(recv_object))
781            }
782        }
783
784        impl BackingPackageStore for $implementor {
785            fn get_package_object(
786                &self,
787                package_id: &ObjectID,
788            ) -> SuiResult<Option<PackageObject>> {
789                ObjectCacheRead::get_package_object(self, package_id)
790            }
791        }
792
793        impl ParentSync for $implementor {
794            fn get_latest_parent_entry_ref_deprecated(
795                &self,
796                object_id: ObjectID,
797            ) -> Option<ObjectRef> {
798                ObjectCacheRead::get_latest_object_ref_or_tombstone(self, object_id)
799            }
800        }
801    };
802}
803
804// Implement traits for a cache implementation that always go directly to the store.
805macro_rules! implement_passthrough_traits {
806    ($implementor: ident) => {
807        impl CheckpointCache for $implementor {
808            fn deprecated_get_transaction_checkpoint(
809                &self,
810                digest: &TransactionDigest,
811            ) -> Option<(EpochId, CheckpointSequenceNumber)> {
812                self.store
813                    .deprecated_get_transaction_checkpoint(digest)
814                    .expect("db error")
815            }
816
817            fn deprecated_multi_get_transaction_checkpoint(
818                &self,
819                digests: &[TransactionDigest],
820            ) -> Vec<Option<(EpochId, CheckpointSequenceNumber)>> {
821                self.store
822                    .deprecated_multi_get_transaction_checkpoint(digests)
823                    .expect("db error")
824            }
825
826            fn deprecated_insert_finalized_transactions(
827                &self,
828                digests: &[TransactionDigest],
829                epoch: EpochId,
830                sequence: CheckpointSequenceNumber,
831            ) {
832                self.store
833                    .deprecated_insert_finalized_transactions(digests, epoch, sequence)
834                    .expect("db error");
835            }
836        }
837
838        impl ExecutionCacheReconfigAPI for $implementor {
839            fn insert_genesis_object(&self, object: Object) {
840                self.insert_genesis_object_impl(object)
841            }
842
843            fn bulk_insert_genesis_objects(&self, objects: &[Object]) {
844                self.bulk_insert_genesis_objects_impl(objects)
845            }
846
847            fn set_epoch_start_configuration(&self, epoch_start_config: &EpochStartConfiguration) {
848                self.store
849                    .set_epoch_start_configuration(epoch_start_config)
850                    .expect("db error");
851            }
852
853            fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
854                self.store.update_epoch_flags_metrics(old, new)
855            }
856
857            fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
858                self.clear_state_end_of_epoch_impl(execution_guard)
859            }
860
861            fn expensive_check_sui_conservation(
862                &self,
863                old_epoch_store: &AuthorityPerEpochStore,
864            ) -> SuiResult {
865                self.store
866                    .expensive_check_sui_conservation(self, old_epoch_store)
867            }
868
869            fn checkpoint_db(&self, path: &std::path::Path) -> SuiResult {
870                self.store.perpetual_tables.checkpoint_db(path)
871            }
872
873            fn maybe_reaccumulate_state_hash(
874                &self,
875                cur_epoch_store: &AuthorityPerEpochStore,
876                new_protocol_version: ProtocolVersion,
877            ) {
878                self.store
879                    .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version)
880            }
881
882            fn reconfigure_cache<'a>(
883                &'a self,
884                _: &'a EpochStartConfiguration,
885            ) -> BoxFuture<'a, ()> {
886                // Since we now use WritebackCache directly at startup (if the epoch flag is set),
887                // this can be called at reconfiguration time. It is a no-op.
888                // TODO: remove this once we completely remove ProxyCache.
889                std::future::ready(()).boxed()
890            }
891        }
892
893        impl TestingAPI for $implementor {
894            fn database_for_testing(&self) -> Arc<AuthorityStore> {
895                self.store.clone()
896            }
897
898            fn cache_for_testing(&self) -> &WritebackCache {
899                self
900            }
901        }
902    };
903}
904
905use implement_passthrough_traits;
906
907implement_storage_traits!(WritebackCache);
908
909pub trait ExecutionCacheAPI:
910    ObjectCacheRead
911    + ExecutionCacheWrite
912    + ExecutionCacheCommit
913    + ExecutionCacheReconfigAPI
914    + CheckpointCache
915    + StateSyncAPI
916{
917}