sui_replay/
replay.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::chain_from_chain_id;
5use crate::{
6    data_fetcher::{
7        DataFetcher, Fetchers, NodeStateDumpFetcher, RemoteFetcher, extract_epoch_and_version,
8    },
9    displays::{
10        Pretty,
11        transaction_displays::{FullPTB, transform_command_results_to_annotated},
12    },
13    types::*,
14};
15use futures::executor::block_on;
16use move_binary_format::CompiledModule;
17use move_bytecode_utils::module_cache::GetModule;
18use move_core_types::{language_storage::ModuleId, resolver::ModuleResolver};
19use prometheus::Registry;
20use serde::{Deserialize, Serialize};
21use similar::{ChangeTag, TextDiff};
22use std::{
23    collections::{BTreeMap, HashSet},
24    path::PathBuf,
25    sync::Arc,
26    sync::Mutex,
27};
28use sui_config::node::ExpensiveSafetyCheckConfig;
29use sui_core::authority::NodeStateDump;
30use sui_execution::Executor;
31use sui_framework::BuiltInFramework;
32use sui_json_rpc_types::{
33    SuiExecutionStatus, SuiTransactionBlockEffects, SuiTransactionBlockEffectsAPI,
34};
35use sui_protocol_config::{Chain, ProtocolConfig};
36use sui_sdk::{SuiClient, SuiClientBuilder};
37use sui_types::SUI_DENY_LIST_OBJECT_ID;
38use sui_types::error::SuiErrorKind;
39use sui_types::execution_params::{
40    BalanceWithdrawStatus, ExecutionOrEarlyError, get_early_execution_error,
41};
42use sui_types::in_memory_storage::InMemoryStorage;
43use sui_types::message_envelope::Message;
44use sui_types::storage::{PackageObject, get_module};
45use sui_types::transaction::GasData;
46use sui_types::transaction::TransactionKind::ProgrammableTransaction;
47use sui_types::{
48    DEEPBOOK_PACKAGE_ID,
49    base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber},
50    committee::EpochId,
51    digests::{ObjectDigest, TransactionDigest},
52    error::{ExecutionError, SuiError, SuiResult},
53    executable_transaction::VerifiedExecutableTransaction,
54    gas::SuiGasStatus,
55    inner_temporary_store::InnerTemporaryStore,
56    metrics::LimitsMetrics,
57    object::{Object, Owner},
58    storage::get_module_by_id,
59    storage::{BackingPackageStore, ChildObjectResolver, ObjectStore, ParentSync},
60    transaction::{
61        CheckedInputObjects, InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
62        SenderSignedData, Transaction, TransactionDataAPI, TransactionKind, VerifiedTransaction,
63    },
64};
65use tracing::{error, info, trace, warn};
66
67// TODO: add persistent cache. But perf is good enough already.
68
69#[derive(Debug, Serialize, Deserialize)]
70pub struct ExecutionSandboxState {
71    /// Information describing the transaction
72    pub transaction_info: OnChainTransactionInfo,
73    /// All the objects that are required for the execution of the transaction
74    pub required_objects: Vec<Object>,
75    /// Temporary store from executing this locally in `execute_transaction_to_effects`
76    #[serde(skip)]
77    pub local_exec_temporary_store: Option<InnerTemporaryStore>,
78    /// Effects from executing this locally in `execute_transaction_to_effects`
79    pub local_exec_effects: SuiTransactionBlockEffects,
80    /// Status from executing this locally in `execute_transaction_to_effects`
81    #[serde(skip)]
82    pub local_exec_status: Option<Result<(), ExecutionError>>,
83}
84
85impl ExecutionSandboxState {
86    #[allow(clippy::result_large_err)]
87    pub fn check_effects(&self) -> Result<(), ReplayEngineError> {
88        let SuiTransactionBlockEffects::V1(mut local_effects) = self.local_exec_effects.clone();
89        let SuiTransactionBlockEffects::V1(on_chain_effects) =
90            self.transaction_info.effects.clone();
91
92        // Handle backwards compatibility with the new `abort_error` field in
93        // `SuiTransactionBlockEffects`
94        if on_chain_effects.abort_error.is_none() {
95            local_effects.abort_error = None;
96        }
97        let local_effects = SuiTransactionBlockEffects::V1(local_effects);
98        let on_chain_effects = SuiTransactionBlockEffects::V1(on_chain_effects);
99
100        if on_chain_effects != local_effects {
101            error!("Replay tool forked {}", self.transaction_info.tx_digest);
102            let diff = Self::diff_effects(&on_chain_effects, &local_effects);
103            println!("{}", diff);
104            return Err(ReplayEngineError::EffectsForked {
105                digest: self.transaction_info.tx_digest,
106                diff: format!("\n{}", diff),
107                on_chain: Box::new(on_chain_effects),
108                local: Box::new(local_effects),
109            });
110        }
111        Ok(())
112    }
113
114    /// Utility to diff effects in a human readable format
115    pub fn diff_effects(
116        on_chain_effects: &SuiTransactionBlockEffects,
117        local_effects: &SuiTransactionBlockEffects,
118    ) -> String {
119        let on_chain_str = format!("{:#?}", on_chain_effects);
120        let local_chain_str = format!("{:#?}", local_effects);
121        let mut res = vec![];
122
123        let diff = TextDiff::from_lines(&on_chain_str, &local_chain_str);
124        for change in diff.iter_all_changes() {
125            let sign = match change.tag() {
126                ChangeTag::Delete => "---",
127                ChangeTag::Insert => "+++",
128                ChangeTag::Equal => "   ",
129            };
130            res.push(format!("{}{}", sign, change));
131        }
132
133        res.join("")
134    }
135}
136
137#[derive(Debug, Clone, PartialEq, Eq)]
138pub struct ProtocolVersionSummary {
139    /// Protocol version at this point
140    pub protocol_version: u64,
141    /// The first epoch that uses this protocol version
142    pub epoch_start: u64,
143    /// The last epoch that uses this protocol version
144    pub epoch_end: u64,
145    /// The first checkpoint in this protocol v ersion
146    pub checkpoint_start: Option<u64>,
147    /// The last checkpoint in this protocol version
148    pub checkpoint_end: Option<u64>,
149    /// The transaction which triggered this epoch change
150    pub epoch_change_tx: TransactionDigest,
151}
152
153#[derive(Clone)]
154pub struct Storage {
155    /// These are objects at the frontier of the execution's view
156    /// They might not be the latest object currently but they are the latest objects
157    /// for the TX at the time it was run
158    /// This store cannot be shared between runners
159    pub live_objects_store: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
160
161    /// Package cache and object version cache can be shared between runners
162    /// Non system packages are immutable so we can cache these
163    pub package_cache: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
164    /// Object contents are frozen at their versions so we can cache these
165    /// We must place system packages here as well
166    pub object_version_cache: Arc<Mutex<BTreeMap<(ObjectID, SequenceNumber), Object>>>,
167}
168
169impl std::fmt::Display for Storage {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        writeln!(f, "Live object store")?;
172        for (id, obj) in self
173            .live_objects_store
174            .lock()
175            .expect("Unable to lock")
176            .iter()
177        {
178            writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
179        }
180        writeln!(f, "Package cache")?;
181        for (id, obj) in self.package_cache.lock().expect("Unable to lock").iter() {
182            writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
183        }
184        writeln!(f, "Object version cache")?;
185        for (id, _) in self
186            .object_version_cache
187            .lock()
188            .expect("Unable to lock")
189            .iter()
190        {
191            writeln!(f, "{}: {}", id.0, id.1)?;
192        }
193
194        write!(f, "")
195    }
196}
197
198impl Storage {
199    pub fn default() -> Self {
200        Self {
201            live_objects_store: Arc::new(Mutex::new(BTreeMap::new())),
202            package_cache: Arc::new(Mutex::new(BTreeMap::new())),
203            object_version_cache: Arc::new(Mutex::new(BTreeMap::new())),
204        }
205    }
206
207    pub fn all_objects(&self) -> Vec<Object> {
208        self.live_objects_store
209            .lock()
210            .expect("Unable to lock")
211            .values()
212            .cloned()
213            .chain(
214                self.package_cache
215                    .lock()
216                    .expect("Unable to lock")
217                    .values()
218                    .cloned(),
219            )
220            .chain(
221                self.object_version_cache
222                    .lock()
223                    .expect("Unable to lock")
224                    .values()
225                    .cloned(),
226            )
227            .collect::<Vec<_>>()
228    }
229}
230
231#[derive(Clone)]
232pub struct LocalExec {
233    pub client: Option<SuiClient>,
234    // For a given protocol version, what TX created it, and what is the valid range of epochs
235    // at this protocol version.
236    pub protocol_version_epoch_table: BTreeMap<u64, ProtocolVersionSummary>,
237    // For a given protocol version, the mapping valid sequence numbers for each framework package
238    pub protocol_version_system_package_table: BTreeMap<u64, BTreeMap<ObjectID, SequenceNumber>>,
239    // The current protocol version for this execution
240    pub current_protocol_version: u64,
241    // All state is contained here
242    pub storage: Storage,
243    // Debug events
244    pub exec_store_events: Arc<Mutex<Vec<ExecutionStoreEvent>>>,
245    // Debug events
246    pub metrics: Arc<LimitsMetrics>,
247    // Used for fetching data from the network or remote store
248    pub fetcher: Fetchers,
249
250    // One can optionally override the executor version
251    // -1 implies use latest version
252    pub executor_version: Option<i64>,
253    // One can optionally override the protocol version
254    // -1 implies use latest version
255    // None implies use the protocol version at the time of execution
256    pub protocol_version: Option<i64>,
257    pub config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
258    // Retry policies due to RPC errors
259    pub num_retries_for_timeout: u32,
260    pub sleep_period_for_timeout: std::time::Duration,
261}
262
263impl LocalExec {
264    /// Wrapper around fetcher in case we want to add more functionality
265    /// Such as fetching from local DB from snapshot
266    pub async fn multi_download(
267        &self,
268        objs: &[(ObjectID, SequenceNumber)],
269    ) -> Result<Vec<Object>, ReplayEngineError> {
270        let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
271        while num_retries_for_timeout >= 0 {
272            match self.fetcher.multi_get_versioned(objs).await {
273                Ok(objs) => return Ok(objs),
274                Err(ReplayEngineError::SuiRpcRequestTimeout) => {
275                    warn!(
276                        "RPC request timed out. Retries left {}. Sleeping for {}s",
277                        num_retries_for_timeout,
278                        self.sleep_period_for_timeout.as_secs()
279                    );
280                    num_retries_for_timeout -= 1;
281                    tokio::time::sleep(self.sleep_period_for_timeout).await;
282                }
283                Err(e) => return Err(e),
284            }
285        }
286        Err(ReplayEngineError::SuiRpcRequestTimeout)
287    }
288    /// Wrapper around fetcher in case we want to add more functionality
289    /// Such as fetching from local DB from snapshot
290    pub async fn multi_download_latest(
291        &self,
292        objs: &[ObjectID],
293    ) -> Result<Vec<Object>, ReplayEngineError> {
294        let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
295        while num_retries_for_timeout >= 0 {
296            match self.fetcher.multi_get_latest(objs).await {
297                Ok(objs) => return Ok(objs),
298                Err(ReplayEngineError::SuiRpcRequestTimeout) => {
299                    warn!(
300                        "RPC request timed out. Retries left {}. Sleeping for {}s",
301                        num_retries_for_timeout,
302                        self.sleep_period_for_timeout.as_secs()
303                    );
304                    num_retries_for_timeout -= 1;
305                    tokio::time::sleep(self.sleep_period_for_timeout).await;
306                }
307                Err(e) => return Err(e),
308            }
309        }
310        Err(ReplayEngineError::SuiRpcRequestTimeout)
311    }
312
313    pub async fn fetch_loaded_child_refs(
314        &self,
315        tx_digest: &TransactionDigest,
316    ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
317        // Get the child objects loaded
318        self.fetcher.get_loaded_child_objects(tx_digest).await
319    }
320
321    pub async fn new_from_fn_url(http_url: &str) -> Result<Self, ReplayEngineError> {
322        Self::new_for_remote(
323            SuiClientBuilder::default()
324                .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
325                .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
326                .build(http_url)
327                .await?,
328            None,
329        )
330        .await
331    }
332
333    pub async fn replay_with_network_config(
334        rpc_url: String,
335        tx_digest: TransactionDigest,
336        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
337        use_authority: bool,
338        executor_version: Option<i64>,
339        protocol_version: Option<i64>,
340        config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
341    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
342        info!("Using RPC URL: {}", rpc_url);
343        LocalExec::new_from_fn_url(&rpc_url)
344            .await?
345            .init_for_execution()
346            .await?
347            .execute_transaction(
348                &tx_digest,
349                expensive_safety_check_config,
350                use_authority,
351                executor_version,
352                protocol_version,
353                config_and_versions,
354            )
355            .await
356    }
357
358    /// This captures the state of the network at a given point in time and populates
359    /// prptocol version tables including which system packages to fetch
360    /// If this function is called across epoch boundaries, the info might be stale.
361    /// But it should only be called once per epoch.
362    pub async fn init_for_execution(mut self) -> Result<Self, ReplayEngineError> {
363        self.populate_protocol_version_tables().await?;
364        tokio::task::yield_now().await;
365        Ok(self)
366    }
367
368    pub async fn reset_for_new_execution_with_client(self) -> Result<Self, ReplayEngineError> {
369        Self::new_for_remote(
370            self.client.expect("Remote client not initialized"),
371            Some(self.fetcher.into_remote()),
372        )
373        .await?
374        .init_for_execution()
375        .await
376    }
377
378    pub async fn new_for_remote(
379        client: SuiClient,
380        remote_fetcher: Option<RemoteFetcher>,
381    ) -> Result<Self, ReplayEngineError> {
382        // Use a throwaway metrics registry for local execution.
383        let registry = prometheus::Registry::new();
384        let metrics = Arc::new(LimitsMetrics::new(&registry));
385
386        let fetcher = remote_fetcher.unwrap_or(RemoteFetcher::new(client.clone()));
387
388        Ok(Self {
389            client: Some(client),
390            protocol_version_epoch_table: BTreeMap::new(),
391            protocol_version_system_package_table: BTreeMap::new(),
392            current_protocol_version: 0,
393            exec_store_events: Arc::new(Mutex::new(Vec::new())),
394            metrics,
395            storage: Storage::default(),
396            fetcher: Fetchers::Remote(fetcher),
397            // TODO: make these configurable
398            num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
399            sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
400            executor_version: None,
401            protocol_version: None,
402            config_and_versions: None,
403        })
404    }
405
406    pub async fn new_for_state_dump(
407        path: &str,
408        backup_rpc_url: Option<String>,
409    ) -> Result<Self, ReplayEngineError> {
410        // Use a throwaway metrics registry for local execution.
411        let registry = prometheus::Registry::new();
412        let metrics = Arc::new(LimitsMetrics::new(&registry));
413
414        let state = NodeStateDump::read_from_file(&PathBuf::from(path))?;
415        let current_protocol_version = state.protocol_version;
416        let fetcher = match backup_rpc_url {
417            Some(url) => NodeStateDumpFetcher::new(
418                state,
419                Some(RemoteFetcher::new(
420                    SuiClientBuilder::default()
421                        .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
422                        .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
423                        .build(url)
424                        .await?,
425                )),
426            ),
427            None => NodeStateDumpFetcher::new(state, None),
428        };
429
430        Ok(Self {
431            client: None,
432            protocol_version_epoch_table: BTreeMap::new(),
433            protocol_version_system_package_table: BTreeMap::new(),
434            current_protocol_version,
435            exec_store_events: Arc::new(Mutex::new(Vec::new())),
436            metrics,
437            storage: Storage::default(),
438            fetcher: Fetchers::NodeStateDump(fetcher),
439            // TODO: make these configurable
440            num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
441            sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
442            executor_version: None,
443            protocol_version: None,
444            config_and_versions: None,
445        })
446    }
447
448    pub async fn multi_download_and_store(
449        &mut self,
450        objs: &[(ObjectID, SequenceNumber)],
451    ) -> Result<Vec<Object>, ReplayEngineError> {
452        let objs = self.multi_download(objs).await?;
453
454        // Backfill the store
455        for obj in objs.iter() {
456            let o_ref = obj.compute_object_reference();
457            self.storage
458                .live_objects_store
459                .lock()
460                .expect("Can't lock")
461                .insert(o_ref.0, obj.clone());
462            self.storage
463                .object_version_cache
464                .lock()
465                .expect("Cannot lock")
466                .insert((o_ref.0, o_ref.1), obj.clone());
467            if obj.is_package() {
468                self.storage
469                    .package_cache
470                    .lock()
471                    .expect("Cannot lock")
472                    .insert(o_ref.0, obj.clone());
473            }
474        }
475        tokio::task::yield_now().await;
476        Ok(objs)
477    }
478
479    pub async fn multi_download_relevant_packages_and_store(
480        &mut self,
481        objs: Vec<ObjectID>,
482        protocol_version: u64,
483    ) -> Result<Vec<Object>, ReplayEngineError> {
484        let syst_packages_objs = if self.protocol_version.is_some_and(|i| i < 0) {
485            BuiltInFramework::genesis_objects().collect()
486        } else {
487            let syst_packages =
488                self.system_package_versions_for_protocol_version(protocol_version)?;
489            self.multi_download(&syst_packages).await?
490        };
491
492        // Download latest version of all packages that are not system packages
493        // This is okay since the versions can never change
494        let non_system_package_objs: Vec<_> = objs
495            .into_iter()
496            .filter(|o| !Self::system_package_ids(self.current_protocol_version).contains(o))
497            .collect();
498        let objs = self
499            .multi_download_latest(&non_system_package_objs)
500            .await?
501            .into_iter()
502            .chain(syst_packages_objs.into_iter());
503
504        for obj in objs.clone() {
505            let o_ref = obj.compute_object_reference();
506            // We dont always want the latest in store
507            //self.storage.store.insert(o_ref.0, obj.clone());
508            self.storage
509                .object_version_cache
510                .lock()
511                .expect("Cannot lock")
512                .insert((o_ref.0, o_ref.1), obj.clone());
513            if obj.is_package() {
514                self.storage
515                    .package_cache
516                    .lock()
517                    .expect("Cannot lock")
518                    .insert(o_ref.0, obj.clone());
519            }
520        }
521        Ok(objs.collect())
522    }
523
524    // TODO: remove this after `futures::executor::block_on` is removed.
525    #[allow(clippy::disallowed_methods, clippy::result_large_err)]
526    pub fn download_object(
527        &self,
528        object_id: &ObjectID,
529        version: SequenceNumber,
530    ) -> Result<Object, ReplayEngineError> {
531        if self
532            .storage
533            .object_version_cache
534            .lock()
535            .expect("Cannot lock")
536            .contains_key(&(*object_id, version))
537        {
538            return Ok(self
539                .storage
540                .object_version_cache
541                .lock()
542                .expect("Cannot lock")
543                .get(&(*object_id, version))
544                .ok_or(ReplayEngineError::InternalCacheInvariantViolation {
545                    id: *object_id,
546                    version: Some(version),
547                })?
548                .clone());
549        }
550
551        let o = block_on(self.multi_download(&[(*object_id, version)])).map(|mut q| {
552            q.pop().unwrap_or_else(|| {
553                panic!(
554                    "Downloaded obj response cannot be empty {:?}",
555                    (*object_id, version)
556                )
557            })
558        })?;
559
560        let o_ref = o.compute_object_reference();
561        self.storage
562            .object_version_cache
563            .lock()
564            .expect("Cannot lock")
565            .insert((o_ref.0, o_ref.1), o.clone());
566        Ok(o)
567    }
568
569    // TODO: remove this after `futures::executor::block_on` is removed.
570    #[allow(clippy::disallowed_methods, clippy::result_large_err)]
571    pub fn download_latest_object(
572        &self,
573        object_id: &ObjectID,
574    ) -> Result<Option<Object>, ReplayEngineError> {
575        let resp = block_on({
576            //info!("Downloading latest object {object_id}");
577            self.multi_download_latest(std::slice::from_ref(object_id))
578        })
579        .map(|mut q| {
580            q.pop()
581                .unwrap_or_else(|| panic!("Downloaded obj response cannot be empty {}", *object_id))
582        });
583
584        match resp {
585            Ok(v) => Ok(Some(v)),
586            Err(ReplayEngineError::ObjectNotExist { id }) => {
587                error!(
588                    "Could not find object {id} on RPC server. It might have been pruned, deleted, or never existed."
589                );
590                Ok(None)
591            }
592            Err(ReplayEngineError::ObjectDeleted {
593                id,
594                version,
595                digest,
596            }) => {
597                error!("Object {id} {version} {digest} was deleted on RPC server.");
598                Ok(None)
599            }
600            Err(err) => Err(ReplayEngineError::SuiRpcError {
601                err: err.to_string(),
602            }),
603        }
604    }
605
606    #[allow(clippy::disallowed_methods, clippy::result_large_err)]
607    pub fn download_object_by_upper_bound(
608        &self,
609        object_id: &ObjectID,
610        version_upper_bound: VersionNumber,
611    ) -> Result<Option<Object>, ReplayEngineError> {
612        let local_object = self
613            .storage
614            .live_objects_store
615            .lock()
616            .expect("Can't lock")
617            .get(object_id)
618            .cloned();
619        if local_object.is_some() {
620            return Ok(local_object);
621        }
622        let response = block_on({
623            self.fetcher
624                .get_child_object(object_id, version_upper_bound)
625        });
626        match response {
627            Ok(object) => {
628                let obj_ref = object.compute_object_reference();
629                self.storage
630                    .live_objects_store
631                    .lock()
632                    .expect("Can't lock")
633                    .insert(*object_id, object.clone());
634                self.storage
635                    .object_version_cache
636                    .lock()
637                    .expect("Can't lock")
638                    .insert((obj_ref.0, obj_ref.1), object.clone());
639                Ok(Some(object))
640            }
641            Err(ReplayEngineError::ObjectNotExist { id }) => {
642                error!(
643                    "Could not find child object {id} on RPC server. It might have been pruned, deleted, or never existed."
644                );
645                Ok(None)
646            }
647            Err(ReplayEngineError::ObjectDeleted {
648                id,
649                version,
650                digest,
651            }) => {
652                error!("Object {id} {version} {digest} was deleted on RPC server.");
653                Ok(None)
654            }
655            // This is a child object which was not found in the store (e.g., due to exists
656            // check before creating the dynamic field).
657            Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
658                info!(
659                    "Object {id} {version} not found on RPC server -- this may have been pruned or never existed."
660                );
661                Ok(None)
662            }
663            Err(err) => Err(ReplayEngineError::SuiRpcError {
664                err: err.to_string(),
665            }),
666        }
667    }
668
669    pub async fn get_checkpoint_txs(
670        &self,
671        checkpoint_id: u64,
672    ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
673        self.fetcher
674            .get_checkpoint_txs(checkpoint_id)
675            .await
676            .map_err(|e| ReplayEngineError::SuiRpcError { err: e.to_string() })
677    }
678
679    pub async fn execute_all_in_checkpoints(
680        &mut self,
681        checkpoint_ids: &[u64],
682        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
683        terminate_early: bool,
684        use_authority: bool,
685    ) -> Result<(u64, u64), ReplayEngineError> {
686        // Get all the TXs at this checkpoint
687        let mut txs = Vec::new();
688        for checkpoint_id in checkpoint_ids {
689            txs.extend(self.get_checkpoint_txs(*checkpoint_id).await?);
690        }
691        let num = txs.len();
692        let mut succeeded = 0;
693        for tx in txs {
694            match self
695                .execute_transaction(
696                    &tx,
697                    expensive_safety_check_config.clone(),
698                    use_authority,
699                    None,
700                    None,
701                    None,
702                )
703                .await
704                .map(|q| q.check_effects())
705            {
706                Err(e) | Ok(Err(e)) => {
707                    if terminate_early {
708                        return Err(e);
709                    }
710                    error!("Error executing tx: {},  {:#?}", tx, e);
711                    continue;
712                }
713                _ => (),
714            }
715
716            succeeded += 1;
717        }
718        Ok((succeeded, num as u64))
719    }
720
721    pub async fn execution_engine_execute_with_tx_info_impl(
722        &mut self,
723        tx_info: &OnChainTransactionInfo,
724        override_transaction_kind: Option<TransactionKind>,
725        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
726    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
727        let tx_digest = &tx_info.tx_digest;
728        // Before protocol version 16, the generation of effects depends on the wrapped tombstones.
729        // It is not possible to retrieve such data for replay.
730        if tx_info.protocol_version.as_u64() < 16 {
731            warn!(
732                "Protocol version ({:?}) too old: {}, skipping transaction",
733                tx_info.protocol_version, tx_digest
734            );
735            return Err(ReplayEngineError::TransactionNotSupported {
736                digest: *tx_digest,
737                reason: "Protocol version too old".to_string(),
738            });
739        }
740        // Initialize the state necessary for execution
741        // Get the input objects
742        let input_objects = self.initialize_execution_env_state(tx_info).await?;
743        assert_eq!(
744            &input_objects.filter_shared_objects().len(),
745            &tx_info.shared_object_refs.len()
746        );
747        // At this point we have all the objects needed for replay
748
749        // This assumes we already initialized the protocol version table `protocol_version_epoch_table`
750        let protocol_config =
751            &ProtocolConfig::get_for_version(tx_info.protocol_version, tx_info.chain);
752
753        let metrics = self.metrics.clone();
754
755        let ov = self.executor_version;
756
757        // We could probably cache the executor per protocol config
758        let executor = get_executor(ov, protocol_config, expensive_safety_check_config);
759
760        // All prep done
761        let expensive_checks = true;
762        let transaction_kind = override_transaction_kind.unwrap_or(tx_info.kind.clone());
763        let gas_status = if tx_info.kind.is_system_tx() {
764            SuiGasStatus::new_unmetered()
765        } else {
766            SuiGasStatus::new(
767                tx_info.gas_budget,
768                tx_info.gas_price,
769                tx_info.reference_gas_price,
770                protocol_config,
771            )
772            .expect("Failed to create gas status")
773        };
774        let gas_data = GasData {
775            payment: tx_info.gas.clone(),
776            owner: tx_info.gas_owner.unwrap_or(tx_info.sender),
777            price: tx_info.gas_price,
778            budget: tx_info.gas_budget,
779        };
780        let checked_input_objects = CheckedInputObjects::new_for_replay(input_objects.clone());
781        let early_execution_error = get_early_execution_error(
782            tx_digest,
783            &checked_input_objects,
784            &HashSet::new(),
785            // TODO(address-balances): Support balance withdraw status for replay
786            &BalanceWithdrawStatus::MaybeSufficient,
787        );
788        let execution_params = match early_execution_error {
789            Some(error) => ExecutionOrEarlyError::Err(error),
790            None => ExecutionOrEarlyError::Ok(()),
791        };
792        let (inner_store, gas_status, effects, _timings, result) = executor
793            .execute_transaction_to_effects(
794                &self,
795                protocol_config,
796                metrics.clone(),
797                expensive_checks,
798                execution_params,
799                &tx_info.executed_epoch,
800                tx_info.epoch_start_timestamp,
801                checked_input_objects,
802                gas_data,
803                gas_status,
804                transaction_kind.clone(),
805                tx_info.sender,
806                *tx_digest,
807                &mut None,
808            );
809
810        if let Err(err) = self.pretty_print_for_tracing(
811            &gas_status,
812            &executor,
813            tx_info,
814            &transaction_kind,
815            protocol_config,
816            metrics,
817            expensive_checks,
818            input_objects.clone(),
819        ) {
820            error!("Failed to pretty print for tracing: {:?}", err);
821        }
822
823        let all_required_objects = self.storage.all_objects();
824
825        let effects =
826            SuiTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
827
828        Ok(ExecutionSandboxState {
829            transaction_info: tx_info.clone(),
830            required_objects: all_required_objects,
831            local_exec_temporary_store: Some(inner_store),
832            local_exec_effects: effects,
833            local_exec_status: Some(result),
834        })
835    }
836
837    fn pretty_print_for_tracing(
838        &self,
839        gas_status: &SuiGasStatus,
840        executor: &Arc<dyn Executor + Send + Sync>,
841        tx_info: &OnChainTransactionInfo,
842        transaction_kind: &TransactionKind,
843        protocol_config: &ProtocolConfig,
844        metrics: Arc<LimitsMetrics>,
845        expensive_checks: bool,
846        input_objects: InputObjects,
847    ) -> anyhow::Result<()> {
848        trace!(target: "replay_gas_info", "{}", Pretty(gas_status));
849
850        let skip_checks = true;
851        let gas_data = GasData {
852            payment: tx_info.gas.clone(),
853            owner: tx_info.gas_owner.unwrap_or(tx_info.sender),
854            price: tx_info.gas_price,
855            budget: tx_info.gas_budget,
856        };
857        let checked_input_objects = CheckedInputObjects::new_for_replay(input_objects.clone());
858        let early_execution_error = get_early_execution_error(
859            &tx_info.tx_digest,
860            &checked_input_objects,
861            &HashSet::new(),
862            // TODO(address-balances): Support balance withdraw status for replay
863            &BalanceWithdrawStatus::MaybeSufficient,
864        );
865        let execution_params = match early_execution_error {
866            Some(error) => ExecutionOrEarlyError::Err(error),
867            None => ExecutionOrEarlyError::Ok(()),
868        };
869        if let ProgrammableTransaction(pt) = transaction_kind {
870            trace!(
871                target: "replay_ptb_info",
872                "{}",
873                Pretty(&FullPTB {
874                    ptb: pt.clone(),
875                    results: transform_command_results_to_annotated(
876                        executor,
877                        &self.clone(),
878                        executor.dev_inspect_transaction(
879                            &self,
880                            protocol_config,
881                            metrics,
882                            expensive_checks,
883                            execution_params,
884                            &tx_info.executed_epoch,
885                            tx_info.epoch_start_timestamp,
886                            CheckedInputObjects::new_for_replay(input_objects),
887                            gas_data,
888                            SuiGasStatus::new(
889                                tx_info.gas_budget,
890                                tx_info.gas_price,
891                                tx_info.reference_gas_price,
892                                protocol_config,
893                            )?,
894                            transaction_kind.clone(),
895                            tx_info.sender,
896                            tx_info.sender_signed_data.digest(),
897                            skip_checks,
898                        )
899                        .3
900                        .unwrap_or_default(),
901                    )?,
902            }));
903        }
904        Ok(())
905    }
906
907    /// Must be called after `init_for_execution`
908    #[allow(clippy::result_large_err)]
909    pub async fn execution_engine_execute_impl(
910        &mut self,
911        tx_digest: &TransactionDigest,
912        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
913    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
914        if self.is_remote_replay() {
915            assert!(
916                !self.protocol_version_system_package_table.is_empty()
917                    || !self.protocol_version_epoch_table.is_empty(),
918                "Required tables not populated. Must call `init_for_execution` before executing transactions"
919            );
920        }
921
922        let tx_info = if self.is_remote_replay() {
923            self.resolve_tx_components(tx_digest).await?
924        } else {
925            self.resolve_tx_components_from_dump(tx_digest).await?
926        };
927        self.execution_engine_execute_with_tx_info_impl(
928            &tx_info,
929            None,
930            expensive_safety_check_config,
931        )
932        .await
933    }
934
935    /// Executes a transaction with the state specified in `pre_run_sandbox`
936    /// This is useful for executing a transaction with a specific state
937    /// However if the state in invalid, the behavior is undefined.
938    #[allow(clippy::result_large_err)]
939    pub async fn certificate_execute_with_sandbox_state(
940        pre_run_sandbox: &ExecutionSandboxState,
941    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
942        // These cannot be changed and are inherited from the sandbox state
943        let executed_epoch = pre_run_sandbox.transaction_info.executed_epoch;
944        let reference_gas_price = pre_run_sandbox.transaction_info.reference_gas_price;
945        let epoch_start_timestamp = pre_run_sandbox.transaction_info.epoch_start_timestamp;
946        let protocol_config = ProtocolConfig::get_for_version(
947            pre_run_sandbox.transaction_info.protocol_version,
948            pre_run_sandbox.transaction_info.chain,
949        );
950        let required_objects = pre_run_sandbox.required_objects.clone();
951        let store = InMemoryStorage::new(required_objects.clone());
952
953        let transaction =
954            Transaction::new(pre_run_sandbox.transaction_info.sender_signed_data.clone());
955
956        // TODO: This will not work for deleted shared objects. We need to persist that information in the sandbox.
957        // TODO: A lot of the following code is replicated in several places. We should introduce a few
958        // traits and make them shared so that we don't have to fix one by one when we have major execution
959        // layer changes.
960        let input_objects = store.read_input_objects_for_transaction(&transaction);
961        let executable = VerifiedExecutableTransaction::new_from_quorum_execution(
962            VerifiedTransaction::new_unchecked(transaction),
963            executed_epoch,
964        );
965        let (gas_status, input_objects) = sui_transaction_checks::check_certificate_input(
966            &executable,
967            input_objects,
968            &protocol_config,
969            reference_gas_price,
970        )
971        .unwrap();
972        let (kind, signer, gas_data) = executable.transaction_data().execution_parts();
973        let executor = sui_execution::executor(&protocol_config, true).unwrap();
974        let early_execution_error = get_early_execution_error(
975            executable.digest(),
976            &input_objects,
977            &HashSet::new(),
978            // TODO(address-balances): Support balance withdraw status for replay
979            &BalanceWithdrawStatus::MaybeSufficient,
980        );
981        let execution_params = match early_execution_error {
982            Some(error) => ExecutionOrEarlyError::Err(error),
983            None => ExecutionOrEarlyError::Ok(()),
984        };
985        let (_, _, effects, _timings, exec_res) = executor.execute_transaction_to_effects(
986            &store,
987            &protocol_config,
988            Arc::new(LimitsMetrics::new(&Registry::new())),
989            true,
990            execution_params,
991            &executed_epoch,
992            epoch_start_timestamp,
993            input_objects,
994            gas_data,
995            gas_status,
996            kind,
997            signer,
998            *executable.digest(),
999            &mut None,
1000        );
1001
1002        let effects =
1003            SuiTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
1004
1005        Ok(ExecutionSandboxState {
1006            transaction_info: pre_run_sandbox.transaction_info.clone(),
1007            required_objects,
1008            local_exec_temporary_store: None, // We dont capture it for cert exec run
1009            local_exec_effects: effects,
1010            local_exec_status: Some(exec_res),
1011        })
1012    }
1013
1014    /// Must be called after `init_for_execution`
1015    /// This executes from `sui_core::authority::AuthorityState::try_execute_immediately`
1016    #[allow(clippy::result_large_err)]
1017    pub async fn certificate_execute(
1018        &mut self,
1019        tx_digest: &TransactionDigest,
1020        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1021    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1022        // Use the lighterweight execution engine to get the pre-run state
1023        let pre_run_sandbox = self
1024            .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
1025            .await?;
1026        Self::certificate_execute_with_sandbox_state(&pre_run_sandbox).await
1027    }
1028
1029    /// Must be called after `init_for_execution`
1030    /// This executes from `sui_adapter::execution_engine::execute_transaction_to_effects`
1031    #[allow(clippy::result_large_err)]
1032    pub async fn execution_engine_execute(
1033        &mut self,
1034        tx_digest: &TransactionDigest,
1035        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1036    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1037        let sandbox_state = self
1038            .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
1039            .await?;
1040
1041        Ok(sandbox_state)
1042    }
1043
1044    #[allow(clippy::result_large_err)]
1045    pub async fn execute_state_dump(
1046        &mut self,
1047        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1048    ) -> Result<(ExecutionSandboxState, NodeStateDump), ReplayEngineError> {
1049        assert!(!self.is_remote_replay());
1050
1051        let d = match self.fetcher.clone() {
1052            Fetchers::NodeStateDump(d) => d,
1053            _ => panic!("Invalid fetcher for state dump"),
1054        };
1055        let tx_digest = d.node_state_dump.clone().tx_digest;
1056        let sandbox_state = self
1057            .execution_engine_execute_impl(&tx_digest, expensive_safety_check_config)
1058            .await?;
1059
1060        Ok((sandbox_state, d.node_state_dump))
1061    }
1062
1063    #[allow(clippy::result_large_err)]
1064    pub async fn execute_transaction(
1065        &mut self,
1066        tx_digest: &TransactionDigest,
1067        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1068        use_authority: bool,
1069        executor_version: Option<i64>,
1070        protocol_version: Option<i64>,
1071        config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
1072    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1073        self.executor_version = executor_version;
1074        self.protocol_version = protocol_version;
1075        self.config_and_versions = config_and_versions;
1076        if use_authority {
1077            self.certificate_execute(tx_digest, expensive_safety_check_config.clone())
1078                .await
1079        } else {
1080            self.execution_engine_execute(tx_digest, expensive_safety_check_config)
1081                .await
1082        }
1083    }
1084    fn system_package_ids(protocol_version: u64) -> Vec<ObjectID> {
1085        let mut ids = BuiltInFramework::all_package_ids();
1086
1087        if protocol_version < 5 {
1088            ids.retain(|id| *id != DEEPBOOK_PACKAGE_ID)
1089        }
1090        ids
1091    }
1092
1093    /// This is the only function which accesses the network during execution
1094    #[allow(clippy::result_large_err)]
1095    pub fn get_or_download_object(
1096        &self,
1097        obj_id: &ObjectID,
1098        package_expected: bool,
1099    ) -> Result<Option<Object>, ReplayEngineError> {
1100        if package_expected {
1101            if let Some(obj) = self
1102                .storage
1103                .package_cache
1104                .lock()
1105                .expect("Cannot lock")
1106                .get(obj_id)
1107            {
1108                return Ok(Some(obj.clone()));
1109            };
1110            // Check if its a system package because we must've downloaded all
1111            // TODO: Will return this check once we can download completely for other networks
1112            // assert!(
1113            //     !self.system_package_ids().contains(obj_id),
1114            //     "All system packages should be downloaded already"
1115            // );
1116        } else if let Some(obj) = self
1117            .storage
1118            .live_objects_store
1119            .lock()
1120            .expect("Can't lock")
1121            .get(obj_id)
1122        {
1123            return Ok(Some(obj.clone()));
1124        }
1125
1126        let Some(o) = self.download_latest_object(obj_id)? else {
1127            return Ok(None);
1128        };
1129
1130        if o.is_package() {
1131            assert!(
1132                package_expected,
1133                "Did not expect package but downloaded object is a package: {obj_id}"
1134            );
1135
1136            self.storage
1137                .package_cache
1138                .lock()
1139                .expect("Cannot lock")
1140                .insert(*obj_id, o.clone());
1141        }
1142        let o_ref = o.compute_object_reference();
1143        self.storage
1144            .object_version_cache
1145            .lock()
1146            .expect("Cannot lock")
1147            .insert((o_ref.0, o_ref.1), o.clone());
1148        Ok(Some(o))
1149    }
1150
1151    pub fn is_remote_replay(&self) -> bool {
1152        matches!(self.fetcher, Fetchers::Remote(_))
1153    }
1154
1155    /// Must be called after `populate_protocol_version_tables`
1156    #[allow(clippy::result_large_err)]
1157    pub fn system_package_versions_for_protocol_version(
1158        &self,
1159        protocol_version: u64,
1160    ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
1161        match &self.fetcher {
1162            Fetchers::Remote(_) => Ok(self
1163                .protocol_version_system_package_table
1164                .get(&protocol_version)
1165                .ok_or(ReplayEngineError::FrameworkObjectVersionTableNotPopulated {
1166                    protocol_version,
1167                })?
1168                .clone()
1169                .into_iter()
1170                .collect()),
1171
1172            Fetchers::NodeStateDump(d) => Ok(d
1173                .node_state_dump
1174                .relevant_system_packages
1175                .iter()
1176                .map(|w| (w.id, w.version, w.digest))
1177                .map(|q| (q.0, q.1))
1178                .collect()),
1179        }
1180    }
1181
1182    pub async fn protocol_ver_to_epoch_map(
1183        &self,
1184    ) -> Result<BTreeMap<u64, ProtocolVersionSummary>, ReplayEngineError> {
1185        let mut range_map = BTreeMap::new();
1186        let epoch_change_events = self.fetcher.get_epoch_change_events(false).await?;
1187
1188        // Exception for Genesis: Protocol version 1 at epoch 0
1189        let mut tx_digest = *self
1190            .fetcher
1191            .get_checkpoint_txs(0)
1192            .await?
1193            .first()
1194            .expect("Genesis TX must be in first checkpoint");
1195        // Somehow the genesis TX did not emit any event, but we know it was the start of version 1
1196        // So we need to manually add this range
1197        let (mut start_epoch, mut start_protocol_version, mut start_checkpoint) =
1198            (0, 1, Some(0u64));
1199
1200        let (mut curr_epoch, mut curr_protocol_version, mut curr_checkpoint) =
1201            (start_epoch, start_protocol_version, start_checkpoint);
1202
1203        (start_epoch, start_protocol_version, start_checkpoint) =
1204            (curr_epoch, curr_protocol_version, curr_checkpoint);
1205
1206        // This is the final tx digest for the epoch change. We need this to track the final checkpoint
1207        let mut end_epoch_tx_digest = tx_digest;
1208
1209        for event in epoch_change_events {
1210            (curr_epoch, curr_protocol_version) = extract_epoch_and_version(event.clone())?;
1211            end_epoch_tx_digest = event.id.tx_digest;
1212
1213            if start_protocol_version == curr_protocol_version {
1214                // Same range
1215                continue;
1216            }
1217
1218            // Change in prot version
1219            // Find the last checkpoint
1220            curr_checkpoint = self
1221                .fetcher
1222                .get_transaction(&event.id.tx_digest)
1223                .await?
1224                .checkpoint;
1225            // Insert the last range
1226            range_map.insert(
1227                start_protocol_version,
1228                ProtocolVersionSummary {
1229                    protocol_version: start_protocol_version,
1230                    epoch_start: start_epoch,
1231                    epoch_end: curr_epoch - 1,
1232                    checkpoint_start: start_checkpoint,
1233                    checkpoint_end: curr_checkpoint.map(|x| x - 1),
1234                    epoch_change_tx: tx_digest,
1235                },
1236            );
1237
1238            start_epoch = curr_epoch;
1239            start_protocol_version = curr_protocol_version;
1240            tx_digest = event.id.tx_digest;
1241            start_checkpoint = curr_checkpoint;
1242        }
1243
1244        // Insert the last range
1245        range_map.insert(
1246            curr_protocol_version,
1247            ProtocolVersionSummary {
1248                protocol_version: curr_protocol_version,
1249                epoch_start: start_epoch,
1250                epoch_end: curr_epoch,
1251                checkpoint_start: curr_checkpoint,
1252                checkpoint_end: self
1253                    .fetcher
1254                    .get_transaction(&end_epoch_tx_digest)
1255                    .await?
1256                    .checkpoint,
1257                epoch_change_tx: tx_digest,
1258            },
1259        );
1260
1261        Ok(range_map)
1262    }
1263
1264    pub fn protocol_version_for_epoch(
1265        epoch: u64,
1266        mp: &BTreeMap<u64, (TransactionDigest, u64, u64)>,
1267    ) -> u64 {
1268        // Naive impl but works for now
1269        // Can improve with range algos & data structures
1270        let mut version = 1;
1271        for (k, v) in mp.iter().rev() {
1272            if v.1 <= epoch {
1273                version = *k;
1274                break;
1275            }
1276        }
1277        version
1278    }
1279
1280    pub async fn populate_protocol_version_tables(&mut self) -> Result<(), ReplayEngineError> {
1281        self.protocol_version_epoch_table = self.protocol_ver_to_epoch_map().await?;
1282
1283        let system_package_revisions = self.system_package_versions().await?;
1284
1285        // This can be more efficient but small footprint so okay for now
1286        //Table is sorted from earliest to latest
1287        for (
1288            prot_ver,
1289            ProtocolVersionSummary {
1290                epoch_change_tx: tx_digest,
1291                ..
1292            },
1293        ) in self.protocol_version_epoch_table.clone()
1294        {
1295            // Use the previous versions protocol version table
1296            let mut working = if prot_ver <= 1 {
1297                BTreeMap::new()
1298            } else {
1299                self.protocol_version_system_package_table
1300                    .iter()
1301                    .rev()
1302                    .find(|(ver, _)| **ver <= prot_ver)
1303                    .expect("Prev entry must exist")
1304                    .1
1305                    .clone()
1306            };
1307
1308            for (id, versions) in system_package_revisions.iter() {
1309                // Oldest appears first in list, so reverse
1310                for ver in versions.iter().rev() {
1311                    if ver.1 == tx_digest {
1312                        // Found the version for this protocol version
1313                        working.insert(*id, ver.0);
1314                        break;
1315                    }
1316                }
1317            }
1318            self.protocol_version_system_package_table
1319                .insert(prot_ver, working);
1320        }
1321        Ok(())
1322    }
1323
1324    pub async fn system_package_versions(
1325        &self,
1326    ) -> Result<BTreeMap<ObjectID, Vec<(SequenceNumber, TransactionDigest)>>, ReplayEngineError>
1327    {
1328        let system_package_ids = Self::system_package_ids(
1329            *self
1330                .protocol_version_epoch_table
1331                .keys()
1332                .peekable()
1333                .last()
1334                .expect("Protocol version epoch table not populated"),
1335        );
1336        let mut system_package_objs = self.multi_download_latest(&system_package_ids).await?;
1337
1338        let mut mapping = BTreeMap::new();
1339
1340        // Extract all the transactions which created or mutated this object
1341        while !system_package_objs.is_empty() {
1342            // For the given object and its version, record the transaction which upgraded or created it
1343            let previous_txs: Vec<_> = system_package_objs
1344                .iter()
1345                .map(|o| (o.compute_object_reference(), o.previous_transaction))
1346                .collect();
1347
1348            previous_txs.iter().for_each(|((id, ver, _), tx)| {
1349                mapping.entry(*id).or_insert(vec![]).push((*ver, *tx));
1350            });
1351
1352            // Next round
1353            // Get the previous version of each object if exists
1354            let previous_ver_refs: Vec<_> = previous_txs
1355                .iter()
1356                .filter_map(|(q, _)| {
1357                    let prev_ver = u64::from(q.1) - 1;
1358                    if prev_ver == 0 {
1359                        None
1360                    } else {
1361                        Some((q.0, SequenceNumber::from(prev_ver)))
1362                    }
1363                })
1364                .collect();
1365            system_package_objs = match self.multi_download(&previous_ver_refs).await {
1366                Ok(packages) => packages,
1367                Err(ReplayEngineError::ObjectNotExist { id }) => {
1368                    // This happens when the RPC server prunes older object
1369                    // Replays in the current protocol version will work but old ones might not
1370                    // as we cannot fetch the package
1371                    warn!(
1372                        "Object {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1373                        id
1374                    );
1375                    break;
1376                }
1377                Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
1378                    // This happens when the RPC server prunes older object
1379                    // Replays in the current protocol version will work but old ones might not
1380                    // as we cannot fetch the package
1381                    warn!(
1382                        "Object {} at version {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1383                        id, version
1384                    );
1385                    break;
1386                }
1387                Err(ReplayEngineError::ObjectVersionTooHigh {
1388                    id,
1389                    asked_version,
1390                    latest_version,
1391                }) => {
1392                    warn!(
1393                        "Object {} at version {} does not exist on RPC server. Latest version is {}. This might be due to pruning. Historical replays might not work",
1394                        id, asked_version, latest_version
1395                    );
1396                    break;
1397                }
1398                Err(ReplayEngineError::ObjectDeleted {
1399                    id,
1400                    version,
1401                    digest,
1402                }) => {
1403                    // This happens when the RPC server prunes older object
1404                    // Replays in the current protocol version will work but old ones might not
1405                    // as we cannot fetch the package
1406                    warn!(
1407                        "Object {} at version {} digest {} deleted from RPC server. This might be due to pruning. Historical replays might not work",
1408                        id, version, digest
1409                    );
1410                    break;
1411                }
1412                Err(e) => return Err(e),
1413            };
1414        }
1415        Ok(mapping)
1416    }
1417
1418    pub async fn get_protocol_config(
1419        &self,
1420        epoch_id: EpochId,
1421        chain: Chain,
1422    ) -> Result<ProtocolConfig, ReplayEngineError> {
1423        match self.protocol_version {
1424            Some(x) if x < 0 => Ok(ProtocolConfig::get_for_max_version_UNSAFE()),
1425            Some(v) => Ok(ProtocolConfig::get_for_version((v as u64).into(), chain)),
1426            None => self
1427                .protocol_version_epoch_table
1428                .iter()
1429                .rev()
1430                .find(|(_, rg)| epoch_id >= rg.epoch_start)
1431                .map(|(p, _rg)| Ok(ProtocolConfig::get_for_version((*p).into(), chain)))
1432                .unwrap_or_else(|| {
1433                    Err(ReplayEngineError::ProtocolVersionNotFound { epoch: epoch_id })
1434                }),
1435        }
1436    }
1437
1438    pub async fn checkpoints_for_epoch(
1439        &self,
1440        epoch_id: u64,
1441    ) -> Result<(u64, u64), ReplayEngineError> {
1442        let epoch_change_events = self
1443            .fetcher
1444            .get_epoch_change_events(true)
1445            .await?
1446            .into_iter()
1447            .collect::<Vec<_>>();
1448        let (start_checkpoint, start_epoch_idx) = if epoch_id == 0 {
1449            (0, 1)
1450        } else {
1451            let idx = epoch_change_events
1452                .iter()
1453                .position(|ev| match extract_epoch_and_version(ev.clone()) {
1454                    Ok((epoch, _)) => epoch == epoch_id,
1455                    Err(_) => false,
1456                })
1457                .ok_or(ReplayEngineError::EventNotFound { epoch: epoch_id })?;
1458            let epoch_change_tx = epoch_change_events[idx].id.tx_digest;
1459            (
1460                self.fetcher
1461                    .get_transaction(&epoch_change_tx)
1462                    .await?
1463                    .checkpoint
1464                    .unwrap_or_else(|| {
1465                        panic!(
1466                            "Checkpoint for transaction {} not present. Could be due to pruning",
1467                            epoch_change_tx
1468                        )
1469                    }),
1470                idx,
1471            )
1472        };
1473
1474        let next_epoch_change_tx = epoch_change_events
1475            .get(start_epoch_idx + 1)
1476            .map(|v| v.id.tx_digest)
1477            .ok_or(ReplayEngineError::UnableToDetermineCheckpoint { epoch: epoch_id })?;
1478
1479        let next_epoch_checkpoint = self
1480            .fetcher
1481            .get_transaction(&next_epoch_change_tx)
1482            .await?
1483            .checkpoint
1484            .unwrap_or_else(|| {
1485                panic!(
1486                    "Checkpoint for transaction {} not present. Could be due to pruning",
1487                    next_epoch_change_tx
1488                )
1489            });
1490
1491        Ok((start_checkpoint, next_epoch_checkpoint - 1))
1492    }
1493
1494    pub async fn get_epoch_start_timestamp_and_rgp(
1495        &self,
1496        epoch_id: u64,
1497        tx_digest: &TransactionDigest,
1498    ) -> Result<(u64, u64), ReplayEngineError> {
1499        if epoch_id == 0 {
1500            return Err(ReplayEngineError::TransactionNotSupported {
1501                digest: *tx_digest,
1502                reason: "Transactions from epoch 0 not supported".to_string(),
1503            });
1504        }
1505        self.fetcher
1506            .get_epoch_start_timestamp_and_rgp(epoch_id)
1507            .await
1508    }
1509
1510    fn add_config_objects_if_needed(
1511        &self,
1512        status: &SuiExecutionStatus,
1513    ) -> Vec<(ObjectID, SequenceNumber)> {
1514        match parse_effect_error_for_denied_coins(status) {
1515            Some(coin_type) => {
1516                let Some(mut config_id_and_version) = self.config_and_versions.clone() else {
1517                    panic!(
1518                        "Need to specify the config object ID and version for '{coin_type}' in order to replay this transaction"
1519                    );
1520                };
1521                // NB: the version of the deny list object doesn't matter
1522                if !config_id_and_version
1523                    .iter()
1524                    .any(|(id, _)| id == &SUI_DENY_LIST_OBJECT_ID)
1525                {
1526                    let deny_list_oid_version = self.download_latest_object(&SUI_DENY_LIST_OBJECT_ID)
1527                        .ok()
1528                        .flatten()
1529                        .expect("Unable to download the deny list object for a transaction that requires it")
1530                        .version();
1531                    config_id_and_version.push((SUI_DENY_LIST_OBJECT_ID, deny_list_oid_version));
1532                }
1533                config_id_and_version
1534            }
1535            None => vec![],
1536        }
1537    }
1538
1539    async fn resolve_tx_components(
1540        &self,
1541        tx_digest: &TransactionDigest,
1542    ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1543        assert!(self.is_remote_replay());
1544        // Fetch full transaction content
1545        let tx_info = self.fetcher.get_transaction(tx_digest).await?;
1546        let sender = match tx_info.clone().transaction.unwrap().data {
1547            sui_json_rpc_types::SuiTransactionBlockData::V1(tx) => tx.sender,
1548        };
1549        let SuiTransactionBlockEffects::V1(effects) = tx_info.clone().effects.unwrap();
1550
1551        let config_objects = self.add_config_objects_if_needed(effects.status());
1552
1553        let raw_tx_bytes = tx_info.clone().raw_transaction;
1554        let orig_tx: SenderSignedData = bcs::from_bytes(&raw_tx_bytes).unwrap();
1555        let input_objs = orig_tx
1556            .transaction_data()
1557            .input_objects()
1558            .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1559        let tx_kind_orig = orig_tx.transaction_data().kind();
1560
1561        // Download the objects at the version right before the execution of this TX
1562        let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1563
1564        let shared_object_refs: Vec<ObjectRef> = effects
1565            .shared_objects()
1566            .iter()
1567            .map(|so_ref| {
1568                if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1569                    unimplemented!(
1570                        "Replay of deleted shared object transactions is not supported yet"
1571                    );
1572                } else {
1573                    so_ref.to_object_ref()
1574                }
1575            })
1576            .collect();
1577        let gas_data = match tx_info.clone().transaction.unwrap().data {
1578            sui_json_rpc_types::SuiTransactionBlockData::V1(tx) => tx.gas_data,
1579        };
1580        let gas_object_refs: Vec<_> = gas_data
1581            .payment
1582            .iter()
1583            .map(|obj_ref| obj_ref.to_object_ref())
1584            .collect();
1585        let receiving_objs = orig_tx
1586            .transaction_data()
1587            .receiving_objects()
1588            .into_iter()
1589            .map(|(obj_id, version, _)| (obj_id, version))
1590            .collect();
1591
1592        let epoch_id = effects.executed_epoch;
1593        let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1594
1595        // Extract the epoch start timestamp
1596        let (epoch_start_timestamp, reference_gas_price) = self
1597            .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1598            .await?;
1599
1600        Ok(OnChainTransactionInfo {
1601            kind: tx_kind_orig.clone(),
1602            sender,
1603            modified_at_versions,
1604            input_objects: input_objs,
1605            shared_object_refs,
1606            gas: gas_object_refs,
1607            gas_owner: (gas_data.owner != sender).then_some(gas_data.owner),
1608            gas_price: gas_data.price,
1609            gas_budget: gas_data.budget,
1610            executed_epoch: epoch_id,
1611            dependencies: effects.dependencies().to_vec(),
1612            effects: SuiTransactionBlockEffects::V1(effects),
1613            receiving_objs,
1614            config_objects,
1615            // Find the protocol version for this epoch
1616            // This assumes we already initialized the protocol version table `protocol_version_epoch_table`
1617            protocol_version: self.get_protocol_config(epoch_id, chain).await?.version,
1618            tx_digest: *tx_digest,
1619            epoch_start_timestamp,
1620            sender_signed_data: orig_tx.clone(),
1621            reference_gas_price,
1622            chain,
1623        })
1624    }
1625
1626    async fn resolve_tx_components_from_dump(
1627        &self,
1628        tx_digest: &TransactionDigest,
1629    ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1630        assert!(!self.is_remote_replay());
1631
1632        let dp = self.fetcher.as_node_state_dump();
1633
1634        let sender = dp
1635            .node_state_dump
1636            .sender_signed_data
1637            .transaction_data()
1638            .sender();
1639        let orig_tx = dp.node_state_dump.sender_signed_data.clone();
1640        let effects = dp.node_state_dump.computed_effects.clone();
1641        let effects = SuiTransactionBlockEffects::try_from(effects).unwrap();
1642        // Config objects don't show up in the node state dump so they need to be provided.
1643        let config_objects = self.add_config_objects_if_needed(effects.status());
1644
1645        // Fetch full transaction content
1646        //let tx_info = self.fetcher.get_transaction(tx_digest).await?;
1647
1648        let input_objs = orig_tx
1649            .transaction_data()
1650            .input_objects()
1651            .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1652        let tx_kind_orig = orig_tx.transaction_data().kind();
1653
1654        // Download the objects at the version right before the execution of this TX
1655        let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1656
1657        let shared_object_refs: Vec<ObjectRef> = effects
1658            .shared_objects()
1659            .iter()
1660            .map(|so_ref| {
1661                if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1662                    unimplemented!(
1663                        "Replay of deleted shared object transactions is not supported yet"
1664                    );
1665                } else {
1666                    so_ref.to_object_ref()
1667                }
1668            })
1669            .collect();
1670        let receiving_objs = orig_tx
1671            .transaction_data()
1672            .receiving_objects()
1673            .into_iter()
1674            .map(|(obj_id, version, _)| (obj_id, version))
1675            .collect();
1676
1677        let epoch_id = dp.node_state_dump.executed_epoch;
1678
1679        let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1680
1681        let protocol_config =
1682            ProtocolConfig::get_for_version(dp.node_state_dump.protocol_version.into(), chain);
1683        // Extract the epoch start timestamp
1684        let (epoch_start_timestamp, reference_gas_price) = self
1685            .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1686            .await?;
1687        let gas_data = orig_tx.transaction_data().gas_data();
1688        let gas_object_refs: Vec<_> = gas_data.clone().payment.into_iter().collect();
1689
1690        Ok(OnChainTransactionInfo {
1691            kind: tx_kind_orig.clone(),
1692            sender,
1693            modified_at_versions,
1694            input_objects: input_objs,
1695            shared_object_refs,
1696            gas: gas_object_refs,
1697            gas_owner: (gas_data.owner != sender).then_some(gas_data.owner),
1698            gas_price: gas_data.price,
1699            gas_budget: gas_data.budget,
1700            executed_epoch: epoch_id,
1701            dependencies: effects.dependencies().to_vec(),
1702            effects,
1703            receiving_objs,
1704            config_objects,
1705            protocol_version: protocol_config.version,
1706            tx_digest: *tx_digest,
1707            epoch_start_timestamp,
1708            sender_signed_data: orig_tx.clone(),
1709            reference_gas_price,
1710            chain,
1711        })
1712    }
1713
1714    async fn resolve_download_input_objects(
1715        &mut self,
1716        tx_info: &OnChainTransactionInfo,
1717        deleted_shared_objects: Vec<ObjectRef>,
1718    ) -> Result<InputObjects, ReplayEngineError> {
1719        // Download the input objects
1720        let mut package_inputs = vec![];
1721        let mut imm_owned_inputs = vec![];
1722        let mut shared_inputs = vec![];
1723        let mut deleted_shared_info_map = BTreeMap::new();
1724
1725        // for deleted shared objects, we need to look at the transaction dependencies to find the
1726        // correct transaction dependency for a deleted shared object.
1727        if !deleted_shared_objects.is_empty() {
1728            for tx_digest in tx_info.dependencies.iter() {
1729                let tx_info = self.resolve_tx_components(tx_digest).await?;
1730                for (obj_id, version, _) in tx_info.shared_object_refs.iter() {
1731                    deleted_shared_info_map.insert(*obj_id, (tx_info.tx_digest, *version));
1732                }
1733            }
1734        }
1735
1736        tx_info
1737            .input_objects
1738            .iter()
1739            .map(|kind| match kind {
1740                InputObjectKind::MovePackage(i) => {
1741                    package_inputs.push(*i);
1742                    Ok(())
1743                }
1744                InputObjectKind::ImmOrOwnedMoveObject(o_ref) => {
1745                    imm_owned_inputs.push((o_ref.0, o_ref.1));
1746                    Ok(())
1747                }
1748                InputObjectKind::SharedMoveObject {
1749                    id,
1750                    initial_shared_version: _,
1751                    mutability: _,
1752                } if !deleted_shared_info_map.contains_key(id) => {
1753                    // We already downloaded
1754                    if let Some(o) = self
1755                        .storage
1756                        .live_objects_store
1757                        .lock()
1758                        .expect("Can't lock")
1759                        .get(id)
1760                    {
1761                        shared_inputs.push(o.clone());
1762                        Ok(())
1763                    } else {
1764                        Err(ReplayEngineError::InternalCacheInvariantViolation {
1765                            id: *id,
1766                            version: None,
1767                        })
1768                    }
1769                }
1770                _ => Ok(()),
1771            })
1772            .collect::<Result<Vec<_>, _>>()?;
1773
1774        // Download the imm and owned objects
1775        let mut in_objs = self.multi_download_and_store(&imm_owned_inputs).await?;
1776
1777        // For packages, download latest if non framework
1778        // If framework, download relevant for the current protocol version
1779        in_objs.extend(
1780            self.multi_download_relevant_packages_and_store(
1781                package_inputs,
1782                tx_info.protocol_version.as_u64(),
1783            )
1784            .await?,
1785        );
1786        // Add shared objects
1787        in_objs.extend(shared_inputs);
1788
1789        // TODO(Zhe): Account for cancelled transaction assigned version here, and tests.
1790        let resolved_input_objs = tx_info
1791            .input_objects
1792            .iter()
1793            .flat_map(|kind| match kind {
1794                InputObjectKind::MovePackage(i) => {
1795                    // Okay to unwrap since we downloaded it
1796                    Some(ObjectReadResult::new(
1797                        *kind,
1798                        self.storage
1799                            .package_cache
1800                            .lock()
1801                            .expect("Cannot lock")
1802                            .get(i)
1803                            .unwrap_or(
1804                                &self
1805                                    .download_latest_object(i)
1806                                    .expect("Object download failed")
1807                                    .expect("Object not found on chain"),
1808                            )
1809                            .clone()
1810                            .into(),
1811                    ))
1812                }
1813                InputObjectKind::ImmOrOwnedMoveObject(o_ref) => Some(ObjectReadResult::new(
1814                    *kind,
1815                    self.storage
1816                        .object_version_cache
1817                        .lock()
1818                        .expect("Cannot lock")
1819                        .get(&(o_ref.0, o_ref.1))
1820                        .unwrap()
1821                        .clone()
1822                        .into(),
1823                )),
1824                InputObjectKind::SharedMoveObject { id, .. }
1825                    if !deleted_shared_info_map.contains_key(id) =>
1826                {
1827                    // we already downloaded
1828                    Some(ObjectReadResult::new(
1829                        *kind,
1830                        self.storage
1831                            .live_objects_store
1832                            .lock()
1833                            .expect("Can't lock")
1834                            .get(id)
1835                            .unwrap()
1836                            .clone()
1837                            .into(),
1838                    ))
1839                }
1840                InputObjectKind::SharedMoveObject { id, .. } => {
1841                    let (digest, version) = deleted_shared_info_map.get(id).unwrap();
1842                    Some(ObjectReadResult::new(
1843                        *kind,
1844                        ObjectReadResultKind::ObjectConsensusStreamEnded(*version, *digest),
1845                    ))
1846                }
1847            })
1848            .collect();
1849
1850        Ok(InputObjects::new(resolved_input_objs))
1851    }
1852
1853    /// Given the OnChainTransactionInfo, download and store the input objects, and other info necessary
1854    /// for execution
1855    async fn initialize_execution_env_state(
1856        &mut self,
1857        tx_info: &OnChainTransactionInfo,
1858    ) -> Result<InputObjects, ReplayEngineError> {
1859        // We need this for other activities in this session
1860        self.current_protocol_version = tx_info.protocol_version.as_u64();
1861
1862        // Download the objects at the version right before the execution of this TX
1863        self.multi_download_and_store(&tx_info.modified_at_versions)
1864            .await?;
1865
1866        let (shared_refs, deleted_shared_refs): (Vec<ObjectRef>, Vec<ObjectRef>) = tx_info
1867            .shared_object_refs
1868            .iter()
1869            .partition(|r| r.2 != ObjectDigest::OBJECT_DIGEST_DELETED);
1870
1871        // Download shared objects at the version right before the execution of this TX
1872        let shared_refs: Vec<_> = shared_refs.iter().map(|r| (r.0, r.1)).collect();
1873        self.multi_download_and_store(&shared_refs).await?;
1874
1875        // Download gas (although this should already be in cache from modified at versions?)
1876        let gas_refs: Vec<_> = tx_info
1877            .gas
1878            .iter()
1879            .filter_map(|w| (w.0 != ObjectID::ZERO).then_some((w.0, w.1)))
1880            .collect();
1881        self.multi_download_and_store(&gas_refs).await?;
1882
1883        // Fetch the input objects we know from the raw transaction
1884        let input_objs = self
1885            .resolve_download_input_objects(tx_info, deleted_shared_refs)
1886            .await?;
1887
1888        // Fetch the receiving objects
1889        self.multi_download_and_store(&tx_info.receiving_objs)
1890            .await?;
1891
1892        // Fetch specified config objects if any
1893        self.multi_download_and_store(&tx_info.config_objects)
1894            .await?;
1895
1896        // Prep the object runtime for dynamic fields
1897        // Download the child objects accessed at the version right before the execution of this TX
1898        let loaded_child_refs = self.fetch_loaded_child_refs(&tx_info.tx_digest).await?;
1899        self.multi_download_and_store(&loaded_child_refs).await?;
1900        tokio::task::yield_now().await;
1901
1902        Ok(input_objs)
1903    }
1904}
1905
1906// <---------------------  Implement necessary traits for LocalExec to work with exec engine ----------------------->
1907
1908impl BackingPackageStore for LocalExec {
1909    /// In this case we might need to download a dependency package which was not present in the
1910    /// modified at versions list because packages are immutable
1911    fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1912        fn inner(self_: &LocalExec, package_id: &ObjectID) -> SuiResult<Option<Object>> {
1913            // If package not present fetch it from the network
1914            self_
1915                .get_or_download_object(package_id, true /* we expect a Move package*/)
1916                .map_err(|e| SuiErrorKind::Storage(e.to_string()).into())
1917        }
1918
1919        let res = inner(self, package_id);
1920        self.exec_store_events
1921            .lock()
1922            .expect("Unable to lock events list")
1923            .push(ExecutionStoreEvent::BackingPackageGetPackageObject {
1924                package_id: *package_id,
1925                result: res.clone(),
1926            });
1927        res.map(|o| o.map(PackageObject::new))
1928    }
1929}
1930
1931impl ChildObjectResolver for LocalExec {
1932    /// This uses `get_object`, which does not download from the network
1933    /// Hence all objects must be in store already
1934    fn read_child_object(
1935        &self,
1936        parent: &ObjectID,
1937        child: &ObjectID,
1938        child_version_upper_bound: SequenceNumber,
1939    ) -> SuiResult<Option<Object>> {
1940        fn inner(
1941            self_: &LocalExec,
1942            parent: &ObjectID,
1943            child: &ObjectID,
1944            child_version_upper_bound: SequenceNumber,
1945        ) -> SuiResult<Option<Object>> {
1946            let child_object =
1947                match self_.download_object_by_upper_bound(child, child_version_upper_bound)? {
1948                    None => return Ok(None),
1949                    Some(o) => o,
1950                };
1951            let child_version = child_object.version();
1952            if child_object.version() > child_version_upper_bound {
1953                return Err(SuiErrorKind::Unknown(format!(
1954                    "Invariant Violation. Replay loaded child_object {child} at version \
1955                    {child_version} but expected the version to be <= {child_version_upper_bound}"
1956                ))
1957                .into());
1958            }
1959            let parent = *parent;
1960            if child_object.owner != Owner::ObjectOwner(parent.into()) {
1961                return Err(SuiErrorKind::InvalidChildObjectAccess {
1962                    object: *child,
1963                    given_parent: parent,
1964                    actual_owner: child_object.owner.clone(),
1965                }
1966                .into());
1967            }
1968            Ok(Some(child_object))
1969        }
1970
1971        let res = inner(self, parent, child, child_version_upper_bound);
1972        self.exec_store_events
1973            .lock()
1974            .expect("Unable to lock events list")
1975            .push(
1976                ExecutionStoreEvent::ChildObjectResolverStoreReadChildObject {
1977                    parent: *parent,
1978                    child: *child,
1979                    result: res.clone(),
1980                },
1981            );
1982        res
1983    }
1984
1985    fn get_object_received_at_version(
1986        &self,
1987        owner: &ObjectID,
1988        receiving_object_id: &ObjectID,
1989        receive_object_at_version: SequenceNumber,
1990        _epoch_id: EpochId,
1991    ) -> SuiResult<Option<Object>> {
1992        fn inner(
1993            self_: &LocalExec,
1994            owner: &ObjectID,
1995            receiving_object_id: &ObjectID,
1996            receive_object_at_version: SequenceNumber,
1997        ) -> SuiResult<Option<Object>> {
1998            let recv_object = match self_.get_object(receiving_object_id) {
1999                None => return Ok(None),
2000                Some(o) => o,
2001            };
2002            if recv_object.version() != receive_object_at_version {
2003                return Err(SuiErrorKind::Unknown(format!(
2004                    "Invariant Violation. Replay loaded child_object {receiving_object_id} at version \
2005                    {receive_object_at_version} but expected the version to be == {receive_object_at_version}"
2006                )).into());
2007            }
2008            if recv_object.owner != Owner::AddressOwner((*owner).into()) {
2009                return Ok(None);
2010            }
2011            Ok(Some(recv_object))
2012        }
2013
2014        let res = inner(self, owner, receiving_object_id, receive_object_at_version);
2015        self.exec_store_events
2016            .lock()
2017            .expect("Unable to lock events list")
2018            .push(ExecutionStoreEvent::ReceiveObject {
2019                owner: *owner,
2020                receive: *receiving_object_id,
2021                receive_at_version: receive_object_at_version,
2022                result: res.clone(),
2023            });
2024        res
2025    }
2026}
2027
2028impl ParentSync for LocalExec {
2029    /// The objects here much already exist in the store because we downloaded them earlier
2030    /// No download from network
2031    fn get_latest_parent_entry_ref_deprecated(&self, object_id: ObjectID) -> Option<ObjectRef> {
2032        fn inner(self_: &LocalExec, object_id: ObjectID) -> Option<ObjectRef> {
2033            if let Some(v) = self_
2034                .storage
2035                .live_objects_store
2036                .lock()
2037                .expect("Can't lock")
2038                .get(&object_id)
2039            {
2040                return Some(v.compute_object_reference());
2041            }
2042            None
2043        }
2044        let res = inner(self, object_id);
2045        self.exec_store_events
2046            .lock()
2047            .expect("Unable to lock events list")
2048            .push(
2049                ExecutionStoreEvent::ParentSyncStoreGetLatestParentEntryRef {
2050                    object_id,
2051                    result: res,
2052                },
2053            );
2054        res
2055    }
2056}
2057
2058impl ModuleResolver for LocalExec {
2059    type Error = SuiError;
2060
2061    /// This fetches a module which must already be present in the store
2062    /// We do not download
2063    fn get_module(&self, module_id: &ModuleId) -> SuiResult<Option<Vec<u8>>> {
2064        fn inner(self_: &LocalExec, module_id: &ModuleId) -> SuiResult<Option<Vec<u8>>> {
2065            get_module(self_, module_id)
2066        }
2067
2068        let res = inner(self, module_id);
2069        self.exec_store_events
2070            .lock()
2071            .expect("Unable to lock events list")
2072            .push(ExecutionStoreEvent::ModuleResolverGetModule {
2073                module_id: module_id.clone(),
2074                result: res.clone(),
2075            });
2076        res
2077    }
2078}
2079
2080impl ModuleResolver for &mut LocalExec {
2081    type Error = SuiError;
2082
2083    fn get_module(&self, module_id: &ModuleId) -> SuiResult<Option<Vec<u8>>> {
2084        // Recording event here will be double-counting since its already recorded in the get_module fn
2085        (**self).get_module(module_id)
2086    }
2087}
2088
2089impl ObjectStore for LocalExec {
2090    /// The object must be present in store by normal process we used to backfill store in init
2091    /// We dont download if not present
2092    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
2093        let res = self
2094            .storage
2095            .live_objects_store
2096            .lock()
2097            .expect("Can't lock")
2098            .get(object_id)
2099            .cloned();
2100        self.exec_store_events
2101            .lock()
2102            .expect("Unable to lock events list")
2103            .push(ExecutionStoreEvent::ObjectStoreGetObject {
2104                object_id: *object_id,
2105                result: Ok(res.clone()),
2106            });
2107        res
2108    }
2109
2110    /// The object must be present in store by normal process we used to backfill store in init
2111    /// We dont download if not present
2112    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
2113        let res = self
2114            .storage
2115            .live_objects_store
2116            .lock()
2117            .expect("Can't lock")
2118            .get(object_id)
2119            .and_then(|obj| {
2120                if obj.version() == version {
2121                    Some(obj.clone())
2122                } else {
2123                    None
2124                }
2125            });
2126
2127        self.exec_store_events
2128            .lock()
2129            .expect("Unable to lock events list")
2130            .push(ExecutionStoreEvent::ObjectStoreGetObjectByKey {
2131                object_id: *object_id,
2132                version,
2133                result: Ok(res.clone()),
2134            });
2135
2136        res
2137    }
2138}
2139
2140impl ObjectStore for &mut LocalExec {
2141    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
2142        // Recording event here will be double-counting since its already recorded in the get_module fn
2143        (**self).get_object(object_id)
2144    }
2145
2146    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
2147        // Recording event here will be double-counting since its already recorded in the get_module fn
2148        (**self).get_object_by_key(object_id, version)
2149    }
2150}
2151
2152impl GetModule for LocalExec {
2153    type Error = SuiError;
2154    type Item = CompiledModule;
2155
2156    fn get_module_by_id(&self, id: &ModuleId) -> SuiResult<Option<Self::Item>> {
2157        let res = get_module_by_id(self, id);
2158
2159        self.exec_store_events
2160            .lock()
2161            .expect("Unable to lock events list")
2162            .push(ExecutionStoreEvent::GetModuleGetModuleByModuleId {
2163                id: id.clone(),
2164                result: res.clone(),
2165            });
2166        res
2167    }
2168}
2169
2170// <--------------------- Util functions ----------------------->
2171
2172pub fn get_executor(
2173    executor_version_override: Option<i64>,
2174    protocol_config: &ProtocolConfig,
2175    _expensive_safety_check_config: ExpensiveSafetyCheckConfig,
2176) -> Arc<dyn Executor + Send + Sync> {
2177    let protocol_config = executor_version_override
2178        .map(|q| {
2179            let ver = if q < 0 {
2180                ProtocolConfig::get_for_max_version_UNSAFE().execution_version()
2181            } else {
2182                q as u64
2183            };
2184
2185            let mut c = protocol_config.clone();
2186            c.set_execution_version_for_testing(ver);
2187            c
2188        })
2189        .unwrap_or(protocol_config.clone());
2190
2191    let silent = true;
2192    sui_execution::executor(&protocol_config, silent)
2193        .expect("Creating an executor should not fail here")
2194}
2195
2196fn parse_effect_error_for_denied_coins(status: &SuiExecutionStatus) -> Option<String> {
2197    let SuiExecutionStatus::Failure { error } = status else {
2198        return None;
2199    };
2200    parse_denied_error_string(error)
2201}
2202
2203fn parse_denied_error_string(error: &str) -> Option<String> {
2204    let regulated_regex = regex::Regex::new(
2205        r#"CoinTypeGlobalPause.*?"(.*?)"|AddressDeniedForCoin.*coin_type:.*?"(.*?)""#,
2206    )
2207    .unwrap();
2208
2209    let caps = regulated_regex.captures(error)?;
2210    Some(caps.get(1).or(caps.get(2))?.as_str().to_string())
2211}
2212
2213#[cfg(test)]
2214mod tests {
2215    use super::parse_denied_error_string;
2216    #[test]
2217    fn test_regex_regulated_coin_errors() {
2218        let test_bank = vec![
2219            "CoinTypeGlobalPause { coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2220            "AddressDeniedForCoin { address: B, coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2221        ];
2222        let expected_string =
2223            "39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN";
2224
2225        for test in &test_bank {
2226            assert!(parse_denied_error_string(test).unwrap() == expected_string);
2227        }
2228    }
2229}