sui_replay/
replay.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::chain_from_chain_id;
5use crate::{
6    data_fetcher::{
7        DataFetcher, Fetchers, NodeStateDumpFetcher, RemoteFetcher, extract_epoch_and_version,
8    },
9    displays::{
10        Pretty,
11        transaction_displays::{FullPTB, transform_command_results_to_annotated},
12    },
13    types::*,
14};
15use futures::executor::block_on;
16use move_binary_format::CompiledModule;
17use move_bytecode_utils::module_cache::GetModule;
18use move_core_types::resolver::SerializedPackage;
19use move_core_types::{
20    account_address::AccountAddress, language_storage::ModuleId, resolver::ModuleResolver,
21};
22use prometheus::Registry;
23use serde::{Deserialize, Serialize};
24use similar::{ChangeTag, TextDiff};
25use std::{
26    collections::{BTreeMap, HashSet},
27    path::PathBuf,
28    sync::Arc,
29    sync::Mutex,
30};
31use sui_config::node::ExpensiveSafetyCheckConfig;
32use sui_core::authority::NodeStateDump;
33use sui_execution::Executor;
34use sui_framework::BuiltInFramework;
35use sui_json_rpc_types::{
36    SuiExecutionStatus, SuiTransactionBlockEffects, SuiTransactionBlockEffectsAPI,
37};
38use sui_protocol_config::{Chain, ProtocolConfig};
39use sui_sdk::{SuiClient, SuiClientBuilder};
40use sui_types::SUI_DENY_LIST_OBJECT_ID;
41use sui_types::error::SuiErrorKind;
42use sui_types::execution_params::{
43    ExecutionOrEarlyError, FundsWithdrawStatus, get_early_execution_error,
44};
45use sui_types::in_memory_storage::InMemoryStorage;
46use sui_types::message_envelope::Message;
47use sui_types::storage::{PackageObject, get_module, get_package};
48use sui_types::transaction::GasData;
49use sui_types::transaction::TransactionKind::ProgrammableTransaction;
50use sui_types::{
51    DEEPBOOK_PACKAGE_ID,
52    base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber},
53    committee::EpochId,
54    digests::{ObjectDigest, TransactionDigest},
55    error::{ExecutionError, SuiError, SuiResult},
56    executable_transaction::VerifiedExecutableTransaction,
57    gas::SuiGasStatus,
58    inner_temporary_store::InnerTemporaryStore,
59    metrics::ExecutionMetrics,
60    object::{Object, Owner},
61    storage::get_module_by_id,
62    storage::{BackingPackageStore, ChildObjectResolver, ObjectStore, ParentSync},
63    transaction::{
64        CheckedInputObjects, InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
65        SenderSignedData, Transaction, TransactionDataAPI, TransactionKind, VerifiedTransaction,
66    },
67};
68use tracing::{error, info, trace, warn};
69
70// TODO: add persistent cache. But perf is good enough already.
71
72#[derive(Debug, Serialize, Deserialize)]
73pub struct ExecutionSandboxState {
74    /// Information describing the transaction
75    pub transaction_info: OnChainTransactionInfo,
76    /// All the objects that are required for the execution of the transaction
77    pub required_objects: Vec<Object>,
78    /// Temporary store from executing this locally in `execute_transaction_to_effects`
79    #[serde(skip)]
80    pub local_exec_temporary_store: Option<InnerTemporaryStore>,
81    /// Effects from executing this locally in `execute_transaction_to_effects`
82    pub local_exec_effects: SuiTransactionBlockEffects,
83    /// Status from executing this locally in `execute_transaction_to_effects`
84    #[serde(skip)]
85    pub local_exec_status: Option<Result<(), ExecutionError>>,
86}
87
88impl ExecutionSandboxState {
89    #[allow(clippy::result_large_err)]
90    pub fn check_effects(&self) -> Result<(), ReplayEngineError> {
91        let SuiTransactionBlockEffects::V1(mut local_effects) = self.local_exec_effects.clone();
92        let SuiTransactionBlockEffects::V1(on_chain_effects) =
93            self.transaction_info.effects.clone();
94
95        // Handle backwards compatibility with the new `abort_error` field in
96        // `SuiTransactionBlockEffects`
97        if on_chain_effects.abort_error.is_none() {
98            local_effects.abort_error = None;
99        }
100        let local_effects = SuiTransactionBlockEffects::V1(local_effects);
101        let on_chain_effects = SuiTransactionBlockEffects::V1(on_chain_effects);
102
103        if on_chain_effects != local_effects {
104            error!("Replay tool forked {}", self.transaction_info.tx_digest);
105            let diff = Self::diff_effects(&on_chain_effects, &local_effects);
106            println!("{}", diff);
107            return Err(ReplayEngineError::EffectsForked {
108                digest: self.transaction_info.tx_digest,
109                diff: format!("\n{}", diff),
110                on_chain: Box::new(on_chain_effects),
111                local: Box::new(local_effects),
112            });
113        }
114        Ok(())
115    }
116
117    /// Utility to diff effects in a human readable format
118    pub fn diff_effects(
119        on_chain_effects: &SuiTransactionBlockEffects,
120        local_effects: &SuiTransactionBlockEffects,
121    ) -> String {
122        let on_chain_str = format!("{:#?}", on_chain_effects);
123        let local_chain_str = format!("{:#?}", local_effects);
124        let mut res = vec![];
125
126        let diff = TextDiff::from_lines(&on_chain_str, &local_chain_str);
127        for change in diff.iter_all_changes() {
128            let sign = match change.tag() {
129                ChangeTag::Delete => "---",
130                ChangeTag::Insert => "+++",
131                ChangeTag::Equal => "   ",
132            };
133            res.push(format!("{}{}", sign, change));
134        }
135
136        res.join("")
137    }
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct ProtocolVersionSummary {
142    /// Protocol version at this point
143    pub protocol_version: u64,
144    /// The first epoch that uses this protocol version
145    pub epoch_start: u64,
146    /// The last epoch that uses this protocol version
147    pub epoch_end: u64,
148    /// The first checkpoint in this protocol v ersion
149    pub checkpoint_start: Option<u64>,
150    /// The last checkpoint in this protocol version
151    pub checkpoint_end: Option<u64>,
152    /// The transaction which triggered this epoch change
153    pub epoch_change_tx: TransactionDigest,
154}
155
156#[derive(Clone)]
157pub struct Storage {
158    /// These are objects at the frontier of the execution's view
159    /// They might not be the latest object currently but they are the latest objects
160    /// for the TX at the time it was run
161    /// This store cannot be shared between runners
162    pub live_objects_store: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
163
164    /// Package cache and object version cache can be shared between runners
165    /// Non system packages are immutable so we can cache these
166    pub package_cache: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
167    /// Object contents are frozen at their versions so we can cache these
168    /// We must place system packages here as well
169    pub object_version_cache: Arc<Mutex<BTreeMap<(ObjectID, SequenceNumber), Object>>>,
170}
171
172impl std::fmt::Display for Storage {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        writeln!(f, "Live object store")?;
175        for (id, obj) in self
176            .live_objects_store
177            .lock()
178            .expect("Unable to lock")
179            .iter()
180        {
181            writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
182        }
183        writeln!(f, "Package cache")?;
184        for (id, obj) in self.package_cache.lock().expect("Unable to lock").iter() {
185            writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
186        }
187        writeln!(f, "Object version cache")?;
188        for (id, _) in self
189            .object_version_cache
190            .lock()
191            .expect("Unable to lock")
192            .iter()
193        {
194            writeln!(f, "{}: {}", id.0, id.1)?;
195        }
196
197        write!(f, "")
198    }
199}
200
201impl Storage {
202    pub fn default() -> Self {
203        Self {
204            live_objects_store: Arc::new(Mutex::new(BTreeMap::new())),
205            package_cache: Arc::new(Mutex::new(BTreeMap::new())),
206            object_version_cache: Arc::new(Mutex::new(BTreeMap::new())),
207        }
208    }
209
210    pub fn all_objects(&self) -> Vec<Object> {
211        self.live_objects_store
212            .lock()
213            .expect("Unable to lock")
214            .values()
215            .cloned()
216            .chain(
217                self.package_cache
218                    .lock()
219                    .expect("Unable to lock")
220                    .values()
221                    .cloned(),
222            )
223            .chain(
224                self.object_version_cache
225                    .lock()
226                    .expect("Unable to lock")
227                    .values()
228                    .cloned(),
229            )
230            .collect::<Vec<_>>()
231    }
232}
233
234#[derive(Clone)]
235pub struct LocalExec {
236    pub client: Option<SuiClient>,
237    // For a given protocol version, what TX created it, and what is the valid range of epochs
238    // at this protocol version.
239    pub protocol_version_epoch_table: BTreeMap<u64, ProtocolVersionSummary>,
240    // For a given protocol version, the mapping valid sequence numbers for each framework package
241    pub protocol_version_system_package_table: BTreeMap<u64, BTreeMap<ObjectID, SequenceNumber>>,
242    // The current protocol version for this execution
243    pub current_protocol_version: u64,
244    // All state is contained here
245    pub storage: Storage,
246    // Debug events
247    pub exec_store_events: Arc<Mutex<Vec<ExecutionStoreEvent>>>,
248    // Debug events
249    pub metrics: Arc<ExecutionMetrics>,
250    // Used for fetching data from the network or remote store
251    pub fetcher: Fetchers,
252
253    // One can optionally override the executor version
254    // -1 implies use latest version
255    pub executor_version: Option<i64>,
256    // One can optionally override the protocol version
257    // -1 implies use latest version
258    // None implies use the protocol version at the time of execution
259    pub protocol_version: Option<i64>,
260    pub config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
261    // Retry policies due to RPC errors
262    pub num_retries_for_timeout: u32,
263    pub sleep_period_for_timeout: std::time::Duration,
264}
265
266impl LocalExec {
267    /// Wrapper around fetcher in case we want to add more functionality
268    /// Such as fetching from local DB from snapshot
269    pub async fn multi_download(
270        &self,
271        objs: &[(ObjectID, SequenceNumber)],
272    ) -> Result<Vec<Object>, ReplayEngineError> {
273        let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
274        while num_retries_for_timeout >= 0 {
275            match self.fetcher.multi_get_versioned(objs).await {
276                Ok(objs) => return Ok(objs),
277                Err(ReplayEngineError::SuiRpcRequestTimeout) => {
278                    warn!(
279                        "RPC request timed out. Retries left {}. Sleeping for {}s",
280                        num_retries_for_timeout,
281                        self.sleep_period_for_timeout.as_secs()
282                    );
283                    num_retries_for_timeout -= 1;
284                    tokio::time::sleep(self.sleep_period_for_timeout).await;
285                }
286                Err(e) => return Err(e),
287            }
288        }
289        Err(ReplayEngineError::SuiRpcRequestTimeout)
290    }
291    /// Wrapper around fetcher in case we want to add more functionality
292    /// Such as fetching from local DB from snapshot
293    pub async fn multi_download_latest(
294        &self,
295        objs: &[ObjectID],
296    ) -> Result<Vec<Object>, ReplayEngineError> {
297        let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
298        while num_retries_for_timeout >= 0 {
299            match self.fetcher.multi_get_latest(objs).await {
300                Ok(objs) => return Ok(objs),
301                Err(ReplayEngineError::SuiRpcRequestTimeout) => {
302                    warn!(
303                        "RPC request timed out. Retries left {}. Sleeping for {}s",
304                        num_retries_for_timeout,
305                        self.sleep_period_for_timeout.as_secs()
306                    );
307                    num_retries_for_timeout -= 1;
308                    tokio::time::sleep(self.sleep_period_for_timeout).await;
309                }
310                Err(e) => return Err(e),
311            }
312        }
313        Err(ReplayEngineError::SuiRpcRequestTimeout)
314    }
315
316    pub async fn fetch_loaded_child_refs(
317        &self,
318        tx_digest: &TransactionDigest,
319    ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
320        // Get the child objects loaded
321        self.fetcher.get_loaded_child_objects(tx_digest).await
322    }
323
324    pub async fn new_from_fn_url(http_url: &str) -> Result<Self, ReplayEngineError> {
325        Self::new_for_remote(
326            SuiClientBuilder::default()
327                .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
328                .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
329                .build(http_url)
330                .await?,
331            None,
332        )
333        .await
334    }
335
336    pub async fn replay_with_network_config(
337        rpc_url: String,
338        tx_digest: TransactionDigest,
339        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
340        use_authority: bool,
341        executor_version: Option<i64>,
342        protocol_version: Option<i64>,
343        config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
344    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
345        info!("Using RPC URL: {}", rpc_url);
346        LocalExec::new_from_fn_url(&rpc_url)
347            .await?
348            .init_for_execution()
349            .await?
350            .execute_transaction(
351                &tx_digest,
352                expensive_safety_check_config,
353                use_authority,
354                executor_version,
355                protocol_version,
356                config_and_versions,
357            )
358            .await
359    }
360
361    /// This captures the state of the network at a given point in time and populates
362    /// prptocol version tables including which system packages to fetch
363    /// If this function is called across epoch boundaries, the info might be stale.
364    /// But it should only be called once per epoch.
365    pub async fn init_for_execution(mut self) -> Result<Self, ReplayEngineError> {
366        self.populate_protocol_version_tables().await?;
367        tokio::task::yield_now().await;
368        Ok(self)
369    }
370
371    pub async fn reset_for_new_execution_with_client(self) -> Result<Self, ReplayEngineError> {
372        Self::new_for_remote(
373            self.client.expect("Remote client not initialized"),
374            Some(self.fetcher.into_remote()),
375        )
376        .await?
377        .init_for_execution()
378        .await
379    }
380
381    pub async fn new_for_remote(
382        client: SuiClient,
383        remote_fetcher: Option<RemoteFetcher>,
384    ) -> Result<Self, ReplayEngineError> {
385        // Use a throwaway metrics registry for local execution.
386        let registry = prometheus::Registry::new();
387        let metrics = Arc::new(ExecutionMetrics::new(&registry));
388
389        let fetcher = remote_fetcher.unwrap_or(RemoteFetcher::new(client.clone()));
390
391        Ok(Self {
392            client: Some(client),
393            protocol_version_epoch_table: BTreeMap::new(),
394            protocol_version_system_package_table: BTreeMap::new(),
395            current_protocol_version: 0,
396            exec_store_events: Arc::new(Mutex::new(Vec::new())),
397            metrics,
398            storage: Storage::default(),
399            fetcher: Fetchers::Remote(fetcher),
400            // TODO: make these configurable
401            num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
402            sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
403            executor_version: None,
404            protocol_version: None,
405            config_and_versions: None,
406        })
407    }
408
409    pub async fn new_for_state_dump(
410        path: &str,
411        backup_rpc_url: Option<String>,
412    ) -> Result<Self, ReplayEngineError> {
413        // Use a throwaway metrics registry for local execution.
414        let registry = prometheus::Registry::new();
415        let metrics = Arc::new(ExecutionMetrics::new(&registry));
416
417        let state = NodeStateDump::read_from_file(&PathBuf::from(path))?;
418        let current_protocol_version = state.protocol_version;
419        let fetcher = match backup_rpc_url {
420            Some(url) => NodeStateDumpFetcher::new(
421                state,
422                Some(RemoteFetcher::new(
423                    SuiClientBuilder::default()
424                        .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
425                        .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
426                        .build(url)
427                        .await?,
428                )),
429            ),
430            None => NodeStateDumpFetcher::new(state, None),
431        };
432
433        Ok(Self {
434            client: None,
435            protocol_version_epoch_table: BTreeMap::new(),
436            protocol_version_system_package_table: BTreeMap::new(),
437            current_protocol_version,
438            exec_store_events: Arc::new(Mutex::new(Vec::new())),
439            metrics,
440            storage: Storage::default(),
441            fetcher: Fetchers::NodeStateDump(fetcher),
442            // TODO: make these configurable
443            num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
444            sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
445            executor_version: None,
446            protocol_version: None,
447            config_and_versions: None,
448        })
449    }
450
451    pub async fn multi_download_and_store(
452        &mut self,
453        objs: &[(ObjectID, SequenceNumber)],
454    ) -> Result<Vec<Object>, ReplayEngineError> {
455        let objs = self.multi_download(objs).await?;
456
457        // Backfill the store
458        for obj in objs.iter() {
459            let o_ref = obj.compute_object_reference();
460            self.storage
461                .live_objects_store
462                .lock()
463                .expect("Can't lock")
464                .insert(o_ref.0, obj.clone());
465            self.storage
466                .object_version_cache
467                .lock()
468                .expect("Cannot lock")
469                .insert((o_ref.0, o_ref.1), obj.clone());
470            if obj.is_package() {
471                self.storage
472                    .package_cache
473                    .lock()
474                    .expect("Cannot lock")
475                    .insert(o_ref.0, obj.clone());
476            }
477        }
478        tokio::task::yield_now().await;
479        Ok(objs)
480    }
481
482    pub async fn multi_download_relevant_packages_and_store(
483        &mut self,
484        objs: Vec<ObjectID>,
485        protocol_version: u64,
486    ) -> Result<Vec<Object>, ReplayEngineError> {
487        let syst_packages_objs = if self.protocol_version.is_some_and(|i| i < 0) {
488            BuiltInFramework::genesis_objects().collect()
489        } else {
490            let syst_packages =
491                self.system_package_versions_for_protocol_version(protocol_version)?;
492            self.multi_download(&syst_packages).await?
493        };
494
495        // Download latest version of all packages that are not system packages
496        // This is okay since the versions can never change
497        let non_system_package_objs: Vec<_> = objs
498            .into_iter()
499            .filter(|o| !Self::system_package_ids(self.current_protocol_version).contains(o))
500            .collect();
501        let objs = self
502            .multi_download_latest(&non_system_package_objs)
503            .await?
504            .into_iter()
505            .chain(syst_packages_objs.into_iter());
506
507        for obj in objs.clone() {
508            let o_ref = obj.compute_object_reference();
509            // We dont always want the latest in store
510            //self.storage.store.insert(o_ref.0, obj.clone());
511            self.storage
512                .object_version_cache
513                .lock()
514                .expect("Cannot lock")
515                .insert((o_ref.0, o_ref.1), obj.clone());
516            if obj.is_package() {
517                self.storage
518                    .package_cache
519                    .lock()
520                    .expect("Cannot lock")
521                    .insert(o_ref.0, obj.clone());
522            }
523        }
524        Ok(objs.collect())
525    }
526
527    // TODO: remove this after `futures::executor::block_on` is removed.
528    #[allow(clippy::disallowed_methods, clippy::result_large_err)]
529    pub fn download_object(
530        &self,
531        object_id: &ObjectID,
532        version: SequenceNumber,
533    ) -> Result<Object, ReplayEngineError> {
534        if self
535            .storage
536            .object_version_cache
537            .lock()
538            .expect("Cannot lock")
539            .contains_key(&(*object_id, version))
540        {
541            return Ok(self
542                .storage
543                .object_version_cache
544                .lock()
545                .expect("Cannot lock")
546                .get(&(*object_id, version))
547                .ok_or(ReplayEngineError::InternalCacheInvariantViolation {
548                    id: *object_id,
549                    version: Some(version),
550                })?
551                .clone());
552        }
553
554        let o = block_on(self.multi_download(&[(*object_id, version)])).map(|mut q| {
555            q.pop().unwrap_or_else(|| {
556                panic!(
557                    "Downloaded obj response cannot be empty {:?}",
558                    (*object_id, version)
559                )
560            })
561        })?;
562
563        let o_ref = o.compute_object_reference();
564        self.storage
565            .object_version_cache
566            .lock()
567            .expect("Cannot lock")
568            .insert((o_ref.0, o_ref.1), o.clone());
569        Ok(o)
570    }
571
572    // TODO: remove this after `futures::executor::block_on` is removed.
573    #[allow(clippy::disallowed_methods, clippy::result_large_err)]
574    pub fn download_latest_object(
575        &self,
576        object_id: &ObjectID,
577    ) -> Result<Option<Object>, ReplayEngineError> {
578        let resp = block_on({
579            //info!("Downloading latest object {object_id}");
580            self.multi_download_latest(std::slice::from_ref(object_id))
581        })
582        .map(|mut q| {
583            q.pop()
584                .unwrap_or_else(|| panic!("Downloaded obj response cannot be empty {}", *object_id))
585        });
586
587        match resp {
588            Ok(v) => Ok(Some(v)),
589            Err(ReplayEngineError::ObjectNotExist { id }) => {
590                error!(
591                    "Could not find object {id} on RPC server. It might have been pruned, deleted, or never existed."
592                );
593                Ok(None)
594            }
595            Err(ReplayEngineError::ObjectDeleted {
596                id,
597                version,
598                digest,
599            }) => {
600                error!("Object {id} {version} {digest} was deleted on RPC server.");
601                Ok(None)
602            }
603            Err(err) => Err(ReplayEngineError::SuiRpcError {
604                err: err.to_string(),
605            }),
606        }
607    }
608
609    #[allow(clippy::disallowed_methods, clippy::result_large_err)]
610    pub fn download_object_by_upper_bound(
611        &self,
612        object_id: &ObjectID,
613        version_upper_bound: VersionNumber,
614    ) -> Result<Option<Object>, ReplayEngineError> {
615        let local_object = self
616            .storage
617            .live_objects_store
618            .lock()
619            .expect("Can't lock")
620            .get(object_id)
621            .cloned();
622        if local_object.is_some() {
623            return Ok(local_object);
624        }
625        let response = block_on({
626            self.fetcher
627                .get_child_object(object_id, version_upper_bound)
628        });
629        match response {
630            Ok(object) => {
631                let obj_ref = object.compute_object_reference();
632                self.storage
633                    .live_objects_store
634                    .lock()
635                    .expect("Can't lock")
636                    .insert(*object_id, object.clone());
637                self.storage
638                    .object_version_cache
639                    .lock()
640                    .expect("Can't lock")
641                    .insert((obj_ref.0, obj_ref.1), object.clone());
642                Ok(Some(object))
643            }
644            Err(ReplayEngineError::ObjectNotExist { id }) => {
645                error!(
646                    "Could not find child object {id} on RPC server. It might have been pruned, deleted, or never existed."
647                );
648                Ok(None)
649            }
650            Err(ReplayEngineError::ObjectDeleted {
651                id,
652                version,
653                digest,
654            }) => {
655                error!("Object {id} {version} {digest} was deleted on RPC server.");
656                Ok(None)
657            }
658            // This is a child object which was not found in the store (e.g., due to exists
659            // check before creating the dynamic field).
660            Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
661                info!(
662                    "Object {id} {version} not found on RPC server -- this may have been pruned or never existed."
663                );
664                Ok(None)
665            }
666            Err(err) => Err(ReplayEngineError::SuiRpcError {
667                err: err.to_string(),
668            }),
669        }
670    }
671
672    pub async fn get_checkpoint_txs(
673        &self,
674        checkpoint_id: u64,
675    ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
676        self.fetcher
677            .get_checkpoint_txs(checkpoint_id)
678            .await
679            .map_err(|e| ReplayEngineError::SuiRpcError { err: e.to_string() })
680    }
681
682    pub async fn execute_all_in_checkpoints(
683        &mut self,
684        checkpoint_ids: &[u64],
685        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
686        terminate_early: bool,
687        use_authority: bool,
688    ) -> Result<(u64, u64), ReplayEngineError> {
689        // Get all the TXs at this checkpoint
690        let mut txs = Vec::new();
691        for checkpoint_id in checkpoint_ids {
692            txs.extend(self.get_checkpoint_txs(*checkpoint_id).await?);
693        }
694        let num = txs.len();
695        let mut succeeded = 0;
696        for tx in txs {
697            match self
698                .execute_transaction(
699                    &tx,
700                    expensive_safety_check_config.clone(),
701                    use_authority,
702                    None,
703                    None,
704                    None,
705                )
706                .await
707                .map(|q| q.check_effects())
708            {
709                Err(e) | Ok(Err(e)) => {
710                    if terminate_early {
711                        return Err(e);
712                    }
713                    error!("Error executing tx: {},  {:#?}", tx, e);
714                    continue;
715                }
716                _ => (),
717            }
718
719            succeeded += 1;
720        }
721        Ok((succeeded, num as u64))
722    }
723
724    pub async fn execution_engine_execute_with_tx_info_impl(
725        &mut self,
726        tx_info: &OnChainTransactionInfo,
727        override_transaction_kind: Option<TransactionKind>,
728        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
729    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
730        let tx_digest = &tx_info.tx_digest;
731        // Before protocol version 16, the generation of effects depends on the wrapped tombstones.
732        // It is not possible to retrieve such data for replay.
733        if tx_info.protocol_version.as_u64() < 16 {
734            warn!(
735                "Protocol version ({:?}) too old: {}, skipping transaction",
736                tx_info.protocol_version, tx_digest
737            );
738            return Err(ReplayEngineError::TransactionNotSupported {
739                digest: *tx_digest,
740                reason: "Protocol version too old".to_string(),
741            });
742        }
743        // Initialize the state necessary for execution
744        // Get the input objects
745        let input_objects = self.initialize_execution_env_state(tx_info).await?;
746        assert_eq!(
747            &input_objects.filter_shared_objects().len(),
748            &tx_info.shared_object_refs.len()
749        );
750        // At this point we have all the objects needed for replay
751
752        // This assumes we already initialized the protocol version table `protocol_version_epoch_table`
753        let protocol_config =
754            &ProtocolConfig::get_for_version(tx_info.protocol_version, tx_info.chain);
755
756        let metrics = self.metrics.clone();
757
758        let ov = self.executor_version;
759
760        // We could probably cache the executor per protocol config
761        let executor = get_executor(ov, protocol_config, expensive_safety_check_config);
762
763        // All prep done
764        let expensive_checks = true;
765        let transaction_kind = override_transaction_kind.unwrap_or(tx_info.kind.clone());
766        let gas_status = if tx_info.kind.is_system_tx() {
767            SuiGasStatus::new_unmetered()
768        } else {
769            SuiGasStatus::new(
770                tx_info.gas_budget,
771                tx_info.gas_price,
772                tx_info.reference_gas_price,
773                protocol_config,
774            )
775            .expect("Failed to create gas status")
776        };
777        let gas_data = GasData {
778            payment: tx_info.gas.clone(),
779            owner: tx_info.gas_owner.unwrap_or(tx_info.sender),
780            price: tx_info.gas_price,
781            budget: tx_info.gas_budget,
782        };
783        let checked_input_objects = CheckedInputObjects::new_for_replay(input_objects.clone());
784        let early_execution_error = get_early_execution_error(
785            tx_digest,
786            &checked_input_objects,
787            &HashSet::new(),
788            // TODO(address-balances): Support balance withdraw status for replay
789            &FundsWithdrawStatus::MaybeSufficient,
790        );
791        let execution_params = match early_execution_error {
792            Some(error) => ExecutionOrEarlyError::Err(error),
793            None => ExecutionOrEarlyError::Ok(()),
794        };
795        let (inner_store, gas_status, effects, _timings, result) = executor
796            .execute_transaction_to_effects_and_execution_error(
797                &self,
798                protocol_config,
799                metrics.clone(),
800                expensive_checks,
801                execution_params,
802                &tx_info.executed_epoch,
803                tx_info.epoch_start_timestamp,
804                checked_input_objects,
805                gas_data,
806                gas_status,
807                transaction_kind.clone(),
808                None, // compat_args
809                tx_info.sender,
810                *tx_digest,
811                &mut None,
812            );
813
814        if let Err(err) = self.pretty_print_for_tracing(
815            &gas_status,
816            &executor,
817            tx_info,
818            &transaction_kind,
819            protocol_config,
820            metrics,
821            expensive_checks,
822            input_objects.clone(),
823        ) {
824            error!("Failed to pretty print for tracing: {:?}", err);
825        }
826
827        let all_required_objects = self.storage.all_objects();
828
829        let effects =
830            SuiTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
831
832        Ok(ExecutionSandboxState {
833            transaction_info: tx_info.clone(),
834            required_objects: all_required_objects,
835            local_exec_temporary_store: Some(inner_store),
836            local_exec_effects: effects,
837            local_exec_status: Some(result),
838        })
839    }
840
841    fn pretty_print_for_tracing(
842        &self,
843        gas_status: &SuiGasStatus,
844        executor: &Arc<dyn Executor + Send + Sync>,
845        tx_info: &OnChainTransactionInfo,
846        transaction_kind: &TransactionKind,
847        protocol_config: &ProtocolConfig,
848        metrics: Arc<ExecutionMetrics>,
849        expensive_checks: bool,
850        input_objects: InputObjects,
851    ) -> anyhow::Result<()> {
852        trace!(target: "replay_gas_info", "{}", Pretty(gas_status));
853
854        let skip_checks = true;
855        let gas_data = GasData {
856            payment: tx_info.gas.clone(),
857            owner: tx_info.gas_owner.unwrap_or(tx_info.sender),
858            price: tx_info.gas_price,
859            budget: tx_info.gas_budget,
860        };
861        let checked_input_objects = CheckedInputObjects::new_for_replay(input_objects.clone());
862        let early_execution_error = get_early_execution_error(
863            &tx_info.tx_digest,
864            &checked_input_objects,
865            &HashSet::new(),
866            // TODO(address-balances): Support balance withdraw status for replay
867            &FundsWithdrawStatus::MaybeSufficient,
868        );
869        let execution_params = match early_execution_error {
870            Some(error) => ExecutionOrEarlyError::Err(error),
871            None => ExecutionOrEarlyError::Ok(()),
872        };
873        if let ProgrammableTransaction(pt) = transaction_kind {
874            trace!(
875                target: "replay_ptb_info",
876                "{}",
877                Pretty(&FullPTB {
878                    ptb: pt.clone(),
879                    results: transform_command_results_to_annotated(
880                        protocol_config,
881                        executor,
882                        &self.clone(),
883                        executor.dev_inspect_transaction(
884                            &self,
885                            protocol_config,
886                            metrics,
887                            expensive_checks,
888                            execution_params,
889                            &tx_info.executed_epoch,
890                            tx_info.epoch_start_timestamp,
891                            CheckedInputObjects::new_for_replay(input_objects),
892                            gas_data,
893                            SuiGasStatus::new(
894                                tx_info.gas_budget,
895                                tx_info.gas_price,
896                                tx_info.reference_gas_price,
897                                protocol_config,
898                            )?,
899                            transaction_kind.clone(),
900                            None, // compat_args
901                            tx_info.sender,
902                            tx_info.sender_signed_data.digest(),
903                            skip_checks,
904                        )
905                        .3
906                        .unwrap_or_default(),
907                    )?,
908            }));
909        }
910        Ok(())
911    }
912
913    /// Must be called after `init_for_execution`
914    #[allow(clippy::result_large_err)]
915    pub async fn execution_engine_execute_impl(
916        &mut self,
917        tx_digest: &TransactionDigest,
918        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
919    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
920        if self.is_remote_replay() {
921            assert!(
922                !self.protocol_version_system_package_table.is_empty()
923                    || !self.protocol_version_epoch_table.is_empty(),
924                "Required tables not populated. Must call `init_for_execution` before executing transactions"
925            );
926        }
927
928        let tx_info = if self.is_remote_replay() {
929            self.resolve_tx_components(tx_digest).await?
930        } else {
931            self.resolve_tx_components_from_dump(tx_digest).await?
932        };
933        self.execution_engine_execute_with_tx_info_impl(
934            &tx_info,
935            None,
936            expensive_safety_check_config,
937        )
938        .await
939    }
940
941    /// Executes a transaction with the state specified in `pre_run_sandbox`
942    /// This is useful for executing a transaction with a specific state
943    /// However if the state in invalid, the behavior is undefined.
944    #[allow(clippy::result_large_err)]
945    pub async fn certificate_execute_with_sandbox_state(
946        pre_run_sandbox: &ExecutionSandboxState,
947    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
948        // These cannot be changed and are inherited from the sandbox state
949        let executed_epoch = pre_run_sandbox.transaction_info.executed_epoch;
950        let reference_gas_price = pre_run_sandbox.transaction_info.reference_gas_price;
951        let epoch_start_timestamp = pre_run_sandbox.transaction_info.epoch_start_timestamp;
952        let protocol_config = ProtocolConfig::get_for_version(
953            pre_run_sandbox.transaction_info.protocol_version,
954            pre_run_sandbox.transaction_info.chain,
955        );
956        let required_objects = pre_run_sandbox.required_objects.clone();
957        let store = InMemoryStorage::new(required_objects.clone());
958
959        let transaction =
960            Transaction::new(pre_run_sandbox.transaction_info.sender_signed_data.clone());
961
962        // TODO: This will not work for deleted shared objects. We need to persist that information in the sandbox.
963        // TODO: A lot of the following code is replicated in several places. We should introduce a few
964        // traits and make them shared so that we don't have to fix one by one when we have major execution
965        // layer changes.
966        let input_objects = store.read_input_objects_for_transaction(&transaction);
967        let executable = VerifiedExecutableTransaction::new_from_consensus(
968            VerifiedTransaction::new_unchecked(transaction),
969            executed_epoch,
970        );
971        let (gas_status, input_objects) = sui_transaction_checks::check_certificate_input(
972            &executable,
973            input_objects,
974            &protocol_config,
975            reference_gas_price,
976        )
977        .unwrap();
978        let (kind, signer, gas_data) = executable.transaction_data().execution_parts();
979        let executor = sui_execution::executor(&protocol_config, true).unwrap();
980        let early_execution_error = get_early_execution_error(
981            executable.digest(),
982            &input_objects,
983            &HashSet::new(),
984            // TODO(address-balances): Support balance withdraw status for replay
985            &FundsWithdrawStatus::MaybeSufficient,
986        );
987        let execution_params = match early_execution_error {
988            Some(error) => ExecutionOrEarlyError::Err(error),
989            None => ExecutionOrEarlyError::Ok(()),
990        };
991        let (_, _, effects, _timings, exec_res) = executor
992            .execute_transaction_to_effects_and_execution_error(
993                &store,
994                &protocol_config,
995                Arc::new(ExecutionMetrics::new(&Registry::new())),
996                true,
997                execution_params,
998                &executed_epoch,
999                epoch_start_timestamp,
1000                input_objects,
1001                gas_data,
1002                gas_status,
1003                kind,
1004                None, // compat_args
1005                signer,
1006                *executable.digest(),
1007                &mut None,
1008            );
1009
1010        let effects =
1011            SuiTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
1012
1013        Ok(ExecutionSandboxState {
1014            transaction_info: pre_run_sandbox.transaction_info.clone(),
1015            required_objects,
1016            local_exec_temporary_store: None, // We dont capture it for cert exec run
1017            local_exec_effects: effects,
1018            local_exec_status: Some(exec_res),
1019        })
1020    }
1021
1022    /// Must be called after `init_for_execution`
1023    /// This executes from `sui_core::authority::AuthorityState::try_execute_immediately`
1024    #[allow(clippy::result_large_err)]
1025    pub async fn certificate_execute(
1026        &mut self,
1027        tx_digest: &TransactionDigest,
1028        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1029    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1030        // Use the lighterweight execution engine to get the pre-run state
1031        let pre_run_sandbox = self
1032            .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
1033            .await?;
1034        Self::certificate_execute_with_sandbox_state(&pre_run_sandbox).await
1035    }
1036
1037    /// Must be called after `init_for_execution`
1038    /// This executes from `sui_adapter::execution_engine::execute_transaction_to_effects`
1039    #[allow(clippy::result_large_err)]
1040    pub async fn execution_engine_execute(
1041        &mut self,
1042        tx_digest: &TransactionDigest,
1043        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1044    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1045        let sandbox_state = self
1046            .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
1047            .await?;
1048
1049        Ok(sandbox_state)
1050    }
1051
1052    #[allow(clippy::result_large_err)]
1053    pub async fn execute_state_dump(
1054        &mut self,
1055        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1056    ) -> Result<(ExecutionSandboxState, NodeStateDump), ReplayEngineError> {
1057        assert!(!self.is_remote_replay());
1058
1059        let d = match self.fetcher.clone() {
1060            Fetchers::NodeStateDump(d) => d,
1061            _ => panic!("Invalid fetcher for state dump"),
1062        };
1063        let tx_digest = d.node_state_dump.clone().tx_digest;
1064        let sandbox_state = self
1065            .execution_engine_execute_impl(&tx_digest, expensive_safety_check_config)
1066            .await?;
1067
1068        Ok((sandbox_state, d.node_state_dump))
1069    }
1070
1071    #[allow(clippy::result_large_err)]
1072    pub async fn execute_transaction(
1073        &mut self,
1074        tx_digest: &TransactionDigest,
1075        expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1076        use_authority: bool,
1077        executor_version: Option<i64>,
1078        protocol_version: Option<i64>,
1079        config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
1080    ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1081        self.executor_version = executor_version;
1082        self.protocol_version = protocol_version;
1083        self.config_and_versions = config_and_versions;
1084        if use_authority {
1085            self.certificate_execute(tx_digest, expensive_safety_check_config.clone())
1086                .await
1087        } else {
1088            self.execution_engine_execute(tx_digest, expensive_safety_check_config)
1089                .await
1090        }
1091    }
1092    fn system_package_ids(protocol_version: u64) -> Vec<ObjectID> {
1093        let mut ids = BuiltInFramework::all_package_ids();
1094
1095        if protocol_version < 5 {
1096            ids.retain(|id| *id != DEEPBOOK_PACKAGE_ID)
1097        }
1098        ids
1099    }
1100
1101    /// This is the only function which accesses the network during execution
1102    #[allow(clippy::result_large_err)]
1103    pub fn get_or_download_object(
1104        &self,
1105        obj_id: &ObjectID,
1106        package_expected: bool,
1107    ) -> Result<Option<Object>, ReplayEngineError> {
1108        if package_expected {
1109            if let Some(obj) = self
1110                .storage
1111                .package_cache
1112                .lock()
1113                .expect("Cannot lock")
1114                .get(obj_id)
1115            {
1116                return Ok(Some(obj.clone()));
1117            };
1118            // Check if its a system package because we must've downloaded all
1119            // TODO: Will return this check once we can download completely for other networks
1120            // assert!(
1121            //     !self.system_package_ids().contains(obj_id),
1122            //     "All system packages should be downloaded already"
1123            // );
1124        } else if let Some(obj) = self
1125            .storage
1126            .live_objects_store
1127            .lock()
1128            .expect("Can't lock")
1129            .get(obj_id)
1130        {
1131            return Ok(Some(obj.clone()));
1132        }
1133
1134        let Some(o) = self.download_latest_object(obj_id)? else {
1135            return Ok(None);
1136        };
1137
1138        if o.is_package() {
1139            assert!(
1140                package_expected,
1141                "Did not expect package but downloaded object is a package: {obj_id}"
1142            );
1143
1144            self.storage
1145                .package_cache
1146                .lock()
1147                .expect("Cannot lock")
1148                .insert(*obj_id, o.clone());
1149        }
1150        let o_ref = o.compute_object_reference();
1151        self.storage
1152            .object_version_cache
1153            .lock()
1154            .expect("Cannot lock")
1155            .insert((o_ref.0, o_ref.1), o.clone());
1156        Ok(Some(o))
1157    }
1158
1159    pub fn is_remote_replay(&self) -> bool {
1160        matches!(self.fetcher, Fetchers::Remote(_))
1161    }
1162
1163    /// Must be called after `populate_protocol_version_tables`
1164    #[allow(clippy::result_large_err)]
1165    pub fn system_package_versions_for_protocol_version(
1166        &self,
1167        protocol_version: u64,
1168    ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
1169        match &self.fetcher {
1170            Fetchers::Remote(_) => Ok(self
1171                .protocol_version_system_package_table
1172                .get(&protocol_version)
1173                .ok_or(ReplayEngineError::FrameworkObjectVersionTableNotPopulated {
1174                    protocol_version,
1175                })?
1176                .clone()
1177                .into_iter()
1178                .collect()),
1179
1180            Fetchers::NodeStateDump(d) => Ok(d
1181                .node_state_dump
1182                .relevant_system_packages
1183                .iter()
1184                .map(|w| (w.id, w.version, w.digest))
1185                .map(|q| (q.0, q.1))
1186                .collect()),
1187        }
1188    }
1189
1190    pub async fn protocol_ver_to_epoch_map(
1191        &self,
1192    ) -> Result<BTreeMap<u64, ProtocolVersionSummary>, ReplayEngineError> {
1193        let mut range_map = BTreeMap::new();
1194        let epoch_change_events = self.fetcher.get_epoch_change_events(false).await?;
1195
1196        // Exception for Genesis: Protocol version 1 at epoch 0
1197        let mut tx_digest = *self
1198            .fetcher
1199            .get_checkpoint_txs(0)
1200            .await?
1201            .first()
1202            .expect("Genesis TX must be in first checkpoint");
1203        // Somehow the genesis TX did not emit any event, but we know it was the start of version 1
1204        // So we need to manually add this range
1205        let (mut start_epoch, mut start_protocol_version, mut start_checkpoint) =
1206            (0, 1, Some(0u64));
1207
1208        let (mut curr_epoch, mut curr_protocol_version, mut curr_checkpoint) =
1209            (start_epoch, start_protocol_version, start_checkpoint);
1210
1211        (start_epoch, start_protocol_version, start_checkpoint) =
1212            (curr_epoch, curr_protocol_version, curr_checkpoint);
1213
1214        // This is the final tx digest for the epoch change. We need this to track the final checkpoint
1215        let mut end_epoch_tx_digest = tx_digest;
1216
1217        for event in epoch_change_events {
1218            (curr_epoch, curr_protocol_version) = extract_epoch_and_version(event.clone())?;
1219            end_epoch_tx_digest = event.id.tx_digest;
1220
1221            if start_protocol_version == curr_protocol_version {
1222                // Same range
1223                continue;
1224            }
1225
1226            // Change in prot version
1227            // Find the last checkpoint
1228            curr_checkpoint = self
1229                .fetcher
1230                .get_transaction(&event.id.tx_digest)
1231                .await?
1232                .checkpoint;
1233            // Insert the last range
1234            range_map.insert(
1235                start_protocol_version,
1236                ProtocolVersionSummary {
1237                    protocol_version: start_protocol_version,
1238                    epoch_start: start_epoch,
1239                    epoch_end: curr_epoch - 1,
1240                    checkpoint_start: start_checkpoint,
1241                    checkpoint_end: curr_checkpoint.map(|x| x - 1),
1242                    epoch_change_tx: tx_digest,
1243                },
1244            );
1245
1246            start_epoch = curr_epoch;
1247            start_protocol_version = curr_protocol_version;
1248            tx_digest = event.id.tx_digest;
1249            start_checkpoint = curr_checkpoint;
1250        }
1251
1252        // Insert the last range
1253        range_map.insert(
1254            curr_protocol_version,
1255            ProtocolVersionSummary {
1256                protocol_version: curr_protocol_version,
1257                epoch_start: start_epoch,
1258                epoch_end: curr_epoch,
1259                checkpoint_start: curr_checkpoint,
1260                checkpoint_end: self
1261                    .fetcher
1262                    .get_transaction(&end_epoch_tx_digest)
1263                    .await?
1264                    .checkpoint,
1265                epoch_change_tx: tx_digest,
1266            },
1267        );
1268
1269        Ok(range_map)
1270    }
1271
1272    pub fn protocol_version_for_epoch(
1273        epoch: u64,
1274        mp: &BTreeMap<u64, (TransactionDigest, u64, u64)>,
1275    ) -> u64 {
1276        // Naive impl but works for now
1277        // Can improve with range algos & data structures
1278        let mut version = 1;
1279        for (k, v) in mp.iter().rev() {
1280            if v.1 <= epoch {
1281                version = *k;
1282                break;
1283            }
1284        }
1285        version
1286    }
1287
1288    pub async fn populate_protocol_version_tables(&mut self) -> Result<(), ReplayEngineError> {
1289        self.protocol_version_epoch_table = self.protocol_ver_to_epoch_map().await?;
1290
1291        let system_package_revisions = self.system_package_versions().await?;
1292
1293        // This can be more efficient but small footprint so okay for now
1294        //Table is sorted from earliest to latest
1295        for (
1296            prot_ver,
1297            ProtocolVersionSummary {
1298                epoch_change_tx: tx_digest,
1299                ..
1300            },
1301        ) in self.protocol_version_epoch_table.clone()
1302        {
1303            // Use the previous versions protocol version table
1304            let mut working = if prot_ver <= 1 {
1305                BTreeMap::new()
1306            } else {
1307                self.protocol_version_system_package_table
1308                    .iter()
1309                    .rev()
1310                    .find(|(ver, _)| **ver <= prot_ver)
1311                    .expect("Prev entry must exist")
1312                    .1
1313                    .clone()
1314            };
1315
1316            for (id, versions) in system_package_revisions.iter() {
1317                // Oldest appears first in list, so reverse
1318                for ver in versions.iter().rev() {
1319                    if ver.1 == tx_digest {
1320                        // Found the version for this protocol version
1321                        working.insert(*id, ver.0);
1322                        break;
1323                    }
1324                }
1325            }
1326            self.protocol_version_system_package_table
1327                .insert(prot_ver, working);
1328        }
1329        Ok(())
1330    }
1331
1332    pub async fn system_package_versions(
1333        &self,
1334    ) -> Result<BTreeMap<ObjectID, Vec<(SequenceNumber, TransactionDigest)>>, ReplayEngineError>
1335    {
1336        let system_package_ids = Self::system_package_ids(
1337            *self
1338                .protocol_version_epoch_table
1339                .keys()
1340                .peekable()
1341                .last()
1342                .expect("Protocol version epoch table not populated"),
1343        );
1344        let mut system_package_objs = self.multi_download_latest(&system_package_ids).await?;
1345
1346        let mut mapping = BTreeMap::new();
1347
1348        // Extract all the transactions which created or mutated this object
1349        while !system_package_objs.is_empty() {
1350            // For the given object and its version, record the transaction which upgraded or created it
1351            let previous_txs: Vec<_> = system_package_objs
1352                .iter()
1353                .map(|o| (o.compute_object_reference(), o.previous_transaction))
1354                .collect();
1355
1356            previous_txs.iter().for_each(|((id, ver, _), tx)| {
1357                mapping.entry(*id).or_insert(vec![]).push((*ver, *tx));
1358            });
1359
1360            // Next round
1361            // Get the previous version of each object if exists
1362            let previous_ver_refs: Vec<_> = previous_txs
1363                .iter()
1364                .filter_map(|(q, _)| {
1365                    let prev_ver = u64::from(q.1) - 1;
1366                    if prev_ver == 0 {
1367                        None
1368                    } else {
1369                        Some((q.0, SequenceNumber::from(prev_ver)))
1370                    }
1371                })
1372                .collect();
1373            system_package_objs = match self.multi_download(&previous_ver_refs).await {
1374                Ok(packages) => packages,
1375                Err(ReplayEngineError::ObjectNotExist { id }) => {
1376                    // This happens when the RPC server prunes older object
1377                    // Replays in the current protocol version will work but old ones might not
1378                    // as we cannot fetch the package
1379                    warn!(
1380                        "Object {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1381                        id
1382                    );
1383                    break;
1384                }
1385                Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
1386                    // This happens when the RPC server prunes older object
1387                    // Replays in the current protocol version will work but old ones might not
1388                    // as we cannot fetch the package
1389                    warn!(
1390                        "Object {} at version {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1391                        id, version
1392                    );
1393                    break;
1394                }
1395                Err(ReplayEngineError::ObjectVersionTooHigh {
1396                    id,
1397                    asked_version,
1398                    latest_version,
1399                }) => {
1400                    warn!(
1401                        "Object {} at version {} does not exist on RPC server. Latest version is {}. This might be due to pruning. Historical replays might not work",
1402                        id, asked_version, latest_version
1403                    );
1404                    break;
1405                }
1406                Err(ReplayEngineError::ObjectDeleted {
1407                    id,
1408                    version,
1409                    digest,
1410                }) => {
1411                    // This happens when the RPC server prunes older object
1412                    // Replays in the current protocol version will work but old ones might not
1413                    // as we cannot fetch the package
1414                    warn!(
1415                        "Object {} at version {} digest {} deleted from RPC server. This might be due to pruning. Historical replays might not work",
1416                        id, version, digest
1417                    );
1418                    break;
1419                }
1420                Err(e) => return Err(e),
1421            };
1422        }
1423        Ok(mapping)
1424    }
1425
1426    pub async fn get_protocol_config(
1427        &self,
1428        epoch_id: EpochId,
1429        chain: Chain,
1430    ) -> Result<ProtocolConfig, ReplayEngineError> {
1431        match self.protocol_version {
1432            Some(x) if x < 0 => Ok(ProtocolConfig::get_for_max_version_UNSAFE()),
1433            Some(v) => Ok(ProtocolConfig::get_for_version((v as u64).into(), chain)),
1434            None => self
1435                .protocol_version_epoch_table
1436                .iter()
1437                .rev()
1438                .find(|(_, rg)| epoch_id >= rg.epoch_start)
1439                .map(|(p, _rg)| Ok(ProtocolConfig::get_for_version((*p).into(), chain)))
1440                .unwrap_or_else(|| {
1441                    Err(ReplayEngineError::ProtocolVersionNotFound { epoch: epoch_id })
1442                }),
1443        }
1444    }
1445
1446    pub async fn checkpoints_for_epoch(
1447        &self,
1448        epoch_id: u64,
1449    ) -> Result<(u64, u64), ReplayEngineError> {
1450        let epoch_change_events = self
1451            .fetcher
1452            .get_epoch_change_events(true)
1453            .await?
1454            .into_iter()
1455            .collect::<Vec<_>>();
1456        let (start_checkpoint, start_epoch_idx) = if epoch_id == 0 {
1457            (0, 1)
1458        } else {
1459            let idx = epoch_change_events
1460                .iter()
1461                .position(|ev| match extract_epoch_and_version(ev.clone()) {
1462                    Ok((epoch, _)) => epoch == epoch_id,
1463                    Err(_) => false,
1464                })
1465                .ok_or(ReplayEngineError::EventNotFound { epoch: epoch_id })?;
1466            let epoch_change_tx = epoch_change_events[idx].id.tx_digest;
1467            (
1468                self.fetcher
1469                    .get_transaction(&epoch_change_tx)
1470                    .await?
1471                    .checkpoint
1472                    .unwrap_or_else(|| {
1473                        panic!(
1474                            "Checkpoint for transaction {} not present. Could be due to pruning",
1475                            epoch_change_tx
1476                        )
1477                    }),
1478                idx,
1479            )
1480        };
1481
1482        let next_epoch_change_tx = epoch_change_events
1483            .get(start_epoch_idx + 1)
1484            .map(|v| v.id.tx_digest)
1485            .ok_or(ReplayEngineError::UnableToDetermineCheckpoint { epoch: epoch_id })?;
1486
1487        let next_epoch_checkpoint = self
1488            .fetcher
1489            .get_transaction(&next_epoch_change_tx)
1490            .await?
1491            .checkpoint
1492            .unwrap_or_else(|| {
1493                panic!(
1494                    "Checkpoint for transaction {} not present. Could be due to pruning",
1495                    next_epoch_change_tx
1496                )
1497            });
1498
1499        Ok((start_checkpoint, next_epoch_checkpoint - 1))
1500    }
1501
1502    pub async fn get_epoch_start_timestamp_and_rgp(
1503        &self,
1504        epoch_id: u64,
1505        tx_digest: &TransactionDigest,
1506    ) -> Result<(u64, u64), ReplayEngineError> {
1507        if epoch_id == 0 {
1508            return Err(ReplayEngineError::TransactionNotSupported {
1509                digest: *tx_digest,
1510                reason: "Transactions from epoch 0 not supported".to_string(),
1511            });
1512        }
1513        self.fetcher
1514            .get_epoch_start_timestamp_and_rgp(epoch_id)
1515            .await
1516    }
1517
1518    fn add_config_objects_if_needed(
1519        &self,
1520        status: &SuiExecutionStatus,
1521    ) -> Vec<(ObjectID, SequenceNumber)> {
1522        match parse_effect_error_for_denied_coins(status) {
1523            Some(coin_type) => {
1524                let Some(mut config_id_and_version) = self.config_and_versions.clone() else {
1525                    panic!(
1526                        "Need to specify the config object ID and version for '{coin_type}' in order to replay this transaction"
1527                    );
1528                };
1529                // NB: the version of the deny list object doesn't matter
1530                if !config_id_and_version
1531                    .iter()
1532                    .any(|(id, _)| id == &SUI_DENY_LIST_OBJECT_ID)
1533                {
1534                    let deny_list_oid_version = self.download_latest_object(&SUI_DENY_LIST_OBJECT_ID)
1535                        .ok()
1536                        .flatten()
1537                        .expect("Unable to download the deny list object for a transaction that requires it")
1538                        .version();
1539                    config_id_and_version.push((SUI_DENY_LIST_OBJECT_ID, deny_list_oid_version));
1540                }
1541                config_id_and_version
1542            }
1543            None => vec![],
1544        }
1545    }
1546
1547    async fn resolve_tx_components(
1548        &self,
1549        tx_digest: &TransactionDigest,
1550    ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1551        assert!(self.is_remote_replay());
1552        // Fetch full transaction content
1553        let tx_info = self.fetcher.get_transaction(tx_digest).await?;
1554        let sender = match tx_info.clone().transaction.unwrap().data {
1555            sui_json_rpc_types::SuiTransactionBlockData::V1(tx) => tx.sender,
1556        };
1557        let SuiTransactionBlockEffects::V1(effects) = tx_info.clone().effects.unwrap();
1558
1559        let config_objects = self.add_config_objects_if_needed(effects.status());
1560
1561        let raw_tx_bytes = tx_info.clone().raw_transaction;
1562        let orig_tx: SenderSignedData = bcs::from_bytes(&raw_tx_bytes).unwrap();
1563        let input_objs = orig_tx
1564            .transaction_data()
1565            .input_objects()
1566            .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1567        let tx_kind_orig = orig_tx.transaction_data().kind();
1568
1569        // Download the objects at the version right before the execution of this TX
1570        let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1571
1572        let shared_object_refs: Vec<ObjectRef> = effects
1573            .shared_objects()
1574            .iter()
1575            .map(|so_ref| {
1576                if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1577                    unimplemented!(
1578                        "Replay of deleted shared object transactions is not supported yet"
1579                    );
1580                } else {
1581                    so_ref.to_object_ref()
1582                }
1583            })
1584            .collect();
1585        let gas_data = match tx_info.clone().transaction.unwrap().data {
1586            sui_json_rpc_types::SuiTransactionBlockData::V1(tx) => tx.gas_data,
1587        };
1588        let gas_object_refs: Vec<_> = gas_data
1589            .payment
1590            .iter()
1591            .map(|obj_ref| obj_ref.to_object_ref())
1592            .collect();
1593        let receiving_objs = orig_tx
1594            .transaction_data()
1595            .receiving_objects()
1596            .into_iter()
1597            .map(|(obj_id, version, _)| (obj_id, version))
1598            .collect();
1599
1600        let epoch_id = effects.executed_epoch;
1601        let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1602
1603        // Extract the epoch start timestamp
1604        let (epoch_start_timestamp, reference_gas_price) = self
1605            .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1606            .await?;
1607
1608        Ok(OnChainTransactionInfo {
1609            kind: tx_kind_orig.clone(),
1610            sender,
1611            modified_at_versions,
1612            input_objects: input_objs,
1613            shared_object_refs,
1614            gas: gas_object_refs,
1615            gas_owner: (gas_data.owner != sender).then_some(gas_data.owner),
1616            gas_price: gas_data.price,
1617            gas_budget: gas_data.budget,
1618            executed_epoch: epoch_id,
1619            dependencies: effects.dependencies().to_vec(),
1620            effects: SuiTransactionBlockEffects::V1(effects),
1621            receiving_objs,
1622            config_objects,
1623            // Find the protocol version for this epoch
1624            // This assumes we already initialized the protocol version table `protocol_version_epoch_table`
1625            protocol_version: self.get_protocol_config(epoch_id, chain).await?.version,
1626            tx_digest: *tx_digest,
1627            epoch_start_timestamp,
1628            sender_signed_data: orig_tx.clone(),
1629            reference_gas_price,
1630            chain,
1631        })
1632    }
1633
1634    async fn resolve_tx_components_from_dump(
1635        &self,
1636        tx_digest: &TransactionDigest,
1637    ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1638        assert!(!self.is_remote_replay());
1639
1640        let dp = self.fetcher.as_node_state_dump();
1641
1642        let sender = dp
1643            .node_state_dump
1644            .sender_signed_data
1645            .transaction_data()
1646            .sender();
1647        let orig_tx = dp.node_state_dump.sender_signed_data.clone();
1648        let effects = dp.node_state_dump.computed_effects.clone();
1649        let effects = SuiTransactionBlockEffects::try_from(effects).unwrap();
1650        // Config objects don't show up in the node state dump so they need to be provided.
1651        let config_objects = self.add_config_objects_if_needed(effects.status());
1652
1653        // Fetch full transaction content
1654        //let tx_info = self.fetcher.get_transaction(tx_digest).await?;
1655
1656        let input_objs = orig_tx
1657            .transaction_data()
1658            .input_objects()
1659            .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1660        let tx_kind_orig = orig_tx.transaction_data().kind();
1661
1662        // Download the objects at the version right before the execution of this TX
1663        let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1664
1665        let shared_object_refs: Vec<ObjectRef> = effects
1666            .shared_objects()
1667            .iter()
1668            .map(|so_ref| {
1669                if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1670                    unimplemented!(
1671                        "Replay of deleted shared object transactions is not supported yet"
1672                    );
1673                } else {
1674                    so_ref.to_object_ref()
1675                }
1676            })
1677            .collect();
1678        let receiving_objs = orig_tx
1679            .transaction_data()
1680            .receiving_objects()
1681            .into_iter()
1682            .map(|(obj_id, version, _)| (obj_id, version))
1683            .collect();
1684
1685        let epoch_id = dp.node_state_dump.executed_epoch;
1686
1687        let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1688
1689        let protocol_config =
1690            ProtocolConfig::get_for_version(dp.node_state_dump.protocol_version.into(), chain);
1691        // Extract the epoch start timestamp
1692        let (epoch_start_timestamp, reference_gas_price) = self
1693            .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1694            .await?;
1695        let gas_data = orig_tx.transaction_data().gas_data();
1696        let gas_object_refs: Vec<_> = gas_data.clone().payment.into_iter().collect();
1697
1698        Ok(OnChainTransactionInfo {
1699            kind: tx_kind_orig.clone(),
1700            sender,
1701            modified_at_versions,
1702            input_objects: input_objs,
1703            shared_object_refs,
1704            gas: gas_object_refs,
1705            gas_owner: (gas_data.owner != sender).then_some(gas_data.owner),
1706            gas_price: gas_data.price,
1707            gas_budget: gas_data.budget,
1708            executed_epoch: epoch_id,
1709            dependencies: effects.dependencies().to_vec(),
1710            effects,
1711            receiving_objs,
1712            config_objects,
1713            protocol_version: protocol_config.version,
1714            tx_digest: *tx_digest,
1715            epoch_start_timestamp,
1716            sender_signed_data: orig_tx.clone(),
1717            reference_gas_price,
1718            chain,
1719        })
1720    }
1721
1722    async fn resolve_download_input_objects(
1723        &mut self,
1724        tx_info: &OnChainTransactionInfo,
1725        deleted_shared_objects: Vec<ObjectRef>,
1726    ) -> Result<InputObjects, ReplayEngineError> {
1727        // Download the input objects
1728        let mut package_inputs = vec![];
1729        let mut imm_owned_inputs = vec![];
1730        let mut shared_inputs = vec![];
1731        let mut deleted_shared_info_map = BTreeMap::new();
1732
1733        // for deleted shared objects, we need to look at the transaction dependencies to find the
1734        // correct transaction dependency for a deleted shared object.
1735        if !deleted_shared_objects.is_empty() {
1736            for tx_digest in tx_info.dependencies.iter() {
1737                let tx_info = self.resolve_tx_components(tx_digest).await?;
1738                for (obj_id, version, _) in tx_info.shared_object_refs.iter() {
1739                    deleted_shared_info_map.insert(*obj_id, (tx_info.tx_digest, *version));
1740                }
1741            }
1742        }
1743
1744        tx_info
1745            .input_objects
1746            .iter()
1747            .map(|kind| match kind {
1748                InputObjectKind::MovePackage(i) => {
1749                    package_inputs.push(*i);
1750                    Ok(())
1751                }
1752                InputObjectKind::ImmOrOwnedMoveObject(o_ref) => {
1753                    imm_owned_inputs.push((o_ref.0, o_ref.1));
1754                    Ok(())
1755                }
1756                InputObjectKind::SharedMoveObject {
1757                    id,
1758                    initial_shared_version: _,
1759                    mutability: _,
1760                } if !deleted_shared_info_map.contains_key(id) => {
1761                    // We already downloaded
1762                    if let Some(o) = self
1763                        .storage
1764                        .live_objects_store
1765                        .lock()
1766                        .expect("Can't lock")
1767                        .get(id)
1768                    {
1769                        shared_inputs.push(o.clone());
1770                        Ok(())
1771                    } else {
1772                        Err(ReplayEngineError::InternalCacheInvariantViolation {
1773                            id: *id,
1774                            version: None,
1775                        })
1776                    }
1777                }
1778                _ => Ok(()),
1779            })
1780            .collect::<Result<Vec<_>, _>>()?;
1781
1782        // Download the imm and owned objects
1783        let mut in_objs = self.multi_download_and_store(&imm_owned_inputs).await?;
1784
1785        // For packages, download latest if non framework
1786        // If framework, download relevant for the current protocol version
1787        in_objs.extend(
1788            self.multi_download_relevant_packages_and_store(
1789                package_inputs,
1790                tx_info.protocol_version.as_u64(),
1791            )
1792            .await?,
1793        );
1794        // Add shared objects
1795        in_objs.extend(shared_inputs);
1796
1797        // TODO(Zhe): Account for cancelled transaction assigned version here, and tests.
1798        let resolved_input_objs = tx_info
1799            .input_objects
1800            .iter()
1801            .flat_map(|kind| match kind {
1802                InputObjectKind::MovePackage(i) => {
1803                    // Okay to unwrap since we downloaded it
1804                    Some(ObjectReadResult::new(
1805                        *kind,
1806                        self.storage
1807                            .package_cache
1808                            .lock()
1809                            .expect("Cannot lock")
1810                            .get(i)
1811                            .unwrap_or(
1812                                &self
1813                                    .download_latest_object(i)
1814                                    .expect("Object download failed")
1815                                    .expect("Object not found on chain"),
1816                            )
1817                            .clone()
1818                            .into(),
1819                    ))
1820                }
1821                InputObjectKind::ImmOrOwnedMoveObject(o_ref) => Some(ObjectReadResult::new(
1822                    *kind,
1823                    self.storage
1824                        .object_version_cache
1825                        .lock()
1826                        .expect("Cannot lock")
1827                        .get(&(o_ref.0, o_ref.1))
1828                        .unwrap()
1829                        .clone()
1830                        .into(),
1831                )),
1832                InputObjectKind::SharedMoveObject { id, .. }
1833                    if !deleted_shared_info_map.contains_key(id) =>
1834                {
1835                    // we already downloaded
1836                    Some(ObjectReadResult::new(
1837                        *kind,
1838                        self.storage
1839                            .live_objects_store
1840                            .lock()
1841                            .expect("Can't lock")
1842                            .get(id)
1843                            .unwrap()
1844                            .clone()
1845                            .into(),
1846                    ))
1847                }
1848                InputObjectKind::SharedMoveObject { id, .. } => {
1849                    let (digest, version) = deleted_shared_info_map.get(id).unwrap();
1850                    Some(ObjectReadResult::new(
1851                        *kind,
1852                        ObjectReadResultKind::ObjectConsensusStreamEnded(*version, *digest),
1853                    ))
1854                }
1855            })
1856            .collect();
1857
1858        Ok(InputObjects::new(resolved_input_objs))
1859    }
1860
1861    /// Given the OnChainTransactionInfo, download and store the input objects, and other info necessary
1862    /// for execution
1863    async fn initialize_execution_env_state(
1864        &mut self,
1865        tx_info: &OnChainTransactionInfo,
1866    ) -> Result<InputObjects, ReplayEngineError> {
1867        // We need this for other activities in this session
1868        self.current_protocol_version = tx_info.protocol_version.as_u64();
1869
1870        // Download the objects at the version right before the execution of this TX
1871        self.multi_download_and_store(&tx_info.modified_at_versions)
1872            .await?;
1873
1874        let (shared_refs, deleted_shared_refs): (Vec<ObjectRef>, Vec<ObjectRef>) = tx_info
1875            .shared_object_refs
1876            .iter()
1877            .partition(|r| r.2 != ObjectDigest::OBJECT_DIGEST_DELETED);
1878
1879        // Download shared objects at the version right before the execution of this TX
1880        let shared_refs: Vec<_> = shared_refs.iter().map(|r| (r.0, r.1)).collect();
1881        self.multi_download_and_store(&shared_refs).await?;
1882
1883        // Download gas (although this should already be in cache from modified at versions?)
1884        let gas_refs: Vec<_> = tx_info
1885            .gas
1886            .iter()
1887            .filter_map(|w| (w.0 != ObjectID::ZERO).then_some((w.0, w.1)))
1888            .collect();
1889        self.multi_download_and_store(&gas_refs).await?;
1890
1891        // Fetch the input objects we know from the raw transaction
1892        let input_objs = self
1893            .resolve_download_input_objects(tx_info, deleted_shared_refs)
1894            .await?;
1895
1896        // Fetch the receiving objects
1897        self.multi_download_and_store(&tx_info.receiving_objs)
1898            .await?;
1899
1900        // Fetch specified config objects if any
1901        self.multi_download_and_store(&tx_info.config_objects)
1902            .await?;
1903
1904        // Prep the object runtime for dynamic fields
1905        // Download the child objects accessed at the version right before the execution of this TX
1906        let loaded_child_refs = self.fetch_loaded_child_refs(&tx_info.tx_digest).await?;
1907        self.multi_download_and_store(&loaded_child_refs).await?;
1908        tokio::task::yield_now().await;
1909
1910        Ok(input_objs)
1911    }
1912}
1913
1914// <---------------------  Implement necessary traits for LocalExec to work with exec engine ----------------------->
1915
1916impl BackingPackageStore for LocalExec {
1917    /// In this case we might need to download a dependency package which was not present in the
1918    /// modified at versions list because packages are immutable
1919    fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1920        fn inner(self_: &LocalExec, package_id: &ObjectID) -> SuiResult<Option<Object>> {
1921            // If package not present fetch it from the network
1922            self_
1923                .get_or_download_object(package_id, true /* we expect a Move package*/)
1924                .map_err(|e| SuiErrorKind::Storage(e.to_string()).into())
1925        }
1926
1927        let res = inner(self, package_id);
1928        self.exec_store_events
1929            .lock()
1930            .expect("Unable to lock events list")
1931            .push(ExecutionStoreEvent::BackingPackageGetPackageObject {
1932                package_id: *package_id,
1933                result: res.clone(),
1934            });
1935        res.map(|o| o.map(PackageObject::new))
1936    }
1937}
1938
1939impl ChildObjectResolver for LocalExec {
1940    /// This uses `get_object`, which does not download from the network
1941    /// Hence all objects must be in store already
1942    fn read_child_object(
1943        &self,
1944        parent: &ObjectID,
1945        child: &ObjectID,
1946        child_version_upper_bound: SequenceNumber,
1947    ) -> SuiResult<Option<Object>> {
1948        fn inner(
1949            self_: &LocalExec,
1950            parent: &ObjectID,
1951            child: &ObjectID,
1952            child_version_upper_bound: SequenceNumber,
1953        ) -> SuiResult<Option<Object>> {
1954            let child_object =
1955                match self_.download_object_by_upper_bound(child, child_version_upper_bound)? {
1956                    None => return Ok(None),
1957                    Some(o) => o,
1958                };
1959            let child_version = child_object.version();
1960            if child_object.version() > child_version_upper_bound {
1961                return Err(SuiErrorKind::Unknown(format!(
1962                    "Invariant Violation. Replay loaded child_object {child} at version \
1963                    {child_version} but expected the version to be <= {child_version_upper_bound}"
1964                ))
1965                .into());
1966            }
1967            let parent = *parent;
1968            if child_object.owner != Owner::ObjectOwner(parent.into()) {
1969                return Err(SuiErrorKind::InvalidChildObjectAccess {
1970                    object: *child,
1971                    given_parent: parent,
1972                    actual_owner: child_object.owner.clone(),
1973                }
1974                .into());
1975            }
1976            Ok(Some(child_object))
1977        }
1978
1979        let res = inner(self, parent, child, child_version_upper_bound);
1980        self.exec_store_events
1981            .lock()
1982            .expect("Unable to lock events list")
1983            .push(
1984                ExecutionStoreEvent::ChildObjectResolverStoreReadChildObject {
1985                    parent: *parent,
1986                    child: *child,
1987                    result: res.clone(),
1988                },
1989            );
1990        res
1991    }
1992
1993    fn get_object_received_at_version(
1994        &self,
1995        owner: &ObjectID,
1996        receiving_object_id: &ObjectID,
1997        receive_object_at_version: SequenceNumber,
1998        _epoch_id: EpochId,
1999    ) -> SuiResult<Option<Object>> {
2000        fn inner(
2001            self_: &LocalExec,
2002            owner: &ObjectID,
2003            receiving_object_id: &ObjectID,
2004            receive_object_at_version: SequenceNumber,
2005        ) -> SuiResult<Option<Object>> {
2006            let recv_object = match self_.get_object(receiving_object_id) {
2007                None => return Ok(None),
2008                Some(o) => o,
2009            };
2010            if recv_object.version() != receive_object_at_version {
2011                return Err(SuiErrorKind::Unknown(format!(
2012                    "Invariant Violation. Replay loaded child_object {receiving_object_id} at version \
2013                    {receive_object_at_version} but expected the version to be == {receive_object_at_version}"
2014                )).into());
2015            }
2016            if recv_object.owner != Owner::AddressOwner((*owner).into()) {
2017                return Ok(None);
2018            }
2019            Ok(Some(recv_object))
2020        }
2021
2022        let res = inner(self, owner, receiving_object_id, receive_object_at_version);
2023        self.exec_store_events
2024            .lock()
2025            .expect("Unable to lock events list")
2026            .push(ExecutionStoreEvent::ReceiveObject {
2027                owner: *owner,
2028                receive: *receiving_object_id,
2029                receive_at_version: receive_object_at_version,
2030                result: res.clone(),
2031            });
2032        res
2033    }
2034}
2035
2036impl ParentSync for LocalExec {
2037    /// The objects here much already exist in the store because we downloaded them earlier
2038    /// No download from network
2039    fn get_latest_parent_entry_ref_deprecated(&self, object_id: ObjectID) -> Option<ObjectRef> {
2040        fn inner(self_: &LocalExec, object_id: ObjectID) -> Option<ObjectRef> {
2041            if let Some(v) = self_
2042                .storage
2043                .live_objects_store
2044                .lock()
2045                .expect("Can't lock")
2046                .get(&object_id)
2047            {
2048                return Some(v.compute_object_reference());
2049            }
2050            None
2051        }
2052        let res = inner(self, object_id);
2053        self.exec_store_events
2054            .lock()
2055            .expect("Unable to lock events list")
2056            .push(
2057                ExecutionStoreEvent::ParentSyncStoreGetLatestParentEntryRef {
2058                    object_id,
2059                    result: res,
2060                },
2061            );
2062        res
2063    }
2064}
2065
2066impl ModuleResolver for LocalExec {
2067    type Error = SuiError;
2068
2069    /// This fetches a module which must already be present in the store
2070    /// We do not download
2071    fn get_module(&self, module_id: &ModuleId) -> SuiResult<Option<Vec<u8>>> {
2072        fn inner(self_: &LocalExec, module_id: &ModuleId) -> SuiResult<Option<Vec<u8>>> {
2073            get_module(self_, module_id)
2074        }
2075
2076        let res = inner(self, module_id);
2077        self.exec_store_events
2078            .lock()
2079            .expect("Unable to lock events list")
2080            .push(ExecutionStoreEvent::ModuleResolverGetModule {
2081                module_id: module_id.clone(),
2082                result: res.clone(),
2083            });
2084        res
2085    }
2086
2087    fn get_packages_static<const N: usize>(
2088        &self,
2089        ids: [AccountAddress; N],
2090    ) -> Result<[Option<SerializedPackage>; N], Self::Error> {
2091        let mut res = [const { None }; N];
2092        for i in 0..N {
2093            res[i] = get_package(self, &ids[i].into())?;
2094        }
2095        Ok(res)
2096    }
2097
2098    fn get_packages<'a>(
2099        &self,
2100        ids: impl ExactSizeIterator<Item = &'a AccountAddress>,
2101    ) -> Result<Vec<Option<SerializedPackage>>, Self::Error> {
2102        ids.map(|id| get_package(self, &(*id).into())).collect()
2103    }
2104}
2105
2106impl ModuleResolver for &mut LocalExec {
2107    type Error = SuiError;
2108
2109    fn get_module(&self, module_id: &ModuleId) -> SuiResult<Option<Vec<u8>>> {
2110        // Recording event here will be double-counting since its already recorded in the get_module fn
2111        (**self).get_module(module_id)
2112    }
2113
2114    fn get_packages<'a>(
2115        &self,
2116        ids: impl ExactSizeIterator<Item = &'a AccountAddress>,
2117    ) -> Result<Vec<Option<SerializedPackage>>, Self::Error> {
2118        (**self).get_packages(ids)
2119    }
2120
2121    fn get_packages_static<const N: usize>(
2122        &self,
2123        ids: [AccountAddress; N],
2124    ) -> Result<[Option<SerializedPackage>; N], Self::Error> {
2125        (**self).get_packages_static(ids)
2126    }
2127}
2128
2129impl ObjectStore for LocalExec {
2130    /// The object must be present in store by normal process we used to backfill store in init
2131    /// We dont download if not present
2132    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
2133        let res = self
2134            .storage
2135            .live_objects_store
2136            .lock()
2137            .expect("Can't lock")
2138            .get(object_id)
2139            .cloned();
2140        self.exec_store_events
2141            .lock()
2142            .expect("Unable to lock events list")
2143            .push(ExecutionStoreEvent::ObjectStoreGetObject {
2144                object_id: *object_id,
2145                result: Ok(res.clone()),
2146            });
2147        res
2148    }
2149
2150    /// The object must be present in store by normal process we used to backfill store in init
2151    /// We dont download if not present
2152    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
2153        let res = self
2154            .storage
2155            .live_objects_store
2156            .lock()
2157            .expect("Can't lock")
2158            .get(object_id)
2159            .and_then(|obj| {
2160                if obj.version() == version {
2161                    Some(obj.clone())
2162                } else {
2163                    None
2164                }
2165            });
2166
2167        self.exec_store_events
2168            .lock()
2169            .expect("Unable to lock events list")
2170            .push(ExecutionStoreEvent::ObjectStoreGetObjectByKey {
2171                object_id: *object_id,
2172                version,
2173                result: Ok(res.clone()),
2174            });
2175
2176        res
2177    }
2178}
2179
2180impl ObjectStore for &mut LocalExec {
2181    fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
2182        // Recording event here will be double-counting since its already recorded in the get_module fn
2183        (**self).get_object(object_id)
2184    }
2185
2186    fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
2187        // Recording event here will be double-counting since its already recorded in the get_module fn
2188        (**self).get_object_by_key(object_id, version)
2189    }
2190}
2191
2192impl GetModule for LocalExec {
2193    type Error = SuiError;
2194    type Item = CompiledModule;
2195
2196    fn get_module_by_id(&self, id: &ModuleId) -> SuiResult<Option<Self::Item>> {
2197        let res = get_module_by_id(self, id);
2198
2199        self.exec_store_events
2200            .lock()
2201            .expect("Unable to lock events list")
2202            .push(ExecutionStoreEvent::GetModuleGetModuleByModuleId {
2203                id: id.clone(),
2204                result: res.clone(),
2205            });
2206        res
2207    }
2208}
2209
2210// <--------------------- Util functions ----------------------->
2211
2212pub fn get_executor(
2213    executor_version_override: Option<i64>,
2214    protocol_config: &ProtocolConfig,
2215    _expensive_safety_check_config: ExpensiveSafetyCheckConfig,
2216) -> Arc<dyn Executor + Send + Sync> {
2217    let protocol_config = executor_version_override
2218        .map(|q| {
2219            let ver = if q < 0 {
2220                ProtocolConfig::get_for_max_version_UNSAFE().execution_version()
2221            } else {
2222                q as u64
2223            };
2224
2225            let mut c = protocol_config.clone();
2226            c.set_execution_version_for_testing(ver);
2227            c
2228        })
2229        .unwrap_or(protocol_config.clone());
2230
2231    let silent = true;
2232    sui_execution::executor(&protocol_config, silent)
2233        .expect("Creating an executor should not fail here")
2234}
2235
2236fn parse_effect_error_for_denied_coins(status: &SuiExecutionStatus) -> Option<String> {
2237    let SuiExecutionStatus::Failure { error } = status else {
2238        return None;
2239    };
2240    parse_denied_error_string(error)
2241}
2242
2243fn parse_denied_error_string(error: &str) -> Option<String> {
2244    let regulated_regex = regex::Regex::new(
2245        r#"CoinTypeGlobalPause.*?"(.*?)"|AddressDeniedForCoin.*coin_type:.*?"(.*?)""#,
2246    )
2247    .unwrap();
2248
2249    let caps = regulated_regex.captures(error)?;
2250    Some(caps.get(1).or(caps.get(2))?.as_str().to_string())
2251}
2252
2253#[cfg(test)]
2254mod tests {
2255    use super::parse_denied_error_string;
2256    #[test]
2257    fn test_regex_regulated_coin_errors() {
2258        let test_bank = vec![
2259            "CoinTypeGlobalPause { coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2260            "AddressDeniedForCoin { address: B, coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2261        ];
2262        let expected_string =
2263            "39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN";
2264
2265        for test in &test_bank {
2266            assert!(parse_denied_error_string(test).unwrap() == expected_string);
2267        }
2268    }
2269}