1use crate::chain_from_chain_id;
5use crate::{
6 data_fetcher::{
7 DataFetcher, Fetchers, NodeStateDumpFetcher, RemoteFetcher, extract_epoch_and_version,
8 },
9 displays::{
10 Pretty,
11 transaction_displays::{FullPTB, transform_command_results_to_annotated},
12 },
13 types::*,
14};
15use futures::executor::block_on;
16use move_binary_format::CompiledModule;
17use move_bytecode_utils::module_cache::GetModule;
18use move_core_types::resolver::SerializedPackage;
19use move_core_types::{
20 account_address::AccountAddress, language_storage::ModuleId, resolver::ModuleResolver,
21};
22use prometheus::Registry;
23use serde::{Deserialize, Serialize};
24use similar::{ChangeTag, TextDiff};
25use std::{
26 collections::{BTreeMap, HashSet},
27 path::PathBuf,
28 sync::Arc,
29 sync::Mutex,
30};
31use sui_config::node::ExpensiveSafetyCheckConfig;
32use sui_core::authority::NodeStateDump;
33use sui_execution::Executor;
34use sui_framework::BuiltInFramework;
35use sui_json_rpc_types::{
36 SuiExecutionStatus, SuiTransactionBlockEffects, SuiTransactionBlockEffectsAPI,
37};
38use sui_protocol_config::{Chain, ProtocolConfig};
39use sui_sdk::{SuiClient, SuiClientBuilder};
40use sui_types::SUI_DENY_LIST_OBJECT_ID;
41use sui_types::error::SuiErrorKind;
42use sui_types::execution_params::{
43 ExecutionOrEarlyError, FundsWithdrawStatus, get_early_execution_error,
44};
45use sui_types::in_memory_storage::InMemoryStorage;
46use sui_types::message_envelope::Message;
47use sui_types::storage::{PackageObject, get_module, get_package};
48use sui_types::transaction::GasData;
49use sui_types::transaction::TransactionKind::ProgrammableTransaction;
50use sui_types::{
51 DEEPBOOK_PACKAGE_ID,
52 base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber},
53 committee::EpochId,
54 digests::{ObjectDigest, TransactionDigest},
55 error::{ExecutionError, SuiError, SuiResult},
56 executable_transaction::VerifiedExecutableTransaction,
57 gas::SuiGasStatus,
58 inner_temporary_store::InnerTemporaryStore,
59 metrics::ExecutionMetrics,
60 object::{Object, Owner},
61 storage::get_module_by_id,
62 storage::{BackingPackageStore, ChildObjectResolver, ObjectStore, ParentSync},
63 transaction::{
64 CheckedInputObjects, InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
65 SenderSignedData, Transaction, TransactionDataAPI, TransactionKind, VerifiedTransaction,
66 },
67};
68use tracing::{error, info, trace, warn};
69
70#[derive(Debug, Serialize, Deserialize)]
73pub struct ExecutionSandboxState {
74 pub transaction_info: OnChainTransactionInfo,
76 pub required_objects: Vec<Object>,
78 #[serde(skip)]
80 pub local_exec_temporary_store: Option<InnerTemporaryStore>,
81 pub local_exec_effects: SuiTransactionBlockEffects,
83 #[serde(skip)]
85 pub local_exec_status: Option<Result<(), ExecutionError>>,
86}
87
88impl ExecutionSandboxState {
89 #[allow(clippy::result_large_err)]
90 pub fn check_effects(&self) -> Result<(), ReplayEngineError> {
91 let SuiTransactionBlockEffects::V1(mut local_effects) = self.local_exec_effects.clone();
92 let SuiTransactionBlockEffects::V1(on_chain_effects) =
93 self.transaction_info.effects.clone();
94
95 if on_chain_effects.abort_error.is_none() {
98 local_effects.abort_error = None;
99 }
100 let local_effects = SuiTransactionBlockEffects::V1(local_effects);
101 let on_chain_effects = SuiTransactionBlockEffects::V1(on_chain_effects);
102
103 if on_chain_effects != local_effects {
104 error!("Replay tool forked {}", self.transaction_info.tx_digest);
105 let diff = Self::diff_effects(&on_chain_effects, &local_effects);
106 println!("{}", diff);
107 return Err(ReplayEngineError::EffectsForked {
108 digest: self.transaction_info.tx_digest,
109 diff: format!("\n{}", diff),
110 on_chain: Box::new(on_chain_effects),
111 local: Box::new(local_effects),
112 });
113 }
114 Ok(())
115 }
116
117 pub fn diff_effects(
119 on_chain_effects: &SuiTransactionBlockEffects,
120 local_effects: &SuiTransactionBlockEffects,
121 ) -> String {
122 let on_chain_str = format!("{:#?}", on_chain_effects);
123 let local_chain_str = format!("{:#?}", local_effects);
124 let mut res = vec![];
125
126 let diff = TextDiff::from_lines(&on_chain_str, &local_chain_str);
127 for change in diff.iter_all_changes() {
128 let sign = match change.tag() {
129 ChangeTag::Delete => "---",
130 ChangeTag::Insert => "+++",
131 ChangeTag::Equal => " ",
132 };
133 res.push(format!("{}{}", sign, change));
134 }
135
136 res.join("")
137 }
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct ProtocolVersionSummary {
142 pub protocol_version: u64,
144 pub epoch_start: u64,
146 pub epoch_end: u64,
148 pub checkpoint_start: Option<u64>,
150 pub checkpoint_end: Option<u64>,
152 pub epoch_change_tx: TransactionDigest,
154}
155
156#[derive(Clone)]
157pub struct Storage {
158 pub live_objects_store: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
163
164 pub package_cache: Arc<Mutex<BTreeMap<ObjectID, Object>>>,
167 pub object_version_cache: Arc<Mutex<BTreeMap<(ObjectID, SequenceNumber), Object>>>,
170}
171
172impl std::fmt::Display for Storage {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 writeln!(f, "Live object store")?;
175 for (id, obj) in self
176 .live_objects_store
177 .lock()
178 .expect("Unable to lock")
179 .iter()
180 {
181 writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
182 }
183 writeln!(f, "Package cache")?;
184 for (id, obj) in self.package_cache.lock().expect("Unable to lock").iter() {
185 writeln!(f, "{}: {:?}", id, obj.compute_object_reference())?;
186 }
187 writeln!(f, "Object version cache")?;
188 for (id, _) in self
189 .object_version_cache
190 .lock()
191 .expect("Unable to lock")
192 .iter()
193 {
194 writeln!(f, "{}: {}", id.0, id.1)?;
195 }
196
197 write!(f, "")
198 }
199}
200
201impl Storage {
202 pub fn default() -> Self {
203 Self {
204 live_objects_store: Arc::new(Mutex::new(BTreeMap::new())),
205 package_cache: Arc::new(Mutex::new(BTreeMap::new())),
206 object_version_cache: Arc::new(Mutex::new(BTreeMap::new())),
207 }
208 }
209
210 pub fn all_objects(&self) -> Vec<Object> {
211 self.live_objects_store
212 .lock()
213 .expect("Unable to lock")
214 .values()
215 .cloned()
216 .chain(
217 self.package_cache
218 .lock()
219 .expect("Unable to lock")
220 .values()
221 .cloned(),
222 )
223 .chain(
224 self.object_version_cache
225 .lock()
226 .expect("Unable to lock")
227 .values()
228 .cloned(),
229 )
230 .collect::<Vec<_>>()
231 }
232}
233
234#[derive(Clone)]
235pub struct LocalExec {
236 pub client: Option<SuiClient>,
237 pub protocol_version_epoch_table: BTreeMap<u64, ProtocolVersionSummary>,
240 pub protocol_version_system_package_table: BTreeMap<u64, BTreeMap<ObjectID, SequenceNumber>>,
242 pub current_protocol_version: u64,
244 pub storage: Storage,
246 pub exec_store_events: Arc<Mutex<Vec<ExecutionStoreEvent>>>,
248 pub metrics: Arc<ExecutionMetrics>,
250 pub fetcher: Fetchers,
252
253 pub executor_version: Option<i64>,
256 pub protocol_version: Option<i64>,
260 pub config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
261 pub num_retries_for_timeout: u32,
263 pub sleep_period_for_timeout: std::time::Duration,
264}
265
266impl LocalExec {
267 pub async fn multi_download(
270 &self,
271 objs: &[(ObjectID, SequenceNumber)],
272 ) -> Result<Vec<Object>, ReplayEngineError> {
273 let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
274 while num_retries_for_timeout >= 0 {
275 match self.fetcher.multi_get_versioned(objs).await {
276 Ok(objs) => return Ok(objs),
277 Err(ReplayEngineError::SuiRpcRequestTimeout) => {
278 warn!(
279 "RPC request timed out. Retries left {}. Sleeping for {}s",
280 num_retries_for_timeout,
281 self.sleep_period_for_timeout.as_secs()
282 );
283 num_retries_for_timeout -= 1;
284 tokio::time::sleep(self.sleep_period_for_timeout).await;
285 }
286 Err(e) => return Err(e),
287 }
288 }
289 Err(ReplayEngineError::SuiRpcRequestTimeout)
290 }
291 pub async fn multi_download_latest(
294 &self,
295 objs: &[ObjectID],
296 ) -> Result<Vec<Object>, ReplayEngineError> {
297 let mut num_retries_for_timeout = self.num_retries_for_timeout as i64;
298 while num_retries_for_timeout >= 0 {
299 match self.fetcher.multi_get_latest(objs).await {
300 Ok(objs) => return Ok(objs),
301 Err(ReplayEngineError::SuiRpcRequestTimeout) => {
302 warn!(
303 "RPC request timed out. Retries left {}. Sleeping for {}s",
304 num_retries_for_timeout,
305 self.sleep_period_for_timeout.as_secs()
306 );
307 num_retries_for_timeout -= 1;
308 tokio::time::sleep(self.sleep_period_for_timeout).await;
309 }
310 Err(e) => return Err(e),
311 }
312 }
313 Err(ReplayEngineError::SuiRpcRequestTimeout)
314 }
315
316 pub async fn fetch_loaded_child_refs(
317 &self,
318 tx_digest: &TransactionDigest,
319 ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
320 self.fetcher.get_loaded_child_objects(tx_digest).await
322 }
323
324 pub async fn new_from_fn_url(http_url: &str) -> Result<Self, ReplayEngineError> {
325 Self::new_for_remote(
326 SuiClientBuilder::default()
327 .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
328 .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
329 .build(http_url)
330 .await?,
331 None,
332 )
333 .await
334 }
335
336 pub async fn replay_with_network_config(
337 rpc_url: String,
338 tx_digest: TransactionDigest,
339 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
340 use_authority: bool,
341 executor_version: Option<i64>,
342 protocol_version: Option<i64>,
343 config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
344 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
345 info!("Using RPC URL: {}", rpc_url);
346 LocalExec::new_from_fn_url(&rpc_url)
347 .await?
348 .init_for_execution()
349 .await?
350 .execute_transaction(
351 &tx_digest,
352 expensive_safety_check_config,
353 use_authority,
354 executor_version,
355 protocol_version,
356 config_and_versions,
357 )
358 .await
359 }
360
361 pub async fn init_for_execution(mut self) -> Result<Self, ReplayEngineError> {
366 self.populate_protocol_version_tables().await?;
367 tokio::task::yield_now().await;
368 Ok(self)
369 }
370
371 pub async fn reset_for_new_execution_with_client(self) -> Result<Self, ReplayEngineError> {
372 Self::new_for_remote(
373 self.client.expect("Remote client not initialized"),
374 Some(self.fetcher.into_remote()),
375 )
376 .await?
377 .init_for_execution()
378 .await
379 }
380
381 pub async fn new_for_remote(
382 client: SuiClient,
383 remote_fetcher: Option<RemoteFetcher>,
384 ) -> Result<Self, ReplayEngineError> {
385 let registry = prometheus::Registry::new();
387 let metrics = Arc::new(ExecutionMetrics::new(®istry));
388
389 let fetcher = remote_fetcher.unwrap_or(RemoteFetcher::new(client.clone()));
390
391 Ok(Self {
392 client: Some(client),
393 protocol_version_epoch_table: BTreeMap::new(),
394 protocol_version_system_package_table: BTreeMap::new(),
395 current_protocol_version: 0,
396 exec_store_events: Arc::new(Mutex::new(Vec::new())),
397 metrics,
398 storage: Storage::default(),
399 fetcher: Fetchers::Remote(fetcher),
400 num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
402 sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
403 executor_version: None,
404 protocol_version: None,
405 config_and_versions: None,
406 })
407 }
408
409 pub async fn new_for_state_dump(
410 path: &str,
411 backup_rpc_url: Option<String>,
412 ) -> Result<Self, ReplayEngineError> {
413 let registry = prometheus::Registry::new();
415 let metrics = Arc::new(ExecutionMetrics::new(®istry));
416
417 let state = NodeStateDump::read_from_file(&PathBuf::from(path))?;
418 let current_protocol_version = state.protocol_version;
419 let fetcher = match backup_rpc_url {
420 Some(url) => NodeStateDumpFetcher::new(
421 state,
422 Some(RemoteFetcher::new(
423 SuiClientBuilder::default()
424 .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
425 .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
426 .build(url)
427 .await?,
428 )),
429 ),
430 None => NodeStateDumpFetcher::new(state, None),
431 };
432
433 Ok(Self {
434 client: None,
435 protocol_version_epoch_table: BTreeMap::new(),
436 protocol_version_system_package_table: BTreeMap::new(),
437 current_protocol_version,
438 exec_store_events: Arc::new(Mutex::new(Vec::new())),
439 metrics,
440 storage: Storage::default(),
441 fetcher: Fetchers::NodeStateDump(fetcher),
442 num_retries_for_timeout: RPC_TIMEOUT_ERR_NUM_RETRIES,
444 sleep_period_for_timeout: RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
445 executor_version: None,
446 protocol_version: None,
447 config_and_versions: None,
448 })
449 }
450
451 pub async fn multi_download_and_store(
452 &mut self,
453 objs: &[(ObjectID, SequenceNumber)],
454 ) -> Result<Vec<Object>, ReplayEngineError> {
455 let objs = self.multi_download(objs).await?;
456
457 for obj in objs.iter() {
459 let o_ref = obj.compute_object_reference();
460 self.storage
461 .live_objects_store
462 .lock()
463 .expect("Can't lock")
464 .insert(o_ref.0, obj.clone());
465 self.storage
466 .object_version_cache
467 .lock()
468 .expect("Cannot lock")
469 .insert((o_ref.0, o_ref.1), obj.clone());
470 if obj.is_package() {
471 self.storage
472 .package_cache
473 .lock()
474 .expect("Cannot lock")
475 .insert(o_ref.0, obj.clone());
476 }
477 }
478 tokio::task::yield_now().await;
479 Ok(objs)
480 }
481
482 pub async fn multi_download_relevant_packages_and_store(
483 &mut self,
484 objs: Vec<ObjectID>,
485 protocol_version: u64,
486 ) -> Result<Vec<Object>, ReplayEngineError> {
487 let syst_packages_objs = if self.protocol_version.is_some_and(|i| i < 0) {
488 BuiltInFramework::genesis_objects().collect()
489 } else {
490 let syst_packages =
491 self.system_package_versions_for_protocol_version(protocol_version)?;
492 self.multi_download(&syst_packages).await?
493 };
494
495 let non_system_package_objs: Vec<_> = objs
498 .into_iter()
499 .filter(|o| !Self::system_package_ids(self.current_protocol_version).contains(o))
500 .collect();
501 let objs = self
502 .multi_download_latest(&non_system_package_objs)
503 .await?
504 .into_iter()
505 .chain(syst_packages_objs.into_iter());
506
507 for obj in objs.clone() {
508 let o_ref = obj.compute_object_reference();
509 self.storage
512 .object_version_cache
513 .lock()
514 .expect("Cannot lock")
515 .insert((o_ref.0, o_ref.1), obj.clone());
516 if obj.is_package() {
517 self.storage
518 .package_cache
519 .lock()
520 .expect("Cannot lock")
521 .insert(o_ref.0, obj.clone());
522 }
523 }
524 Ok(objs.collect())
525 }
526
527 #[allow(clippy::disallowed_methods, clippy::result_large_err)]
529 pub fn download_object(
530 &self,
531 object_id: &ObjectID,
532 version: SequenceNumber,
533 ) -> Result<Object, ReplayEngineError> {
534 if self
535 .storage
536 .object_version_cache
537 .lock()
538 .expect("Cannot lock")
539 .contains_key(&(*object_id, version))
540 {
541 return Ok(self
542 .storage
543 .object_version_cache
544 .lock()
545 .expect("Cannot lock")
546 .get(&(*object_id, version))
547 .ok_or(ReplayEngineError::InternalCacheInvariantViolation {
548 id: *object_id,
549 version: Some(version),
550 })?
551 .clone());
552 }
553
554 let o = block_on(self.multi_download(&[(*object_id, version)])).map(|mut q| {
555 q.pop().unwrap_or_else(|| {
556 panic!(
557 "Downloaded obj response cannot be empty {:?}",
558 (*object_id, version)
559 )
560 })
561 })?;
562
563 let o_ref = o.compute_object_reference();
564 self.storage
565 .object_version_cache
566 .lock()
567 .expect("Cannot lock")
568 .insert((o_ref.0, o_ref.1), o.clone());
569 Ok(o)
570 }
571
572 #[allow(clippy::disallowed_methods, clippy::result_large_err)]
574 pub fn download_latest_object(
575 &self,
576 object_id: &ObjectID,
577 ) -> Result<Option<Object>, ReplayEngineError> {
578 let resp = block_on({
579 self.multi_download_latest(std::slice::from_ref(object_id))
581 })
582 .map(|mut q| {
583 q.pop()
584 .unwrap_or_else(|| panic!("Downloaded obj response cannot be empty {}", *object_id))
585 });
586
587 match resp {
588 Ok(v) => Ok(Some(v)),
589 Err(ReplayEngineError::ObjectNotExist { id }) => {
590 error!(
591 "Could not find object {id} on RPC server. It might have been pruned, deleted, or never existed."
592 );
593 Ok(None)
594 }
595 Err(ReplayEngineError::ObjectDeleted {
596 id,
597 version,
598 digest,
599 }) => {
600 error!("Object {id} {version} {digest} was deleted on RPC server.");
601 Ok(None)
602 }
603 Err(err) => Err(ReplayEngineError::SuiRpcError {
604 err: err.to_string(),
605 }),
606 }
607 }
608
609 #[allow(clippy::disallowed_methods, clippy::result_large_err)]
610 pub fn download_object_by_upper_bound(
611 &self,
612 object_id: &ObjectID,
613 version_upper_bound: VersionNumber,
614 ) -> Result<Option<Object>, ReplayEngineError> {
615 let local_object = self
616 .storage
617 .live_objects_store
618 .lock()
619 .expect("Can't lock")
620 .get(object_id)
621 .cloned();
622 if local_object.is_some() {
623 return Ok(local_object);
624 }
625 let response = block_on({
626 self.fetcher
627 .get_child_object(object_id, version_upper_bound)
628 });
629 match response {
630 Ok(object) => {
631 let obj_ref = object.compute_object_reference();
632 self.storage
633 .live_objects_store
634 .lock()
635 .expect("Can't lock")
636 .insert(*object_id, object.clone());
637 self.storage
638 .object_version_cache
639 .lock()
640 .expect("Can't lock")
641 .insert((obj_ref.0, obj_ref.1), object.clone());
642 Ok(Some(object))
643 }
644 Err(ReplayEngineError::ObjectNotExist { id }) => {
645 error!(
646 "Could not find child object {id} on RPC server. It might have been pruned, deleted, or never existed."
647 );
648 Ok(None)
649 }
650 Err(ReplayEngineError::ObjectDeleted {
651 id,
652 version,
653 digest,
654 }) => {
655 error!("Object {id} {version} {digest} was deleted on RPC server.");
656 Ok(None)
657 }
658 Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
661 info!(
662 "Object {id} {version} not found on RPC server -- this may have been pruned or never existed."
663 );
664 Ok(None)
665 }
666 Err(err) => Err(ReplayEngineError::SuiRpcError {
667 err: err.to_string(),
668 }),
669 }
670 }
671
672 pub async fn get_checkpoint_txs(
673 &self,
674 checkpoint_id: u64,
675 ) -> Result<Vec<TransactionDigest>, ReplayEngineError> {
676 self.fetcher
677 .get_checkpoint_txs(checkpoint_id)
678 .await
679 .map_err(|e| ReplayEngineError::SuiRpcError { err: e.to_string() })
680 }
681
682 pub async fn execute_all_in_checkpoints(
683 &mut self,
684 checkpoint_ids: &[u64],
685 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
686 terminate_early: bool,
687 use_authority: bool,
688 ) -> Result<(u64, u64), ReplayEngineError> {
689 let mut txs = Vec::new();
691 for checkpoint_id in checkpoint_ids {
692 txs.extend(self.get_checkpoint_txs(*checkpoint_id).await?);
693 }
694 let num = txs.len();
695 let mut succeeded = 0;
696 for tx in txs {
697 match self
698 .execute_transaction(
699 &tx,
700 expensive_safety_check_config.clone(),
701 use_authority,
702 None,
703 None,
704 None,
705 )
706 .await
707 .map(|q| q.check_effects())
708 {
709 Err(e) | Ok(Err(e)) => {
710 if terminate_early {
711 return Err(e);
712 }
713 error!("Error executing tx: {}, {:#?}", tx, e);
714 continue;
715 }
716 _ => (),
717 }
718
719 succeeded += 1;
720 }
721 Ok((succeeded, num as u64))
722 }
723
724 pub async fn execution_engine_execute_with_tx_info_impl(
725 &mut self,
726 tx_info: &OnChainTransactionInfo,
727 override_transaction_kind: Option<TransactionKind>,
728 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
729 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
730 let tx_digest = &tx_info.tx_digest;
731 if tx_info.protocol_version.as_u64() < 16 {
734 warn!(
735 "Protocol version ({:?}) too old: {}, skipping transaction",
736 tx_info.protocol_version, tx_digest
737 );
738 return Err(ReplayEngineError::TransactionNotSupported {
739 digest: *tx_digest,
740 reason: "Protocol version too old".to_string(),
741 });
742 }
743 let input_objects = self.initialize_execution_env_state(tx_info).await?;
746 assert_eq!(
747 &input_objects.filter_shared_objects().len(),
748 &tx_info.shared_object_refs.len()
749 );
750 let protocol_config =
754 &ProtocolConfig::get_for_version(tx_info.protocol_version, tx_info.chain);
755
756 let metrics = self.metrics.clone();
757
758 let ov = self.executor_version;
759
760 let executor = get_executor(ov, protocol_config, expensive_safety_check_config);
762
763 let expensive_checks = true;
765 let transaction_kind = override_transaction_kind.unwrap_or(tx_info.kind.clone());
766 let gas_status = if tx_info.kind.is_system_tx() {
767 SuiGasStatus::new_unmetered()
768 } else {
769 SuiGasStatus::new(
770 tx_info.gas_budget,
771 tx_info.gas_price,
772 tx_info.reference_gas_price,
773 protocol_config,
774 )
775 .expect("Failed to create gas status")
776 };
777 let gas_data = GasData {
778 payment: tx_info.gas.clone(),
779 owner: tx_info.gas_owner.unwrap_or(tx_info.sender),
780 price: tx_info.gas_price,
781 budget: tx_info.gas_budget,
782 };
783 let checked_input_objects = CheckedInputObjects::new_for_replay(input_objects.clone());
784 let early_execution_error = get_early_execution_error(
785 tx_digest,
786 &checked_input_objects,
787 &HashSet::new(),
788 &FundsWithdrawStatus::MaybeSufficient,
790 );
791 let execution_params = match early_execution_error {
792 Some(error) => ExecutionOrEarlyError::Err(error),
793 None => ExecutionOrEarlyError::Ok(()),
794 };
795 let (inner_store, gas_status, effects, _timings, result) = executor
796 .execute_transaction_to_effects_and_execution_error(
797 &self,
798 protocol_config,
799 metrics.clone(),
800 expensive_checks,
801 execution_params,
802 &tx_info.executed_epoch,
803 tx_info.epoch_start_timestamp,
804 checked_input_objects,
805 gas_data,
806 gas_status,
807 transaction_kind.clone(),
808 None, tx_info.sender,
810 *tx_digest,
811 &mut None,
812 );
813
814 if let Err(err) = self.pretty_print_for_tracing(
815 &gas_status,
816 &executor,
817 tx_info,
818 &transaction_kind,
819 protocol_config,
820 metrics,
821 expensive_checks,
822 input_objects.clone(),
823 ) {
824 error!("Failed to pretty print for tracing: {:?}", err);
825 }
826
827 let all_required_objects = self.storage.all_objects();
828
829 let effects =
830 SuiTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
831
832 Ok(ExecutionSandboxState {
833 transaction_info: tx_info.clone(),
834 required_objects: all_required_objects,
835 local_exec_temporary_store: Some(inner_store),
836 local_exec_effects: effects,
837 local_exec_status: Some(result),
838 })
839 }
840
841 fn pretty_print_for_tracing(
842 &self,
843 gas_status: &SuiGasStatus,
844 executor: &Arc<dyn Executor + Send + Sync>,
845 tx_info: &OnChainTransactionInfo,
846 transaction_kind: &TransactionKind,
847 protocol_config: &ProtocolConfig,
848 metrics: Arc<ExecutionMetrics>,
849 expensive_checks: bool,
850 input_objects: InputObjects,
851 ) -> anyhow::Result<()> {
852 trace!(target: "replay_gas_info", "{}", Pretty(gas_status));
853
854 let skip_checks = true;
855 let gas_data = GasData {
856 payment: tx_info.gas.clone(),
857 owner: tx_info.gas_owner.unwrap_or(tx_info.sender),
858 price: tx_info.gas_price,
859 budget: tx_info.gas_budget,
860 };
861 let checked_input_objects = CheckedInputObjects::new_for_replay(input_objects.clone());
862 let early_execution_error = get_early_execution_error(
863 &tx_info.tx_digest,
864 &checked_input_objects,
865 &HashSet::new(),
866 &FundsWithdrawStatus::MaybeSufficient,
868 );
869 let execution_params = match early_execution_error {
870 Some(error) => ExecutionOrEarlyError::Err(error),
871 None => ExecutionOrEarlyError::Ok(()),
872 };
873 if let ProgrammableTransaction(pt) = transaction_kind {
874 trace!(
875 target: "replay_ptb_info",
876 "{}",
877 Pretty(&FullPTB {
878 ptb: pt.clone(),
879 results: transform_command_results_to_annotated(
880 protocol_config,
881 executor,
882 &self.clone(),
883 executor.dev_inspect_transaction(
884 &self,
885 protocol_config,
886 metrics,
887 expensive_checks,
888 execution_params,
889 &tx_info.executed_epoch,
890 tx_info.epoch_start_timestamp,
891 CheckedInputObjects::new_for_replay(input_objects),
892 gas_data,
893 SuiGasStatus::new(
894 tx_info.gas_budget,
895 tx_info.gas_price,
896 tx_info.reference_gas_price,
897 protocol_config,
898 )?,
899 transaction_kind.clone(),
900 None, tx_info.sender,
902 tx_info.sender_signed_data.digest(),
903 skip_checks,
904 )
905 .3
906 .unwrap_or_default(),
907 )?,
908 }));
909 }
910 Ok(())
911 }
912
913 #[allow(clippy::result_large_err)]
915 pub async fn execution_engine_execute_impl(
916 &mut self,
917 tx_digest: &TransactionDigest,
918 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
919 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
920 if self.is_remote_replay() {
921 assert!(
922 !self.protocol_version_system_package_table.is_empty()
923 || !self.protocol_version_epoch_table.is_empty(),
924 "Required tables not populated. Must call `init_for_execution` before executing transactions"
925 );
926 }
927
928 let tx_info = if self.is_remote_replay() {
929 self.resolve_tx_components(tx_digest).await?
930 } else {
931 self.resolve_tx_components_from_dump(tx_digest).await?
932 };
933 self.execution_engine_execute_with_tx_info_impl(
934 &tx_info,
935 None,
936 expensive_safety_check_config,
937 )
938 .await
939 }
940
941 #[allow(clippy::result_large_err)]
945 pub async fn certificate_execute_with_sandbox_state(
946 pre_run_sandbox: &ExecutionSandboxState,
947 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
948 let executed_epoch = pre_run_sandbox.transaction_info.executed_epoch;
950 let reference_gas_price = pre_run_sandbox.transaction_info.reference_gas_price;
951 let epoch_start_timestamp = pre_run_sandbox.transaction_info.epoch_start_timestamp;
952 let protocol_config = ProtocolConfig::get_for_version(
953 pre_run_sandbox.transaction_info.protocol_version,
954 pre_run_sandbox.transaction_info.chain,
955 );
956 let required_objects = pre_run_sandbox.required_objects.clone();
957 let store = InMemoryStorage::new(required_objects.clone());
958
959 let transaction =
960 Transaction::new(pre_run_sandbox.transaction_info.sender_signed_data.clone());
961
962 let input_objects = store.read_input_objects_for_transaction(&transaction);
967 let executable = VerifiedExecutableTransaction::new_from_consensus(
968 VerifiedTransaction::new_unchecked(transaction),
969 executed_epoch,
970 );
971 let (gas_status, input_objects) = sui_transaction_checks::check_certificate_input(
972 &executable,
973 input_objects,
974 &protocol_config,
975 reference_gas_price,
976 )
977 .unwrap();
978 let (kind, signer, gas_data) = executable.transaction_data().execution_parts();
979 let executor = sui_execution::executor(&protocol_config, true).unwrap();
980 let early_execution_error = get_early_execution_error(
981 executable.digest(),
982 &input_objects,
983 &HashSet::new(),
984 &FundsWithdrawStatus::MaybeSufficient,
986 );
987 let execution_params = match early_execution_error {
988 Some(error) => ExecutionOrEarlyError::Err(error),
989 None => ExecutionOrEarlyError::Ok(()),
990 };
991 let (_, _, effects, _timings, exec_res) = executor
992 .execute_transaction_to_effects_and_execution_error(
993 &store,
994 &protocol_config,
995 Arc::new(ExecutionMetrics::new(&Registry::new())),
996 true,
997 execution_params,
998 &executed_epoch,
999 epoch_start_timestamp,
1000 input_objects,
1001 gas_data,
1002 gas_status,
1003 kind,
1004 None, signer,
1006 *executable.digest(),
1007 &mut None,
1008 );
1009
1010 let effects =
1011 SuiTransactionBlockEffects::try_from(effects).map_err(ReplayEngineError::from)?;
1012
1013 Ok(ExecutionSandboxState {
1014 transaction_info: pre_run_sandbox.transaction_info.clone(),
1015 required_objects,
1016 local_exec_temporary_store: None, local_exec_effects: effects,
1018 local_exec_status: Some(exec_res),
1019 })
1020 }
1021
1022 #[allow(clippy::result_large_err)]
1025 pub async fn certificate_execute(
1026 &mut self,
1027 tx_digest: &TransactionDigest,
1028 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1029 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1030 let pre_run_sandbox = self
1032 .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
1033 .await?;
1034 Self::certificate_execute_with_sandbox_state(&pre_run_sandbox).await
1035 }
1036
1037 #[allow(clippy::result_large_err)]
1040 pub async fn execution_engine_execute(
1041 &mut self,
1042 tx_digest: &TransactionDigest,
1043 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1044 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1045 let sandbox_state = self
1046 .execution_engine_execute_impl(tx_digest, expensive_safety_check_config)
1047 .await?;
1048
1049 Ok(sandbox_state)
1050 }
1051
1052 #[allow(clippy::result_large_err)]
1053 pub async fn execute_state_dump(
1054 &mut self,
1055 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1056 ) -> Result<(ExecutionSandboxState, NodeStateDump), ReplayEngineError> {
1057 assert!(!self.is_remote_replay());
1058
1059 let d = match self.fetcher.clone() {
1060 Fetchers::NodeStateDump(d) => d,
1061 _ => panic!("Invalid fetcher for state dump"),
1062 };
1063 let tx_digest = d.node_state_dump.clone().tx_digest;
1064 let sandbox_state = self
1065 .execution_engine_execute_impl(&tx_digest, expensive_safety_check_config)
1066 .await?;
1067
1068 Ok((sandbox_state, d.node_state_dump))
1069 }
1070
1071 #[allow(clippy::result_large_err)]
1072 pub async fn execute_transaction(
1073 &mut self,
1074 tx_digest: &TransactionDigest,
1075 expensive_safety_check_config: ExpensiveSafetyCheckConfig,
1076 use_authority: bool,
1077 executor_version: Option<i64>,
1078 protocol_version: Option<i64>,
1079 config_and_versions: Option<Vec<(ObjectID, SequenceNumber)>>,
1080 ) -> Result<ExecutionSandboxState, ReplayEngineError> {
1081 self.executor_version = executor_version;
1082 self.protocol_version = protocol_version;
1083 self.config_and_versions = config_and_versions;
1084 if use_authority {
1085 self.certificate_execute(tx_digest, expensive_safety_check_config.clone())
1086 .await
1087 } else {
1088 self.execution_engine_execute(tx_digest, expensive_safety_check_config)
1089 .await
1090 }
1091 }
1092 fn system_package_ids(protocol_version: u64) -> Vec<ObjectID> {
1093 let mut ids = BuiltInFramework::all_package_ids();
1094
1095 if protocol_version < 5 {
1096 ids.retain(|id| *id != DEEPBOOK_PACKAGE_ID)
1097 }
1098 ids
1099 }
1100
1101 #[allow(clippy::result_large_err)]
1103 pub fn get_or_download_object(
1104 &self,
1105 obj_id: &ObjectID,
1106 package_expected: bool,
1107 ) -> Result<Option<Object>, ReplayEngineError> {
1108 if package_expected {
1109 if let Some(obj) = self
1110 .storage
1111 .package_cache
1112 .lock()
1113 .expect("Cannot lock")
1114 .get(obj_id)
1115 {
1116 return Ok(Some(obj.clone()));
1117 };
1118 } else if let Some(obj) = self
1125 .storage
1126 .live_objects_store
1127 .lock()
1128 .expect("Can't lock")
1129 .get(obj_id)
1130 {
1131 return Ok(Some(obj.clone()));
1132 }
1133
1134 let Some(o) = self.download_latest_object(obj_id)? else {
1135 return Ok(None);
1136 };
1137
1138 if o.is_package() {
1139 assert!(
1140 package_expected,
1141 "Did not expect package but downloaded object is a package: {obj_id}"
1142 );
1143
1144 self.storage
1145 .package_cache
1146 .lock()
1147 .expect("Cannot lock")
1148 .insert(*obj_id, o.clone());
1149 }
1150 let o_ref = o.compute_object_reference();
1151 self.storage
1152 .object_version_cache
1153 .lock()
1154 .expect("Cannot lock")
1155 .insert((o_ref.0, o_ref.1), o.clone());
1156 Ok(Some(o))
1157 }
1158
1159 pub fn is_remote_replay(&self) -> bool {
1160 matches!(self.fetcher, Fetchers::Remote(_))
1161 }
1162
1163 #[allow(clippy::result_large_err)]
1165 pub fn system_package_versions_for_protocol_version(
1166 &self,
1167 protocol_version: u64,
1168 ) -> Result<Vec<(ObjectID, SequenceNumber)>, ReplayEngineError> {
1169 match &self.fetcher {
1170 Fetchers::Remote(_) => Ok(self
1171 .protocol_version_system_package_table
1172 .get(&protocol_version)
1173 .ok_or(ReplayEngineError::FrameworkObjectVersionTableNotPopulated {
1174 protocol_version,
1175 })?
1176 .clone()
1177 .into_iter()
1178 .collect()),
1179
1180 Fetchers::NodeStateDump(d) => Ok(d
1181 .node_state_dump
1182 .relevant_system_packages
1183 .iter()
1184 .map(|w| (w.id, w.version, w.digest))
1185 .map(|q| (q.0, q.1))
1186 .collect()),
1187 }
1188 }
1189
1190 pub async fn protocol_ver_to_epoch_map(
1191 &self,
1192 ) -> Result<BTreeMap<u64, ProtocolVersionSummary>, ReplayEngineError> {
1193 let mut range_map = BTreeMap::new();
1194 let epoch_change_events = self.fetcher.get_epoch_change_events(false).await?;
1195
1196 let mut tx_digest = *self
1198 .fetcher
1199 .get_checkpoint_txs(0)
1200 .await?
1201 .first()
1202 .expect("Genesis TX must be in first checkpoint");
1203 let (mut start_epoch, mut start_protocol_version, mut start_checkpoint) =
1206 (0, 1, Some(0u64));
1207
1208 let (mut curr_epoch, mut curr_protocol_version, mut curr_checkpoint) =
1209 (start_epoch, start_protocol_version, start_checkpoint);
1210
1211 (start_epoch, start_protocol_version, start_checkpoint) =
1212 (curr_epoch, curr_protocol_version, curr_checkpoint);
1213
1214 let mut end_epoch_tx_digest = tx_digest;
1216
1217 for event in epoch_change_events {
1218 (curr_epoch, curr_protocol_version) = extract_epoch_and_version(event.clone())?;
1219 end_epoch_tx_digest = event.id.tx_digest;
1220
1221 if start_protocol_version == curr_protocol_version {
1222 continue;
1224 }
1225
1226 curr_checkpoint = self
1229 .fetcher
1230 .get_transaction(&event.id.tx_digest)
1231 .await?
1232 .checkpoint;
1233 range_map.insert(
1235 start_protocol_version,
1236 ProtocolVersionSummary {
1237 protocol_version: start_protocol_version,
1238 epoch_start: start_epoch,
1239 epoch_end: curr_epoch - 1,
1240 checkpoint_start: start_checkpoint,
1241 checkpoint_end: curr_checkpoint.map(|x| x - 1),
1242 epoch_change_tx: tx_digest,
1243 },
1244 );
1245
1246 start_epoch = curr_epoch;
1247 start_protocol_version = curr_protocol_version;
1248 tx_digest = event.id.tx_digest;
1249 start_checkpoint = curr_checkpoint;
1250 }
1251
1252 range_map.insert(
1254 curr_protocol_version,
1255 ProtocolVersionSummary {
1256 protocol_version: curr_protocol_version,
1257 epoch_start: start_epoch,
1258 epoch_end: curr_epoch,
1259 checkpoint_start: curr_checkpoint,
1260 checkpoint_end: self
1261 .fetcher
1262 .get_transaction(&end_epoch_tx_digest)
1263 .await?
1264 .checkpoint,
1265 epoch_change_tx: tx_digest,
1266 },
1267 );
1268
1269 Ok(range_map)
1270 }
1271
1272 pub fn protocol_version_for_epoch(
1273 epoch: u64,
1274 mp: &BTreeMap<u64, (TransactionDigest, u64, u64)>,
1275 ) -> u64 {
1276 let mut version = 1;
1279 for (k, v) in mp.iter().rev() {
1280 if v.1 <= epoch {
1281 version = *k;
1282 break;
1283 }
1284 }
1285 version
1286 }
1287
1288 pub async fn populate_protocol_version_tables(&mut self) -> Result<(), ReplayEngineError> {
1289 self.protocol_version_epoch_table = self.protocol_ver_to_epoch_map().await?;
1290
1291 let system_package_revisions = self.system_package_versions().await?;
1292
1293 for (
1296 prot_ver,
1297 ProtocolVersionSummary {
1298 epoch_change_tx: tx_digest,
1299 ..
1300 },
1301 ) in self.protocol_version_epoch_table.clone()
1302 {
1303 let mut working = if prot_ver <= 1 {
1305 BTreeMap::new()
1306 } else {
1307 self.protocol_version_system_package_table
1308 .iter()
1309 .rev()
1310 .find(|(ver, _)| **ver <= prot_ver)
1311 .expect("Prev entry must exist")
1312 .1
1313 .clone()
1314 };
1315
1316 for (id, versions) in system_package_revisions.iter() {
1317 for ver in versions.iter().rev() {
1319 if ver.1 == tx_digest {
1320 working.insert(*id, ver.0);
1322 break;
1323 }
1324 }
1325 }
1326 self.protocol_version_system_package_table
1327 .insert(prot_ver, working);
1328 }
1329 Ok(())
1330 }
1331
1332 pub async fn system_package_versions(
1333 &self,
1334 ) -> Result<BTreeMap<ObjectID, Vec<(SequenceNumber, TransactionDigest)>>, ReplayEngineError>
1335 {
1336 let system_package_ids = Self::system_package_ids(
1337 *self
1338 .protocol_version_epoch_table
1339 .keys()
1340 .peekable()
1341 .last()
1342 .expect("Protocol version epoch table not populated"),
1343 );
1344 let mut system_package_objs = self.multi_download_latest(&system_package_ids).await?;
1345
1346 let mut mapping = BTreeMap::new();
1347
1348 while !system_package_objs.is_empty() {
1350 let previous_txs: Vec<_> = system_package_objs
1352 .iter()
1353 .map(|o| (o.compute_object_reference(), o.previous_transaction))
1354 .collect();
1355
1356 previous_txs.iter().for_each(|((id, ver, _), tx)| {
1357 mapping.entry(*id).or_insert(vec![]).push((*ver, *tx));
1358 });
1359
1360 let previous_ver_refs: Vec<_> = previous_txs
1363 .iter()
1364 .filter_map(|(q, _)| {
1365 let prev_ver = u64::from(q.1) - 1;
1366 if prev_ver == 0 {
1367 None
1368 } else {
1369 Some((q.0, SequenceNumber::from(prev_ver)))
1370 }
1371 })
1372 .collect();
1373 system_package_objs = match self.multi_download(&previous_ver_refs).await {
1374 Ok(packages) => packages,
1375 Err(ReplayEngineError::ObjectNotExist { id }) => {
1376 warn!(
1380 "Object {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1381 id
1382 );
1383 break;
1384 }
1385 Err(ReplayEngineError::ObjectVersionNotFound { id, version }) => {
1386 warn!(
1390 "Object {} at version {} does not exist on RPC server. This might be due to pruning. Historical replays might not work",
1391 id, version
1392 );
1393 break;
1394 }
1395 Err(ReplayEngineError::ObjectVersionTooHigh {
1396 id,
1397 asked_version,
1398 latest_version,
1399 }) => {
1400 warn!(
1401 "Object {} at version {} does not exist on RPC server. Latest version is {}. This might be due to pruning. Historical replays might not work",
1402 id, asked_version, latest_version
1403 );
1404 break;
1405 }
1406 Err(ReplayEngineError::ObjectDeleted {
1407 id,
1408 version,
1409 digest,
1410 }) => {
1411 warn!(
1415 "Object {} at version {} digest {} deleted from RPC server. This might be due to pruning. Historical replays might not work",
1416 id, version, digest
1417 );
1418 break;
1419 }
1420 Err(e) => return Err(e),
1421 };
1422 }
1423 Ok(mapping)
1424 }
1425
1426 pub async fn get_protocol_config(
1427 &self,
1428 epoch_id: EpochId,
1429 chain: Chain,
1430 ) -> Result<ProtocolConfig, ReplayEngineError> {
1431 match self.protocol_version {
1432 Some(x) if x < 0 => Ok(ProtocolConfig::get_for_max_version_UNSAFE()),
1433 Some(v) => Ok(ProtocolConfig::get_for_version((v as u64).into(), chain)),
1434 None => self
1435 .protocol_version_epoch_table
1436 .iter()
1437 .rev()
1438 .find(|(_, rg)| epoch_id >= rg.epoch_start)
1439 .map(|(p, _rg)| Ok(ProtocolConfig::get_for_version((*p).into(), chain)))
1440 .unwrap_or_else(|| {
1441 Err(ReplayEngineError::ProtocolVersionNotFound { epoch: epoch_id })
1442 }),
1443 }
1444 }
1445
1446 pub async fn checkpoints_for_epoch(
1447 &self,
1448 epoch_id: u64,
1449 ) -> Result<(u64, u64), ReplayEngineError> {
1450 let epoch_change_events = self
1451 .fetcher
1452 .get_epoch_change_events(true)
1453 .await?
1454 .into_iter()
1455 .collect::<Vec<_>>();
1456 let (start_checkpoint, start_epoch_idx) = if epoch_id == 0 {
1457 (0, 1)
1458 } else {
1459 let idx = epoch_change_events
1460 .iter()
1461 .position(|ev| match extract_epoch_and_version(ev.clone()) {
1462 Ok((epoch, _)) => epoch == epoch_id,
1463 Err(_) => false,
1464 })
1465 .ok_or(ReplayEngineError::EventNotFound { epoch: epoch_id })?;
1466 let epoch_change_tx = epoch_change_events[idx].id.tx_digest;
1467 (
1468 self.fetcher
1469 .get_transaction(&epoch_change_tx)
1470 .await?
1471 .checkpoint
1472 .unwrap_or_else(|| {
1473 panic!(
1474 "Checkpoint for transaction {} not present. Could be due to pruning",
1475 epoch_change_tx
1476 )
1477 }),
1478 idx,
1479 )
1480 };
1481
1482 let next_epoch_change_tx = epoch_change_events
1483 .get(start_epoch_idx + 1)
1484 .map(|v| v.id.tx_digest)
1485 .ok_or(ReplayEngineError::UnableToDetermineCheckpoint { epoch: epoch_id })?;
1486
1487 let next_epoch_checkpoint = self
1488 .fetcher
1489 .get_transaction(&next_epoch_change_tx)
1490 .await?
1491 .checkpoint
1492 .unwrap_or_else(|| {
1493 panic!(
1494 "Checkpoint for transaction {} not present. Could be due to pruning",
1495 next_epoch_change_tx
1496 )
1497 });
1498
1499 Ok((start_checkpoint, next_epoch_checkpoint - 1))
1500 }
1501
1502 pub async fn get_epoch_start_timestamp_and_rgp(
1503 &self,
1504 epoch_id: u64,
1505 tx_digest: &TransactionDigest,
1506 ) -> Result<(u64, u64), ReplayEngineError> {
1507 if epoch_id == 0 {
1508 return Err(ReplayEngineError::TransactionNotSupported {
1509 digest: *tx_digest,
1510 reason: "Transactions from epoch 0 not supported".to_string(),
1511 });
1512 }
1513 self.fetcher
1514 .get_epoch_start_timestamp_and_rgp(epoch_id)
1515 .await
1516 }
1517
1518 fn add_config_objects_if_needed(
1519 &self,
1520 status: &SuiExecutionStatus,
1521 ) -> Vec<(ObjectID, SequenceNumber)> {
1522 match parse_effect_error_for_denied_coins(status) {
1523 Some(coin_type) => {
1524 let Some(mut config_id_and_version) = self.config_and_versions.clone() else {
1525 panic!(
1526 "Need to specify the config object ID and version for '{coin_type}' in order to replay this transaction"
1527 );
1528 };
1529 if !config_id_and_version
1531 .iter()
1532 .any(|(id, _)| id == &SUI_DENY_LIST_OBJECT_ID)
1533 {
1534 let deny_list_oid_version = self.download_latest_object(&SUI_DENY_LIST_OBJECT_ID)
1535 .ok()
1536 .flatten()
1537 .expect("Unable to download the deny list object for a transaction that requires it")
1538 .version();
1539 config_id_and_version.push((SUI_DENY_LIST_OBJECT_ID, deny_list_oid_version));
1540 }
1541 config_id_and_version
1542 }
1543 None => vec![],
1544 }
1545 }
1546
1547 async fn resolve_tx_components(
1548 &self,
1549 tx_digest: &TransactionDigest,
1550 ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1551 assert!(self.is_remote_replay());
1552 let tx_info = self.fetcher.get_transaction(tx_digest).await?;
1554 let sender = match tx_info.clone().transaction.unwrap().data {
1555 sui_json_rpc_types::SuiTransactionBlockData::V1(tx) => tx.sender,
1556 };
1557 let SuiTransactionBlockEffects::V1(effects) = tx_info.clone().effects.unwrap();
1558
1559 let config_objects = self.add_config_objects_if_needed(effects.status());
1560
1561 let raw_tx_bytes = tx_info.clone().raw_transaction;
1562 let orig_tx: SenderSignedData = bcs::from_bytes(&raw_tx_bytes).unwrap();
1563 let input_objs = orig_tx
1564 .transaction_data()
1565 .input_objects()
1566 .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1567 let tx_kind_orig = orig_tx.transaction_data().kind();
1568
1569 let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1571
1572 let shared_object_refs: Vec<ObjectRef> = effects
1573 .shared_objects()
1574 .iter()
1575 .map(|so_ref| {
1576 if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1577 unimplemented!(
1578 "Replay of deleted shared object transactions is not supported yet"
1579 );
1580 } else {
1581 so_ref.to_object_ref()
1582 }
1583 })
1584 .collect();
1585 let gas_data = match tx_info.clone().transaction.unwrap().data {
1586 sui_json_rpc_types::SuiTransactionBlockData::V1(tx) => tx.gas_data,
1587 };
1588 let gas_object_refs: Vec<_> = gas_data
1589 .payment
1590 .iter()
1591 .map(|obj_ref| obj_ref.to_object_ref())
1592 .collect();
1593 let receiving_objs = orig_tx
1594 .transaction_data()
1595 .receiving_objects()
1596 .into_iter()
1597 .map(|(obj_id, version, _)| (obj_id, version))
1598 .collect();
1599
1600 let epoch_id = effects.executed_epoch;
1601 let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1602
1603 let (epoch_start_timestamp, reference_gas_price) = self
1605 .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1606 .await?;
1607
1608 Ok(OnChainTransactionInfo {
1609 kind: tx_kind_orig.clone(),
1610 sender,
1611 modified_at_versions,
1612 input_objects: input_objs,
1613 shared_object_refs,
1614 gas: gas_object_refs,
1615 gas_owner: (gas_data.owner != sender).then_some(gas_data.owner),
1616 gas_price: gas_data.price,
1617 gas_budget: gas_data.budget,
1618 executed_epoch: epoch_id,
1619 dependencies: effects.dependencies().to_vec(),
1620 effects: SuiTransactionBlockEffects::V1(effects),
1621 receiving_objs,
1622 config_objects,
1623 protocol_version: self.get_protocol_config(epoch_id, chain).await?.version,
1626 tx_digest: *tx_digest,
1627 epoch_start_timestamp,
1628 sender_signed_data: orig_tx.clone(),
1629 reference_gas_price,
1630 chain,
1631 })
1632 }
1633
1634 async fn resolve_tx_components_from_dump(
1635 &self,
1636 tx_digest: &TransactionDigest,
1637 ) -> Result<OnChainTransactionInfo, ReplayEngineError> {
1638 assert!(!self.is_remote_replay());
1639
1640 let dp = self.fetcher.as_node_state_dump();
1641
1642 let sender = dp
1643 .node_state_dump
1644 .sender_signed_data
1645 .transaction_data()
1646 .sender();
1647 let orig_tx = dp.node_state_dump.sender_signed_data.clone();
1648 let effects = dp.node_state_dump.computed_effects.clone();
1649 let effects = SuiTransactionBlockEffects::try_from(effects).unwrap();
1650 let config_objects = self.add_config_objects_if_needed(effects.status());
1652
1653 let input_objs = orig_tx
1657 .transaction_data()
1658 .input_objects()
1659 .map_err(|e| ReplayEngineError::UserInputError { err: e })?;
1660 let tx_kind_orig = orig_tx.transaction_data().kind();
1661
1662 let modified_at_versions: Vec<(ObjectID, SequenceNumber)> = effects.modified_at_versions();
1664
1665 let shared_object_refs: Vec<ObjectRef> = effects
1666 .shared_objects()
1667 .iter()
1668 .map(|so_ref| {
1669 if so_ref.digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1670 unimplemented!(
1671 "Replay of deleted shared object transactions is not supported yet"
1672 );
1673 } else {
1674 so_ref.to_object_ref()
1675 }
1676 })
1677 .collect();
1678 let receiving_objs = orig_tx
1679 .transaction_data()
1680 .receiving_objects()
1681 .into_iter()
1682 .map(|(obj_id, version, _)| (obj_id, version))
1683 .collect();
1684
1685 let epoch_id = dp.node_state_dump.executed_epoch;
1686
1687 let chain = chain_from_chain_id(self.fetcher.get_chain_id().await?.as_str());
1688
1689 let protocol_config =
1690 ProtocolConfig::get_for_version(dp.node_state_dump.protocol_version.into(), chain);
1691 let (epoch_start_timestamp, reference_gas_price) = self
1693 .get_epoch_start_timestamp_and_rgp(epoch_id, tx_digest)
1694 .await?;
1695 let gas_data = orig_tx.transaction_data().gas_data();
1696 let gas_object_refs: Vec<_> = gas_data.clone().payment.into_iter().collect();
1697
1698 Ok(OnChainTransactionInfo {
1699 kind: tx_kind_orig.clone(),
1700 sender,
1701 modified_at_versions,
1702 input_objects: input_objs,
1703 shared_object_refs,
1704 gas: gas_object_refs,
1705 gas_owner: (gas_data.owner != sender).then_some(gas_data.owner),
1706 gas_price: gas_data.price,
1707 gas_budget: gas_data.budget,
1708 executed_epoch: epoch_id,
1709 dependencies: effects.dependencies().to_vec(),
1710 effects,
1711 receiving_objs,
1712 config_objects,
1713 protocol_version: protocol_config.version,
1714 tx_digest: *tx_digest,
1715 epoch_start_timestamp,
1716 sender_signed_data: orig_tx.clone(),
1717 reference_gas_price,
1718 chain,
1719 })
1720 }
1721
1722 async fn resolve_download_input_objects(
1723 &mut self,
1724 tx_info: &OnChainTransactionInfo,
1725 deleted_shared_objects: Vec<ObjectRef>,
1726 ) -> Result<InputObjects, ReplayEngineError> {
1727 let mut package_inputs = vec![];
1729 let mut imm_owned_inputs = vec![];
1730 let mut shared_inputs = vec![];
1731 let mut deleted_shared_info_map = BTreeMap::new();
1732
1733 if !deleted_shared_objects.is_empty() {
1736 for tx_digest in tx_info.dependencies.iter() {
1737 let tx_info = self.resolve_tx_components(tx_digest).await?;
1738 for (obj_id, version, _) in tx_info.shared_object_refs.iter() {
1739 deleted_shared_info_map.insert(*obj_id, (tx_info.tx_digest, *version));
1740 }
1741 }
1742 }
1743
1744 tx_info
1745 .input_objects
1746 .iter()
1747 .map(|kind| match kind {
1748 InputObjectKind::MovePackage(i) => {
1749 package_inputs.push(*i);
1750 Ok(())
1751 }
1752 InputObjectKind::ImmOrOwnedMoveObject(o_ref) => {
1753 imm_owned_inputs.push((o_ref.0, o_ref.1));
1754 Ok(())
1755 }
1756 InputObjectKind::SharedMoveObject {
1757 id,
1758 initial_shared_version: _,
1759 mutability: _,
1760 } if !deleted_shared_info_map.contains_key(id) => {
1761 if let Some(o) = self
1763 .storage
1764 .live_objects_store
1765 .lock()
1766 .expect("Can't lock")
1767 .get(id)
1768 {
1769 shared_inputs.push(o.clone());
1770 Ok(())
1771 } else {
1772 Err(ReplayEngineError::InternalCacheInvariantViolation {
1773 id: *id,
1774 version: None,
1775 })
1776 }
1777 }
1778 _ => Ok(()),
1779 })
1780 .collect::<Result<Vec<_>, _>>()?;
1781
1782 let mut in_objs = self.multi_download_and_store(&imm_owned_inputs).await?;
1784
1785 in_objs.extend(
1788 self.multi_download_relevant_packages_and_store(
1789 package_inputs,
1790 tx_info.protocol_version.as_u64(),
1791 )
1792 .await?,
1793 );
1794 in_objs.extend(shared_inputs);
1796
1797 let resolved_input_objs = tx_info
1799 .input_objects
1800 .iter()
1801 .flat_map(|kind| match kind {
1802 InputObjectKind::MovePackage(i) => {
1803 Some(ObjectReadResult::new(
1805 *kind,
1806 self.storage
1807 .package_cache
1808 .lock()
1809 .expect("Cannot lock")
1810 .get(i)
1811 .unwrap_or(
1812 &self
1813 .download_latest_object(i)
1814 .expect("Object download failed")
1815 .expect("Object not found on chain"),
1816 )
1817 .clone()
1818 .into(),
1819 ))
1820 }
1821 InputObjectKind::ImmOrOwnedMoveObject(o_ref) => Some(ObjectReadResult::new(
1822 *kind,
1823 self.storage
1824 .object_version_cache
1825 .lock()
1826 .expect("Cannot lock")
1827 .get(&(o_ref.0, o_ref.1))
1828 .unwrap()
1829 .clone()
1830 .into(),
1831 )),
1832 InputObjectKind::SharedMoveObject { id, .. }
1833 if !deleted_shared_info_map.contains_key(id) =>
1834 {
1835 Some(ObjectReadResult::new(
1837 *kind,
1838 self.storage
1839 .live_objects_store
1840 .lock()
1841 .expect("Can't lock")
1842 .get(id)
1843 .unwrap()
1844 .clone()
1845 .into(),
1846 ))
1847 }
1848 InputObjectKind::SharedMoveObject { id, .. } => {
1849 let (digest, version) = deleted_shared_info_map.get(id).unwrap();
1850 Some(ObjectReadResult::new(
1851 *kind,
1852 ObjectReadResultKind::ObjectConsensusStreamEnded(*version, *digest),
1853 ))
1854 }
1855 })
1856 .collect();
1857
1858 Ok(InputObjects::new(resolved_input_objs))
1859 }
1860
1861 async fn initialize_execution_env_state(
1864 &mut self,
1865 tx_info: &OnChainTransactionInfo,
1866 ) -> Result<InputObjects, ReplayEngineError> {
1867 self.current_protocol_version = tx_info.protocol_version.as_u64();
1869
1870 self.multi_download_and_store(&tx_info.modified_at_versions)
1872 .await?;
1873
1874 let (shared_refs, deleted_shared_refs): (Vec<ObjectRef>, Vec<ObjectRef>) = tx_info
1875 .shared_object_refs
1876 .iter()
1877 .partition(|r| r.2 != ObjectDigest::OBJECT_DIGEST_DELETED);
1878
1879 let shared_refs: Vec<_> = shared_refs.iter().map(|r| (r.0, r.1)).collect();
1881 self.multi_download_and_store(&shared_refs).await?;
1882
1883 let gas_refs: Vec<_> = tx_info
1885 .gas
1886 .iter()
1887 .filter_map(|w| (w.0 != ObjectID::ZERO).then_some((w.0, w.1)))
1888 .collect();
1889 self.multi_download_and_store(&gas_refs).await?;
1890
1891 let input_objs = self
1893 .resolve_download_input_objects(tx_info, deleted_shared_refs)
1894 .await?;
1895
1896 self.multi_download_and_store(&tx_info.receiving_objs)
1898 .await?;
1899
1900 self.multi_download_and_store(&tx_info.config_objects)
1902 .await?;
1903
1904 let loaded_child_refs = self.fetch_loaded_child_refs(&tx_info.tx_digest).await?;
1907 self.multi_download_and_store(&loaded_child_refs).await?;
1908 tokio::task::yield_now().await;
1909
1910 Ok(input_objs)
1911 }
1912}
1913
1914impl BackingPackageStore for LocalExec {
1917 fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1920 fn inner(self_: &LocalExec, package_id: &ObjectID) -> SuiResult<Option<Object>> {
1921 self_
1923 .get_or_download_object(package_id, true )
1924 .map_err(|e| SuiErrorKind::Storage(e.to_string()).into())
1925 }
1926
1927 let res = inner(self, package_id);
1928 self.exec_store_events
1929 .lock()
1930 .expect("Unable to lock events list")
1931 .push(ExecutionStoreEvent::BackingPackageGetPackageObject {
1932 package_id: *package_id,
1933 result: res.clone(),
1934 });
1935 res.map(|o| o.map(PackageObject::new))
1936 }
1937}
1938
1939impl ChildObjectResolver for LocalExec {
1940 fn read_child_object(
1943 &self,
1944 parent: &ObjectID,
1945 child: &ObjectID,
1946 child_version_upper_bound: SequenceNumber,
1947 ) -> SuiResult<Option<Object>> {
1948 fn inner(
1949 self_: &LocalExec,
1950 parent: &ObjectID,
1951 child: &ObjectID,
1952 child_version_upper_bound: SequenceNumber,
1953 ) -> SuiResult<Option<Object>> {
1954 let child_object =
1955 match self_.download_object_by_upper_bound(child, child_version_upper_bound)? {
1956 None => return Ok(None),
1957 Some(o) => o,
1958 };
1959 let child_version = child_object.version();
1960 if child_object.version() > child_version_upper_bound {
1961 return Err(SuiErrorKind::Unknown(format!(
1962 "Invariant Violation. Replay loaded child_object {child} at version \
1963 {child_version} but expected the version to be <= {child_version_upper_bound}"
1964 ))
1965 .into());
1966 }
1967 let parent = *parent;
1968 if child_object.owner != Owner::ObjectOwner(parent.into()) {
1969 return Err(SuiErrorKind::InvalidChildObjectAccess {
1970 object: *child,
1971 given_parent: parent,
1972 actual_owner: child_object.owner.clone(),
1973 }
1974 .into());
1975 }
1976 Ok(Some(child_object))
1977 }
1978
1979 let res = inner(self, parent, child, child_version_upper_bound);
1980 self.exec_store_events
1981 .lock()
1982 .expect("Unable to lock events list")
1983 .push(
1984 ExecutionStoreEvent::ChildObjectResolverStoreReadChildObject {
1985 parent: *parent,
1986 child: *child,
1987 result: res.clone(),
1988 },
1989 );
1990 res
1991 }
1992
1993 fn get_object_received_at_version(
1994 &self,
1995 owner: &ObjectID,
1996 receiving_object_id: &ObjectID,
1997 receive_object_at_version: SequenceNumber,
1998 _epoch_id: EpochId,
1999 ) -> SuiResult<Option<Object>> {
2000 fn inner(
2001 self_: &LocalExec,
2002 owner: &ObjectID,
2003 receiving_object_id: &ObjectID,
2004 receive_object_at_version: SequenceNumber,
2005 ) -> SuiResult<Option<Object>> {
2006 let recv_object = match self_.get_object(receiving_object_id) {
2007 None => return Ok(None),
2008 Some(o) => o,
2009 };
2010 if recv_object.version() != receive_object_at_version {
2011 return Err(SuiErrorKind::Unknown(format!(
2012 "Invariant Violation. Replay loaded child_object {receiving_object_id} at version \
2013 {receive_object_at_version} but expected the version to be == {receive_object_at_version}"
2014 )).into());
2015 }
2016 if recv_object.owner != Owner::AddressOwner((*owner).into()) {
2017 return Ok(None);
2018 }
2019 Ok(Some(recv_object))
2020 }
2021
2022 let res = inner(self, owner, receiving_object_id, receive_object_at_version);
2023 self.exec_store_events
2024 .lock()
2025 .expect("Unable to lock events list")
2026 .push(ExecutionStoreEvent::ReceiveObject {
2027 owner: *owner,
2028 receive: *receiving_object_id,
2029 receive_at_version: receive_object_at_version,
2030 result: res.clone(),
2031 });
2032 res
2033 }
2034}
2035
2036impl ParentSync for LocalExec {
2037 fn get_latest_parent_entry_ref_deprecated(&self, object_id: ObjectID) -> Option<ObjectRef> {
2040 fn inner(self_: &LocalExec, object_id: ObjectID) -> Option<ObjectRef> {
2041 if let Some(v) = self_
2042 .storage
2043 .live_objects_store
2044 .lock()
2045 .expect("Can't lock")
2046 .get(&object_id)
2047 {
2048 return Some(v.compute_object_reference());
2049 }
2050 None
2051 }
2052 let res = inner(self, object_id);
2053 self.exec_store_events
2054 .lock()
2055 .expect("Unable to lock events list")
2056 .push(
2057 ExecutionStoreEvent::ParentSyncStoreGetLatestParentEntryRef {
2058 object_id,
2059 result: res,
2060 },
2061 );
2062 res
2063 }
2064}
2065
2066impl ModuleResolver for LocalExec {
2067 type Error = SuiError;
2068
2069 fn get_module(&self, module_id: &ModuleId) -> SuiResult<Option<Vec<u8>>> {
2072 fn inner(self_: &LocalExec, module_id: &ModuleId) -> SuiResult<Option<Vec<u8>>> {
2073 get_module(self_, module_id)
2074 }
2075
2076 let res = inner(self, module_id);
2077 self.exec_store_events
2078 .lock()
2079 .expect("Unable to lock events list")
2080 .push(ExecutionStoreEvent::ModuleResolverGetModule {
2081 module_id: module_id.clone(),
2082 result: res.clone(),
2083 });
2084 res
2085 }
2086
2087 fn get_packages_static<const N: usize>(
2088 &self,
2089 ids: [AccountAddress; N],
2090 ) -> Result<[Option<SerializedPackage>; N], Self::Error> {
2091 let mut res = [const { None }; N];
2092 for i in 0..N {
2093 res[i] = get_package(self, &ids[i].into())?;
2094 }
2095 Ok(res)
2096 }
2097
2098 fn get_packages<'a>(
2099 &self,
2100 ids: impl ExactSizeIterator<Item = &'a AccountAddress>,
2101 ) -> Result<Vec<Option<SerializedPackage>>, Self::Error> {
2102 ids.map(|id| get_package(self, &(*id).into())).collect()
2103 }
2104}
2105
2106impl ModuleResolver for &mut LocalExec {
2107 type Error = SuiError;
2108
2109 fn get_module(&self, module_id: &ModuleId) -> SuiResult<Option<Vec<u8>>> {
2110 (**self).get_module(module_id)
2112 }
2113
2114 fn get_packages<'a>(
2115 &self,
2116 ids: impl ExactSizeIterator<Item = &'a AccountAddress>,
2117 ) -> Result<Vec<Option<SerializedPackage>>, Self::Error> {
2118 (**self).get_packages(ids)
2119 }
2120
2121 fn get_packages_static<const N: usize>(
2122 &self,
2123 ids: [AccountAddress; N],
2124 ) -> Result<[Option<SerializedPackage>; N], Self::Error> {
2125 (**self).get_packages_static(ids)
2126 }
2127}
2128
2129impl ObjectStore for LocalExec {
2130 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
2133 let res = self
2134 .storage
2135 .live_objects_store
2136 .lock()
2137 .expect("Can't lock")
2138 .get(object_id)
2139 .cloned();
2140 self.exec_store_events
2141 .lock()
2142 .expect("Unable to lock events list")
2143 .push(ExecutionStoreEvent::ObjectStoreGetObject {
2144 object_id: *object_id,
2145 result: Ok(res.clone()),
2146 });
2147 res
2148 }
2149
2150 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
2153 let res = self
2154 .storage
2155 .live_objects_store
2156 .lock()
2157 .expect("Can't lock")
2158 .get(object_id)
2159 .and_then(|obj| {
2160 if obj.version() == version {
2161 Some(obj.clone())
2162 } else {
2163 None
2164 }
2165 });
2166
2167 self.exec_store_events
2168 .lock()
2169 .expect("Unable to lock events list")
2170 .push(ExecutionStoreEvent::ObjectStoreGetObjectByKey {
2171 object_id: *object_id,
2172 version,
2173 result: Ok(res.clone()),
2174 });
2175
2176 res
2177 }
2178}
2179
2180impl ObjectStore for &mut LocalExec {
2181 fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
2182 (**self).get_object(object_id)
2184 }
2185
2186 fn get_object_by_key(&self, object_id: &ObjectID, version: VersionNumber) -> Option<Object> {
2187 (**self).get_object_by_key(object_id, version)
2189 }
2190}
2191
2192impl GetModule for LocalExec {
2193 type Error = SuiError;
2194 type Item = CompiledModule;
2195
2196 fn get_module_by_id(&self, id: &ModuleId) -> SuiResult<Option<Self::Item>> {
2197 let res = get_module_by_id(self, id);
2198
2199 self.exec_store_events
2200 .lock()
2201 .expect("Unable to lock events list")
2202 .push(ExecutionStoreEvent::GetModuleGetModuleByModuleId {
2203 id: id.clone(),
2204 result: res.clone(),
2205 });
2206 res
2207 }
2208}
2209
2210pub fn get_executor(
2213 executor_version_override: Option<i64>,
2214 protocol_config: &ProtocolConfig,
2215 _expensive_safety_check_config: ExpensiveSafetyCheckConfig,
2216) -> Arc<dyn Executor + Send + Sync> {
2217 let protocol_config = executor_version_override
2218 .map(|q| {
2219 let ver = if q < 0 {
2220 ProtocolConfig::get_for_max_version_UNSAFE().execution_version()
2221 } else {
2222 q as u64
2223 };
2224
2225 let mut c = protocol_config.clone();
2226 c.set_execution_version_for_testing(ver);
2227 c
2228 })
2229 .unwrap_or(protocol_config.clone());
2230
2231 let silent = true;
2232 sui_execution::executor(&protocol_config, silent)
2233 .expect("Creating an executor should not fail here")
2234}
2235
2236fn parse_effect_error_for_denied_coins(status: &SuiExecutionStatus) -> Option<String> {
2237 let SuiExecutionStatus::Failure { error } = status else {
2238 return None;
2239 };
2240 parse_denied_error_string(error)
2241}
2242
2243fn parse_denied_error_string(error: &str) -> Option<String> {
2244 let regulated_regex = regex::Regex::new(
2245 r#"CoinTypeGlobalPause.*?"(.*?)"|AddressDeniedForCoin.*coin_type:.*?"(.*?)""#,
2246 )
2247 .unwrap();
2248
2249 let caps = regulated_regex.captures(error)?;
2250 Some(caps.get(1).or(caps.get(2))?.as_str().to_string())
2251}
2252
2253#[cfg(test)]
2254mod tests {
2255 use super::parse_denied_error_string;
2256 #[test]
2257 fn test_regex_regulated_coin_errors() {
2258 let test_bank = vec![
2259 "CoinTypeGlobalPause { coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2260 "AddressDeniedForCoin { address: B, coin_type: \"39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN\" }",
2261 ];
2262 let expected_string =
2263 "39a572c071784c280ee8ee8c683477e059d1381abc4366f9a58ffac3f350a254::rcoin::RCOIN";
2264
2265 for test in &test_bank {
2266 assert!(parse_denied_error_string(test).unwrap() == expected_string);
2267 }
2268 }
2269}