sui_core/
execution_cache.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::accumulators::funds_read::AccountFundsRead;
5use crate::authority::AuthorityStore;
6use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
7use crate::authority::authority_store::{ExecutionLockWriteGuard, SuiLockResult};
8use crate::authority::backpressure::BackpressureManager;
9use crate::authority::epoch_start_configuration::EpochFlag;
10use crate::authority::epoch_start_configuration::EpochStartConfiguration;
11use crate::global_state_hasher::GlobalStateHashStore;
12use crate::transaction_outputs::TransactionOutputs;
13use either::Either;
14use itertools::Itertools;
15use sui_types::accumulator_event::AccumulatorEvent;
16use sui_types::bridge::Bridge;
17
18use futures::{FutureExt, future::BoxFuture};
19use prometheus::Registry;
20use std::collections::HashSet;
21use std::path::Path;
22use std::sync::Arc;
23use sui_config::ExecutionCacheConfig;
24use sui_protocol_config::ProtocolVersion;
25use sui_types::base_types::{FullObjectID, VerifiedExecutionData};
26use sui_types::digests::{TransactionDigest, TransactionEffectsDigest};
27use sui_types::effects::{TransactionEffects, TransactionEvents};
28use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
29use sui_types::executable_transaction::VerifiedExecutableTransaction;
30use sui_types::messages_checkpoint::CheckpointSequenceNumber;
31use sui_types::object::Object;
32use sui_types::storage::{
33    BackingPackageStore, BackingStore, ChildObjectResolver, FullObjectKey, MarkerValue, ObjectKey,
34    ObjectOrTombstone, ObjectStore, PackageObject, ParentSync,
35};
36use sui_types::sui_system_state::SuiSystemState;
37use sui_types::transaction::VerifiedTransaction;
38use sui_types::{
39    base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber},
40    object::Owner,
41    storage::InputKey,
42};
43use tracing::instrument;
44use typed_store::rocks::DBBatch;
45
46pub(crate) mod cache_types;
47pub mod metrics;
48mod object_locks;
49pub mod writeback_cache;
50
51pub use writeback_cache::WritebackCache;
52
53use metrics::ExecutionCacheMetrics;
54
55// If you have Arc<ExecutionCache>, you cannot return a reference to it as
56// an &Arc<dyn ExecutionCacheRead> (for example), because the trait object is a fat pointer.
57// So, in order to be able to return &Arc<dyn T>, we create all the converted trait objects
58// (aka fat pointers) up front and return references to them.
59#[derive(Clone)]
60pub struct ExecutionCacheTraitPointers {
61    pub object_cache_reader: Arc<dyn ObjectCacheRead>,
62    pub transaction_cache_reader: Arc<dyn TransactionCacheRead>,
63    pub cache_writer: Arc<dyn ExecutionCacheWrite>,
64    pub backing_store: Arc<dyn BackingStore + Send + Sync>,
65    pub child_object_resolver: Arc<dyn ChildObjectResolver + Send + Sync>,
66    pub backing_package_store: Arc<dyn BackingPackageStore + Send + Sync>,
67    pub object_store: Arc<dyn ObjectStore + Send + Sync>,
68    pub reconfig_api: Arc<dyn ExecutionCacheReconfigAPI>,
69    pub global_state_hash_store: Arc<dyn GlobalStateHashStore>,
70    pub checkpoint_cache: Arc<dyn CheckpointCache>,
71    pub state_sync_store: Arc<dyn StateSyncAPI>,
72    pub cache_commit: Arc<dyn ExecutionCacheCommit>,
73    pub testing_api: Arc<dyn TestingAPI>,
74    pub account_funds_read: Arc<dyn AccountFundsRead>,
75}
76
77impl ExecutionCacheTraitPointers {
78    pub fn new<T>(cache: Arc<T>) -> Self
79    where
80        T: ObjectCacheRead
81            + TransactionCacheRead
82            + ExecutionCacheWrite
83            + BackingStore
84            + BackingPackageStore
85            + ObjectStore
86            + ExecutionCacheReconfigAPI
87            + GlobalStateHashStore
88            + CheckpointCache
89            + StateSyncAPI
90            + ExecutionCacheCommit
91            + TestingAPI
92            + AccountFundsRead
93            + 'static,
94    {
95        Self {
96            object_cache_reader: cache.clone(),
97            transaction_cache_reader: cache.clone(),
98            cache_writer: cache.clone(),
99            backing_store: cache.clone(),
100            child_object_resolver: cache.clone(),
101            backing_package_store: cache.clone(),
102            object_store: cache.clone(),
103            reconfig_api: cache.clone(),
104            global_state_hash_store: cache.clone(),
105            checkpoint_cache: cache.clone(),
106            state_sync_store: cache.clone(),
107            cache_commit: cache.clone(),
108            testing_api: cache.clone(),
109            account_funds_read: cache.clone(),
110        }
111    }
112}
113
114pub fn build_execution_cache(
115    cache_config: &ExecutionCacheConfig,
116    prometheus_registry: &Registry,
117    store: &Arc<AuthorityStore>,
118    backpressure_manager: Arc<BackpressureManager>,
119) -> ExecutionCacheTraitPointers {
120    let execution_cache_metrics = Arc::new(ExecutionCacheMetrics::new(prometheus_registry));
121
122    ExecutionCacheTraitPointers::new(
123        WritebackCache::new(
124            cache_config,
125            store.clone(),
126            execution_cache_metrics,
127            backpressure_manager,
128        )
129        .into(),
130    )
131}
132
133/// Should only be used for sui-tool or tests. Nodes must use build_execution_cache which
134/// uses the epoch_start_config to prevent cache impl from switching except at epoch boundaries.
135pub fn build_execution_cache_from_env(
136    prometheus_registry: &Registry,
137    store: &Arc<AuthorityStore>,
138) -> ExecutionCacheTraitPointers {
139    let execution_cache_metrics = Arc::new(ExecutionCacheMetrics::new(prometheus_registry));
140
141    ExecutionCacheTraitPointers::new(
142        WritebackCache::new(
143            &Default::default(),
144            store.clone(),
145            execution_cache_metrics,
146            BackpressureManager::new_for_tests(),
147        )
148        .into(),
149    )
150}
151
152pub type Batch = (Vec<Arc<TransactionOutputs>>, DBBatch);
153
154pub trait ExecutionCacheCommit: Send + Sync {
155    /// Build a DBBatch containing the given transaction outputs.
156    fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch;
157
158    /// Durably commit the outputs of the given transactions to the database.
159    /// Will be called by CheckpointExecutor to ensure that transaction outputs are
160    /// written durably before marking a checkpoint as finalized.
161    fn commit_transaction_outputs(
162        &self,
163        epoch: EpochId,
164        batch: Batch,
165        digests: &[TransactionDigest],
166    );
167
168    /// Durably commit a transaction to the database. Used to store any transactions
169    /// that cannot be reconstructed at start-up by consensus replay. Currently the only
170    /// case of this is RandomnessStateUpdate.
171    fn persist_transaction(&self, transaction: &VerifiedExecutableTransaction);
172
173    // Number of pending uncommitted transactions
174    fn approximate_pending_transaction_count(&self) -> u64;
175}
176
177pub trait ObjectCacheRead: Send + Sync {
178    fn get_package_object(&self, id: &ObjectID) -> SuiResult<Option<PackageObject>>;
179    fn force_reload_system_packages(&self, system_package_ids: &[ObjectID]);
180
181    fn get_object(&self, id: &ObjectID) -> Option<Object>;
182
183    fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
184        let mut ret = Vec::with_capacity(objects.len());
185        for object_id in objects {
186            ret.push(self.get_object(object_id));
187        }
188        ret
189    }
190
191    fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef>;
192
193    fn get_latest_object_or_tombstone(
194        &self,
195        object_id: ObjectID,
196    ) -> Option<(ObjectKey, ObjectOrTombstone)>;
197
198    fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object>;
199
200    fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>>;
201
202    fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool;
203
204    fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool>;
205
206    /// Load a list of objects from the store by object reference.
207    /// If they exist in the store, they are returned directly.
208    /// If any object missing, we try to figure out the best error to return.
209    /// If the object we are asking is currently locked at a future version, we know this
210    /// transaction is out-of-date and we return a ObjectVersionUnavailableForConsumption,
211    /// which indicates this is not retriable.
212    /// Otherwise, we return a ObjectNotFound error, which indicates this is retriable.
213    fn multi_get_objects_with_more_accurate_error_return(
214        &self,
215        object_refs: &[ObjectRef],
216    ) -> Result<Vec<Object>, SuiError> {
217        let objects = self
218            .multi_get_objects_by_key(&object_refs.iter().map(ObjectKey::from).collect::<Vec<_>>());
219        let mut result = Vec::new();
220        for (object_opt, object_ref) in objects.into_iter().zip(object_refs) {
221            match object_opt {
222                None => {
223                    let live_objref = self._get_live_objref(object_ref.0)?;
224                    let error = if live_objref.1 >= object_ref.1 {
225                        UserInputError::ObjectVersionUnavailableForConsumption {
226                            provided_obj_ref: *object_ref,
227                            current_version: live_objref.1,
228                        }
229                    } else {
230                        UserInputError::ObjectNotFound {
231                            object_id: object_ref.0,
232                            version: Some(object_ref.1),
233                        }
234                    };
235                    return Err(SuiErrorKind::UserInputError { error }.into());
236                }
237                Some(object) => {
238                    result.push(object);
239                }
240            }
241        }
242        assert_eq!(result.len(), object_refs.len());
243        Ok(result)
244    }
245
246    /// Used by execution scheduler to determine if input objects are ready. Distinct from multi_get_object_by_key
247    /// because it also consults markers to handle the case where an object will never become available (e.g.
248    /// because it has been received by some other transaction already).
249    fn multi_input_objects_available(
250        &self,
251        keys: &[InputKey],
252        receiving_objects: &HashSet<InputKey>,
253        epoch: EpochId,
254    ) -> Vec<bool> {
255        let mut results = vec![false; keys.len()];
256        let non_canceled_keys = keys.iter().enumerate().filter(|(idx, key)| {
257            if key.is_cancelled() {
258                // Shared objects in canceled transactions are always available.
259                results[*idx] = true;
260                false
261            } else {
262                true
263            }
264        });
265        let (move_object_keys, package_object_keys): (Vec<_>, Vec<_>) = non_canceled_keys
266            .partition_map(|(idx, key)| match key {
267                InputKey::VersionedObject { id, version } => Either::Left((idx, (id, version))),
268                InputKey::Package { id } => Either::Right((idx, id)),
269            });
270
271        for ((idx, (id, version)), has_key) in move_object_keys.iter().zip(
272            self.multi_object_exists_by_key(
273                &move_object_keys
274                    .iter()
275                    .map(|(_, k)| ObjectKey(k.0.id(), *k.1))
276                    .collect::<Vec<_>>(),
277            )
278            .into_iter(),
279        ) {
280            // If the key exists at the specified version, then the object is available.
281            if has_key {
282                results[*idx] = true;
283            } else if receiving_objects.contains(&InputKey::VersionedObject {
284                id: **id,
285                version: **version,
286            }) {
287                // There could be a more recent version of this object, and the object at the
288                // specified version could have already been pruned. In such a case `has_key` will
289                // be false, but since this is a receiving object we should mark it as available if
290                // we can determine that an object with a version greater than or equal to the
291                // specified version exists or was deleted. We will then let mark it as available
292                // to let the transaction through so it can fail at execution.
293                let is_available = self
294                    .get_object(&id.id())
295                    .map(|obj| obj.version() >= **version)
296                    .unwrap_or(false)
297                    || self.fastpath_stream_ended_at_version_or_after(id.id(), **version, epoch);
298                results[*idx] = is_available;
299            } else {
300                // If the object is an already-removed consensus object, mark it as available if the
301                // version for that object is in the marker table.
302                let is_consensus_stream_ended = self
303                    .get_consensus_stream_end_tx_digest(FullObjectKey::new(**id, **version), epoch)
304                    .is_some();
305                results[*idx] = is_consensus_stream_ended;
306            }
307        }
308
309        package_object_keys.into_iter().for_each(|(idx, id)| {
310            // get_package_object() only errors when the object is not a package, so returning false on error.
311            // Error is possible when this gets called on uncertified transactions.
312            results[idx] = self.get_package_object(id).is_ok_and(|p| p.is_some());
313        });
314
315        results
316    }
317
318    fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool>;
319
320    /// Return the object with version less then or eq to the provided seq number.
321    /// This is used by indexer to find the correct version of dynamic field child object.
322    /// We do not store the version of the child object, but because of lamport timestamp,
323    /// we know the child must have version number less then or eq to the parent.
324    fn find_object_lt_or_eq_version(
325        &self,
326        object_id: ObjectID,
327        version: SequenceNumber,
328    ) -> Option<Object>;
329
330    fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult;
331
332    // This method is considered "private" - only used by multi_get_objects_with_more_accurate_error_return
333    fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef>;
334
335    // Check that the given set of objects are live at the given version. This is used as a
336    // safety check before execution, and could potentially be deleted or changed to a debug_assert
337    fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult;
338
339    fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState>;
340
341    fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge>;
342
343    // Marker methods
344
345    /// Get the marker at a specific version
346    fn get_marker_value(&self, object_key: FullObjectKey, epoch_id: EpochId)
347    -> Option<MarkerValue>;
348
349    /// Get the latest marker for a given object.
350    fn get_latest_marker(
351        &self,
352        object_id: FullObjectID,
353        epoch_id: EpochId,
354    ) -> Option<(SequenceNumber, MarkerValue)>;
355
356    /// If the given consensus object stream was ended, return related
357    /// version and transaction digest.
358    fn get_last_consensus_stream_end_info(
359        &self,
360        object_id: FullObjectID,
361        epoch_id: EpochId,
362    ) -> Option<(SequenceNumber, TransactionDigest)> {
363        match self.get_latest_marker(object_id, epoch_id) {
364            Some((version, MarkerValue::ConsensusStreamEnded(digest))) => Some((version, digest)),
365            _ => None,
366        }
367    }
368
369    /// If the given consensus object stream was ended at the specified version,
370    /// return related transaction digest.
371    fn get_consensus_stream_end_tx_digest(
372        &self,
373        object_key: FullObjectKey,
374        epoch_id: EpochId,
375    ) -> Option<TransactionDigest> {
376        match self.get_marker_value(object_key, epoch_id) {
377            Some(MarkerValue::ConsensusStreamEnded(digest)) => Some(digest),
378            _ => None,
379        }
380    }
381
382    fn have_received_object_at_version(
383        &self,
384        object_key: FullObjectKey,
385        epoch_id: EpochId,
386    ) -> bool {
387        matches!(
388            self.get_marker_value(object_key, epoch_id),
389            Some(MarkerValue::Received)
390        )
391    }
392
393    fn fastpath_stream_ended_at_version_or_after(
394        &self,
395        object_id: ObjectID,
396        version: SequenceNumber,
397        epoch_id: EpochId,
398    ) -> bool {
399        let full_id = FullObjectID::Fastpath(object_id); // function explicitly assumes "fastpath"
400        matches!(
401            self.get_latest_marker(full_id, epoch_id),
402            Some((marker_version, MarkerValue::FastpathStreamEnded)) if marker_version >= version
403        )
404    }
405
406    /// Return the watermark for the highest checkpoint for which we've pruned objects.
407    fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber>;
408
409    /// Given a list of input and receiving objects for a transaction,
410    /// wait until all of them become available, so that the transaction
411    /// can start execution.
412    /// `input_and_receiving_keys` contains both input objects and receiving
413    /// input objects, including canceled objects.
414    /// TODO: Eventually this can return the objects read results,
415    /// so that execution does not need to load them again.
416    fn notify_read_input_objects<'a>(
417        &'a self,
418        input_and_receiving_keys: &'a [InputKey],
419        receiving_keys: &'a HashSet<InputKey>,
420        epoch: EpochId,
421    ) -> BoxFuture<'a, ()>;
422}
423
424pub trait TransactionCacheRead: Send + Sync {
425    fn multi_get_transaction_blocks(
426        &self,
427        digests: &[TransactionDigest],
428    ) -> Vec<Option<Arc<VerifiedTransaction>>>;
429
430    fn get_transaction_block(
431        &self,
432        digest: &TransactionDigest,
433    ) -> Option<Arc<VerifiedTransaction>> {
434        self.multi_get_transaction_blocks(&[*digest])
435            .pop()
436            .expect("multi-get must return correct number of items")
437    }
438
439    #[instrument(level = "trace", skip_all)]
440    fn get_transactions_and_serialized_sizes(
441        &self,
442        digests: &[TransactionDigest],
443    ) -> SuiResult<Vec<Option<(VerifiedTransaction, usize)>>> {
444        let txns = self.multi_get_transaction_blocks(digests);
445        txns.into_iter()
446            .map(|txn| {
447                txn.map(|txn| {
448                    // Note: if the transaction is read from the db, we are wasting some
449                    // effort relative to reading the raw bytes from the db instead of
450                    // calling serialized_size. However, transactions should usually be
451                    // fetched from cache.
452                    match txn.serialized_size() {
453                        Ok(size) => Ok(((*txn).clone(), size)),
454                        Err(e) => Err(e),
455                    }
456                })
457                .transpose()
458            })
459            .collect::<Result<Vec<_>, _>>()
460    }
461
462    fn multi_get_executed_effects_digests(
463        &self,
464        digests: &[TransactionDigest],
465    ) -> Vec<Option<TransactionEffectsDigest>>;
466
467    fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
468        self.multi_get_executed_effects_digests(&[*digest])
469            .pop()
470            .expect("multi-get must return correct number of items")
471            .is_some()
472    }
473
474    fn multi_get_executed_effects(
475        &self,
476        digests: &[TransactionDigest],
477    ) -> Vec<Option<TransactionEffects>> {
478        let effects_digests = self.multi_get_executed_effects_digests(digests);
479        assert_eq!(effects_digests.len(), digests.len());
480
481        let mut results = vec![None; digests.len()];
482        let mut fetch_digests = Vec::with_capacity(digests.len());
483        let mut fetch_indices = Vec::with_capacity(digests.len());
484
485        for (i, digest) in effects_digests.into_iter().enumerate() {
486            if let Some(digest) = digest {
487                fetch_digests.push(digest);
488                fetch_indices.push(i);
489            }
490        }
491
492        let effects = self.multi_get_effects(&fetch_digests);
493        for (i, effects) in fetch_indices.into_iter().zip(effects.into_iter()) {
494            results[i] = effects;
495        }
496
497        results
498    }
499
500    fn get_executed_effects(&self, digest: &TransactionDigest) -> Option<TransactionEffects> {
501        self.multi_get_executed_effects(&[*digest])
502            .pop()
503            .expect("multi-get must return correct number of items")
504    }
505
506    fn transaction_executed_in_last_epoch(
507        &self,
508        digest: &TransactionDigest,
509        current_epoch: EpochId,
510    ) -> bool;
511
512    fn multi_get_effects(
513        &self,
514        digests: &[TransactionEffectsDigest],
515    ) -> Vec<Option<TransactionEffects>>;
516
517    fn get_effects(&self, digest: &TransactionEffectsDigest) -> Option<TransactionEffects> {
518        self.multi_get_effects(&[*digest])
519            .pop()
520            .expect("multi-get must return correct number of items")
521    }
522
523    fn multi_get_events(&self, digests: &[TransactionDigest]) -> Vec<Option<TransactionEvents>>;
524
525    fn get_events(&self, digest: &TransactionDigest) -> Option<TransactionEvents> {
526        self.multi_get_events(&[*digest])
527            .pop()
528            .expect("multi-get must return correct number of items")
529    }
530
531    fn get_unchanged_loaded_runtime_objects(
532        &self,
533        digest: &TransactionDigest,
534    ) -> Option<Vec<ObjectKey>>;
535
536    fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>>;
537
538    fn notify_read_executed_effects_digests<'a>(
539        &'a self,
540        task_name: &'static str,
541        digests: &'a [TransactionDigest],
542    ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>>;
543
544    /// Wait until the effects of the given transactions are available and return them.
545    /// WARNING: If calling this on a transaction that could be reverted, you must be
546    /// sure that this function cannot be called during reconfiguration. The best way to
547    /// do this is to wrap your future in EpochStore::within_alive_epoch. Holding an
548    /// ExecutionLockReadGuard would also prevent reconfig from happening while waiting,
549    /// but this is very dangerous, as it could prevent reconfiguration from ever
550    /// occurring!
551    ///
552    /// This function panics if any of the requested effects are not found. Use this in
553    /// critical paths where effects are expected to exist (e.g., checkpoint building,
554    /// consensus commit processing). For non-critical paths where effects may have been
555    /// pruned (e.g., serving historical data to clients), use `notify_read_executed_effects_may_fail`.
556    fn notify_read_executed_effects<'a>(
557        &'a self,
558        task_name: &'static str,
559        digests: &'a [TransactionDigest],
560    ) -> BoxFuture<'a, Vec<TransactionEffects>> {
561        async move {
562            self.notify_read_executed_effects_may_fail(task_name, digests)
563                .await
564                .unwrap_or_else(|e| panic!("effects must exist: {e}"))
565        }
566        .boxed()
567    }
568
569    /// Returns an error if any of the requested effects have been pruned from the database.
570    /// Use this in non-critical paths where effects may not exist (e.g., serving historical
571    /// data that may have been pruned). For critical paths where effects must exist,
572    /// use `notify_read_executed_effects`.
573    fn notify_read_executed_effects_may_fail<'a>(
574        &'a self,
575        task_name: &'static str,
576        digests: &'a [TransactionDigest],
577    ) -> BoxFuture<'a, SuiResult<Vec<TransactionEffects>>> {
578        async move {
579            let effects_digests = self
580                .notify_read_executed_effects_digests(task_name, digests)
581                .await;
582            self.multi_get_effects(&effects_digests)
583                .into_iter()
584                .zip(digests)
585                .map(|(e, digest)| {
586                    e.ok_or_else(|| {
587                        SuiError::from(SuiErrorKind::TransactionEffectsNotFound { digest: *digest })
588                    })
589                })
590                .collect()
591        }
592        .boxed()
593    }
594
595    /// Get the execution outputs of a mysticeti fastpath certified transaction, if it exists.
596    fn get_mysticeti_fastpath_outputs(
597        &self,
598        tx_digest: &TransactionDigest,
599    ) -> Option<Arc<TransactionOutputs>>;
600
601    /// Wait until the outputs of the given transactions are available
602    /// in the temporary buffer holding mysticeti fastpath outputs.
603    fn notify_read_fastpath_transaction_outputs<'a>(
604        &'a self,
605        tx_digests: &'a [TransactionDigest],
606    ) -> BoxFuture<'a, Vec<Arc<TransactionOutputs>>>;
607}
608
609pub trait ExecutionCacheWrite: Send + Sync {
610    /// Write the output of a transaction.
611    ///
612    /// Because of the child object consistency rule (readers that observe parents must observe all
613    /// children of that parent, up to the parent's version bound), implementations of this method
614    /// must not write any top-level (address-owned or shared) objects before they have written all
615    /// of the object-owned objects (i.e. child objects) in the `objects` list.
616    ///
617    /// In the future, we may modify this method to expose finer-grained information about
618    /// parent/child relationships. (This may be especially necessary for distributed object
619    /// storage, but is unlikely to be an issue before we tackle that problem).
620    ///
621    /// This function may evict the mutable input objects (and successfully received objects) of
622    /// transaction from the cache, since they cannot be read by any other transaction.
623    ///
624    /// Any write performed by this method immediately notifies any waiter that has previously
625    /// called notify_read_objects_for_execution or notify_read_objects_for_signing for the object
626    /// in question.
627    fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>);
628
629    /// Write the output of a Mysticeti fastpath certified transaction.
630    /// Such output cannot be written to the dirty cache right away because
631    /// the transaction may end up rejected by consensus later. We need to make sure
632    /// that it is not visible to any subsequent transaction until we observe it
633    /// from consensus or checkpoints.
634    fn write_fastpath_transaction_outputs(&self, tx_outputs: Arc<TransactionOutputs>);
635
636    /// Validate owned object versions and digests without acquiring locks.
637    /// Used to validate transaction input before submitting or voting to accept the transaction.
638    fn validate_owned_object_versions(&self, owned_input_objects: &[ObjectRef]) -> SuiResult;
639
640    /// Write an object entry directly to the cache for testing.
641    /// This allows us to write an object without constructing the entire
642    /// transaction outputs.
643    #[cfg(test)]
644    fn write_object_entry_for_test(&self, object: Object);
645}
646
647pub trait CheckpointCache: Send + Sync {
648    // TODO: In addition to the deprecated methods below, this will eventually include access
649    // to the CheckpointStore
650
651    // DEPRECATED METHODS
652    fn deprecated_get_transaction_checkpoint(
653        &self,
654        digest: &TransactionDigest,
655    ) -> Option<(EpochId, CheckpointSequenceNumber)>;
656
657    fn deprecated_multi_get_transaction_checkpoint(
658        &self,
659        digests: &[TransactionDigest],
660    ) -> Vec<Option<(EpochId, CheckpointSequenceNumber)>>;
661
662    fn deprecated_insert_finalized_transactions(
663        &self,
664        digests: &[TransactionDigest],
665        epoch: EpochId,
666        sequence: CheckpointSequenceNumber,
667    );
668}
669
670pub trait ExecutionCacheReconfigAPI: Send + Sync {
671    fn insert_genesis_object(&self, object: Object);
672    fn bulk_insert_genesis_objects(&self, objects: &[Object]);
673
674    fn set_epoch_start_configuration(&self, epoch_start_config: &EpochStartConfiguration);
675
676    fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]);
677
678    fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>);
679
680    fn expensive_check_sui_conservation(
681        &self,
682        old_epoch_store: &AuthorityPerEpochStore,
683    ) -> SuiResult;
684
685    fn checkpoint_db(&self, path: &Path) -> SuiResult;
686
687    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
688    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
689    fn maybe_reaccumulate_state_hash(
690        &self,
691        cur_epoch_store: &AuthorityPerEpochStore,
692        new_protocol_version: ProtocolVersion,
693    );
694
695    /// Reconfigure the cache itself.
696    /// TODO: this is only needed for ProxyCache to switch between cache impls. It can be removed
697    /// once WritebackCache is the sole cache impl.
698    fn reconfigure_cache<'a>(
699        &'a self,
700        epoch_start_config: &'a EpochStartConfiguration,
701    ) -> BoxFuture<'a, ()>;
702}
703
704// StateSyncAPI is for writing any data that was not the result of transaction execution,
705// but that arrived via state sync. The fact that it came via state sync implies that it
706// is certified output, and can be immediately persisted to the store.
707pub trait StateSyncAPI: Send + Sync {
708    fn insert_transaction_and_effects(
709        &self,
710        transaction: &VerifiedTransaction,
711        transaction_effects: &TransactionEffects,
712    );
713
714    fn multi_insert_transaction_and_effects(
715        &self,
716        transactions_and_effects: &[VerifiedExecutionData],
717    );
718}
719
720pub trait TestingAPI: Send + Sync {
721    fn database_for_testing(&self) -> Arc<AuthorityStore>;
722
723    fn cache_for_testing(&self) -> &WritebackCache;
724}
725
726macro_rules! implement_storage_traits {
727    ($implementor: ident) => {
728        impl ObjectStore for $implementor {
729            fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
730                ObjectCacheRead::get_object(self, object_id)
731            }
732
733            fn get_object_by_key(
734                &self,
735                object_id: &ObjectID,
736                version: sui_types::base_types::VersionNumber,
737            ) -> Option<Object> {
738                ObjectCacheRead::get_object_by_key(self, object_id, version)
739            }
740        }
741
742        impl ChildObjectResolver for $implementor {
743            fn read_child_object(
744                &self,
745                parent: &ObjectID,
746                child: &ObjectID,
747                child_version_upper_bound: SequenceNumber,
748            ) -> SuiResult<Option<Object>> {
749                let Some(child_object) =
750                    self.find_object_lt_or_eq_version(*child, child_version_upper_bound)
751                else {
752                    return Ok(None);
753                };
754
755                let parent = *parent;
756                if child_object.owner != Owner::ObjectOwner(parent.into()) {
757                    return Err(SuiErrorKind::InvalidChildObjectAccess {
758                        object: *child,
759                        given_parent: parent,
760                        actual_owner: child_object.owner.clone(),
761                    }
762                    .into());
763                }
764                Ok(Some(child_object))
765            }
766
767            fn get_object_received_at_version(
768                &self,
769                owner: &ObjectID,
770                receiving_object_id: &ObjectID,
771                receive_object_at_version: SequenceNumber,
772                epoch_id: EpochId,
773            ) -> SuiResult<Option<Object>> {
774                let Some(recv_object) = ObjectCacheRead::get_object_by_key(
775                    self,
776                    receiving_object_id,
777                    receive_object_at_version,
778                ) else {
779                    return Ok(None);
780                };
781
782                // Check for:
783                // * Invalid access -- treat as the object does not exist. Or;
784                // * If we've already received the object at the version -- then treat it as though it doesn't exist.
785                // These two cases must remain indisguishable to the caller otherwise we risk forks in
786                // transaction replay due to possible reordering of transactions during replay.
787                if recv_object.owner != Owner::AddressOwner((*owner).into())
788                    || self.have_received_object_at_version(
789                        // TODO: Add support for receiving consensus objects. For now this assumes fastpath.
790                        FullObjectKey::new(
791                            FullObjectID::new(*receiving_object_id, None),
792                            receive_object_at_version,
793                        ),
794                        epoch_id,
795                    )
796                {
797                    return Ok(None);
798                }
799
800                Ok(Some(recv_object))
801            }
802        }
803
804        impl BackingPackageStore for $implementor {
805            fn get_package_object(
806                &self,
807                package_id: &ObjectID,
808            ) -> SuiResult<Option<PackageObject>> {
809                ObjectCacheRead::get_package_object(self, package_id)
810            }
811        }
812
813        impl ParentSync for $implementor {
814            fn get_latest_parent_entry_ref_deprecated(
815                &self,
816                object_id: ObjectID,
817            ) -> Option<ObjectRef> {
818                ObjectCacheRead::get_latest_object_ref_or_tombstone(self, object_id)
819            }
820        }
821    };
822}
823
824// Implement traits for a cache implementation that always go directly to the store.
825macro_rules! implement_passthrough_traits {
826    ($implementor: ident) => {
827        impl CheckpointCache for $implementor {
828            fn deprecated_get_transaction_checkpoint(
829                &self,
830                digest: &TransactionDigest,
831            ) -> Option<(EpochId, CheckpointSequenceNumber)> {
832                self.store
833                    .deprecated_get_transaction_checkpoint(digest)
834                    .expect("db error")
835            }
836
837            fn deprecated_multi_get_transaction_checkpoint(
838                &self,
839                digests: &[TransactionDigest],
840            ) -> Vec<Option<(EpochId, CheckpointSequenceNumber)>> {
841                self.store
842                    .deprecated_multi_get_transaction_checkpoint(digests)
843                    .expect("db error")
844            }
845
846            fn deprecated_insert_finalized_transactions(
847                &self,
848                digests: &[TransactionDigest],
849                epoch: EpochId,
850                sequence: CheckpointSequenceNumber,
851            ) {
852                self.store
853                    .deprecated_insert_finalized_transactions(digests, epoch, sequence)
854                    .expect("db error");
855            }
856        }
857
858        impl ExecutionCacheReconfigAPI for $implementor {
859            fn insert_genesis_object(&self, object: Object) {
860                self.insert_genesis_object_impl(object)
861            }
862
863            fn bulk_insert_genesis_objects(&self, objects: &[Object]) {
864                self.bulk_insert_genesis_objects_impl(objects)
865            }
866
867            fn set_epoch_start_configuration(&self, epoch_start_config: &EpochStartConfiguration) {
868                self.store
869                    .set_epoch_start_configuration(epoch_start_config)
870                    .expect("db error");
871            }
872
873            fn update_epoch_flags_metrics(&self, old: &[EpochFlag], new: &[EpochFlag]) {
874                self.store.update_epoch_flags_metrics(old, new)
875            }
876
877            fn clear_state_end_of_epoch(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
878                self.clear_state_end_of_epoch_impl(execution_guard)
879            }
880
881            fn expensive_check_sui_conservation(
882                &self,
883                old_epoch_store: &AuthorityPerEpochStore,
884            ) -> SuiResult {
885                self.store
886                    .expensive_check_sui_conservation(self, old_epoch_store)
887            }
888
889            fn checkpoint_db(&self, path: &std::path::Path) -> SuiResult {
890                self.store.perpetual_tables.checkpoint_db(path)
891            }
892
893            fn maybe_reaccumulate_state_hash(
894                &self,
895                cur_epoch_store: &AuthorityPerEpochStore,
896                new_protocol_version: ProtocolVersion,
897            ) {
898                self.store
899                    .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version)
900            }
901
902            fn reconfigure_cache<'a>(
903                &'a self,
904                _: &'a EpochStartConfiguration,
905            ) -> BoxFuture<'a, ()> {
906                // Since we now use WritebackCache directly at startup (if the epoch flag is set),
907                // this can be called at reconfiguration time. It is a no-op.
908                // TODO: remove this once we completely remove ProxyCache.
909                std::future::ready(()).boxed()
910            }
911        }
912
913        impl TestingAPI for $implementor {
914            fn database_for_testing(&self) -> Arc<AuthorityStore> {
915                self.store.clone()
916            }
917
918            fn cache_for_testing(&self) -> &WritebackCache {
919                self
920            }
921        }
922    };
923}
924
925use implement_passthrough_traits;
926
927implement_storage_traits!(WritebackCache);
928
929pub trait ExecutionCacheAPI:
930    ObjectCacheRead
931    + ExecutionCacheWrite
932    + ExecutionCacheCommit
933    + ExecutionCacheReconfigAPI
934    + CheckpointCache
935    + StateSyncAPI
936{
937}