1use crate::chain_from_chain_id;
5use crate::{
6 data_fetcher::{
7 DataFetcher, Fetchers, NodeStateDumpFetcher, RemoteFetcher, extract_epoch_and_version,
8 },
9 displays::{
10 Pretty,
11 transaction_displays::{FullPTB, transform_command_results_to_annotated},
12 },
13 types::*,
14};
15use futures::executor::block_on;
16use move_binary_format::CompiledModule;
17use move_bytecode_utils::module_cache::GetModule;
18use move_core_types::{language_storage::ModuleId, resolver::ModuleResolver};
19use prometheus::Registry;
20use serde::{Deserialize, Serialize};
21use similar::{ChangeTag, TextDiff};
22use std::{
23 collections::{BTreeMap, HashSet},
24 path::PathBuf,
25 sync::Arc,
26 sync::Mutex,
27};
28use sui_config::node::ExpensiveSafetyCheckConfig;
29use sui_core::authority::NodeStateDump;
30use sui_execution::Executor;
31use sui_framework::BuiltInFramework;
32use sui_json_rpc_types::{
33 SuiExecutionStatus, SuiTransactionBlockEffects, SuiTransactionBlockEffectsAPI,
34};
35use sui_protocol_config::{Chain, ProtocolConfig};
36use sui_sdk::{SuiClient, SuiClientBuilder};
37use sui_types::SUI_DENY_LIST_OBJECT_ID;
38use sui_types::error::SuiErrorKind;
39use sui_types::execution_params::{
40 BalanceWithdrawStatus, ExecutionOrEarlyError, get_early_execution_error,
41};
42use sui_types::in_memory_storage::InMemoryStorage;
43use sui_types::message_envelope::Message;
44use sui_types::storage::{PackageObject, get_module};
45use sui_types::transaction::GasData;
46use sui_types::transaction::TransactionKind::ProgrammableTransaction;
47use sui_types::{
48 DEEPBOOK_PACKAGE_ID,
49 base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber},
50 committee::EpochId,
51 digests::{ObjectDigest, TransactionDigest},
52 error::{ExecutionError, SuiError, SuiResult},
53 executable_transaction::VerifiedExecutableTransaction,
54 gas::SuiGasStatus,
55 inner_temporary_store::InnerTemporaryStore,
56 metrics::LimitsMetrics,
57 object::{Object, Owner},
58 storage::get_module_by_id,
59 storage::{BackingPackageStore, ChildObjectResolver, ObjectStore, ParentSync},
60 transaction::{
61 CheckedInputObjects, InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
62 SenderSignedData, Transaction, TransactionDataAPI, TransactionKind, VerifiedTransaction,
63 },
64};
65use tracing::{error, info, trace, warn};
66
67#[derive(Debug, Serialize, Deserialize)]
70pub struct ExecutionSandboxState {
71 pub transaction_info: OnChainTransactionInfo,
73 pub required_objects: Vec<Object>,
75 #[serde(skip)]
77 pub local_exec_temporary_store: Option<InnerTemporaryStore>,
78 pub local_exec_effects: SuiTransactionBlockEffects,
80 #[serde(skip)]
82 pub local_exec_status: Option<Result<(), ExecutionError>>,
83}
84
85impl ExecutionSandboxState {
86 #[allow(clippy::result_large_err)]
87 pub fn check_effects(&self) -> Result<(), ReplayEngineError> {
88 let SuiTransactionBlockEffects::V1(mut local_effects) = self.local_exec_effects.clone();
89 let SuiTransactionBlockEffects::V1(on_chain_effects) =
90 self.transaction_info.effects.clone();
91
92 if on_chain_effects.abort_error.is_none() {
95 local_effects.abort_error = None;
96 }
97 let local_effects = SuiTransactionBlockEffects::V1(local_effects);
98 let on_chain_effects = SuiTransactionBlockEffects::V1(on_chain_effects);
99
100 if on_chain_effects != local_effects {
101 error!("Replay tool forked {}", self.transaction_info.tx_digest);
102 let diff = Self::diff_effects(&on_chain_effects, &local_effects);
103 println!("{}", diff);
104 return Err(ReplayEngineError::EffectsForked {
105 digest: self.transaction_info.tx_digest,
106 diff: format!("\n{}", diff),
107 on_chain: Box::new(on_chain_effects),
108 local: Box::new(local_effects),
109 });
110 }
111 Ok(())
112 }
113
114 pub fn diff_effects(
116 on_chain_effects: &SuiTransactionBlockEffects,
117 local_effects: &SuiTransactionBlockEffects,
118 ) -> String {
119 let on_chain_str = format!("{:#?}", on_chain_effects);
120 let local_chain_str = format!("{:#?}", local_effects);
121 let mut res = vec![];
122
123 let diff = TextDiff::from_lines(&on_chain_str, &local_chain_str);
124 for change in diff.iter_all_changes() {
125 let sign = match change.tag() {
126 ChangeTag::Delete => "---",
127 ChangeTag::Insert => "+++",
128 ChangeTag::Equal => " ",
129 };
130 res.push(format!("{}{}", sign, change));
131 }
132
133 res.join("")
134 }
135}
136
137#[derive(Debug, Clone, PartialEq, Eq)]
138pub struct ProtocolVersionSummary {
139 pub protocol_version: u64,
141 pub epoch_start: u64,
143 pub epoch_end: u64,
145 pub checkpoint_start: Option<u64>,
147 pub checkpoint_end: Option<u64>,
149 pub epoch_change_tx: TransactionDigest,
151}
152
153#[derive(Clone)]
154pub struct Storage {
155 pub live_objects_store: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
160
161 pub package_cache: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
164 pub object_version_cache: Arc<Mutex<BTreeMap<(ObjectID, SequenceNumber), Object>>>,
167}
168
169impl std::fmt::Display for Storage {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 writeln!(f, "Live object store")?;
172 for (id, obj) in self
173 .live_objects_store
174 .lock()
175 .expect("Unable to lock")
176 .iter()
177 {
178 writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
179 }
180 writeln!(f, "Package cache")?;
181 for (id, obj) in self.package_cache.lock().expect("Unable to lock").iter() {
182 writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
183 }
184 writeln!(f, "Object version cache")?;
185 for (id, _) in self
186 .object_version_cache
187 .lock()
188 .expect("Unable to lock")
189 .iter()
190 {
191 writeln!(f, "{}: {}", id.0, id.1)?;
192 }
193
194 write!(f, "")
195 }
196}
197
198impl Storage {
199 pub fn default() -> Self {
200 Self {
201 live_objects_store: Arc::new(Mutex::new(BTreeMap::new())),
202 package_cache: Arc::new(Mutex::new(BTreeMap::new())),
203 object_version_cache: Arc::new(Mutex::new(BTreeMap::new())),
204 }
205 }
206
207 pub fn all_objects(&self) -> Vec<Object> {
208 self.live_objects_store
209 .lock()
210 .expect("Unable to lock")
211 .values()
212 .cloned()
213 .chain(
214 self.package_cache
215 .lock()
216 .expect("Unable to lock")
217 .values()
218 .cloned(),
219 )
220 .chain(
221 self.object_version_cache
222 .lock()
223 .expect("Unable to lock")
224 .values()
225 .cloned(),
226 )
227 .collect::<Vec<_>>()
228 }
229}
230
231#[derive(Clone)]
232pub struct LocalExec {
233 pub client: Option<SuiClient>,
234 pub protocol_version_epoch_table: BTreeMap<u64, ProtocolVersionSummary>,
237 pub protocol_version_system_package_table: BTreeMap<u64, BTreeMap<ObjectID, SequenceNumber>>,
239 pub current_protocol_version: u64,
241 pub storage: Storage,
243 pub exec_store_events: Arc<Mutex<Vec<ExecutionStoreEvent>>>,
245 pub metrics: Arc<LimitsMetrics>,
247 pub fetcher: Fetchers,
249
250 pub executor_version: Option<i64>,
253 pub protocol_version: Option<i64>,
257 pub config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
258 pub num_retries_for_timeout: u32,
260 pub sleep_period_for_timeout: std::time::Duration,
261}
262
263impl LocalExec {
264 pub async fn multi_download(
267 &self,
268 objs: &[(ObjectID, SequenceNumber)],
269 ) -> Result<Vec<Object>, ReplayEngineError> {
270 let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
271 while num_retries_for_timeout >= 0 {
272 match self.fetcher.multi_get_versioned(objs).await {
273 Ok(objs) => return Ok(objs),
274 Err(ReplayEngineError::SuiRpcRequestTimeout) => {
275 warn!(
276 "RPC request timed out. Retries left {}. Sleeping for {}s",
277 num_retries_for_timeout,
278 self.sleep_period_for_timeout.as_secs()
279 );
280 num_retries_for_timeout -= 1;
281 tokio::time::sleep(self.sleep_period_for_timeout).await;
282 }
283 Err(e) => return Err(e),
284 }
285 }
286 Err(ReplayEngineError::SuiRpcRequestTimeout)
287 }
288 pub async fn multi_download_latest(
291 &self,
292 objs: &[ObjectID],
293 ) -> Result<Vec<Object>, ReplayEngineError> {
294 let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
295 while num_retries_for_timeout >= 0 {
296 match self.fetcher.multi_get_latest(objs).await {
297 Ok(objs) => return Ok(objs),
298 Err(ReplayEngineError::SuiRpcRequestTimeout) => {
299 warn!(
300 "RPC request timed out. Retries left {}. Sleeping for {}s",
301 num_retries_for_timeout,
302 self.sleep_period_for_timeout.as_secs()
303 );
304 num_retries_for_timeout -= 1;
305 tokio::time::sleep(self.sleep_period_for_timeout).await;
306 }
307 Err(e) => return Err(e),
308 }
309 }
310 Err(ReplayEngineError::SuiRpcRequestTimeout)
311 }
312
313 pub async fn fetch_loaded_child_refs(
314 &self,
315 tx_digest: &TransactionDigest,
316 ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
317 self.fetcher.get_loaded_child_objects(tx_digest).await
319 }
320
321 pub async fn new_from_fn_url(http_url: &str) -> Result<Self, ReplayEngineError> {
322 Self::new_for_remote(
323 SuiClientBuilder::default()
324 .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
325 .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
326 .build(http_url)
327 .await?,
328 None,
329 )
330 .await
331 }
332
333 pub async fn replay_with_network_config(
334 rpc_url: String,
335 tx_digest: TransactionDigest,
336 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
337 use_authority: bool,
338 executor_version: Option<i64>,
339 protocol_version: Option<i64>,
340 config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
341 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
342 info!("Using RPC URL: {}", rpc_url);
343 LocalExec::new_from_fn_url(&rpc_url)
344 .await?
345 .init_for_execution()
346 .await?
347 .execute_transaction(
348 &tx_digest,
349 expensive_safety_check_config,
350 use_authority,
351 executor_version,
352 protocol_version,
353 config_and_versions,
354 )
355 .await
356 }
357
358 pub async fn init_for_execution(mut self) -> Result<Self, ReplayEngineError> {
363 self.populate_protocol_version_tables().await?;
364 tokio::task::yield_now().await;
365 Ok(self)
366 }
367
368 pub async fn reset_for_new_execution_with_client(self) -> Result<Self, ReplayEngineError> {
369 Self::new_for_remote(
370 self.client.expect("Remote client not initialized"),
371 Some(self.fetcher.into_remote()),
372 )
373 .await?
374 .init_for_execution()
375 .await
376 }
377
378 pub async fn new_for_remote(
379 client: SuiClient,
380 remote_fetcher: Option<RemoteFetcher>,
381 ) -> Result<Self, ReplayEngineError> {
382 let registry = prometheus::Registry::new();
384 let metrics = Arc::new(LimitsMetrics::new(®istry));
385
386 let fetcher = remote_fetcher.unwrap_or(RemoteFetcher::new(client.clone()));
387
388 Ok(Self {
389 client: Some(client),
390 protocol_version_epoch_table: BTreeMap::new(),
391 protocol_version_system_package_table: BTreeMap::new(),
392 current_protocol_version: 0,
393 exec_store_events: Arc::new(Mutex::new(Vec::new())),
394 metrics,
395 storage: Storage::default(),
396 fetcher: Fetchers::Remote(fetcher),
397 num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
399 sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
400 executor_version: None,
401 protocol_version: None,
402 config_and_versions: None,
403 })
404 }
405
406 pub async fn new_for_state_dump(
407 path: &str,
408 backup_rpc_url: Option<String>,
409 ) -> Result<Self, ReplayEngineError> {
410 let registry = prometheus::Registry::new();
412 let metrics = Arc::new(LimitsMetrics::new(®istry));
413
414 let state = NodeStateDump::read_from_file(&PathBuf::from(path))?;
415 let current_protocol_version = state.protocol_version;
416 let fetcher = match backup_rpc_url {
417 Some(url) => NodeStateDumpFetcher::new(
418 state,
419 Some(RemoteFetcher::new(
420 SuiClientBuilder::default()
421 .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
422 .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
423 .build(url)
424 .await?,
425 )),
426 ),
427 None => NodeStateDumpFetcher::new(state, None),
428 };
429
430 Ok(Self {
431 client: None,
432 protocol_version_epoch_table: BTreeMap::new(),
433 protocol_version_system_package_table: BTreeMap::new(),
434 current_protocol_version,
435 exec_store_events: Arc::new(Mutex::new(Vec::new())),
436 metrics,
437 storage: Storage::default(),
438 fetcher: Fetchers::NodeStateDump(fetcher),
439 num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
441 sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
442 executor_version: None,
443 protocol_version: None,
444 config_and_versions: None,
445 })
446 }
447
448 pub async fn multi_download_and_store(
449 &mut self,
450 objs: &[(ObjectID, SequenceNumber)],
451 ) -> Result<Vec<Object>, ReplayEngineError> {
452 let objs = self.multi_download(objs).await?;
453
454 for obj in objs.iter() {
456 let o_ref = obj.compute_object_reference();
457 self.storage
458 .live_objects_store
459 .lock()
460 .expect("Can't lock")
461 .insert(o_ref.0, obj.clone());
462 self.storage
463 .object_version_cache
464 .lock()
465 .expect("Cannot lock")
466 .insert((o_ref.0, o_ref.1), obj.clone());
467 if obj.is_package() {
468 self.storage
469 .package_cache
470 .lock()
471 .expect("Cannot lock")
472 .insert(o_ref.0, obj.clone());
473 }
474 }
475 tokio::task::yield_now().await;
476 Ok(objs)
477 }
478
479 pub async fn multi_download_relevant_packages_and_store(
480 &mut self,
481 objs: Vec<ObjectID>,
482 protocol_version: u64,
483 ) -> Result<Vec<Object>, ReplayEngineError> {
484 let syst_packages_objs = if self.protocol_version.is_some_and(|i| i < 0) {
485 BuiltInFramework::genesis_objects().collect()
486 } else {
487 let syst_packages =
488 self.system_package_versions_for_protocol_version(protocol_version)?;
489 self.multi_download(&syst_packages).await?
490 };
491
492 let non_system_package_objs: Vec<_> = objs
495 .into_iter()
496 .filter(|o| !Self::system_package_ids(self.current_protocol_version).contains(o))
497 .collect();
498 let objs = self
499 .multi_download_latest(&non_system_package_objs)
500 .await?
501 .into_iter()
502 .chain(syst_packages_objs.into_iter());
503
504 for obj in objs.clone() {
505 let o_ref = obj.compute_object_reference();
506 self.storage
509 .object_version_cache
510 .lock()
511 .expect("Cannot lock")
512 .insert((o_ref.0, o_ref.1), obj.clone());
513 if obj.is_package() {
514 self.storage
515 .package_cache
516 .lock()
517 .expect("Cannot lock")
518 .insert(o_ref.0, obj.clone());
519 }
520 }
521 Ok(objs.collect())
522 }
523
524 #[allow(clippy::disallowed_methods, clippy::result_large_err)]
526 pub fn download_object(
527 &self,
528 object_id: &ObjectID,
529 version: SequenceNumber,
530 ) -> Result<Object, ReplayEngineError> {
531 if self
532 .storage
533 .object_version_cache
534 .lock()
535 .expect("Cannot lock")
536 .contains_key(&(*object_id, version))
537 {
538 return Ok(self
539 .storage
540 .object_version_cache
541 .lock()
542 .expect("Cannot lock")
543 .get(&(*object_id, version))
544 .ok_or(ReplayEngineError::InternalCacheInvariantViolation {
545 id: *object_id,
546 version: Some(version),
547 })?
548 .clone());
549 }
550
551 let o = block_on(self.multi_download(&[(*object_id, version)])).map(|mut q| {
552 q.pop().unwrap_or_else(|| {
553 panic!(
554 "Downloaded obj response cannot be empty {:?}",
555 (*object_id, version)
556 )
557 })
558 })?;
559
560 let o_ref = o.compute_object_reference();
561 self.storage
562 .object_version_cache
563 .lock()
564 .expect("Cannot lock")
565 .insert((o_ref.0, o_ref.1), o.clone());
566 Ok(o)
567 }
568
569 #[allow(clippy::disallowed_methods, clippy::result_large_err)]
571 pub fn download_latest_object(
572 &self,
573 object_id: &ObjectID,
574 ) -> Result<Option<Object>, ReplayEngineError> {
575 let resp = block_on({
576 self.multi_download_latest(std::slice::from_ref(object_id))
578 })
579 .map(|mut q| {
580 q.pop()
581 .unwrap_or_else(|| panic!("Downloaded obj response cannot be empty {}", *object_id))
582 });
583
584 match resp {
585 Ok(v) => Ok(Some(v)),
586 Err(ReplayEngineError::ObjectNotExist { id }) => {
587 error!(
588 "Could not find object {id} on RPC server. It might have been pruned, deleted, or never existed."
589 );
590 Ok(None)
591 }
592 Err(ReplayEngineError::ObjectDeleted {
593 id,
594 version,
595 digest,
596 }) => {
597 error!("Object {id} {version} {digest} was deleted on RPC server.");
598 Ok(None)
599 }
600 Err(err) => Err(ReplayEngineError::SuiRpcError {
601 err: err.to_string(),
602 }),
603 }
604 }
605
606 #[allow(clippy::disallowed_methods, clippy::result_large_err)]
607 pub fn download_object_by_upper_bound(
608 &self,
609 object_id: &ObjectID,
610 version_upper_bound: VersionNumber,
611 ) -> Result<Option<Object>, ReplayEngineError> {
612 let local_object = self
613 .storage
614 .live_objects_store
615 .lock()
616 .expect("Can't lock")
617 .get(object_id)
618 .cloned();
619 if local_object.is_some() {
620 return Ok(local_object);
621 }
622 let response = block_on({
623 self.fetcher
624 .get_child_object(object_id, version_upper_bound)
625 });
626 match response {
627 Ok(object) => {
628 let obj_ref = object.compute_object_reference();
629 self.storage
630 .live_objects_store
631 .lock()
632 .expect("Can't lock")
633 .insert(*object_id, object.clone());
634 self.storage
635 .object_version_cache
636 .lock()
637 .expect("Can't lock")
638 .insert((obj_ref.0, obj_ref.1), object.clone());
639 Ok(Some(object))
640 }
641 Err(ReplayEngineError::ObjectNotExist { id }) => {
642 error!(
643 "Could not find child object {id} on RPC server. It might have been pruned, deleted, or never existed."
644 );
645 Ok(None)
646 }
647 Err(ReplayEngineError::ObjectDeleted {
648 id,
649 version,
650 digest,
651 }) => {
652 error!("Object {id} {version} {digest} was deleted on RPC server.");
653 Ok(None)
654 }
655 Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
658 info!(
659 "Object {id} {version} not found on RPC server -- this may have been pruned or never existed."
660 );
661 Ok(None)
662 }
663 Err(err) => Err(ReplayEngineError::SuiRpcError {
664 err: err.to_string(),
665 }),
666 }
667 }
668
669 pub async fn get_checkpoint_txs(
670 &self,
671 checkpoint_id: u64,
672 ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
673 self.fetcher
674 .get_checkpoint_txs(checkpoint_id)
675 .await
676 .map_err(|e| ReplayEngineError::SuiRpcError { err: e.to_string() })
677 }
678
679 pub async fn execute_all_in_checkpoints(
680 &mut self,
681 checkpoint_ids: &[u64],
682 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
683 terminate_early: bool,
684 use_authority: bool,
685 ) -> Result<(u64, u64), ReplayEngineError> {
686 let mut txs = Vec::new();
688 for checkpoint_id in checkpoint_ids {
689 txs.extend(self.get_checkpoint_txs(*checkpoint_id).await?);
690 }
691 let num = txs.len();
692 let mut succeeded = 0;
693 for tx in txs {
694 match self
695 .execute_transaction(
696 &tx,
697 expensive_safety_check_config.clone(),
698 use_authority,
699 None,
700 None,
701 None,
702 )
703 .await
704 .map(|q| q.check_effects())
705 {
706 Err(e) | Ok(Err(e)) => {
707 if terminate_early {
708 return Err(e);
709 }
710 error!("Error executing tx: {}, {:#?}", tx, e);
711 continue;
712 }
713 _ => (),
714 }
715
716 succeeded += 1;
717 }
718 Ok((succeeded, num as u64))
719 }
720
721 pub async fn execution_engine_execute_with_tx_info_impl(
722 &mut self,
723 tx_info: &OnChainTransactionInfo,
724 override_transaction_kind: Option<TransactionKind>,
725 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
726 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
727 let tx_digest = &tx_info.tx_digest;
728 if tx_info.protocol_version.as_u64() < 16 {
731 warn!(
732 "Protocol version ({:?}) too old: {}, skipping transaction",
733 tx_info.protocol_version, tx_digest
734 );
735 return Err(ReplayEngineError::TransactionNotSupported {
736 digest: *tx_digest,
737 reason: "Protocol version too old".to_string(),
738 });
739 }
740 let input_objects = self.initialize_execution_env_state(tx_info).await?;
743 assert_eq!(
744 &input_objects.filter_shared_objects().len(),
745 &tx_info.shared_object_refs.len()
746 );
747 let protocol_config =
751 &ProtocolConfig::get_for_version(tx_info.protocol_version, tx_info.chain);
752
753 let metrics = self.metrics.clone();
754
755 let ov = self.executor_version;
756
757 let executor = get_executor(ov, protocol_config, expensive_safety_check_config);
759
760 let expensive_checks = true;
762 let transaction_kind = override_transaction_kind.unwrap_or(tx_info.kind.clone());
763 let gas_status = if tx_info.kind.is_system_tx() {
764 SuiGasStatus::new_unmetered()
765 } else {
766 SuiGasStatus::new(
767 tx_info.gas_budget,
768 tx_info.gas_price,
769 tx_info.reference_gas_price,
770 protocol_config,
771 )
772 .expect("Failed to create gas status")
773 };
774 let gas_data = GasData {
775 payment: tx_info.gas.clone(),
776 owner: tx_info.gas_owner.unwrap_or(tx_info.sender),
777 price: tx_info.gas_price,
778 budget: tx_info.gas_budget,
779 };
780 let checked_input_objects = CheckedInputObjects::new_for_replay(input_objects.clone());
781 let early_execution_error = get_early_execution_error(
782 tx_digest,
783 &checked_input_objects,
784 &HashSet::new(),
785 &BalanceWithdrawStatus::MaybeSufficient,
787 );
788 let execution_params = match early_execution_error {
789 Some(error) => ExecutionOrEarlyError::Err(error),
790 None => ExecutionOrEarlyError::Ok(()),
791 };
792 let (inner_store, gas_status, effects, _timings, result) = executor
793 .execute_transaction_to_effects(
794 &self,
795 protocol_config,
796 metrics.clone(),
797 expensive_checks,
798 execution_params,
799 &tx_info.executed_epoch,
800 tx_info.epoch_start_timestamp,
801 checked_input_objects,
802 gas_data,
803 gas_status,
804 transaction_kind.clone(),
805 tx_info.sender,
806 *tx_digest,
807 &mut None,
808 );
809
810 if let Err(err) = self.pretty_print_for_tracing(
811 &gas_status,
812 &executor,
813 tx_info,
814 &transaction_kind,
815 protocol_config,
816 metrics,
817 expensive_checks,
818 input_objects.clone(),
819 ) {
820 error!("Failed to pretty print for tracing: {:?}", err);
821 }
822
823 let all_required_objects = self.storage.all_objects();
824
825 let effects =
826 SuiTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
827
828 Ok(ExecutionSandboxState {
829 transaction_info: tx_info.clone(),
830 required_objects: all_required_objects,
831 local_exec_temporary_store: Some(inner_store),
832 local_exec_effects: effects,
833 local_exec_status: Some(result),
834 })
835 }
836
837 fn pretty_print_for_tracing(
838 &self,
839 gas_status: &SuiGasStatus,
840 executor: &Arc<dyn Executor + Send + Sync>,
841 tx_info: &OnChainTransactionInfo,
842 transaction_kind: &TransactionKind,
843 protocol_config: &ProtocolConfig,
844 metrics: Arc<LimitsMetrics>,
845 expensive_checks: bool,
846 input_objects: InputObjects,
847 ) -> anyhow::Result<()> {
848 trace!(target: "replay_gas_info", "{}", Pretty(gas_status));
849
850 let skip_checks = true;
851 let gas_data = GasData {
852 payment: tx_info.gas.clone(),
853 owner: tx_info.gas_owner.unwrap_or(tx_info.sender),
854 price: tx_info.gas_price,
855 budget: tx_info.gas_budget,
856 };
857 let checked_input_objects = CheckedInputObjects::new_for_replay(input_objects.clone());
858 let early_execution_error = get_early_execution_error(
859 &tx_info.tx_digest,
860 &checked_input_objects,
861 &HashSet::new(),
862 &BalanceWithdrawStatus::MaybeSufficient,
864 );
865 let execution_params = match early_execution_error {
866 Some(error) => ExecutionOrEarlyError::Err(error),
867 None => ExecutionOrEarlyError::Ok(()),
868 };
869 if let ProgrammableTransaction(pt) = transaction_kind {
870 trace!(
871 target: "replay_ptb_info",
872 "{}",
873 Pretty(&FullPTB {
874 ptb: pt.clone(),
875 results: transform_command_results_to_annotated(
876 executor,
877 &self.clone(),
878 executor.dev_inspect_transaction(
879 &self,
880 protocol_config,
881 metrics,
882 expensive_checks,
883 execution_params,
884 &tx_info.executed_epoch,
885 tx_info.epoch_start_timestamp,
886 CheckedInputObjects::new_for_replay(input_objects),
887 gas_data,
888 SuiGasStatus::new(
889 tx_info.gas_budget,
890 tx_info.gas_price,
891 tx_info.reference_gas_price,
892 protocol_config,
893 )?,
894 transaction_kind.clone(),
895 tx_info.sender,
896 tx_info.sender_signed_data.digest(),
897 skip_checks,
898 )
899 .3
900 .unwrap_or_default(),
901 )?,
902 }));
903 }
904 Ok(())
905 }
906
907 #[allow(clippy::result_large_err)]
909 pub async fn execution_engine_execute_impl(
910 &mut self,
911 tx_digest: &TransactionDigest,
912 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
913 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
914 if self.is_remote_replay() {
915 assert!(
916 !self.protocol_version_system_package_table.is_empty()
917 || !self.protocol_version_epoch_table.is_empty(),
918 "Required tables not populated. Must call `init_for_execution` before executing transactions"
919 );
920 }
921
922 let tx_info = if self.is_remote_replay() {
923 self.resolve_tx_components(tx_digest).await?
924 } else {
925 self.resolve_tx_components_from_dump(tx_digest).await?
926 };
927 self.execution_engine_execute_with_tx_info_impl(
928 &tx_info,
929 None,
930 expensive_safety_check_config,
931 )
932 .await
933 }
934
935 #[allow(clippy::result_large_err)]
939 pub async fn certificate_execute_with_sandbox_state(
940 pre_run_sandbox: &ExecutionSandboxState,
941 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
942 let executed_epoch = pre_run_sandbox.transaction_info.executed_epoch;
944 let reference_gas_price = pre_run_sandbox.transaction_info.reference_gas_price;
945 let epoch_start_timestamp = pre_run_sandbox.transaction_info.epoch_start_timestamp;
946 let protocol_config = ProtocolConfig::get_for_version(
947 pre_run_sandbox.transaction_info.protocol_version,
948 pre_run_sandbox.transaction_info.chain,
949 );
950 let required_objects = pre_run_sandbox.required_objects.clone();
951 let store = InMemoryStorage::new(required_objects.clone());
952
953 let transaction =
954 Transaction::new(pre_run_sandbox.transaction_info.sender_signed_data.clone());
955
956 let input_objects = store.read_input_objects_for_transaction(&transaction);
961 let executable = VerifiedExecutableTransaction::new_from_quorum_execution(
962 VerifiedTransaction::new_unchecked(transaction),
963 executed_epoch,
964 );
965 let (gas_status, input_objects) = sui_transaction_checks::check_certificate_input(
966 &executable,
967 input_objects,
968 &protocol_config,
969 reference_gas_price,
970 )
971 .unwrap();
972 let (kind, signer, gas_data) = executable.transaction_data().execution_parts();
973 let executor = sui_execution::executor(&protocol_config, true).unwrap();
974 let early_execution_error = get_early_execution_error(
975 executable.digest(),
976 &input_objects,
977 &HashSet::new(),
978 &BalanceWithdrawStatus::MaybeSufficient,
980 );
981 let execution_params = match early_execution_error {
982 Some(error) => ExecutionOrEarlyError::Err(error),
983 None => ExecutionOrEarlyError::Ok(()),
984 };
985 let (_, _, effects, _timings, exec_res) = executor.execute_transaction_to_effects(
986 &store,
987 &protocol_config,
988 Arc::new(LimitsMetrics::new(&Registry::new())),
989 true,
990 execution_params,
991 &executed_epoch,
992 epoch_start_timestamp,
993 input_objects,
994 gas_data,
995 gas_status,
996 kind,
997 signer,
998 *executable.digest(),
999 &mut None,
1000 );
1001
1002 let effects =
1003 SuiTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
1004
1005 Ok(ExecutionSandboxState {
1006 transaction_info: pre_run_sandbox.transaction_info.clone(),
1007 required_objects,
1008 local_exec_temporary_store: None, local_exec_effects: effects,
1010 local_exec_status: Some(exec_res),
1011 })
1012 }
1013
1014 #[allow(clippy::result_large_err)]
1017 pub async fn certificate_execute(
1018 &mut self,
1019 tx_digest: &TransactionDigest,
1020 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1021 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1022 let pre_run_sandbox = self
1024 .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
1025 .await?;
1026 Self::certificate_execute_with_sandbox_state(&pre_run_sandbox).await
1027 }
1028
1029 #[allow(clippy::result_large_err)]
1032 pub async fn execution_engine_execute(
1033 &mut self,
1034 tx_digest: &TransactionDigest,
1035 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1036 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1037 let sandbox_state = self
1038 .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
1039 .await?;
1040
1041 Ok(sandbox_state)
1042 }
1043
1044 #[allow(clippy::result_large_err)]
1045 pub async fn execute_state_dump(
1046 &mut self,
1047 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1048 ) -> Result<(ExecutionSandboxState, NodeStateDump), ReplayEngineError> {
1049 assert!(!self.is_remote_replay());
1050
1051 let d = match self.fetcher.clone() {
1052 Fetchers::NodeStateDump(d) => d,
1053 _ => panic!("Invalid fetcher for state dump"),
1054 };
1055 let tx_digest = d.node_state_dump.clone().tx_digest;
1056 let sandbox_state = self
1057 .execution_engine_execute_impl(&tx_digest, expensive_safety_check_config)
1058 .await?;
1059
1060 Ok((sandbox_state, d.node_state_dump))
1061 }
1062
1063 #[allow(clippy::result_large_err)]
1064 pub async fn execute_transaction(
1065 &mut self,
1066 tx_digest: &TransactionDigest,
1067 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1068 use_authority: bool,
1069 executor_version: Option<i64>,
1070 protocol_version: Option<i64>,
1071 config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
1072 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1073 self.executor_version = executor_version;
1074 self.protocol_version = protocol_version;
1075 self.config_and_versions = config_and_versions;
1076 if use_authority {
1077 self.certificate_execute(tx_digest, expensive_safety_check_config.clone())
1078 .await
1079 } else {
1080 self.execution_engine_execute(tx_digest, expensive_safety_check_config)
1081 .await
1082 }
1083 }
1084 fn system_package_ids(protocol_version: u64) -> Vec<ObjectID> {
1085 let mut ids = BuiltInFramework::all_package_ids();
1086
1087 if protocol_version < 5 {
1088 ids.retain(|id| *id != DEEPBOOK_PACKAGE_ID)
1089 }
1090 ids
1091 }
1092
1093 #[allow(clippy::result_large_err)]
1095 pub fn get_or_download_object(
1096 &self,
1097 obj_id: &ObjectID,
1098 package_expected: bool,
1099 ) -> Result<Option<Object>, ReplayEngineError> {
1100 if package_expected {
1101 if let Some(obj) = self
1102 .storage
1103 .package_cache
1104 .lock()
1105 .expect("Cannot lock")
1106 .get(obj_id)
1107 {
1108 return Ok(Some(obj.clone()));
1109 };
1110 } else if let Some(obj) = self
1117 .storage
1118 .live_objects_store
1119 .lock()
1120 .expect("Can't lock")
1121 .get(obj_id)
1122 {
1123 return Ok(Some(obj.clone()));
1124 }
1125
1126 let Some(o) = self.download_latest_object(obj_id)? else {
1127 return Ok(None);
1128 };
1129
1130 if o.is_package() {
1131 assert!(
1132 package_expected,
1133 "Did not expect package but downloaded object is a package: {obj_id}"
1134 );
1135
1136 self.storage
1137 .package_cache
1138 .lock()
1139 .expect("Cannot lock")
1140 .insert(*obj_id, o.clone());
1141 }
1142 let o_ref = o.compute_object_reference();
1143 self.storage
1144 .object_version_cache
1145 .lock()
1146 .expect("Cannot lock")
1147 .insert((o_ref.0, o_ref.1), o.clone());
1148 Ok(Some(o))
1149 }
1150
1151 pub fn is_remote_replay(&self) -> bool {
1152 matches!(self.fetcher, Fetchers::Remote(_))
1153 }
1154
1155 #[allow(clippy::result_large_err)]
1157 pub fn system_package_versions_for_protocol_version(
1158 &self,
1159 protocol_version: u64,
1160 ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
1161 match &self.fetcher {
1162 Fetchers::Remote(_) => Ok(self
1163 .protocol_version_system_package_table
1164 .get(&protocol_version)
1165 .ok_or(ReplayEngineError::FrameworkObjectVersionTableNotPopulated {
1166 protocol_version,
1167 })?
1168 .clone()
1169 .into_iter()
1170 .collect()),
1171
1172 Fetchers::NodeStateDump(d) => Ok(d
1173 .node_state_dump
1174 .relevant_system_packages
1175 .iter()
1176 .map(|w| (w.id, w.version, w.digest))
1177 .map(|q| (q.0, q.1))
1178 .collect()),
1179 }
1180 }
1181
1182 pub async fn protocol_ver_to_epoch_map(
1183 &self,
1184 ) -> Result<BTreeMap<u64, ProtocolVersionSummary>, ReplayEngineError> {
1185 let mut range_map = BTreeMap::new();
1186 let epoch_change_events = self.fetcher.get_epoch_change_events(false).await?;
1187
1188 let mut tx_digest = *self
1190 .fetcher
1191 .get_checkpoint_txs(0)
1192 .await?
1193 .first()
1194 .expect("Genesis TX must be in first checkpoint");
1195 let (mut start_epoch, mut start_protocol_version, mut start_checkpoint) =
1198 (0, 1, Some(0u64));
1199
1200 let (mut curr_epoch, mut curr_protocol_version, mut curr_checkpoint) =
1201 (start_epoch, start_protocol_version, start_checkpoint);
1202
1203 (start_epoch, start_protocol_version, start_checkpoint) =
1204 (curr_epoch, curr_protocol_version, curr_checkpoint);
1205
1206 let mut end_epoch_tx_digest = tx_digest;
1208
1209 for event in epoch_change_events {
1210 (curr_epoch, curr_protocol_version) = extract_epoch_and_version(event.clone())?;
1211 end_epoch_tx_digest = event.id.tx_digest;
1212
1213 if start_protocol_version == curr_protocol_version {
1214 continue;
1216 }
1217
1218 curr_checkpoint = self
1221 .fetcher
1222 .get_transaction(&event.id.tx_digest)
1223 .await?
1224 .checkpoint;
1225 range_map.insert(
1227 start_protocol_version,
1228 ProtocolVersionSummary {
1229 protocol_version: start_protocol_version,
1230 epoch_start: start_epoch,
1231 epoch_end: curr_epoch - 1,
1232 checkpoint_start: start_checkpoint,
1233 checkpoint_end: curr_checkpoint.map(|x| x - 1),
1234 epoch_change_tx: tx_digest,
1235 },
1236 );
1237
1238 start_epoch = curr_epoch;
1239 start_protocol_version = curr_protocol_version;
1240 tx_digest = event.id.tx_digest;
1241 start_checkpoint = curr_checkpoint;
1242 }
1243
1244 range_map.insert(
1246 curr_protocol_version,
1247 ProtocolVersionSummary {
1248 protocol_version: curr_protocol_version,
1249 epoch_start: start_epoch,
1250 epoch_end: curr_epoch,
1251 checkpoint_start: curr_checkpoint,
1252 checkpoint_end: self
1253 .fetcher
1254 .get_transaction(&end_epoch_tx_digest)
1255 .await?
1256 .checkpoint,
1257 epoch_change_tx: tx_digest,
1258 },
1259 );
1260
1261 Ok(range_map)
1262 }
1263
1264 pub fn protocol_version_for_epoch(
1265 epoch: u64,
1266 mp: &BTreeMap<u64, (TransactionDigest, u64, u64)>,
1267 ) -> u64 {
1268 let mut version = 1;
1271 for (k, v) in mp.iter().rev() {
1272 if v.1 <= epoch {
1273 version = *k;
1274 break;
1275 }
1276 }
1277 version
1278 }
1279
1280 pub async fn populate_protocol_version_tables(&mut self) -> Result<(), ReplayEngineError> {
1281 self.protocol_version_epoch_table = self.protocol_ver_to_epoch_map().await?;
1282
1283 let system_package_revisions = self.system_package_versions().await?;
1284
1285 for (
1288 prot_ver,
1289 ProtocolVersionSummary {
1290 epoch_change_tx: tx_digest,
1291 ..
1292 },
1293 ) in self.protocol_version_epoch_table.clone()
1294 {
1295 let mut working = if prot_ver <= 1 {
1297 BTreeMap::new()
1298 } else {
1299 self.protocol_version_system_package_table
1300 .iter()
1301 .rev()
1302 .find(|(ver, _)| **ver <= prot_ver)
1303 .expect("Prev entry must exist")
1304 .1
1305 .clone()
1306 };
1307
1308 for (id, versions) in system_package_revisions.iter() {
1309 for ver in versions.iter().rev() {
1311 if ver.1 == tx_digest {
1312 working.insert(*id, ver.0);
1314 break;
1315 }
1316 }
1317 }
1318 self.protocol_version_system_package_table
1319 .insert(prot_ver, working);
1320 }
1321 Ok(())
1322 }
1323
1324 pub async fn system_package_versions(
1325 &self,
1326 ) -> Result<BTreeMap<ObjectID, Vec<(SequenceNumber, TransactionDigest)>>, ReplayEngineError>
1327 {
1328 let system_package_ids = Self::system_package_ids(
1329 *self
1330 .protocol_version_epoch_table
1331 .keys()
1332 .peekable()
1333 .last()
1334 .expect("Protocol version epoch table not populated"),
1335 );
1336 let mut system_package_objs = self.multi_download_latest(&system_package_ids).await?;
1337
1338 let mut mapping = BTreeMap::new();
1339
1340 while !system_package_objs.is_empty() {
1342 let previous_txs: Vec<_> = system_package_objs
1344 .iter()
1345 .map(|o| (o.compute_object_reference(), o.previous_transaction))
1346 .collect();
1347
1348 previous_txs.iter().for_each(|((id, ver, _), tx)| {
1349 mapping.entry(*id).or_insert(vec![]).push((*ver, *tx));
1350 });
1351
1352 let previous_ver_refs: Vec<_> = previous_txs
1355 .iter()
1356 .filter_map(|(q, _)| {
1357 let prev_ver = u64::from(q.1) - 1;
1358 if prev_ver == 0 {
1359 None
1360 } else {
1361 Some((q.0, SequenceNumber::from(prev_ver)))
1362 }
1363 })
1364 .collect();
1365 system_package_objs = match self.multi_download(&previous_ver_refs).await {
1366 Ok(packages) => packages,
1367 Err(ReplayEngineError::ObjectNotExist { id }) => {
1368 warn!(
1372 "Object {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1373 id
1374 );
1375 break;
1376 }
1377 Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
1378 warn!(
1382 "Object {} at version {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1383 id, version
1384 );
1385 break;
1386 }
1387 Err(ReplayEngineError::ObjectVersionTooHigh {
1388 id,
1389 asked_version,
1390 latest_version,
1391 }) => {
1392 warn!(
1393 "Object {} at version {} does not exist on RPC server. Latest version is {}. This might be due to pruning. Historical replays might not work",
1394 id, asked_version, latest_version
1395 );
1396 break;
1397 }
1398 Err(ReplayEngineError::ObjectDeleted {
1399 id,
1400 version,
1401 digest,
1402 }) => {
1403 warn!(
1407 "Object {} at version {} digest {} deleted from RPC server. This might be due to pruning. Historical replays might not work",
1408 id, version, digest
1409 );
1410 break;
1411 }
1412 Err(e) => return Err(e),
1413 };
1414 }
1415 Ok(mapping)
1416 }
1417
1418 pub async fn get_protocol_config(
1419 &self,
1420 epoch_id: EpochId,
1421 chain: Chain,
1422 ) -> Result<ProtocolConfig, ReplayEngineError> {
1423 match self.protocol_version {
1424 Some(x) if x < 0 => Ok(ProtocolConfig::get_for_max_version_UNSAFE()),
1425 Some(v) => Ok(ProtocolConfig::get_for_version((v as u64).into(), chain)),
1426 None => self
1427 .protocol_version_epoch_table
1428 .iter()
1429 .rev()
1430 .find(|(_, rg)| epoch_id >= rg.epoch_start)
1431 .map(|(p, _rg)| Ok(ProtocolConfig::get_for_version((*p).into(), chain)))
1432 .unwrap_or_else(|| {
1433 Err(ReplayEngineError::ProtocolVersionNotFound { epoch: epoch_id })
1434 }),
1435 }
1436 }
1437
1438 pub async fn checkpoints_for_epoch(
1439 &self,
1440 epoch_id: u64,
1441 ) -> Result<(u64, u64), ReplayEngineError> {
1442 let epoch_change_events = self
1443 .fetcher
1444 .get_epoch_change_events(true)
1445 .await?
1446 .into_iter()
1447 .collect::<Vec<_>>();
1448 let (start_checkpoint, start_epoch_idx) = if epoch_id == 0 {
1449 (0, 1)
1450 } else {
1451 let idx = epoch_change_events
1452 .iter()
1453 .position(|ev| match extract_epoch_and_version(ev.clone()) {
1454 Ok((epoch, _)) => epoch == epoch_id,
1455 Err(_) => false,
1456 })
1457 .ok_or(ReplayEngineError::EventNotFound { epoch: epoch_id })?;
1458 let epoch_change_tx = epoch_change_events[idx].id.tx_digest;
1459 (
1460 self.fetcher
1461 .get_transaction(&epoch_change_tx)
1462 .await?
1463 .checkpoint
1464 .unwrap_or_else(|| {
1465 panic!(
1466 "Checkpoint for transaction {} not present. Could be due to pruning",
1467 epoch_change_tx
1468 )
1469 }),
1470 idx,
1471 )
1472 };
1473
1474 let next_epoch_change_tx = epoch_change_events
1475 .get(start_epoch_idx + 1)
1476 .map(|v| v.id.tx_digest)
1477 .ok_or(ReplayEngineError::UnableToDetermineCheckpoint { epoch: epoch_id })?;
1478
1479 let next_epoch_checkpoint = self
1480 .fetcher
1481 .get_transaction(&next_epoch_change_tx)
1482 .await?
1483 .checkpoint
1484 .unwrap_or_else(|| {
1485 panic!(
1486 "Checkpoint for transaction {} not present. Could be due to pruning",
1487 next_epoch_change_tx
1488 )
1489 });
1490
1491 Ok((start_checkpoint, next_epoch_checkpoint - 1))
1492 }
1493
1494 pub async fn get_epoch_start_timestamp_and_rgp(
1495 &self,
1496 epoch_id: u64,
1497 tx_digest: &TransactionDigest,
1498 ) -> Result<(u64, u64), ReplayEngineError> {
1499 if epoch_id == 0 {
1500 return Err(ReplayEngineError::TransactionNotSupported {
1501 digest: *tx_digest,
1502 reason: "Transactions from epoch 0 not supported".to_string(),
1503 });
1504 }
1505 self.fetcher
1506 .get_epoch_start_timestamp_and_rgp(epoch_id)
1507 .await
1508 }
1509
1510 fn add_config_objects_if_needed(
1511 &self,
1512 status: &SuiExecutionStatus,
1513 ) -> Vec<(ObjectID, SequenceNumber)> {
1514 match parse_effect_error_for_denied_coins(status) {
1515 Some(coin_type) => {
1516 let Some(mut config_id_and_version) = self.config_and_versions.clone() else {
1517 panic!(
1518 "Need to specify the config object ID and version for '{coin_type}' in order to replay this transaction"
1519 );
1520 };
1521 if !config_id_and_version
1523 .iter()
1524 .any(|(id, _)| id == &SUI_DENY_LIST_OBJECT_ID)
1525 {
1526 let deny_list_oid_version = self.download_latest_object(&SUI_DENY_LIST_OBJECT_ID)
1527 .ok()
1528 .flatten()
1529 .expect("Unable to download the deny list object for a transaction that requires it")
1530 .version();
1531 config_id_and_version.push((SUI_DENY_LIST_OBJECT_ID, deny_list_oid_version));
1532 }
1533 config_id_and_version
1534 }
1535 None => vec![],
1536 }
1537 }
1538
1539 async fn resolve_tx_components(
1540 &self,
1541 tx_digest: &TransactionDigest,
1542 ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1543 assert!(self.is_remote_replay());
1544 let tx_info = self.fetcher.get_transaction(tx_digest).await?;
1546 let sender = match tx_info.clone().transaction.unwrap().data {
1547 sui_json_rpc_types::SuiTransactionBlockData::V1(tx) => tx.sender,
1548 };
1549 let SuiTransactionBlockEffects::V1(effects) = tx_info.clone().effects.unwrap();
1550
1551 let config_objects = self.add_config_objects_if_needed(effects.status());
1552
1553 let raw_tx_bytes = tx_info.clone().raw_transaction;
1554 let orig_tx: SenderSignedData = bcs::from_bytes(&raw_tx_bytes).unwrap();
1555 let input_objs = orig_tx
1556 .transaction_data()
1557 .input_objects()
1558 .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1559 let tx_kind_orig = orig_tx.transaction_data().kind();
1560
1561 let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1563
1564 let shared_object_refs: Vec<ObjectRef> = effects
1565 .shared_objects()
1566 .iter()
1567 .map(|so_ref| {
1568 if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1569 unimplemented!(
1570 "Replay of deleted shared object transactions is not supported yet"
1571 );
1572 } else {
1573 so_ref.to_object_ref()
1574 }
1575 })
1576 .collect();
1577 let gas_data = match tx_info.clone().transaction.unwrap().data {
1578 sui_json_rpc_types::SuiTransactionBlockData::V1(tx) => tx.gas_data,
1579 };
1580 let gas_object_refs: Vec<_> = gas_data
1581 .payment
1582 .iter()
1583 .map(|obj_ref| obj_ref.to_object_ref())
1584 .collect();
1585 let receiving_objs = orig_tx
1586 .transaction_data()
1587 .receiving_objects()
1588 .into_iter()
1589 .map(|(obj_id, version, _)| (obj_id, version))
1590 .collect();
1591
1592 let epoch_id = effects.executed_epoch;
1593 let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1594
1595 let (epoch_start_timestamp, reference_gas_price) = self
1597 .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1598 .await?;
1599
1600 Ok(OnChainTransactionInfo {
1601 kind: tx_kind_orig.clone(),
1602 sender,
1603 modified_at_versions,
1604 input_objects: input_objs,
1605 shared_object_refs,
1606 gas: gas_object_refs,
1607 gas_owner: (gas_data.owner != sender).then_some(gas_data.owner),
1608 gas_price: gas_data.price,
1609 gas_budget: gas_data.budget,
1610 executed_epoch: epoch_id,
1611 dependencies: effects.dependencies().to_vec(),
1612 effects: SuiTransactionBlockEffects::V1(effects),
1613 receiving_objs,
1614 config_objects,
1615 protocol_version: self.get_protocol_config(epoch_id, chain).await?.version,
1618 tx_digest: *tx_digest,
1619 epoch_start_timestamp,
1620 sender_signed_data: orig_tx.clone(),
1621 reference_gas_price,
1622 chain,
1623 })
1624 }
1625
1626 async fn resolve_tx_components_from_dump(
1627 &self,
1628 tx_digest: &TransactionDigest,
1629 ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1630 assert!(!self.is_remote_replay());
1631
1632 let dp = self.fetcher.as_node_state_dump();
1633
1634 let sender = dp
1635 .node_state_dump
1636 .sender_signed_data
1637 .transaction_data()
1638 .sender();
1639 let orig_tx = dp.node_state_dump.sender_signed_data.clone();
1640 let effects = dp.node_state_dump.computed_effects.clone();
1641 let effects = SuiTransactionBlockEffects::try_from(effects).unwrap();
1642 let config_objects = self.add_config_objects_if_needed(effects.status());
1644
1645 let input_objs = orig_tx
1649 .transaction_data()
1650 .input_objects()
1651 .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1652 let tx_kind_orig = orig_tx.transaction_data().kind();
1653
1654 let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1656
1657 let shared_object_refs: Vec<ObjectRef> = effects
1658 .shared_objects()
1659 .iter()
1660 .map(|so_ref| {
1661 if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1662 unimplemented!(
1663 "Replay of deleted shared object transactions is not supported yet"
1664 );
1665 } else {
1666 so_ref.to_object_ref()
1667 }
1668 })
1669 .collect();
1670 let receiving_objs = orig_tx
1671 .transaction_data()
1672 .receiving_objects()
1673 .into_iter()
1674 .map(|(obj_id, version, _)| (obj_id, version))
1675 .collect();
1676
1677 let epoch_id = dp.node_state_dump.executed_epoch;
1678
1679 let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1680
1681 let protocol_config =
1682 ProtocolConfig::get_for_version(dp.node_state_dump.protocol_version.into(), chain);
1683 let (epoch_start_timestamp, reference_gas_price) = self
1685 .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1686 .await?;
1687 let gas_data = orig_tx.transaction_data().gas_data();
1688 let gas_object_refs: Vec<_> = gas_data.clone().payment.into_iter().collect();
1689
1690 Ok(OnChainTransactionInfo {
1691 kind: tx_kind_orig.clone(),
1692 sender,
1693 modified_at_versions,
1694 input_objects: input_objs,
1695 shared_object_refs,
1696 gas: gas_object_refs,
1697 gas_owner: (gas_data.owner != sender).then_some(gas_data.owner),
1698 gas_price: gas_data.price,
1699 gas_budget: gas_data.budget,
1700 executed_epoch: epoch_id,
1701 dependencies: effects.dependencies().to_vec(),
1702 effects,
1703 receiving_objs,
1704 config_objects,
1705 protocol_version: protocol_config.version,
1706 tx_digest: *tx_digest,
1707 epoch_start_timestamp,
1708 sender_signed_data: orig_tx.clone(),
1709 reference_gas_price,
1710 chain,
1711 })
1712 }
1713
1714 async fn resolve_download_input_objects(
1715 &mut self,
1716 tx_info: &OnChainTransactionInfo,
1717 deleted_shared_objects: Vec<ObjectRef>,
1718 ) -> Result<InputObjects, ReplayEngineError> {
1719 let mut package_inputs = vec![];
1721 let mut imm_owned_inputs = vec![];
1722 let mut shared_inputs = vec![];
1723 let mut deleted_shared_info_map = BTreeMap::new();
1724
1725 if !deleted_shared_objects.is_empty() {
1728 for tx_digest in tx_info.dependencies.iter() {
1729 let tx_info = self.resolve_tx_components(tx_digest).await?;
1730 for (obj_id, version, _) in tx_info.shared_object_refs.iter() {
1731 deleted_shared_info_map.insert(*obj_id, (tx_info.tx_digest, *version));
1732 }
1733 }
1734 }
1735
1736 tx_info
1737 .input_objects
1738 .iter()
1739 .map(|kind| match kind {
1740 InputObjectKind::MovePackage(i) => {
1741 package_inputs.push(*i);
1742 Ok(())
1743 }
1744 InputObjectKind::ImmOrOwnedMoveObject(o_ref) => {
1745 imm_owned_inputs.push((o_ref.0, o_ref.1));
1746 Ok(())
1747 }
1748 InputObjectKind::SharedMoveObject {
1749 id,
1750 initial_shared_version: _,
1751 mutability: _,
1752 } if !deleted_shared_info_map.contains_key(id) => {
1753 if let Some(o) = self
1755 .storage
1756 .live_objects_store
1757 .lock()
1758 .expect("Can't lock")
1759 .get(id)
1760 {
1761 shared_inputs.push(o.clone());
1762 Ok(())
1763 } else {
1764 Err(ReplayEngineError::InternalCacheInvariantViolation {
1765 id: *id,
1766 version: None,
1767 })
1768 }
1769 }
1770 _ => Ok(()),
1771 })
1772 .collect::<Result<Vec<_>, _>>()?;
1773
1774 let mut in_objs = self.multi_download_and_store(&imm_owned_inputs).await?;
1776
1777 in_objs.extend(
1780 self.multi_download_relevant_packages_and_store(
1781 package_inputs,
1782 tx_info.protocol_version.as_u64(),
1783 )
1784 .await?,
1785 );
1786 in_objs.extend(shared_inputs);
1788
1789 let resolved_input_objs = tx_info
1791 .input_objects
1792 .iter()
1793 .flat_map(|kind| match kind {
1794 InputObjectKind::MovePackage(i) => {
1795 Some(ObjectReadResult::new(
1797 *kind,
1798 self.storage
1799 .package_cache
1800 .lock()
1801 .expect("Cannot lock")
1802 .get(i)
1803 .unwrap_or(
1804 &self
1805 .download_latest_object(i)
1806 .expect("Object download failed")
1807 .expect("Object not found on chain"),
1808 )
1809 .clone()
1810 .into(),
1811 ))
1812 }
1813 InputObjectKind::ImmOrOwnedMoveObject(o_ref) => Some(ObjectReadResult::new(
1814 *kind,
1815 self.storage
1816 .object_version_cache
1817 .lock()
1818 .expect("Cannot lock")
1819 .get(&(o_ref.0, o_ref.1))
1820 .unwrap()
1821 .clone()
1822 .into(),
1823 )),
1824 InputObjectKind::SharedMoveObject { id, .. }
1825 if !deleted_shared_info_map.contains_key(id) =>
1826 {
1827 Some(ObjectReadResult::new(
1829 *kind,
1830 self.storage
1831 .live_objects_store
1832 .lock()
1833 .expect("Can't lock")
1834 .get(id)
1835 .unwrap()
1836 .clone()
1837 .into(),
1838 ))
1839 }
1840 InputObjectKind::SharedMoveObject { id, .. } => {
1841 let (digest, version) = deleted_shared_info_map.get(id).unwrap();
1842 Some(ObjectReadResult::new(
1843 *kind,
1844 ObjectReadResultKind::ObjectConsensusStreamEnded(*version, *digest),
1845 ))
1846 }
1847 })
1848 .collect();
1849
1850 Ok(InputObjects::new(resolved_input_objs))
1851 }
1852
1853 async fn initialize_execution_env_state(
1856 &mut self,
1857 tx_info: &OnChainTransactionInfo,
1858 ) -> Result<InputObjects, ReplayEngineError> {
1859 self.current_protocol_version = tx_info.protocol_version.as_u64();
1861
1862 self.multi_download_and_store(&tx_info.modified_at_versions)
1864 .await?;
1865
1866 let (shared_refs, deleted_shared_refs): (Vec<ObjectRef>, Vec<ObjectRef>) = tx_info
1867 .shared_object_refs
1868 .iter()
1869 .partition(|r| r.2 != ObjectDigest::OBJECT_DIGEST_DELETED);
1870
1871 let shared_refs: Vec<_> = shared_refs.iter().map(|r| (r.0, r.1)).collect();
1873 self.multi_download_and_store(&shared_refs).await?;
1874
1875 let gas_refs: Vec<_> = tx_info
1877 .gas
1878 .iter()
1879 .filter_map(|w| (w.0 != ObjectID::ZERO).then_some((w.0, w.1)))
1880 .collect();
1881 self.multi_download_and_store(&gas_refs).await?;
1882
1883 let input_objs = self
1885 .resolve_download_input_objects(tx_info, deleted_shared_refs)
1886 .await?;
1887
1888 self.multi_download_and_store(&tx_info.receiving_objs)
1890 .await?;
1891
1892 self.multi_download_and_store(&tx_info.config_objects)
1894 .await?;
1895
1896 let loaded_child_refs = self.fetch_loaded_child_refs(&tx_info.tx_digest).await?;
1899 self.multi_download_and_store(&loaded_child_refs).await?;
1900 tokio::task::yield_now().await;
1901
1902 Ok(input_objs)
1903 }
1904}
1905
1906impl BackingPackageStore for LocalExec {
1909 fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1912 fn inner(self_: &LocalExec, package_id: &ObjectID) -> SuiResult<Option<Object>> {
1913 self_
1915 .get_or_download_object(package_id, true )
1916 .map_err(|e| SuiErrorKind::Storage(e.to_string()).into())
1917 }
1918
1919 let res = inner(self, package_id);
1920 self.exec_store_events
1921 .lock()
1922 .expect("Unable to lock events list")
1923 .push(ExecutionStoreEvent::BackingPackageGetPackageObject {
1924 package_id: *package_id,
1925 result: res.clone(),
1926 });
1927 res.map(|o| o.map(PackageObject::new))
1928 }
1929}
1930
1931impl ChildObjectResolver for LocalExec {
1932 fn read_child_object(
1935 &self,
1936 parent: &ObjectID,
1937 child: &ObjectID,
1938 child_version_upper_bound: SequenceNumber,
1939 ) -> SuiResult<Option<Object>> {
1940 fn inner(
1941 self_: &LocalExec,
1942 parent: &ObjectID,
1943 child: &ObjectID,
1944 child_version_upper_bound: SequenceNumber,
1945 ) -> SuiResult<Option<Object>> {
1946 let child_object =
1947 match self_.download_object_by_upper_bound(child, child_version_upper_bound)? {
1948 None => return Ok(None),
1949 Some(o) => o,
1950 };
1951 let child_version = child_object.version();
1952 if child_object.version() > child_version_upper_bound {
1953 return Err(SuiErrorKind::Unknown(format!(
1954 "Invariant Violation. Replay loaded child_object {child} at version \
1955 {child_version} but expected the version to be <= {child_version_upper_bound}"
1956 ))
1957 .into());
1958 }
1959 let parent = *parent;
1960 if child_object.owner != Owner::ObjectOwner(parent.into()) {
1961 return Err(SuiErrorKind::InvalidChildObjectAccess {
1962 object: *child,
1963 given_parent: parent,
1964 actual_owner: child_object.owner.clone(),
1965 }
1966 .into());
1967 }
1968 Ok(Some(child_object))
1969 }
1970
1971 let res = inner(self, parent, child, child_version_upper_bound);
1972 self.exec_store_events
1973 .lock()
1974 .expect("Unable to lock events list")
1975 .push(
1976 ExecutionStoreEvent::ChildObjectResolverStoreReadChildObject {
1977 parent: *parent,
1978 child: *child,
1979 result: res.clone(),
1980 },
1981 );
1982 res
1983 }
1984
1985 fn get_object_received_at_version(
1986 &self,
1987 owner: &ObjectID,
1988 receiving_object_id: &ObjectID,
1989 receive_object_at_version: SequenceNumber,
1990 _epoch_id: EpochId,
1991 ) -> SuiResult<Option<Object>> {
1992 fn inner(
1993 self_: &LocalExec,
1994 owner: &ObjectID,
1995 receiving_object_id: &ObjectID,
1996 receive_object_at_version: SequenceNumber,
1997 ) -> SuiResult<Option<Object>> {
1998 let recv_object = match self_.get_object(receiving_object_id) {
1999 None => return Ok(None),
2000 Some(o) => o,
2001 };
2002 if recv_object.version() != receive_object_at_version {
2003 return Err(SuiErrorKind::Unknown(format!(
2004 "Invariant Violation. Replay loaded child_object {receiving_object_id} at version \
2005 {receive_object_at_version} but expected the version to be == {receive_object_at_version}"
2006 )).into());
2007 }
2008 if recv_object.owner != Owner::AddressOwner((*owner).into()) {
2009 return Ok(None);
2010 }
2011 Ok(Some(recv_object))
2012 }
2013
2014 let res = inner(self, owner, receiving_object_id, receive_object_at_version);
2015 self.exec_store_events
2016 .lock()
2017 .expect("Unable to lock events list")
2018 .push(ExecutionStoreEvent::ReceiveObject {
2019 owner: *owner,
2020 receive: *receiving_object_id,
2021 receive_at_version: receive_object_at_version,
2022 result: res.clone(),
2023 });
2024 res
2025 }
2026}
2027
2028impl ParentSync for LocalExec {
2029 fn get_latest_parent_entry_ref_deprecated(&self, object_id: ObjectID) -> Option<ObjectRef> {
2032 fn inner(self_: &LocalExec, object_id: ObjectID) -> Option<ObjectRef> {
2033 if let Some(v) = self_
2034 .storage
2035 .live_objects_store
2036 .lock()
2037 .expect("Can't lock")
2038 .get(&object_id)
2039 {
2040 return Some(v.compute_object_reference());
2041 }
2042 None
2043 }
2044 let res = inner(self, object_id);
2045 self.exec_store_events
2046 .lock()
2047 .expect("Unable to lock events list")
2048 .push(
2049 ExecutionStoreEvent::ParentSyncStoreGetLatestParentEntryRef {
2050 object_id,
2051 result: res,
2052 },
2053 );
2054 res
2055 }
2056}
2057
2058impl ModuleResolver for LocalExec {
2059 type Error = SuiError;
2060
2061 fn get_module(&self, module_id: &ModuleId) -> SuiResult<Option<Vec<u8>>> {
2064 fn inner(self_: &LocalExec, module_id: &ModuleId) -> SuiResult<Option<Vec<u8>>> {
2065 get_module(self_, module_id)
2066 }
2067
2068 let res = inner(self, module_id);
2069 self.exec_store_events
2070 .lock()
2071 .expect("Unable to lock events list")
2072 .push(ExecutionStoreEvent::ModuleResolverGetModule {
2073 module_id: module_id.clone(),
2074 result: res.clone(),
2075 });
2076 res
2077 }
2078}
2079
2080impl ModuleResolver for &mut LocalExec {
2081 type Error = SuiError;
2082
2083 fn get_module(&self, module_id: &ModuleId) -> SuiResult<Option<Vec<u8>>> {
2084 (**self).get_module(module_id)
2086 }
2087}
2088
2089impl ObjectStore for LocalExec {
2090 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
2093 let res = self
2094 .storage
2095 .live_objects_store
2096 .lock()
2097 .expect("Can't lock")
2098 .get(object_id)
2099 .cloned();
2100 self.exec_store_events
2101 .lock()
2102 .expect("Unable to lock events list")
2103 .push(ExecutionStoreEvent::ObjectStoreGetObject {
2104 object_id: *object_id,
2105 result: Ok(res.clone()),
2106 });
2107 res
2108 }
2109
2110 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
2113 let res = self
2114 .storage
2115 .live_objects_store
2116 .lock()
2117 .expect("Can't lock")
2118 .get(object_id)
2119 .and_then(|obj| {
2120 if obj.version() == version {
2121 Some(obj.clone())
2122 } else {
2123 None
2124 }
2125 });
2126
2127 self.exec_store_events
2128 .lock()
2129 .expect("Unable to lock events list")
2130 .push(ExecutionStoreEvent::ObjectStoreGetObjectByKey {
2131 object_id: *object_id,
2132 version,
2133 result: Ok(res.clone()),
2134 });
2135
2136 res
2137 }
2138}
2139
2140impl ObjectStore for &mut LocalExec {
2141 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
2142 (**self).get_object(object_id)
2144 }
2145
2146 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
2147 (**self).get_object_by_key(object_id, version)
2149 }
2150}
2151
2152impl GetModule for LocalExec {
2153 type Error = SuiError;
2154 type Item = CompiledModule;
2155
2156 fn get_module_by_id(&self, id: &ModuleId) -> SuiResult<Option<Self::Item>> {
2157 let res = get_module_by_id(self, id);
2158
2159 self.exec_store_events
2160 .lock()
2161 .expect("Unable to lock events list")
2162 .push(ExecutionStoreEvent::GetModuleGetModuleByModuleId {
2163 id: id.clone(),
2164 result: res.clone(),
2165 });
2166 res
2167 }
2168}
2169
2170pub fn get_executor(
2173 executor_version_override: Option<i64>,
2174 protocol_config: &ProtocolConfig,
2175 _expensive_safety_check_config: ExpensiveSafetyCheckConfig,
2176) -> Arc<dyn Executor + Send + Sync> {
2177 let protocol_config = executor_version_override
2178 .map(|q| {
2179 let ver = if q < 0 {
2180 ProtocolConfig::get_for_max_version_UNSAFE().execution_version()
2181 } else {
2182 q as u64
2183 };
2184
2185 let mut c = protocol_config.clone();
2186 c.set_execution_version_for_testing(ver);
2187 c
2188 })
2189 .unwrap_or(protocol_config.clone());
2190
2191 let silent = true;
2192 sui_execution::executor(&protocol_config, silent)
2193 .expect("Creating an executor should not fail here")
2194}
2195
2196fn parse_effect_error_for_denied_coins(status: &SuiExecutionStatus) -> Option<String> {
2197 let SuiExecutionStatus::Failure { error } = status else {
2198 return None;
2199 };
2200 parse_denied_error_string(error)
2201}
2202
2203fn parse_denied_error_string(error: &str) -> Option<String> {
2204 let regulated_regex = regex::Regex::new(
2205 r#"CoinTypeGlobalPause.*?"(.*?)"|AddressDeniedForCoin.*coin_type:.*?"(.*?)""#,
2206 )
2207 .unwrap();
2208
2209 let caps = regulated_regex.captures(error)?;
2210 Some(caps.get(1).or(caps.get(2))?.as_str().to_string())
2211}
2212
2213#[cfg(test)]
2214mod tests {
2215 use super::parse_denied_error_string;
2216 #[test]
2217 fn test_regex_regulated_coin_errors() {
2218 let test_bank = vec![
2219 "CoinTypeGlobalPause { coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2220 "AddressDeniedForCoin { address: B, coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2221 ];
2222 let expected_string =
2223 "39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN";
2224
2225 for test in &test_bank {
2226 assert!(parse_denied_error_string(test).unwrap() == expected_string);
2227 }
2228 }
2229}