1use crate::accumulators::coin_reservations::CachingCoinReservationResolver;
6use crate::accumulators::funds_read::AccountFundsRead;
7use crate::accumulators::object_funds_checker::ObjectFundsChecker;
8use crate::accumulators::object_funds_checker::metrics::ObjectFundsCheckerMetrics;
9use crate::accumulators::transaction_rewriting::rewrite_transaction_for_coin_reservations;
10use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
11use crate::checkpoints::CheckpointBuilderError;
12use crate::checkpoints::CheckpointBuilderResult;
13use crate::congestion_tracker::CongestionTracker;
14use crate::execution_cache::ExecutionCacheTraitPointers;
15use crate::execution_cache::TransactionCacheRead;
16use crate::execution_cache::writeback_cache::WritebackCache;
17use crate::execution_scheduler::ExecutionScheduler;
18use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
19use crate::gasless_rate_limiter::ConsensusGaslessCounter;
20use crate::jsonrpc_index::CoinIndexKey2;
21use crate::rpc_index::RpcIndexStore;
22use crate::traffic_controller::TrafficController;
23use crate::traffic_controller::metrics::TrafficControllerMetrics;
24use crate::transaction_outputs::TransactionOutputs;
25use crate::verify_indexes::{fix_indexes, verify_indexes};
26use arc_swap::{ArcSwap, ArcSwapOption, Guard};
27use async_trait::async_trait;
28use authority_per_epoch_store::CertLockGuard;
29use dashmap::DashMap;
30use fastcrypto::encoding::Base58;
31use fastcrypto::encoding::Encoding;
32use fastcrypto::hash::MultisetHash;
33use itertools::Itertools;
34use move_binary_format::CompiledModule;
35use move_binary_format::binary_config::BinaryConfig;
36use move_core_types::annotated_value::MoveStructLayout;
37use move_core_types::language_storage::ModuleId;
38use mysten_common::ZipDebugEqIteratorExt;
39use mysten_common::{assert_reachable, fatal};
40use parking_lot::Mutex;
41use prometheus::{
42 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
43 register_histogram_vec_with_registry, register_histogram_with_registry,
44 register_int_counter_vec_with_registry, register_int_counter_with_registry,
45 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
46};
47use serde::de::DeserializeOwned;
48use serde::{Deserialize, Serialize};
49use shared_object_version_manager::AssignedVersions;
50use shared_object_version_manager::Schedulable;
51use std::collections::BTreeMap;
52use std::collections::BTreeSet;
53use std::fs::File;
54use std::io::Write;
55use std::path::{Path, PathBuf};
56use std::sync::atomic::Ordering;
57use std::time::Duration;
58use std::time::Instant;
59use std::time::SystemTime;
60use std::time::UNIX_EPOCH;
61use std::{
62 collections::{HashMap, HashSet},
63 fs,
64 pin::Pin,
65 str::FromStr,
66 sync::Arc,
67 vec,
68};
69use sui_config::NodeConfig;
70use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
71use sui_execution::Executor;
72use sui_protocol_config::PerObjectCongestionControlMode;
73use sui_types::accumulator_root::AccumulatorObjId;
74use sui_types::crypto::RandomnessRound;
75use sui_types::dynamic_field::visitor as DFV;
76use sui_types::execution::ExecutionOutput;
77use sui_types::execution::ExecutionTimeObservationKey;
78use sui_types::execution::ExecutionTiming;
79use sui_types::execution_params::ExecutionOrEarlyError;
80use sui_types::execution_params::FundsWithdrawStatus;
81use sui_types::execution_params::get_early_execution_error;
82use sui_types::execution_status::ExecutionStatus;
83use sui_types::inner_temporary_store::PackageStoreWithFallback;
84use sui_types::layout_resolver::LayoutResolver;
85use sui_types::layout_resolver::into_struct_layout;
86use sui_types::messages_consensus::AuthorityCapabilitiesV2;
87use sui_types::node_role::NodeRole;
88use sui_types::object::bounded_visitor::BoundedVisitor;
89use sui_types::storage::ChildObjectResolver;
90use sui_types::storage::InputKey;
91use sui_types::storage::TrackingBackingStore;
92use sui_types::traffic_control::{
93 PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
94};
95use sui_types::transaction_executor::SimulateTransactionResult;
96use sui_types::transaction_executor::TransactionChecks;
97use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
98use tap::TapFallible;
99use tokio::sync::RwLock;
100use tokio::sync::mpsc::unbounded_channel;
101use tokio::sync::watch::error::RecvError;
102use tokio::sync::{mpsc, oneshot};
103use tokio::task::JoinHandle;
104use tokio::time::timeout;
105use tracing::{debug, error, info, instrument, warn};
106
107use self::authority_store::ExecutionLockWriteGuard;
108use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
109pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
110use mysten_metrics::{monitored_scope, spawn_monitored_task};
111
112use crate::jsonrpc_index::IndexStore;
113use crate::jsonrpc_index::{
114 CoinInfo, IndexStoreCacheUpdates, IndexStoreCacheUpdatesWithLocks, ObjectIndexChanges,
115};
116use mysten_common::{debug_fatal, debug_fatal_no_invariant};
117use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
118use sui_config::genesis::Genesis;
119use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
120use sui_framework::{BuiltInFramework, SystemPackage};
121use sui_json_rpc_types::{
122 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
123 SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
124 SuiTransactionBlockEvents, TransactionFilter,
125};
126use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
127use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
128use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
129use sui_types::accumulator_root::AccumulatorValue;
130use sui_types::authenticator_state::get_authenticator_state;
131use sui_types::balance::Balance;
132use sui_types::coin_reservation;
133use sui_types::committee::{EpochId, ProtocolVersion};
134use sui_types::crypto::{AuthoritySignInfo, Signer, default_hash};
135use sui_types::deny_list_v1::check_coin_deny_list_v1;
136use sui_types::digests::ChainIdentifier;
137use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
138use sui_types::effects::{
139 InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
140 TransactionEvents, VerifiedSignedTransactionEffects,
141};
142use sui_types::error::{ExecutionError, SuiErrorKind, UserInputError};
143use sui_types::event::EventID;
144use sui_types::executable_transaction::VerifiedExecutableTransaction;
145use sui_types::execution_status::ExecutionErrorKind;
146use sui_types::gas::{GasCostSummary, SuiGasStatus};
147use sui_types::inner_temporary_store::{
148 InnerTemporaryStore, ObjectMap, TemporaryModuleResolver, TxCoins, WrittenObjects,
149};
150use sui_types::message_envelope::Message;
151use sui_types::messages_checkpoint::{
152 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
153 CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
154 CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
155 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
156};
157use sui_types::messages_grpc::{
158 LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
159 TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
160};
161use sui_types::metrics::{BytecodeVerifierMetrics, ExecutionMetrics};
162use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
163use sui_types::signature::GenericSignature;
164use sui_types::storage::{
165 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
166};
167use sui_types::sui_system_state::SuiSystemStateTrait;
168use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
169use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
170use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
171use sui_types::{
172 SUI_SYSTEM_ADDRESS,
173 base_types::*,
174 committee::Committee,
175 crypto::AuthoritySignature,
176 error::{SuiError, SuiResult},
177 object::{Object, ObjectRead},
178 transaction::*,
179};
180use sui_types::{TypeTag, is_system_package};
181use typed_store::TypedStoreError;
182use typed_store::rocks::StagedBatch;
183
184use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
185use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
186use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
187use crate::authority::authority_store_pruner::{
188 AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
189};
190use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
191use crate::authority::epoch_start_configuration::EpochStartConfiguration;
192use crate::checkpoints::CheckpointStore;
193use crate::epoch::committee_store::CommitteeStore;
194use crate::execution_cache::{
195 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
196 ObjectCacheRead, StateSyncAPI,
197};
198use crate::execution_driver::execution_process;
199use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
200use crate::metrics::LatencyObserver;
201use crate::metrics::RateTracker;
202use crate::module_cache_metrics::ResolverMetrics;
203use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
204use crate::stake_aggregator::StakeAggregator;
205use crate::subscription_handler::SubscriptionHandler;
206use crate::transaction_input_loader::TransactionInputLoader;
207
208#[cfg(msim)]
209pub use crate::checkpoints::checkpoint_executor::utils::{
210 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
211};
212
213#[cfg(msim)]
214use sui_types::committee::CommitteeTrait;
215use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
216
217#[cfg(test)]
218#[path = "unit_tests/authority_tests.rs"]
219pub mod authority_tests;
220
221#[cfg(test)]
222#[path = "unit_tests/transaction_tests.rs"]
223pub mod transaction_tests;
224
225#[cfg(test)]
226#[path = "unit_tests/batch_transaction_tests.rs"]
227mod batch_transaction_tests;
228
229#[cfg(test)]
230#[path = "unit_tests/move_integration_tests.rs"]
231pub mod move_integration_tests;
232
233#[cfg(test)]
234#[path = "unit_tests/gas_tests.rs"]
235mod gas_tests;
236
237#[cfg(test)]
238#[path = "unit_tests/gas_data_tests.rs"]
239mod gas_data_tests;
240
241#[cfg(test)]
242#[path = "unit_tests/batch_verification_tests.rs"]
243mod batch_verification_tests;
244
245#[cfg(test)]
246#[path = "unit_tests/coin_deny_list_tests.rs"]
247mod coin_deny_list_tests;
248
249#[cfg(test)]
250#[path = "unit_tests/auth_unit_test_utils.rs"]
251pub mod auth_unit_test_utils;
252
253pub mod authority_test_utils;
254
255pub mod authority_per_epoch_store;
256pub mod authority_per_epoch_store_pruner;
257
258pub mod authority_store_pruner;
259pub mod authority_store_tables;
260pub mod authority_store_types;
261pub mod congestion_log;
262pub mod consensus_tx_status_cache;
263pub(crate) mod epoch_marker_key;
264pub mod epoch_start_configuration;
265pub mod execution_time_estimator;
266pub mod shared_object_congestion_tracker;
267pub mod shared_object_version_manager;
268pub mod submitted_transaction_cache;
269pub mod test_authority_builder;
270pub mod transaction_deferral;
271pub mod transaction_reject_reason_cache;
272mod weighted_moving_average;
273
274pub(crate) mod authority_store;
275pub mod backpressure;
276
277pub struct AuthorityMetrics {
279 tx_orders: IntCounter,
280 total_certs: IntCounter,
281 total_effects: IntCounter,
282 pub shared_obj_tx: IntCounter,
284 sponsored_tx: IntCounter,
285 tx_already_processed: IntCounter,
286 num_input_objs: Histogram,
287 num_shared_objects: Histogram,
289 batch_size: Histogram,
290
291 authority_state_handle_vote_transaction_latency: Histogram,
292
293 internal_execution_latency: Histogram,
294 execution_load_input_objects_latency: Histogram,
295 prepare_certificate_latency: Histogram,
296 commit_certificate_latency: Histogram,
297 db_checkpoint_latency: Histogram,
298
299 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
301 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
302 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
303 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
304
305 pub(crate) execution_driver_executed_transactions: IntCounter,
306 pub(crate) execution_driver_paused_transactions: IntCounter,
307 pub(crate) execution_driver_dispatch_queue: IntGauge,
308 pub(crate) execution_queueing_delay_s: Histogram,
309 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
310 pub(crate) execution_gas_latency_ratio: Histogram,
311
312 pub(crate) skipped_consensus_txns: IntCounter,
313 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
314 pub(crate) consensus_handler_duplicate_tx_count: Histogram,
315
316 pub(crate) authority_overload_status: IntGauge,
317 pub(crate) authority_load_shedding_percentage: IntGauge,
318
319 pub(crate) transaction_overload_sources: IntCounterVec,
320
321 post_processing_total_events_emitted: IntCounter,
323 post_processing_total_tx_indexed: IntCounter,
324 post_processing_total_tx_had_event_processed: IntCounter,
325 post_processing_total_failures: IntCounter,
326
327 pub consensus_handler_processed: IntCounterVec,
329 pub consensus_handler_transaction_sizes: HistogramVec,
330 pub consensus_handler_deferred_transactions: IntCounter,
331 pub consensus_handler_congested_transactions: IntCounter,
332 pub consensus_handler_unpaid_amplification_deferrals: IntCounter,
333 pub consensus_handler_cancelled_transactions: IntCounter,
334 pub consensus_handler_max_object_costs: IntGaugeVec,
335 pub consensus_committed_subdags: IntCounterVec,
336 pub accumulator_deposits: IntCounter,
337 pub accumulator_withdrawals: IntCounter,
338 pub consensus_committed_messages: IntGaugeVec,
339 pub consensus_committed_user_transactions: IntGaugeVec,
340 pub consensus_finalized_user_transactions: IntGaugeVec,
341 pub consensus_rejected_user_transactions: IntGaugeVec,
342 pub consensus_calculated_throughput: IntGauge,
343 pub consensus_calculated_throughput_profile: IntGauge,
344 pub consensus_block_handler_block_processed: IntCounter,
345 pub consensus_block_handler_txn_processed: IntCounterVec,
346 pub consensus_block_handler_fastpath_executions: IntCounter,
347 pub consensus_timestamp_bias: Histogram,
348
349 pub execution_metrics: Arc<ExecutionMetrics>,
350
351 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
353
354 pub zklogin_sig_count: IntCounter,
356 pub multisig_sig_count: IntCounter,
358
359 pub execution_queueing_latency: LatencyObserver,
362
363 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
370
371 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
374}
375
376const POSITIVE_INT_BUCKETS: &[f64] = &[
378 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
379 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
380 7000000., 10000000.,
381];
382
383const LATENCY_SEC_BUCKETS: &[f64] = &[
384 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
385 10., 20., 30., 60., 90.,
386];
387
388const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
390 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
391 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
392];
393
394const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
397 -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
398 2.0, 10.0, 30.0,
399];
400
401const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
402 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
403 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
404];
405
406pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
407
408pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
412
413impl AuthorityMetrics {
414 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
415 Self {
416 tx_orders: register_int_counter_with_registry!(
417 "total_transaction_orders",
418 "Total number of transaction orders",
419 registry,
420 )
421 .unwrap(),
422 total_certs: register_int_counter_with_registry!(
423 "total_transaction_certificates",
424 "Total number of transaction certificates handled",
425 registry,
426 )
427 .unwrap(),
428 total_effects: register_int_counter_with_registry!(
430 "total_transaction_effects",
431 "Total number of transaction effects produced",
432 registry,
433 )
434 .unwrap(),
435
436 shared_obj_tx: register_int_counter_with_registry!(
437 "num_shared_obj_tx",
438 "Number of transactions involving shared objects",
439 registry,
440 )
441 .unwrap(),
442
443 sponsored_tx: register_int_counter_with_registry!(
444 "num_sponsored_tx",
445 "Number of sponsored transactions",
446 registry,
447 )
448 .unwrap(),
449
450 tx_already_processed: register_int_counter_with_registry!(
451 "num_tx_already_processed",
452 "Number of transaction orders already processed previously",
453 registry,
454 )
455 .unwrap(),
456 num_input_objs: register_histogram_with_registry!(
457 "num_input_objects",
458 "Distribution of number of input TX objects per TX",
459 POSITIVE_INT_BUCKETS.to_vec(),
460 registry,
461 )
462 .unwrap(),
463 num_shared_objects: register_histogram_with_registry!(
464 "num_shared_objects",
465 "Number of shared input objects per TX",
466 POSITIVE_INT_BUCKETS.to_vec(),
467 registry,
468 )
469 .unwrap(),
470 batch_size: register_histogram_with_registry!(
471 "batch_size",
472 "Distribution of size of transaction batch",
473 POSITIVE_INT_BUCKETS.to_vec(),
474 registry,
475 )
476 .unwrap(),
477 authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
478 "authority_state_handle_vote_transaction_latency",
479 "Latency of voting on transactions without signing",
480 LATENCY_SEC_BUCKETS.to_vec(),
481 registry,
482 )
483 .unwrap(),
484 internal_execution_latency: register_histogram_with_registry!(
485 "authority_state_internal_execution_latency",
486 "Latency of actual certificate executions",
487 LATENCY_SEC_BUCKETS.to_vec(),
488 registry,
489 )
490 .unwrap(),
491 execution_load_input_objects_latency: register_histogram_with_registry!(
492 "authority_state_execution_load_input_objects_latency",
493 "Latency of loading input objects for execution",
494 LOW_LATENCY_SEC_BUCKETS.to_vec(),
495 registry,
496 )
497 .unwrap(),
498 prepare_certificate_latency: register_histogram_with_registry!(
499 "authority_state_prepare_certificate_latency",
500 "Latency of executing certificates, before committing the results",
501 LATENCY_SEC_BUCKETS.to_vec(),
502 registry,
503 )
504 .unwrap(),
505 commit_certificate_latency: register_histogram_with_registry!(
506 "authority_state_commit_certificate_latency",
507 "Latency of committing certificate execution results",
508 LATENCY_SEC_BUCKETS.to_vec(),
509 registry,
510 )
511 .unwrap(),
512 db_checkpoint_latency: register_histogram_with_registry!(
513 "db_checkpoint_latency",
514 "Latency of checkpointing dbs",
515 LATENCY_SEC_BUCKETS.to_vec(),
516 registry,
517 ).unwrap(),
518 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
519 "transaction_manager_num_enqueued_certificates",
520 "Current number of certificates enqueued to ExecutionScheduler",
521 &["result"],
522 registry,
523 )
524 .unwrap(),
525 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
526 "transaction_manager_num_pending_certificates",
527 "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
528 registry,
529 )
530 .unwrap(),
531 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
532 "transaction_manager_num_executing_certificates",
533 "Number of executing certificates, including queued and actually running certificates",
534 registry,
535 )
536 .unwrap(),
537 authority_overload_status: register_int_gauge_with_registry!(
538 "authority_overload_status",
539 "Whether authority is current experiencing overload and enters load shedding mode.",
540 registry)
541 .unwrap(),
542 authority_load_shedding_percentage: register_int_gauge_with_registry!(
543 "authority_load_shedding_percentage",
544 "The percentage of transactions is shed when the authority is in load shedding mode.",
545 registry)
546 .unwrap(),
547 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
548 "transaction_manager_transaction_queue_age_s",
549 "Time spent in waiting for transaction in the queue",
550 LATENCY_SEC_BUCKETS.to_vec(),
551 registry,
552 )
553 .unwrap(),
554 transaction_overload_sources: register_int_counter_vec_with_registry!(
555 "transaction_overload_sources",
556 "Number of times each source indicates transaction overload.",
557 &["source"],
558 registry)
559 .unwrap(),
560 execution_driver_executed_transactions: register_int_counter_with_registry!(
561 "execution_driver_executed_transactions",
562 "Cumulative number of transaction executed by execution driver",
563 registry,
564 )
565 .unwrap(),
566 execution_driver_paused_transactions: register_int_counter_with_registry!(
567 "execution_driver_paused_transactions",
568 "Cumulative number of transactions paused by execution driver",
569 registry,
570 )
571 .unwrap(),
572 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
573 "execution_driver_dispatch_queue",
574 "Number of transaction pending in execution driver dispatch queue",
575 registry,
576 )
577 .unwrap(),
578 execution_queueing_delay_s: register_histogram_with_registry!(
579 "execution_queueing_delay_s",
580 "Queueing delay between a transaction is ready for execution until it starts executing.",
581 LATENCY_SEC_BUCKETS.to_vec(),
582 registry
583 )
584 .unwrap(),
585 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
586 "prepare_cert_gas_latency_ratio",
587 "The ratio of computation gas divided by VM execution latency.",
588 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
589 registry
590 )
591 .unwrap(),
592 execution_gas_latency_ratio: register_histogram_with_registry!(
593 "execution_gas_latency_ratio",
594 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
595 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
596 registry
597 )
598 .unwrap(),
599 skipped_consensus_txns: register_int_counter_with_registry!(
600 "skipped_consensus_txns",
601 "Total number of consensus transactions skipped",
602 registry,
603 )
604 .unwrap(),
605 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
606 "skipped_consensus_txns_cache_hit",
607 "Total number of consensus transactions skipped because of local cache hit",
608 registry,
609 )
610 .unwrap(),
611 consensus_handler_duplicate_tx_count: register_histogram_with_registry!(
612 "consensus_handler_duplicate_tx_count",
613 "Number of times each transaction appears in its first consensus commit",
614 POSITIVE_INT_BUCKETS.to_vec(),
615 registry,
616 )
617 .unwrap(),
618 post_processing_total_events_emitted: register_int_counter_with_registry!(
619 "post_processing_total_events_emitted",
620 "Total number of events emitted in post processing",
621 registry,
622 )
623 .unwrap(),
624 post_processing_total_tx_indexed: register_int_counter_with_registry!(
625 "post_processing_total_tx_indexed",
626 "Total number of txes indexed in post processing",
627 registry,
628 )
629 .unwrap(),
630 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
631 "post_processing_total_tx_had_event_processed",
632 "Total number of txes finished event processing in post processing",
633 registry,
634 )
635 .unwrap(),
636 post_processing_total_failures: register_int_counter_with_registry!(
637 "post_processing_total_failures",
638 "Total number of failure in post processing",
639 registry,
640 )
641 .unwrap(),
642 consensus_handler_processed: register_int_counter_vec_with_registry!(
643 "consensus_handler_processed",
644 "Number of transactions processed by consensus handler",
645 &["class"],
646 registry
647 ).unwrap(),
648 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
649 "consensus_handler_transaction_sizes",
650 "Sizes of each type of transactions processed by consensus handler",
651 &["class"],
652 POSITIVE_INT_BUCKETS.to_vec(),
653 registry
654 ).unwrap(),
655 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
656 "consensus_handler_deferred_transactions",
657 "Number of transactions deferred by consensus handler",
658 registry,
659 ).unwrap(),
660 consensus_handler_congested_transactions: register_int_counter_with_registry!(
661 "consensus_handler_congested_transactions",
662 "Number of transactions deferred by consensus handler due to congestion",
663 registry,
664 ).unwrap(),
665 consensus_handler_unpaid_amplification_deferrals: register_int_counter_with_registry!(
666 "consensus_handler_unpaid_amplification_deferrals",
667 "Number of transactions deferred due to unpaid consensus amplification",
668 registry,
669 ).unwrap(),
670 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
671 "consensus_handler_cancelled_transactions",
672 "Number of transactions cancelled by consensus handler",
673 registry,
674 ).unwrap(),
675 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
676 "consensus_handler_max_congestion_control_object_costs",
677 "Max object costs for congestion control in the current consensus commit",
678 &["commit_type"],
679 registry,
680 ).unwrap(),
681 consensus_committed_subdags: register_int_counter_vec_with_registry!(
682 "consensus_committed_subdags",
683 "Number of committed subdags, sliced by author",
684 &["authority"],
685 registry,
686 ).unwrap(),
687 accumulator_deposits: register_int_counter_with_registry!(
688 "accumulator_deposits",
689 "Total accumulator deposit (merge) events processed in settlement",
690 registry,
691 ).unwrap(),
692 accumulator_withdrawals: register_int_counter_with_registry!(
693 "accumulator_withdrawals",
694 "Total accumulator withdrawal (split) events processed in settlement",
695 registry,
696 ).unwrap(),
697 consensus_committed_messages: register_int_gauge_vec_with_registry!(
698 "consensus_committed_messages",
699 "Total number of committed consensus messages, sliced by author",
700 &["authority"],
701 registry,
702 ).unwrap(),
703 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
704 "consensus_committed_user_transactions",
705 "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
706 &["authority"],
707 registry,
708 ).unwrap(),
709 consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
710 "consensus_finalized_user_transactions",
711 "Number of user transactions finalized, sliced by submitter",
712 &["authority"],
713 registry,
714 ).unwrap(),
715 consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
716 "consensus_rejected_user_transactions",
717 "Number of user transactions rejected, sliced by submitter",
718 &["authority"],
719 registry,
720 ).unwrap(),
721 execution_metrics: Arc::new(ExecutionMetrics::new(registry)),
722 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
723 zklogin_sig_count: register_int_counter_with_registry!(
724 "zklogin_sig_count",
725 "Count of zkLogin signatures",
726 registry,
727 )
728 .unwrap(),
729 multisig_sig_count: register_int_counter_with_registry!(
730 "multisig_sig_count",
731 "Count of zkLogin signatures",
732 registry,
733 )
734 .unwrap(),
735 consensus_calculated_throughput: register_int_gauge_with_registry!(
736 "consensus_calculated_throughput",
737 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
738 registry,
739 ).unwrap(),
740 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
741 "consensus_calculated_throughput_profile",
742 "The current active calculated throughput profile",
743 registry
744 ).unwrap(),
745 consensus_block_handler_block_processed: register_int_counter_with_registry!(
746 "consensus_block_handler_block_processed",
747 "Number of blocks processed by consensus block handler.",
748 registry
749 ).unwrap(),
750 consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
751 "consensus_block_handler_txn_processed",
752 "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
753 &["outcome"],
754 registry
755 ).unwrap(),
756 consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
757 "consensus_block_handler_fastpath_executions",
758 "Number of fastpath transactions sent for execution by consensus transaction handler",
759 registry,
760 ).unwrap(),
761 consensus_timestamp_bias: register_histogram_with_registry!(
762 "consensus_timestamp_bias",
763 "Bias/delay from consensus timestamp to system time",
764 TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
765 registry
766 ).unwrap(),
767 execution_queueing_latency: LatencyObserver::new(),
768 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
769 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
770 }
771 }
772}
773
774pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
781
782#[derive(Debug, Clone)]
785pub struct ExecutionEnv {
786 pub assigned_versions: AssignedVersions,
788 pub expected_effects_digest: Option<TransactionEffectsDigest>,
791 pub funds_withdraw_status: FundsWithdrawStatus,
794 pub barrier_dependencies: Vec<TransactionDigest>,
797}
798
799impl Default for ExecutionEnv {
800 fn default() -> Self {
801 Self {
802 assigned_versions: Default::default(),
803 expected_effects_digest: None,
804 funds_withdraw_status: FundsWithdrawStatus::MaybeSufficient,
805 barrier_dependencies: Default::default(),
806 }
807 }
808}
809
810impl ExecutionEnv {
811 pub fn new() -> Self {
812 Default::default()
813 }
814
815 pub fn with_expected_effects_digest(
816 mut self,
817 expected_effects_digest: TransactionEffectsDigest,
818 ) -> Self {
819 self.expected_effects_digest = Some(expected_effects_digest);
820 self
821 }
822
823 pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
824 self.assigned_versions = assigned_versions;
825 self
826 }
827
828 pub fn with_insufficient_funds(mut self) -> Self {
829 self.funds_withdraw_status = FundsWithdrawStatus::Insufficient;
830 self
831 }
832
833 pub fn with_barrier_dependencies(
834 mut self,
835 barrier_dependencies: BTreeSet<TransactionDigest>,
836 ) -> Self {
837 self.barrier_dependencies = barrier_dependencies.into_iter().collect();
838 self
839 }
840}
841
842#[derive(Debug)]
843pub struct ForkRecoveryState {
844 transaction_overrides:
846 parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
847}
848
849impl Default for ForkRecoveryState {
850 fn default() -> Self {
851 Self {
852 transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
853 }
854 }
855}
856
857impl ForkRecoveryState {
858 pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
859 let Some(config) = config else {
860 return Ok(Self::default());
861 };
862
863 let mut transaction_overrides = HashMap::new();
864 for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
865 let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
866 SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
867 })?;
868 let effects_digest =
869 TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
870 SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
871 })?;
872 transaction_overrides.insert(tx_digest, effects_digest);
873 }
874
875 Ok(Self {
876 transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
877 })
878 }
879
880 pub fn get_transaction_override(
881 &self,
882 tx_digest: &TransactionDigest,
883 ) -> Option<TransactionEffectsDigest> {
884 self.transaction_overrides.read().get(tx_digest).copied()
885 }
886}
887
888pub type PostProcessingOutput = (StagedBatch, IndexStoreCacheUpdates);
889
890pub struct AuthorityState {
891 pub name: AuthorityName,
894 pub secret: StableSyncAuthoritySigner,
896
897 input_loader: TransactionInputLoader,
899 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
900 coin_reservation_resolver: Arc<CachingCoinReservationResolver>,
901
902 epoch_store: ArcSwap<AuthorityPerEpochStore>,
903
904 execution_lock: RwLock<EpochId>,
909
910 pub indexes: Option<Arc<IndexStore>>,
911 pub rpc_index: Option<Arc<RpcIndexStore>>,
912
913 pub subscription_handler: Arc<SubscriptionHandler>,
914 pub checkpoint_store: Arc<CheckpointStore>,
915
916 committee_store: Arc<CommitteeStore>,
917
918 execution_scheduler: Arc<ExecutionScheduler>,
920
921 #[allow(unused)]
923 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
924
925 pub metrics: Arc<AuthorityMetrics>,
926 _pruner: AuthorityStorePruner,
927 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
928
929 db_checkpoint_config: DBCheckpointConfig,
931
932 pub config: NodeConfig,
933
934 pub overload_info: AuthorityOverloadInfo,
936
937 chain_identifier: ChainIdentifier,
939
940 pub(crate) congestion_tracker: Arc<CongestionTracker>,
941
942 pub(crate) consensus_gasless_counter: Arc<ConsensusGaslessCounter>,
944
945 pub traffic_controller: Option<Arc<TrafficController>>,
947
948 fork_recovery_state: Option<ForkRecoveryState>,
950
951 notify_epoch: tokio::sync::watch::Sender<EpochId>,
953
954 pub(crate) object_funds_checker: ArcSwapOption<ObjectFundsChecker>,
955 object_funds_checker_metrics: Arc<ObjectFundsCheckerMetrics>,
956
957 pending_post_processing:
961 Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>>,
962
963 post_processing_semaphore: Arc<tokio::sync::Semaphore>,
966}
967
968impl AuthorityState {
975 pub fn node_role(&self, epoch_store: &AuthorityPerEpochStore) -> NodeRole {
976 epoch_store.node_role()
977 }
978
979 pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
980 epoch_store.node_role().is_validator()
981 }
982
983 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
984 epoch_store.node_role().is_fullnode()
985 }
986
987 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
988 &self.committee_store
989 }
990
991 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
992 self.committee_store.clone()
993 }
994
995 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
996 &self.config.authority_overload_config
997 }
998
999 pub fn get_epoch_state_commitments(
1000 &self,
1001 epoch: EpochId,
1002 ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
1003 self.checkpoint_store.get_epoch_state_commitments(epoch)
1004 }
1005
1006 fn pre_object_load_checks(
1009 &self,
1010 tx_data: &TransactionData,
1011 tx_signatures: &[GenericSignature],
1012 input_object_kinds: &[InputObjectKind],
1013 receiving_objects_refs: &[ObjectRef],
1014 protocol_config: &ProtocolConfig,
1015 ) -> SuiResult<BTreeMap<AccumulatorObjId, (u64, TypeTag)>> {
1016 sui_transaction_checks::deny::check_transaction_for_signing(
1020 tx_data,
1021 tx_signatures,
1022 input_object_kinds,
1023 receiving_objects_refs,
1024 &self.config.transaction_deny_config,
1025 self.get_backing_package_store().as_ref(),
1026 )?;
1027
1028 let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1029 self.chain_identifier,
1030 self.coin_reservation_resolver.as_ref(),
1031 )?;
1032
1033 self.execution_cache_trait_pointers
1034 .account_funds_read
1035 .check_amounts_available(&declared_withdrawals)?;
1036
1037 if protocol_config.gasless_verify_remaining_balance() && tx_data.is_gasless_transaction() {
1038 let min_amounts =
1039 sui_types::transaction::get_gasless_allowed_token_types(protocol_config);
1040 self.execution_cache_trait_pointers
1041 .account_funds_read
1042 .check_remaining_amounts_after_withdrawal(&declared_withdrawals, &min_amounts)?;
1043 }
1044
1045 Ok(declared_withdrawals)
1046 }
1047
1048 fn handle_transaction_deny_checks(
1049 &self,
1050 transaction: &VerifiedTransaction,
1051 epoch_store: &Arc<AuthorityPerEpochStore>,
1052 ) -> SuiResult<CheckedInputObjects> {
1053 let tx_digest = transaction.digest();
1054 let tx_data = transaction.data().transaction_data();
1055
1056 let input_object_kinds = tx_data.input_objects()?;
1057 let receiving_objects_refs = tx_data.receiving_objects();
1058
1059 self.pre_object_load_checks(
1060 tx_data,
1061 transaction.tx_signatures(),
1062 &input_object_kinds,
1063 &receiving_objects_refs,
1064 epoch_store.protocol_config(),
1065 )?;
1066
1067 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1068 Some(tx_digest),
1069 &input_object_kinds,
1070 &receiving_objects_refs,
1071 epoch_store.epoch(),
1072 )?;
1073
1074 let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1075 epoch_store.protocol_config(),
1076 epoch_store.reference_gas_price(),
1077 tx_data,
1078 input_objects,
1079 &receiving_objects,
1080 &self.metrics.bytecode_verifier_metrics,
1081 &self.config.verifier_signing_config,
1082 )?;
1083
1084 self.handle_coin_deny_list_checks(
1085 tx_data,
1086 &checked_input_objects,
1087 &receiving_objects,
1088 epoch_store,
1089 )?;
1090
1091 Ok(checked_input_objects)
1092 }
1093
1094 fn handle_coin_deny_list_checks(
1095 &self,
1096 tx_data: &TransactionData,
1097 checked_input_objects: &CheckedInputObjects,
1098 receiving_objects: &ReceivingObjects,
1099 epoch_store: &Arc<AuthorityPerEpochStore>,
1100 ) -> SuiResult<()> {
1101 use sui_types::balance::Balance;
1102
1103 let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1104 self.chain_identifier,
1105 self.coin_reservation_resolver.as_ref(),
1106 )?;
1107
1108 let funds_withdraw_types = declared_withdrawals
1109 .values()
1110 .filter_map(|(_, type_tag)| {
1111 Balance::maybe_get_balance_type_param(type_tag)
1112 .map(|ty| ty.to_canonical_string(false))
1113 })
1114 .collect::<BTreeSet<_>>();
1115
1116 if epoch_store.coin_deny_list_v1_enabled() {
1117 check_coin_deny_list_v1(
1118 tx_data.sender(),
1119 checked_input_objects,
1120 receiving_objects,
1121 funds_withdraw_types.clone(),
1122 &self.get_object_store(),
1123 )?;
1124 }
1125
1126 if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1127 check_coin_deny_list_v2_during_signing(
1128 tx_data.sender(),
1129 checked_input_objects,
1130 receiving_objects,
1131 funds_withdraw_types.clone(),
1132 &self.get_object_store(),
1133 )?;
1134 }
1135
1136 Ok(())
1137 }
1138
1139 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1149 pub fn handle_vote_transaction(
1150 &self,
1151 epoch_store: &Arc<AuthorityPerEpochStore>,
1152 transaction: VerifiedTransaction,
1153 ) -> SuiResult<()> {
1154 debug!("handle_vote_transaction");
1155
1156 let _metrics_guard = self
1157 .metrics
1158 .authority_state_handle_vote_transaction_latency
1159 .start_timer();
1160 self.metrics.tx_orders.inc();
1161
1162 if !epoch_store
1166 .get_reconfig_state_read_lock_guard()
1167 .should_accept_user_certs()
1168 {
1169 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1170 }
1171
1172 let tx_digest = *transaction.digest();
1177 if epoch_store.is_recently_finalized(&tx_digest)
1178 || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1179 {
1180 assert_reachable!("transaction recently executed");
1181 return Ok(());
1182 }
1183
1184 if self
1185 .get_transaction_cache_reader()
1186 .transaction_executed_in_last_epoch(transaction.digest(), epoch_store.epoch())
1187 {
1188 return Err(SuiErrorKind::TransactionAlreadyExecuted {
1189 digest: (*transaction.digest()),
1190 }
1191 .into());
1192 }
1193
1194 let _execution_lock = self.execution_lock_for_validation()?;
1196
1197 let checked_input_objects =
1198 self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1199
1200 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1201
1202 self.get_cache_writer()
1206 .validate_owned_object_versions(&owned_objects)
1207 }
1208
1209 pub fn check_transaction_validity(
1218 &self,
1219 epoch_store: &Arc<AuthorityPerEpochStore>,
1220 transaction: &VerifiedTransaction,
1221 enforce_live_input_objects: bool,
1222 ) -> SuiResult<()> {
1223 if !epoch_store
1224 .get_reconfig_state_read_lock_guard()
1225 .should_accept_user_certs()
1226 {
1227 return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1228 }
1229
1230 transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1231
1232 let checked_input_objects =
1233 self.handle_transaction_deny_checks(transaction, epoch_store)?;
1234
1235 let owned_objects = checked_input_objects.inner().filter_owned_objects();
1237 let cache_reader = self.get_object_cache_reader();
1238
1239 for obj_ref in &owned_objects {
1240 if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1241 if obj_ref.1 < live_object.version() {
1244 return Err(SuiErrorKind::UserInputError {
1245 error: UserInputError::ObjectVersionUnavailableForConsumption {
1246 provided_obj_ref: *obj_ref,
1247 current_version: live_object.version(),
1248 },
1249 }
1250 .into());
1251 }
1252
1253 if enforce_live_input_objects && live_object.version() < obj_ref.1 {
1254 return Err(SuiErrorKind::UserInputError {
1256 error: UserInputError::ObjectVersionUnavailableForConsumption {
1257 provided_obj_ref: *obj_ref,
1258 current_version: live_object.version(),
1259 },
1260 }
1261 .into());
1262 }
1263
1264 if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1266 return Err(SuiErrorKind::UserInputError {
1267 error: UserInputError::InvalidObjectDigest {
1268 object_id: obj_ref.0,
1269 expected_digest: live_object.digest(),
1270 },
1271 }
1272 .into());
1273 }
1274 }
1275 }
1276
1277 Ok(())
1278 }
1279
1280 pub fn check_system_overload_at_signing(&self) -> bool {
1281 self.config
1282 .authority_overload_config
1283 .check_system_overload_at_signing
1284 }
1285
1286 pub fn check_system_overload_at_execution(&self) -> bool {
1287 self.config
1288 .authority_overload_config
1289 .check_system_overload_at_execution
1290 }
1291
1292 pub(crate) fn check_system_overload(
1293 &self,
1294 tx_data: &SenderSignedData,
1295 do_authority_overload_check: bool,
1296 ) -> SuiResult {
1297 if do_authority_overload_check {
1298 self.check_authority_overload(tx_data).tap_err(|_| {
1299 self.update_overload_metrics("execution_queue");
1300 })?;
1301 }
1302 self.execution_scheduler
1303 .check_execution_overload(self.overload_config(), tx_data)
1304 .tap_err(|_| {
1305 self.update_overload_metrics("execution_pending");
1306 })?;
1307 let pending_tx_count = self
1310 .get_cache_commit()
1311 .approximate_pending_transaction_count();
1312 if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1313 return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1314 retry_after_secs: 10,
1315 }
1316 .into());
1317 }
1318
1319 Ok(())
1320 }
1321
1322 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1323 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1324 return Ok(());
1325 }
1326
1327 let load_shedding_percentage = self
1328 .overload_info
1329 .load_shedding_percentage
1330 .load(Ordering::Relaxed);
1331 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1332 }
1333
1334 pub(crate) fn update_overload_metrics(&self, source: &str) {
1335 self.metrics
1336 .transaction_overload_sources
1337 .with_label_values(&[source])
1338 .inc();
1339 }
1340
1341 pub(crate) async fn wait_for_fastpath_dependency_objects(
1345 &self,
1346 transaction: &VerifiedTransaction,
1347 epoch: EpochId,
1348 ) -> SuiResult<bool> {
1349 let txn_data = transaction.data().transaction_data();
1350 let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1351
1352 let fastpath_dependency_objects: Vec<_> = move_objects
1354 .into_iter()
1355 .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1356 .chain(
1357 packages
1358 .into_iter()
1359 .map(|package_id| InputKey::Package { id: package_id }),
1360 )
1361 .collect();
1362 let receiving_keys: HashSet<_> = receiving_objects
1363 .into_iter()
1364 .filter_map(|receiving_obj_ref| {
1365 self.should_wait_for_dependency_object(receiving_obj_ref)
1366 })
1367 .collect();
1368 if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1369 return Ok(true);
1370 }
1371
1372 let max_wait = if cfg!(msim) {
1375 Duration::from_millis(200)
1376 } else {
1377 WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1378 };
1379
1380 match timeout(
1381 max_wait,
1382 self.get_object_cache_reader().notify_read_input_objects(
1383 &fastpath_dependency_objects,
1384 &receiving_keys,
1385 epoch,
1386 ),
1387 )
1388 .await
1389 {
1390 Ok(()) => Ok(true),
1391 Err(_) => Ok(false),
1394 }
1395 }
1396
1397 fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1404 let (obj_id, cur_version, _digest) = obj_ref;
1405 let Some(latest_obj_ref) = self
1406 .get_object_cache_reader()
1407 .get_latest_object_ref_or_tombstone(obj_id)
1408 else {
1409 return Some(InputKey::VersionedObject {
1411 id: FullObjectID::new(obj_id, None),
1412 version: cur_version,
1413 });
1414 };
1415 let latest_digest = latest_obj_ref.2;
1416 if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1417 return None;
1419 }
1420 let latest_version = latest_obj_ref.1;
1421 if cur_version <= latest_version {
1422 return None;
1425 }
1426 Some(InputKey::VersionedObject {
1428 id: FullObjectID::new(obj_id, None),
1429 version: cur_version,
1430 })
1431 }
1432
1433 #[instrument(level = "trace", skip_all)]
1438 pub async fn wait_for_transaction_execution_for_testing(
1439 &self,
1440 transaction: &VerifiedExecutableTransaction,
1441 ) -> TransactionEffects {
1442 self.notify_read_effects_for_testing(
1443 "AuthorityState::wait_for_transaction_execution_for_testing",
1444 *transaction.digest(),
1445 )
1446 .await
1447 }
1448
1449 #[instrument(level = "trace", skip_all)]
1461 pub async fn try_execute_immediately(
1462 &self,
1463 certificate: &VerifiedExecutableTransaction,
1464 mut execution_env: ExecutionEnv,
1465 epoch_store: &Arc<AuthorityPerEpochStore>,
1466 ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1467 let _scope = monitored_scope("Execution::try_execute_immediately");
1468 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1469
1470 let tx_digest = certificate.digest();
1471
1472 if let Some(fork_recovery) = &self.fork_recovery_state
1473 && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1474 {
1475 warn!(
1476 ?tx_digest,
1477 original_digest = ?execution_env.expected_effects_digest,
1478 override_digest = ?override_digest,
1479 "Applying fork recovery override for transaction effects digest"
1480 );
1481 execution_env.expected_effects_digest = Some(override_digest);
1482 }
1483
1484 let tx_guard = epoch_store.acquire_tx_guard(certificate);
1486
1487 let tx_cache_reader = self.get_transaction_cache_reader();
1488 if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1489 if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1490 assert_eq!(
1491 effects.digest(),
1492 expected_effects_digest,
1493 "Unexpected effects digest for transaction {:?}",
1494 tx_digest
1495 );
1496 }
1497 tx_guard.release();
1498 return ExecutionOutput::Success((effects, None));
1499 }
1500
1501 let execution_start_time = Instant::now();
1502
1503 let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1508 else {
1509 tx_guard.release();
1510 return ExecutionOutput::EpochEnded;
1511 };
1512 if *execution_guard != epoch_store.epoch() {
1517 tx_guard.release();
1518 info!("The epoch of the execution_guard doesn't match the epoch store");
1519 return ExecutionOutput::EpochEnded;
1520 }
1521
1522 let accumulator_version = execution_env.assigned_versions.accumulator_version;
1523
1524 let (transaction_outputs, timings, execution_error_opt) = match self.process_certificate(
1525 &tx_guard,
1526 &execution_guard,
1527 certificate,
1528 execution_env,
1529 epoch_store,
1530 ) {
1531 ExecutionOutput::Success(result) => result,
1532 output => return output.unwrap_err(),
1533 };
1534 let transaction_outputs = Arc::new(transaction_outputs);
1535
1536 fail_point!("crash");
1537
1538 let effects = transaction_outputs.effects.clone();
1539 debug!(
1540 ?tx_digest,
1541 fx_digest=?effects.digest(),
1542 "process_certificate succeeded in {:.3}ms",
1543 (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1544 );
1545
1546 let commit_result = self.commit_certificate(certificate, transaction_outputs, epoch_store);
1547 if let Err(err) = commit_result {
1548 error!(?tx_digest, "Error committing transaction: {err}");
1549 tx_guard.release();
1550 return ExecutionOutput::Fatal(err);
1551 }
1552
1553 if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1554 certificate.data().transaction_data().kind()
1555 {
1556 if let Some(err) = &execution_error_opt {
1557 debug_fatal!("Authenticator state update failed: {:?}", err);
1558 }
1559 epoch_store.update_authenticator_state(auth_state);
1560
1561 if cfg!(debug_assertions) {
1563 let authenticator_state = get_authenticator_state(self.get_object_store())
1564 .expect("Read cannot fail")
1565 .expect("Authenticator state must exist");
1566
1567 let mut sys_jwks: Vec<_> = authenticator_state
1568 .active_jwks
1569 .into_iter()
1570 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1571 .collect();
1572 let mut active_jwks: Vec<_> = epoch_store
1573 .signature_verifier
1574 .get_jwks()
1575 .into_iter()
1576 .collect();
1577 sys_jwks.sort();
1578 active_jwks.sort();
1579
1580 assert_eq!(sys_jwks, active_jwks);
1581 }
1582 }
1583
1584 if certificate
1585 .transaction_data()
1586 .kind()
1587 .is_accumulator_barrier_settle_tx()
1588 {
1589 let object_funds_checker = self.object_funds_checker.load();
1590 if let Some(object_funds_checker) = object_funds_checker.as_ref() {
1591 let next_accumulator_version = accumulator_version.unwrap().next();
1594 object_funds_checker.settle_accumulator_version(next_accumulator_version);
1595 }
1596 }
1597
1598 tx_guard.commit_tx();
1599
1600 let elapsed = execution_start_time.elapsed();
1601 epoch_store.record_local_execution_time(
1602 certificate.data().transaction_data(),
1603 &effects,
1604 timings,
1605 elapsed,
1606 );
1607
1608 let elapsed_us = elapsed.as_micros() as f64;
1609 if elapsed_us > 0.0 {
1610 self.metrics
1611 .execution_gas_latency_ratio
1612 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1613 };
1614 ExecutionOutput::Success((effects, execution_error_opt))
1615 }
1616
1617 pub fn read_objects_for_execution(
1618 &self,
1619 tx_lock: &CertLockGuard,
1620 certificate: &VerifiedExecutableTransaction,
1621 assigned_shared_object_versions: &AssignedVersions,
1622 epoch_store: &Arc<AuthorityPerEpochStore>,
1623 ) -> SuiResult<InputObjects> {
1624 let _scope = monitored_scope("Execution::load_input_objects");
1625 let _metrics_guard = self
1626 .metrics
1627 .execution_load_input_objects_latency
1628 .start_timer();
1629 let input_objects = &certificate.data().transaction_data().input_objects()?;
1630 self.input_loader.read_objects_for_execution(
1631 &certificate.key(),
1632 tx_lock,
1633 input_objects,
1634 assigned_shared_object_versions,
1635 epoch_store.epoch(),
1636 )
1637 }
1638
1639 pub async fn try_execute_for_test(
1642 &self,
1643 certificate: &VerifiedCertificate,
1644 execution_env: ExecutionEnv,
1645 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1646 let epoch_store = self.epoch_store_for_testing();
1647 let (effects, execution_error_opt) = self
1648 .try_execute_immediately(
1649 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1650 execution_env,
1651 &epoch_store,
1652 )
1653 .await
1654 .unwrap();
1655 let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1656 (signed_effects, execution_error_opt)
1657 }
1658
1659 pub async fn try_execute_executable_for_test(
1660 &self,
1661 executable: &VerifiedExecutableTransaction,
1662 execution_env: ExecutionEnv,
1663 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1664 let epoch_store = self.epoch_store_for_testing();
1665 let (effects, execution_error_opt) = self
1666 .try_execute_immediately(executable, execution_env, &epoch_store)
1667 .await
1668 .unwrap();
1669 self.flush_post_processing(executable.digest()).await;
1670 let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1671 (signed_effects, execution_error_opt)
1672 }
1673
1674 pub async fn notify_read_effects_for_testing(
1679 &self,
1680 task_name: &'static str,
1681 digest: TransactionDigest,
1682 ) -> TransactionEffects {
1683 self.get_transaction_cache_reader()
1684 .notify_read_executed_effects(task_name, &[digest])
1685 .await
1686 .pop()
1687 .expect("must return correct number of effects")
1688 }
1689
1690 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1691 self.get_object_cache_reader()
1692 .check_owned_objects_are_live(owned_object_refs)
1693 }
1694
1695 pub(crate) fn debug_dump_transaction_state(
1700 &self,
1701 tx_digest: &TransactionDigest,
1702 effects: &TransactionEffects,
1703 expected_effects_digest: TransactionEffectsDigest,
1704 inner_temporary_store: &InnerTemporaryStore,
1705 certificate: &VerifiedExecutableTransaction,
1706 debug_dump_config: &StateDebugDumpConfig,
1707 ) -> SuiResult<PathBuf> {
1708 let dump_dir = debug_dump_config
1709 .dump_file_directory
1710 .as_ref()
1711 .cloned()
1712 .unwrap_or(std::env::temp_dir());
1713 let epoch_store = self.load_epoch_store_one_call_per_task();
1714
1715 NodeStateDump::new(
1716 tx_digest,
1717 effects,
1718 expected_effects_digest,
1719 self.get_object_store().as_ref(),
1720 &epoch_store,
1721 inner_temporary_store,
1722 certificate,
1723 )?
1724 .write_to_file(&dump_dir)
1725 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1726 }
1727
1728 #[instrument(level = "trace", skip_all)]
1729 pub(crate) fn process_certificate(
1730 &self,
1731 tx_guard: &CertTxGuard,
1732 execution_guard: &ExecutionLockReadGuard<'_>,
1733 certificate: &VerifiedExecutableTransaction,
1734 execution_env: ExecutionEnv,
1735 epoch_store: &Arc<AuthorityPerEpochStore>,
1736 ) -> ExecutionOutput<(
1737 TransactionOutputs,
1738 Vec<ExecutionTiming>,
1739 Option<ExecutionError>,
1740 )> {
1741 let _scope = monitored_scope("Execution::process_certificate");
1742 let tx_digest = *certificate.digest();
1743
1744 let input_objects = match self.read_objects_for_execution(
1745 tx_guard.as_lock_guard(),
1746 certificate,
1747 &execution_env.assigned_versions,
1748 epoch_store,
1749 ) {
1750 Ok(objects) => objects,
1751 Err(e) => return ExecutionOutput::Fatal(e),
1752 };
1753
1754 let expected_effects_digest = match execution_env.expected_effects_digest {
1755 Some(expected_effects_digest) => Some(expected_effects_digest),
1756 None => {
1757 match epoch_store.get_signed_effects_digest(&tx_digest) {
1761 Ok(digest) => digest,
1762 Err(e) => return ExecutionOutput::Fatal(e),
1763 }
1764 }
1765 };
1766
1767 fail_point_if!("correlated-crash-process-certificate", || {
1768 if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1769 sui_simulator::task::kill_current_node(None);
1770 }
1771 });
1772
1773 self.execute_certificate(
1778 execution_guard,
1779 certificate,
1780 input_objects,
1781 expected_effects_digest,
1782 execution_env,
1783 epoch_store,
1784 )
1785 }
1786
1787 pub async fn reconfigure_traffic_control(
1788 &self,
1789 params: TrafficControlReconfigParams,
1790 ) -> Result<TrafficControlReconfigParams, SuiError> {
1791 if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1792 traffic_controller.admin_reconfigure(params).await
1793 } else {
1794 Err(SuiErrorKind::InvalidAdminRequest(
1795 "Traffic controller is not configured on this node".to_string(),
1796 )
1797 .into())
1798 }
1799 }
1800
1801 #[instrument(level = "trace", skip_all)]
1802 fn commit_certificate(
1803 &self,
1804 certificate: &VerifiedExecutableTransaction,
1805 transaction_outputs: Arc<TransactionOutputs>,
1806 epoch_store: &Arc<AuthorityPerEpochStore>,
1807 ) -> SuiResult {
1808 let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1809 monitored_scope("Execution::commit_certificate");
1810 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1811
1812 let tx_digest = certificate.digest();
1813
1814 epoch_store.insert_executed_in_epoch(tx_digest);
1817 let key = certificate.key();
1818 if !matches!(key, TransactionKey::Digest(_)) {
1819 epoch_store.insert_tx_key(key, *tx_digest)?;
1820 }
1821
1822 fail_point!("crash");
1824
1825 self.get_cache_writer()
1826 .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1827
1828 if let Some(settlement_key) = certificate
1833 .transaction_data()
1834 .kind()
1835 .accumulator_barrier_settlement_key()
1836 {
1837 epoch_store.notify_barrier_executed(settlement_key, *tx_digest);
1838 }
1839
1840 if certificate.transaction_data().is_end_of_epoch_tx() {
1841 self.get_object_cache_reader()
1844 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1845 }
1846
1847 Ok(())
1848 }
1849
1850 fn update_metrics(
1851 &self,
1852 certificate: &VerifiedExecutableTransaction,
1853 inner_temporary_store: &InnerTemporaryStore,
1854 effects: &TransactionEffects,
1855 ) {
1856 if certificate.has_zklogin_sig() {
1858 self.metrics.zklogin_sig_count.inc();
1859 } else if certificate.has_upgraded_multisig() {
1860 self.metrics.multisig_sig_count.inc();
1861 }
1862
1863 self.metrics.total_effects.inc();
1864 self.metrics.total_certs.inc();
1865
1866 let consensus_object_count = effects.input_consensus_objects().len();
1867 if consensus_object_count > 0 {
1868 self.metrics.shared_obj_tx.inc();
1869 }
1870
1871 if certificate.is_sponsored_tx() {
1872 self.metrics.sponsored_tx.inc();
1873 }
1874
1875 let input_object_count = inner_temporary_store.input_objects.len();
1876 self.metrics
1877 .num_input_objs
1878 .observe(input_object_count as f64);
1879 self.metrics
1880 .num_shared_objects
1881 .observe(consensus_object_count as f64);
1882 self.metrics.batch_size.observe(
1883 certificate
1884 .data()
1885 .intent_message()
1886 .value
1887 .kind()
1888 .num_commands() as f64,
1889 );
1890 }
1891
1892 fn execute_transaction_to_effects(
1895 &self,
1896 executor: &dyn Executor,
1897 store: &dyn BackingStore,
1898 protocol_config: &ProtocolConfig,
1899 enable_expensive_checks: bool,
1900 execution_params: ExecutionOrEarlyError,
1901 epoch_id: &EpochId,
1902 epoch_timestamp_ms: u64,
1903 input_objects: CheckedInputObjects,
1904 gas_data: GasData,
1905 gas_status: SuiGasStatus,
1906 kind: TransactionKind,
1907 rewritten_inputs: Option<Vec<bool>>,
1908 signer: SuiAddress,
1909 tx_digest: TransactionDigest,
1910 ) -> (
1911 InnerTemporaryStore,
1912 SuiGasStatus,
1913 TransactionEffects,
1914 Vec<ExecutionTiming>,
1915 Result<(), ExecutionError>,
1916 ) {
1917 let (inner_temp_store, gas_status, effects, timings, execution_error) = executor
1918 .execute_transaction_to_effects_and_execution_error(
1920 store,
1921 protocol_config,
1922 self.metrics.execution_metrics.clone(),
1923 enable_expensive_checks,
1924 execution_params,
1925 epoch_id,
1926 epoch_timestamp_ms,
1927 input_objects,
1928 gas_data,
1929 gas_status,
1930 kind,
1931 rewritten_inputs,
1932 signer,
1933 tx_digest,
1934 &mut None,
1935 );
1936
1937 (
1938 inner_temp_store,
1939 gas_status,
1940 effects,
1941 timings,
1942 execution_error,
1943 )
1944 }
1945
1946 #[instrument(level = "trace", skip_all)]
1955 fn execute_certificate(
1956 &self,
1957 _execution_guard: &ExecutionLockReadGuard<'_>,
1958 certificate: &VerifiedExecutableTransaction,
1959 input_objects: InputObjects,
1960 expected_effects_digest: Option<TransactionEffectsDigest>,
1961 execution_env: ExecutionEnv,
1962 epoch_store: &Arc<AuthorityPerEpochStore>,
1963 ) -> ExecutionOutput<(
1964 TransactionOutputs,
1965 Vec<ExecutionTiming>,
1966 Option<ExecutionError>,
1967 )> {
1968 let _scope = monitored_scope("Execution::prepare_certificate");
1969 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1970 let prepare_certificate_start_time = tokio::time::Instant::now();
1971
1972 let tx_data = certificate.data().transaction_data();
1974
1975 if let Err(e) = tx_data.validity_check(&epoch_store.tx_validity_check_context()) {
1976 return ExecutionOutput::Fatal(e);
1977 }
1978
1979 let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
1983 certificate,
1984 input_objects,
1985 epoch_store.protocol_config(),
1986 epoch_store.reference_gas_price(),
1987 ) {
1988 Ok(result) => result,
1989 Err(e) => return ExecutionOutput::Fatal(e),
1990 };
1991
1992 let owned_object_refs = input_objects.inner().filter_owned_objects();
1993 if let Err(e) = self.check_owned_locks(&owned_object_refs) {
1994 return ExecutionOutput::Fatal(e);
1995 }
1996 let tx_digest = *certificate.digest();
1997 let protocol_config = epoch_store.protocol_config();
1998 let transaction_data = &certificate.data().intent_message().value;
1999 let sender = transaction_data.sender();
2000 let (mut kind, signer, gas_data) = transaction_data.execution_parts();
2001 let early_execution_error = get_early_execution_error(
2002 &tx_digest,
2003 &input_objects,
2004 self.config.certificate_deny_config.certificate_deny_set(),
2005 &execution_env.funds_withdraw_status,
2006 );
2007 let execution_params = match early_execution_error {
2008 Some(error) => ExecutionOrEarlyError::Err(error),
2009 None => ExecutionOrEarlyError::Ok(()),
2010 };
2011
2012 let rewritten_inputs = if execution_params.is_ok() {
2015 rewrite_transaction_for_coin_reservations(
2016 self.chain_identifier,
2017 &*self.coin_reservation_resolver,
2018 sender,
2019 &mut kind,
2020 execution_env.assigned_versions.accumulator_version,
2021 )
2022 .expect("rewriting must succeed for a certified transaction")
2023 } else {
2024 None
2025 };
2026
2027 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2028
2029 #[allow(unused_mut)]
2030 let (inner_temp_store, _, mut effects, timings, execution_error_opt) = self
2031 .execute_transaction_to_effects(
2032 &**epoch_store.executor(),
2033 &tracking_store,
2034 protocol_config,
2035 self.config
2038 .expensive_safety_check_config
2039 .enable_deep_per_tx_sui_conservation_check(),
2040 execution_params,
2041 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2042 epoch_store
2043 .epoch_start_config()
2044 .epoch_data()
2045 .epoch_start_timestamp(),
2046 input_objects,
2047 gas_data,
2048 gas_status,
2049 kind,
2050 rewritten_inputs,
2051 signer,
2052 tx_digest,
2053 );
2054
2055 let object_funds_checker = self.object_funds_checker.load();
2056 if let Some(object_funds_checker) = object_funds_checker.as_ref()
2057 && !object_funds_checker.should_commit_object_funds_withdraws(
2058 certificate,
2059 effects.status(),
2060 &inner_temp_store.accumulator_running_max_withdraws,
2061 &execution_env,
2062 self.get_account_funds_read(),
2063 &self.execution_scheduler,
2064 epoch_store,
2065 )
2066 {
2067 assert_reachable!("retry object withdraw later");
2068 return ExecutionOutput::RetryLater;
2069 }
2070
2071 if let Some(expected_effects_digest) = expected_effects_digest
2072 && effects.digest() != expected_effects_digest
2073 {
2074 match self.debug_dump_transaction_state(
2076 &tx_digest,
2077 &effects,
2078 expected_effects_digest,
2079 &inner_temp_store,
2080 certificate,
2081 &self.config.state_debug_dump_config,
2082 ) {
2083 Ok(out_path) => {
2084 info!(
2085 "Dumped node state for transaction {} to {}",
2086 tx_digest,
2087 out_path.as_path().display().to_string()
2088 );
2089 }
2090 Err(e) => {
2091 error!("Error dumping state for transaction {}: {e}", tx_digest);
2092 }
2093 }
2094 let expected_effects = self
2095 .get_transaction_cache_reader()
2096 .get_effects(&expected_effects_digest);
2097 error!(
2098 ?tx_digest,
2099 ?expected_effects_digest,
2100 actual_effects = ?effects,
2101 expected_effects = ?expected_effects,
2102 "fork detected!"
2103 );
2104 if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2105 tx_digest,
2106 expected_effects_digest,
2107 effects.digest(),
2108 ) {
2109 error!("Failed to record transaction fork: {e}");
2110 }
2111
2112 fail_point_if!("kill_transaction_fork_node", || {
2113 #[cfg(msim)]
2114 {
2115 tracing::error!(
2116 fatal = true,
2117 "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2118 tx_digest
2119 );
2120 sui_simulator::task::shutdown_current_node();
2121 }
2122 });
2123
2124 fatal!(
2125 "Transaction {} is expected to have effects digest {}, but got {}!",
2126 tx_digest,
2127 expected_effects_digest,
2128 effects.digest()
2129 );
2130 }
2131
2132 fail_point_arg!("simulate_fork_during_execution", |(
2133 forked_validators,
2134 full_halt,
2135 effects_overrides,
2136 fork_probability,
2137 ): (
2138 std::sync::Arc<
2139 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2140 >,
2141 bool,
2142 std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2143 f32,
2144 )| {
2145 #[cfg(msim)]
2146 self.simulate_fork_during_execution(
2147 certificate,
2148 epoch_store,
2149 &mut effects,
2150 forked_validators,
2151 full_halt,
2152 effects_overrides,
2153 fork_probability,
2154 );
2155 });
2156
2157 let unchanged_loaded_runtime_objects =
2158 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2159 certificate.transaction_data(),
2160 &effects,
2161 &tracking_store.into_read_objects(),
2162 );
2163
2164 let _ = self
2166 .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2167 .tap_err(|e| {
2168 self.metrics.post_processing_total_failures.inc();
2169 error!(?tx_digest, "tx post processing failed: {e}");
2170 });
2171
2172 self.update_metrics(certificate, &inner_temp_store, &effects);
2173
2174 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2175 certificate.clone().into_unsigned(),
2176 effects,
2177 inner_temp_store,
2178 unchanged_loaded_runtime_objects,
2179 );
2180
2181 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2182 if elapsed > 0.0 {
2183 self.metrics.prepare_cert_gas_latency_ratio.observe(
2184 transaction_outputs
2185 .effects
2186 .gas_cost_summary()
2187 .computation_cost as f64
2188 / elapsed,
2189 );
2190 }
2191
2192 ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2193 }
2194
2195 pub fn prepare_certificate_for_benchmark(
2196 &self,
2197 certificate: &VerifiedExecutableTransaction,
2198 input_objects: InputObjects,
2199 epoch_store: &Arc<AuthorityPerEpochStore>,
2200 ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2201 let lock = RwLock::new(epoch_store.epoch());
2202 let execution_guard = lock.try_read().unwrap();
2203
2204 let (transaction_outputs, _timings, execution_error_opt) = self
2205 .execute_certificate(
2206 &execution_guard,
2207 certificate,
2208 input_objects,
2209 None,
2210 ExecutionEnv::default(),
2211 epoch_store,
2212 )
2213 .unwrap();
2214 Ok((transaction_outputs, execution_error_opt))
2215 }
2216
2217 #[instrument(skip_all)]
2218 #[allow(clippy::type_complexity)]
2219 pub async fn dry_exec_transaction(
2220 &self,
2221 transaction: TransactionData,
2222 transaction_digest: TransactionDigest,
2223 ) -> SuiResult<(
2224 DryRunTransactionBlockResponse,
2225 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2226 TransactionEffects,
2227 Option<ObjectID>,
2228 )> {
2229 let epoch_store = self.load_epoch_store_one_call_per_task();
2230 if !self.is_fullnode(&epoch_store) {
2231 return Err(SuiErrorKind::UnsupportedFeatureError {
2232 error: "dry-exec is only supported on fullnodes".to_string(),
2233 }
2234 .into());
2235 }
2236
2237 if transaction.kind().is_system_tx() {
2238 return Err(SuiErrorKind::UnsupportedFeatureError {
2239 error: "dry-exec does not support system transactions".to_string(),
2240 }
2241 .into());
2242 }
2243
2244 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2245 }
2246
2247 #[allow(clippy::type_complexity)]
2248 pub fn dry_exec_transaction_for_benchmark(
2249 &self,
2250 transaction: TransactionData,
2251 transaction_digest: TransactionDigest,
2252 ) -> SuiResult<(
2253 DryRunTransactionBlockResponse,
2254 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2255 TransactionEffects,
2256 Option<ObjectID>,
2257 )> {
2258 let epoch_store = self.load_epoch_store_one_call_per_task();
2259 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2260 }
2261
2262 #[allow(clippy::type_complexity)]
2263 fn dry_exec_transaction_impl(
2264 &self,
2265 epoch_store: &AuthorityPerEpochStore,
2266 transaction: TransactionData,
2267 transaction_digest: TransactionDigest,
2268 ) -> SuiResult<(
2269 DryRunTransactionBlockResponse,
2270 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2271 TransactionEffects,
2272 Option<ObjectID>,
2273 )> {
2274 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2276
2277 let input_object_kinds = transaction.input_objects()?;
2278 let receiving_object_refs = transaction.receiving_objects();
2279
2280 sui_transaction_checks::deny::check_transaction_for_signing(
2281 &transaction,
2282 &[],
2283 &input_object_kinds,
2284 &receiving_object_refs,
2285 &self.config.transaction_deny_config,
2286 self.get_backing_package_store().as_ref(),
2287 )?;
2288
2289 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2290 None,
2292 &input_object_kinds,
2293 &receiving_object_refs,
2294 epoch_store.epoch(),
2295 )?;
2296
2297 let mut gas_data = transaction.gas_data().clone();
2299 let is_gasless =
2300 epoch_store.protocol_config().enable_gasless() && transaction.is_gasless_transaction();
2301 let ((gas_status, checked_input_objects), mock_gas) = if is_gasless {
2302 (
2304 sui_transaction_checks::check_transaction_input(
2305 epoch_store.protocol_config(),
2306 epoch_store.reference_gas_price(),
2307 &transaction,
2308 input_objects,
2309 &receiving_objects,
2310 &self.metrics.bytecode_verifier_metrics,
2311 &self.config.verifier_signing_config,
2312 )?,
2313 None,
2314 )
2315 } else if transaction.gas().is_empty() {
2316 let sender = transaction.sender();
2317 const MIST_TO_SUI: u64 = 1_000_000_000;
2319 const DRY_RUN_SUI: u64 = 1_000_000_000;
2320 let max_coin_value = MIST_TO_SUI * DRY_RUN_SUI;
2321 let gas_object_id = ObjectID::random();
2322 let gas_object = Object::new_move(
2323 MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
2324 Owner::AddressOwner(sender),
2325 TransactionDigest::genesis_marker(),
2326 );
2327 let gas_object_ref = gas_object.compute_object_reference();
2328 gas_data.payment = vec![gas_object_ref];
2329 (
2330 sui_transaction_checks::check_transaction_input_with_given_gas(
2331 epoch_store.protocol_config(),
2332 epoch_store.reference_gas_price(),
2333 &transaction,
2334 input_objects,
2335 receiving_objects,
2336 gas_object,
2337 &self.metrics.bytecode_verifier_metrics,
2338 &self.config.verifier_signing_config,
2339 )?,
2340 Some(gas_object_id),
2341 )
2342 } else {
2343 (
2344 sui_transaction_checks::check_transaction_input(
2345 epoch_store.protocol_config(),
2346 epoch_store.reference_gas_price(),
2347 &transaction,
2348 input_objects,
2349 &receiving_objects,
2350 &self.metrics.bytecode_verifier_metrics,
2351 &self.config.verifier_signing_config,
2352 )?,
2353 None,
2354 )
2355 };
2356
2357 let protocol_config = epoch_store.protocol_config();
2358 let (mut kind, signer, _) = transaction.execution_parts();
2359
2360 let silent = true;
2361 let executor = sui_execution::executor(protocol_config, silent)
2362 .expect("Creating an executor should not fail here");
2363
2364 let expensive_checks = false;
2365 let early_execution_error = get_early_execution_error(
2366 &transaction_digest,
2367 &checked_input_objects,
2368 self.config.certificate_deny_config.certificate_deny_set(),
2369 &FundsWithdrawStatus::MaybeSufficient,
2375 );
2376 let execution_params = match early_execution_error {
2377 Some(error) => ExecutionOrEarlyError::Err(error),
2378 None => ExecutionOrEarlyError::Ok(()),
2379 };
2380
2381 let rewritten_inputs = if execution_params.is_ok() {
2383 rewrite_transaction_for_coin_reservations(
2384 self.chain_identifier,
2385 &*self.coin_reservation_resolver,
2386 transaction.sender(),
2387 &mut kind,
2388 None,
2390 )?
2391 } else {
2392 None
2393 };
2394
2395 let (inner_temp_store, _, effects, _timings, execution_error) = self
2396 .execute_transaction_to_effects(
2397 executor.as_ref(),
2398 self.get_backing_store().as_ref(),
2399 protocol_config,
2400 expensive_checks,
2401 execution_params,
2402 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2403 epoch_store
2404 .epoch_start_config()
2405 .epoch_data()
2406 .epoch_start_timestamp(),
2407 checked_input_objects,
2408 gas_data,
2409 gas_status,
2410 kind,
2411 rewritten_inputs,
2412 signer,
2413 transaction_digest,
2414 );
2415
2416 let tx_digest = *effects.transaction_digest();
2417
2418 let module_cache =
2419 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2420
2421 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2422 epoch_store.protocol_config(),
2423 Box::new(PackageStoreWithFallback::new(
2424 &inner_temp_store,
2425 self.get_backing_package_store(),
2426 )),
2427 );
2428 let object_changes = Vec::new();
2430
2431 let balance_changes = Vec::new();
2433
2434 let written_with_kind = effects
2435 .created()
2436 .into_iter()
2437 .map(|(oref, _)| (oref, WriteKind::Create))
2438 .chain(
2439 effects
2440 .unwrapped()
2441 .into_iter()
2442 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2443 )
2444 .chain(
2445 effects
2446 .mutated()
2447 .into_iter()
2448 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2449 )
2450 .map(|(oref, kind)| {
2451 let obj = inner_temp_store.written.get(&oref.0).unwrap();
2452 (oref.0, (oref, obj.clone(), kind))
2454 })
2455 .collect();
2456
2457 let execution_error_source = execution_error
2458 .as_ref()
2459 .err()
2460 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2461
2462 Ok((
2463 DryRunTransactionBlockResponse {
2464 suggested_gas_price: self
2465 .congestion_tracker
2466 .get_suggested_gas_prices(&transaction),
2467 input: SuiTransactionBlockData::try_from_with_module_cache(
2468 transaction,
2469 &module_cache,
2470 )
2471 .map_err(|e| SuiErrorKind::TransactionSerializationError {
2472 error: format!(
2473 "Failed to convert transaction to SuiTransactionBlockData: {}",
2474 e
2475 ),
2476 })?, effects: effects.clone().try_into()?,
2478 events: SuiTransactionBlockEvents::try_from(
2479 inner_temp_store.events.clone(),
2480 tx_digest,
2481 None,
2482 layout_resolver.as_mut(),
2483 )?,
2484 object_changes,
2485 balance_changes,
2486 execution_error_source,
2487 },
2488 written_with_kind,
2489 effects,
2490 mock_gas,
2491 ))
2492 }
2493
2494 pub fn simulate_transaction(
2495 &self,
2496 mut transaction: TransactionData,
2497 checks: TransactionChecks,
2498 allow_mock_gas_coin: bool,
2499 ) -> SuiResult<SimulateTransactionResult> {
2500 if transaction.kind().is_system_tx() {
2501 return Err(SuiErrorKind::UnsupportedFeatureError {
2502 error: "simulate does not support system transactions".to_string(),
2503 }
2504 .into());
2505 }
2506
2507 let epoch_store = self.load_epoch_store_one_call_per_task();
2508 if !self.is_fullnode(&epoch_store) {
2509 return Err(SuiErrorKind::UnsupportedFeatureError {
2510 error: "simulate is only supported on fullnodes".to_string(),
2511 }
2512 .into());
2513 }
2514
2515 let dev_inspect = checks.disabled();
2516 if dev_inspect && self.config.dev_inspect_disabled {
2517 return Err(SuiErrorKind::UnsupportedFeatureError {
2518 error: "simulate with checks disabled is not allowed on this node".to_string(),
2519 }
2520 .into());
2521 }
2522
2523 let protocol_config = epoch_store.protocol_config();
2526 if !protocol_config.enable_coin_reservation_obj_refs()
2527 && transaction.gas().iter().any(|obj_ref| {
2528 sui_types::coin_reservation::ParsedDigest::is_coin_reservation_digest(&obj_ref.2)
2529 })
2530 {
2531 return Err(SuiErrorKind::UnsupportedFeatureError {
2532 error:
2533 "coin reservations in gas payment are not supported at this protocol version"
2534 .to_string(),
2535 }
2536 .into());
2537 }
2538
2539 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2541
2542 let input_object_kinds = transaction.input_objects()?;
2543 let receiving_object_refs = transaction.receiving_objects();
2544
2545 let is_gasless = protocol_config.enable_gasless() && transaction.is_gasless_transaction();
2550 let mock_gas_object = if allow_mock_gas_coin && transaction.gas().is_empty() && !is_gasless
2551 {
2552 let obj = Object::new_move(
2553 MoveObject::new_gas_coin(
2554 OBJECT_START_VERSION,
2555 ObjectID::MAX,
2556 DEV_INSPECT_GAS_COIN_VALUE,
2557 ),
2558 Owner::AddressOwner(transaction.gas_data().owner),
2559 TransactionDigest::genesis_marker(),
2560 );
2561 transaction.gas_data_mut().payment = vec![obj.compute_object_reference()];
2562 Some(obj)
2563 } else {
2564 None
2565 };
2566
2567 let declared_withdrawals = self.pre_object_load_checks(
2568 &transaction,
2569 &[],
2570 &input_object_kinds,
2571 &receiving_object_refs,
2572 epoch_store.protocol_config(),
2573 )?;
2574 let address_funds: BTreeSet<_> = declared_withdrawals.keys().cloned().collect();
2575
2576 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2577 None,
2579 &input_object_kinds,
2580 &receiving_object_refs,
2581 epoch_store.epoch(),
2582 )?;
2583
2584 let mock_gas_id = mock_gas_object.map(|obj| {
2586 let id = obj.id();
2587 input_objects.push(ObjectReadResult::new_from_gas_object(&obj));
2588 id
2589 });
2590
2591 let protocol_config = epoch_store.protocol_config();
2592
2593 let (gas_status, checked_input_objects) = if dev_inspect {
2594 sui_transaction_checks::check_dev_inspect_input(
2595 protocol_config,
2596 &transaction,
2597 input_objects,
2598 receiving_objects,
2599 epoch_store.reference_gas_price(),
2600 )?
2601 } else {
2602 sui_transaction_checks::check_transaction_input(
2603 epoch_store.protocol_config(),
2604 epoch_store.reference_gas_price(),
2605 &transaction,
2606 input_objects,
2607 &receiving_objects,
2608 &self.metrics.bytecode_verifier_metrics,
2609 &self.config.verifier_signing_config,
2610 )?
2611 };
2612
2613 let executor = sui_execution::executor(
2615 protocol_config,
2616 true, )
2618 .expect("Creating an executor should not fail here");
2619
2620 let (mut kind, signer, gas_data) = transaction.execution_parts();
2621 let rewritten_inputs = rewrite_transaction_for_coin_reservations(
2622 self.chain_identifier,
2623 &*self.coin_reservation_resolver,
2624 signer,
2625 &mut kind,
2626 None,
2627 )?;
2628 let early_execution_error = get_early_execution_error(
2629 &transaction.digest(),
2630 &checked_input_objects,
2631 self.config.certificate_deny_config.certificate_deny_set(),
2632 &FundsWithdrawStatus::MaybeSufficient,
2633 );
2634 let execution_params = match early_execution_error {
2635 Some(error) => ExecutionOrEarlyError::Err(error),
2636 None => ExecutionOrEarlyError::Ok(()),
2637 };
2638
2639 let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2640
2641 let cloned_input_objects = checked_input_objects.clone();
2643 let cloned_gas = gas_data.clone();
2644 let cloned_kind = kind.clone();
2645 let tx_digest = transaction.digest();
2646 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
2647 let epoch_timestamp_ms = epoch_store
2648 .epoch_start_config()
2649 .epoch_data()
2650 .epoch_start_timestamp();
2651 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2652 &tracking_store,
2653 protocol_config,
2654 self.metrics.execution_metrics.clone(),
2655 false, execution_params,
2657 &epoch_id,
2658 epoch_timestamp_ms,
2659 checked_input_objects,
2660 gas_data,
2661 gas_status,
2662 kind,
2663 rewritten_inputs.clone(),
2664 signer,
2665 tx_digest,
2666 dev_inspect,
2667 );
2668
2669 let (inner_temp_store, effects, execution_result) = if execution_result.is_ok() {
2671 let has_insufficient_object_funds = inner_temp_store
2672 .accumulator_running_max_withdraws
2673 .iter()
2674 .filter(|(id, _)| !address_funds.contains(id))
2675 .any(|(id, max_withdraw)| {
2676 let (balance, _) = self.get_account_funds_read().get_latest_account_amount(id);
2677 balance < *max_withdraw
2678 });
2679
2680 if has_insufficient_object_funds {
2681 let retry_gas_status = SuiGasStatus::new(
2682 cloned_gas.budget,
2683 cloned_gas.price,
2684 epoch_store.reference_gas_price(),
2685 protocol_config,
2686 )?;
2687 let (store, _, effects, result) = executor.dev_inspect_transaction(
2688 &tracking_store,
2689 protocol_config,
2690 self.metrics.execution_metrics.clone(),
2691 false,
2692 ExecutionOrEarlyError::Err(ExecutionErrorKind::InsufficientFundsForWithdraw),
2693 &epoch_id,
2694 epoch_timestamp_ms,
2695 cloned_input_objects,
2696 cloned_gas,
2697 retry_gas_status,
2698 cloned_kind,
2699 rewritten_inputs,
2700 signer,
2701 tx_digest,
2702 dev_inspect,
2703 );
2704 (store, effects, result)
2705 } else {
2706 (inner_temp_store, effects, execution_result)
2707 }
2708 } else {
2709 (inner_temp_store, effects, execution_result)
2710 };
2711
2712 let loaded_runtime_objects = tracking_store.into_read_objects();
2713 let unchanged_loaded_runtime_objects =
2714 crate::transaction_outputs::unchanged_loaded_runtime_objects(
2715 &transaction,
2716 &effects,
2717 &loaded_runtime_objects,
2718 );
2719
2720 let object_set = {
2721 let objects = {
2722 let mut objects = loaded_runtime_objects;
2723
2724 for o in inner_temp_store
2725 .input_objects
2726 .into_values()
2727 .chain(inner_temp_store.written.into_values())
2728 {
2729 objects.insert(o);
2730 }
2731
2732 objects
2733 };
2734
2735 let object_keys = sui_types::storage::get_transaction_object_set(
2736 &transaction,
2737 &effects,
2738 &unchanged_loaded_runtime_objects,
2739 );
2740
2741 let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2742 for k in object_keys {
2743 if let Some(o) = objects.get(&k) {
2744 set.insert(o.clone());
2745 }
2746 }
2747
2748 set
2749 };
2750
2751 Ok(SimulateTransactionResult {
2752 objects: object_set,
2753 events: effects.events_digest().map(|_| inner_temp_store.events),
2754 effects,
2755 execution_result,
2756 mock_gas_id,
2757 unchanged_loaded_runtime_objects,
2758 suggested_gas_price: self
2759 .congestion_tracker
2760 .get_suggested_gas_prices(&transaction),
2761 })
2762 }
2763
2764 #[allow(clippy::collapsible_else_if)]
2766 #[instrument(skip_all)]
2767 pub async fn dev_inspect_transaction_block(
2768 &self,
2769 sender: SuiAddress,
2770 transaction_kind: TransactionKind,
2771 gas_price: Option<u64>,
2772 gas_budget: Option<u64>,
2773 gas_sponsor: Option<SuiAddress>,
2774 gas_objects: Option<Vec<ObjectRef>>,
2775 show_raw_txn_data_and_effects: Option<bool>,
2776 skip_checks: Option<bool>,
2777 ) -> SuiResult<DevInspectResults> {
2778 let epoch_store = self.load_epoch_store_one_call_per_task();
2779
2780 if !self.is_fullnode(&epoch_store) {
2781 return Err(SuiErrorKind::UnsupportedFeatureError {
2782 error: "dev-inspect is only supported on fullnodes".to_string(),
2783 }
2784 .into());
2785 }
2786
2787 if transaction_kind.is_system_tx() {
2788 return Err(SuiErrorKind::UnsupportedFeatureError {
2789 error: "system transactions are not supported".to_string(),
2790 }
2791 .into());
2792 }
2793
2794 let skip_checks = skip_checks.unwrap_or(true);
2795 if skip_checks && self.config.dev_inspect_disabled {
2796 return Err(SuiErrorKind::UnsupportedFeatureError {
2797 error: "dev-inspect with skip_checks is not allowed on this node".to_string(),
2798 }
2799 .into());
2800 }
2801
2802 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2803 let reference_gas_price = epoch_store.reference_gas_price();
2804 let protocol_config = epoch_store.protocol_config();
2805 let max_tx_gas = protocol_config.max_tx_gas();
2806
2807 let price = gas_price.unwrap_or(reference_gas_price);
2808 let budget = gas_budget.unwrap_or(max_tx_gas);
2809 let owner = gas_sponsor.unwrap_or(sender);
2810 let payment = gas_objects.unwrap_or_default();
2812 let mut transaction = TransactionData::V1(TransactionDataV1 {
2813 kind: transaction_kind.clone(),
2814 sender,
2815 gas_data: GasData {
2816 payment,
2817 owner,
2818 price,
2819 budget,
2820 },
2821 expiration: TransactionExpiration::None,
2822 });
2823
2824 let raw_txn_data = if show_raw_txn_data_and_effects {
2825 bcs::to_bytes(&transaction).map_err(|_| {
2826 SuiErrorKind::TransactionSerializationError {
2827 error: "Failed to serialize transaction during dev inspect".to_string(),
2828 }
2829 })?
2830 } else {
2831 vec![]
2832 };
2833
2834 transaction.validity_check_no_gas_check(protocol_config)?;
2835
2836 let input_object_kinds = transaction.input_objects()?;
2837 let receiving_object_refs = transaction.receiving_objects();
2838
2839 sui_transaction_checks::deny::check_transaction_for_signing(
2840 &transaction,
2841 &[],
2842 &input_object_kinds,
2843 &receiving_object_refs,
2844 &self.config.transaction_deny_config,
2845 self.get_backing_package_store().as_ref(),
2846 )?;
2847
2848 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2849 None,
2851 &input_object_kinds,
2852 &receiving_object_refs,
2853 epoch_store.epoch(),
2854 )?;
2855
2856 let (gas_status, checked_input_objects) = if skip_checks {
2857 if transaction.gas().is_empty() {
2861 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2863 DEV_INSPECT_GAS_COIN_VALUE,
2864 transaction.gas_owner(),
2865 );
2866 let gas_object_ref = dummy_gas_object.compute_object_reference();
2867 transaction.gas_data_mut().payment = vec![gas_object_ref];
2868 input_objects.push(ObjectReadResult::new(
2869 InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2870 dummy_gas_object.into(),
2871 ));
2872 }
2873 sui_transaction_checks::check_dev_inspect_input(
2874 protocol_config,
2875 &transaction,
2876 input_objects,
2877 receiving_objects,
2878 reference_gas_price,
2879 )?
2880 } else {
2881 if transaction.gas().is_empty() {
2884 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2886 DEV_INSPECT_GAS_COIN_VALUE,
2887 transaction.gas_owner(),
2888 );
2889 let gas_object_ref = dummy_gas_object.compute_object_reference();
2890 transaction.gas_data_mut().payment = vec![gas_object_ref];
2891 sui_transaction_checks::check_transaction_input_with_given_gas(
2892 epoch_store.protocol_config(),
2893 epoch_store.reference_gas_price(),
2894 &transaction,
2895 input_objects,
2896 receiving_objects,
2897 dummy_gas_object,
2898 &self.metrics.bytecode_verifier_metrics,
2899 &self.config.verifier_signing_config,
2900 )?
2901 } else {
2902 sui_transaction_checks::check_transaction_input(
2903 epoch_store.protocol_config(),
2904 epoch_store.reference_gas_price(),
2905 &transaction,
2906 input_objects,
2907 &receiving_objects,
2908 &self.metrics.bytecode_verifier_metrics,
2909 &self.config.verifier_signing_config,
2910 )?
2911 }
2912 };
2913
2914 let executor = sui_execution::executor(protocol_config, true)
2915 .expect("Creating an executor should not fail here");
2916 let gas_data = transaction.gas_data().clone();
2917 let intent_msg = IntentMessage::new(
2918 Intent {
2919 version: IntentVersion::V0,
2920 scope: IntentScope::TransactionData,
2921 app_id: AppId::Sui,
2922 },
2923 transaction,
2924 );
2925 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2926 let early_execution_error = get_early_execution_error(
2927 &transaction_digest,
2928 &checked_input_objects,
2929 self.config.certificate_deny_config.certificate_deny_set(),
2930 &FundsWithdrawStatus::MaybeSufficient,
2932 );
2933 let execution_params = match early_execution_error {
2934 Some(error) => ExecutionOrEarlyError::Err(error),
2935 None => ExecutionOrEarlyError::Ok(()),
2936 };
2937
2938 let mut transaction_kind = transaction_kind;
2939 let rewritten_inputs = rewrite_transaction_for_coin_reservations(
2940 self.chain_identifier,
2941 &*self.coin_reservation_resolver,
2942 sender,
2943 &mut transaction_kind,
2944 None,
2945 )?;
2946 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2947 self.get_backing_store().as_ref(),
2948 protocol_config,
2949 self.metrics.execution_metrics.clone(),
2950 false,
2951 execution_params,
2952 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2953 epoch_store
2954 .epoch_start_config()
2955 .epoch_data()
2956 .epoch_start_timestamp(),
2957 checked_input_objects,
2958 gas_data,
2959 gas_status,
2960 transaction_kind,
2961 rewritten_inputs,
2962 sender,
2963 transaction_digest,
2964 skip_checks,
2965 );
2966
2967 let raw_effects = if show_raw_txn_data_and_effects {
2968 bcs::to_bytes(&effects).map_err(|_| SuiErrorKind::TransactionSerializationError {
2969 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2970 })?
2971 } else {
2972 vec![]
2973 };
2974
2975 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
2976 epoch_store.protocol_config(),
2977 Box::new(PackageStoreWithFallback::new(
2978 &inner_temp_store,
2979 self.get_backing_package_store(),
2980 )),
2981 );
2982
2983 DevInspectResults::new(
2984 effects,
2985 inner_temp_store.events.clone(),
2986 execution_result,
2987 raw_txn_data,
2988 raw_effects,
2989 layout_resolver.as_mut(),
2990 )
2991 }
2992
2993 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2995 let epoch_store = self.epoch_store_for_testing();
2996 Ok(epoch_store.reference_gas_price())
2997 }
2998
2999 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
3000 self.get_transaction_cache_reader()
3001 .is_tx_already_executed(digest)
3002 }
3003
3004 #[instrument(level = "debug", skip_all, err(level = "debug"))]
3005 fn index_tx(
3006 sequence: u64,
3007 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3008 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3009 indexes: &IndexStore,
3010 digest: &TransactionDigest,
3011 cert: &VerifiedExecutableTransaction,
3013 effects: &TransactionEffects,
3014 events: &TransactionEvents,
3015 timestamp_ms: u64,
3016 tx_coins: Option<TxCoins>,
3017 written: &WrittenObjects,
3018 inner_temporary_store: &InnerTemporaryStore,
3019 epoch_store: &Arc<AuthorityPerEpochStore>,
3020 acquire_locks: bool,
3021 ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3022 let changes = Self::process_object_index(backing_package_store, object_store, effects, written, inner_temporary_store, epoch_store)
3023 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
3024
3025 indexes.index_tx(
3026 sequence,
3027 cert.data().intent_message().value.sender(),
3028 cert.data()
3029 .intent_message()
3030 .value
3031 .input_objects()?
3032 .iter()
3033 .map(|o| o.object_id()),
3034 effects
3035 .all_changed_objects()
3036 .into_iter()
3037 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
3038 cert.data()
3039 .intent_message()
3040 .value
3041 .move_calls()
3042 .into_iter()
3043 .map(|(_cmd_idx, package, module, function)| {
3044 (*package, module.to_owned(), function.to_owned())
3045 }),
3046 events,
3047 changes,
3048 digest,
3049 timestamp_ms,
3050 tx_coins,
3051 effects.accumulator_events(),
3052 acquire_locks,
3053 )
3054 }
3055
3056 #[cfg(msim)]
3057 fn simulate_fork_during_execution(
3058 &self,
3059 certificate: &VerifiedExecutableTransaction,
3060 epoch_store: &Arc<AuthorityPerEpochStore>,
3061 effects: &mut TransactionEffects,
3062 forked_validators: std::sync::Arc<
3063 std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
3064 >,
3065 full_halt: bool,
3066 effects_overrides: std::sync::Arc<
3067 std::sync::Mutex<std::collections::BTreeMap<String, String>>,
3068 >,
3069 fork_probability: f32,
3070 ) {
3071 use std::cell::RefCell;
3072 thread_local! {
3073 static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
3074 }
3075 if !certificate.data().intent_message().value.is_system_tx() {
3076 let committee = epoch_store.committee();
3077 let cur_stake = (**committee).weight(&self.name);
3078 if cur_stake > 0 {
3079 TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
3080 let already_forked = forked_validators
3081 .lock()
3082 .ok()
3083 .map(|set| set.contains(&self.name))
3084 .unwrap_or(false);
3085
3086 if !already_forked {
3087 let should_fork = if full_halt {
3088 *total_stake <= committee.validity_threshold()
3090 } else {
3091 *total_stake + cur_stake < committee.validity_threshold()
3093 };
3094
3095 if should_fork {
3096 *total_stake += cur_stake;
3097
3098 if let Ok(mut external_set) = forked_validators.lock() {
3099 external_set.insert(self.name);
3100 info!("forked_validators: {:?}", external_set);
3101 }
3102 }
3103 }
3104
3105 if let Ok(external_set) = forked_validators.lock() {
3106 if external_set.contains(&self.name) {
3107 let tx_digest = certificate.digest().to_string();
3113 if let Ok(mut overrides) = effects_overrides.lock() {
3114 if overrides.contains_key(&tx_digest)
3115 || overrides.is_empty()
3116 && sui_simulator::random::deterministic_probability(
3117 &tx_digest,
3118 fork_probability,
3119 )
3120 {
3121 let original_effects_digest = effects.digest().to_string();
3122 overrides
3123 .insert(tx_digest.clone(), original_effects_digest.clone());
3124 info!(
3125 ?tx_digest,
3126 ?original_effects_digest,
3127 "Captured forked effects digest for transaction"
3128 );
3129 effects.gas_cost_summary_mut_for_testing().computation_cost +=
3130 1;
3131 }
3132 }
3133 }
3134 }
3135 });
3136 }
3137 }
3138 }
3139
3140 fn process_object_index(
3141 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3142 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3143 effects: &TransactionEffects,
3144 written: &WrittenObjects,
3145 inner_temporary_store: &InnerTemporaryStore,
3146 epoch_store: &Arc<AuthorityPerEpochStore>,
3147 ) -> SuiResult<ObjectIndexChanges> {
3148 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3149 epoch_store.protocol_config(),
3150 Box::new(PackageStoreWithFallback::new(
3151 inner_temporary_store,
3152 backing_package_store,
3153 )),
3154 );
3155
3156 let modified_at_version = effects
3157 .modified_at_versions()
3158 .into_iter()
3159 .collect::<HashMap<_, _>>();
3160
3161 let tx_digest = effects.transaction_digest();
3162 let mut deleted_owners = vec![];
3163 let mut deleted_dynamic_fields = vec![];
3164 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
3165 let old_version = modified_at_version.get(&id).unwrap();
3166 match Self::get_owner_at_version(object_store, &id, *old_version).unwrap_or_else(
3169 |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
3170 ) {
3171 Owner::AddressOwner(addr)
3172 | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
3173 Owner::ObjectOwner(object_id) => {
3174 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
3175 }
3176 _ => {}
3177 }
3178 }
3179
3180 let mut new_owners = vec![];
3181 let mut new_dynamic_fields = vec![];
3182
3183 for (oref, owner, kind) in effects.all_changed_objects() {
3184 let id = &oref.0;
3185 if let WriteKind::Mutate = kind {
3187 let Some(old_version) = modified_at_version.get(id) else {
3188 panic!(
3189 "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
3190 tx_digest
3191 );
3192 };
3193 let Some(old_object) = object_store.get_object_by_key(id, *old_version) else {
3196 panic!(
3197 "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
3198 tx_digest, id, old_version
3199 );
3200 };
3201 if old_object.owner != owner {
3202 match old_object.owner {
3203 Owner::AddressOwner(addr)
3204 | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3205 deleted_owners.push((addr, *id));
3206 }
3207 Owner::ObjectOwner(object_id) => {
3208 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
3209 }
3210 _ => {}
3211 }
3212 }
3213 }
3214
3215 match owner {
3216 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3217 let new_object = written.get(id).unwrap_or_else(
3219 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3220 );
3221 assert_eq!(
3222 new_object.version(),
3223 oref.1,
3224 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3225 tx_digest,
3226 id,
3227 new_object.version(),
3228 oref.1
3229 );
3230
3231 let type_ = new_object
3232 .type_()
3233 .map(|type_| ObjectType::Struct(type_.clone()))
3234 .unwrap_or(ObjectType::Package);
3235
3236 new_owners.push((
3237 (addr, *id),
3238 ObjectInfo {
3239 object_id: *id,
3240 version: oref.1,
3241 digest: oref.2,
3242 type_,
3243 owner,
3244 previous_transaction: *effects.transaction_digest(),
3245 },
3246 ));
3247 }
3248 Owner::ObjectOwner(owner) => {
3249 let new_object = written.get(id).unwrap_or_else(
3250 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3251 );
3252 assert_eq!(
3253 new_object.version(),
3254 oref.1,
3255 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3256 tx_digest,
3257 id,
3258 new_object.version(),
3259 oref.1
3260 );
3261
3262 let Some(df_info) = Self::try_create_dynamic_field_info(
3263 object_store,
3264 new_object,
3265 written,
3266 layout_resolver.as_mut(),
3267 )
3268 .unwrap_or_else(|e| {
3269 error!(
3270 "try_create_dynamic_field_info should not fail, {}, new_object={:?}",
3271 e, new_object
3272 );
3273 None
3274 }) else {
3275 continue;
3277 };
3278 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3279 }
3280 _ => {}
3281 }
3282 }
3283
3284 Ok(ObjectIndexChanges {
3285 deleted_owners,
3286 deleted_dynamic_fields,
3287 new_owners,
3288 new_dynamic_fields,
3289 })
3290 }
3291
3292 fn try_create_dynamic_field_info(
3293 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3294 o: &Object,
3295 written: &WrittenObjects,
3296 resolver: &mut dyn LayoutResolver,
3297 ) -> SuiResult<Option<DynamicFieldInfo>> {
3298 let Some(move_object) = o.data.try_as_move().cloned() else {
3300 return Ok(None);
3301 };
3302
3303 if !move_object.type_().is_dynamic_field() {
3305 return Ok(None);
3306 }
3307
3308 let layout = resolver
3309 .get_annotated_layout(&move_object.type_().clone().into())?
3310 .into_layout();
3311
3312 let field =
3313 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3314 SuiErrorKind::ObjectDeserializationError {
3315 error: e.to_string(),
3316 }
3317 })?;
3318
3319 let type_ = field.kind;
3320 let name_type: TypeTag = field.name_layout.into();
3321 let bcs_name = field.name_bytes.to_owned();
3322
3323 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3324 .map_err(|e| {
3325 warn!("{e}");
3326 SuiErrorKind::ObjectDeserializationError {
3327 error: e.to_string(),
3328 }
3329 })?;
3330
3331 let name = DynamicFieldName {
3332 type_: name_type,
3333 value: SuiMoveValue::from(name_value).to_json_value(),
3334 };
3335
3336 let value_metadata = field.value_metadata().map_err(|e| {
3337 warn!("{e}");
3338 SuiErrorKind::ObjectDeserializationError {
3339 error: e.to_string(),
3340 }
3341 })?;
3342
3343 Ok(Some(match value_metadata {
3344 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3345 name,
3346 bcs_name,
3347 type_,
3348 object_type: object_type.to_canonical_string(true),
3349 object_id: o.id(),
3350 version: o.version(),
3351 digest: o.digest(),
3352 },
3353
3354 DFV::ValueMetadata::DynamicObjectField(object_id) => {
3355 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3359 let version = object.version();
3360 let digest = object.digest();
3361 let object_type = object.data.type_().unwrap().clone();
3362 (version, digest, object_type)
3363 } else {
3364 let object = object_store
3366 .get_object_by_key(&object_id, o.version())
3367 .ok_or_else(|| UserInputError::ObjectNotFound {
3368 object_id,
3369 version: Some(o.version()),
3370 })?;
3371 let version = object.version();
3372 let digest = object.digest();
3373 let object_type = object.data.type_().unwrap().clone();
3374 (version, digest, object_type)
3375 };
3376
3377 DynamicFieldInfo {
3378 name,
3379 bcs_name,
3380 type_,
3381 object_type: object_type.to_string(),
3382 object_id,
3383 version,
3384 digest,
3385 }
3386 }
3387 }))
3388 }
3389
3390 #[instrument(level = "trace", skip_all, err(level = "debug"))]
3391 fn post_process_one_tx(
3392 &self,
3393 certificate: &VerifiedExecutableTransaction,
3394 effects: &TransactionEffects,
3395 inner_temporary_store: &InnerTemporaryStore,
3396 epoch_store: &Arc<AuthorityPerEpochStore>,
3397 ) -> SuiResult {
3398 let Some(indexes) = &self.indexes else {
3399 return Ok(());
3400 };
3401
3402 let tx_digest = *certificate.digest();
3403
3404 let sequence = indexes.allocate_sequence_number();
3406
3407 if self.config.sync_post_process_one_tx {
3408 let result = Self::post_process_one_tx_impl(
3413 sequence,
3414 indexes,
3415 &self.subscription_handler,
3416 &self.metrics,
3417 self.name,
3418 self.get_backing_package_store(),
3419 self.get_object_store(),
3420 certificate,
3421 effects,
3422 inner_temporary_store,
3423 epoch_store,
3424 true, );
3426
3427 match result {
3428 Ok((raw_batch, cache_updates_with_locks)) => {
3429 let mut db_batch = indexes.new_db_batch();
3430 db_batch
3431 .concat(vec![raw_batch])
3432 .expect("failed to absorb raw index batch");
3433 let IndexStoreCacheUpdatesWithLocks { _locks, inner } =
3435 cache_updates_with_locks;
3436 indexes
3437 .commit_index_batch(db_batch, vec![inner])
3438 .expect("failed to commit index batch");
3439 }
3440 Err(e) => {
3441 self.metrics.post_processing_total_failures.inc();
3442 error!(?tx_digest, "tx post processing failed: {e}");
3443 return Err(e);
3444 }
3445 }
3446
3447 return Ok(());
3448 }
3449
3450 let (done_tx, done_rx) = tokio::sync::oneshot::channel::<PostProcessingOutput>();
3451 self.pending_post_processing.insert(tx_digest, done_rx);
3452
3453 let indexes = indexes.clone();
3454 let subscription_handler = self.subscription_handler.clone();
3455 let metrics = self.metrics.clone();
3456 let name = self.name;
3457 let backing_package_store = self.get_backing_package_store().clone();
3458 let object_store = self.get_object_store().clone();
3459 let semaphore = self.post_processing_semaphore.clone();
3460
3461 let certificate = certificate.clone();
3462 let effects = effects.clone();
3463 let inner_temporary_store = inner_temporary_store.clone();
3464 let epoch_store = epoch_store.clone();
3465
3466 tokio::spawn(async move {
3468 let permit = {
3469 let _scope = monitored_scope("Execution::post_process_one_tx::semaphore_acquire");
3470 semaphore
3471 .acquire_owned()
3472 .await
3473 .expect("post-processing semaphore should not be closed")
3474 };
3475
3476 let _ = tokio::task::spawn_blocking(move || {
3477 let _permit = permit;
3478
3479 let result = Self::post_process_one_tx_impl(
3480 sequence,
3481 &indexes,
3482 &subscription_handler,
3483 &metrics,
3484 name,
3485 &backing_package_store,
3486 &object_store,
3487 &certificate,
3488 &effects,
3489 &inner_temporary_store,
3490 &epoch_store,
3491 false, );
3493
3494 match result {
3495 Ok((raw_batch, cache_updates_with_locks)) => {
3496 fail_point!("crash-after-post-process-one-tx");
3497 let output = (raw_batch, cache_updates_with_locks.into_inner());
3498 let _ = done_tx.send(output);
3499 }
3500 Err(e) => {
3501 metrics.post_processing_total_failures.inc();
3502 error!(?tx_digest, "tx post processing failed: {e}");
3503 }
3504 }
3505 })
3506 .await;
3507 });
3508
3509 Ok(())
3510 }
3511
3512 fn post_process_one_tx_impl(
3513 sequence: u64,
3514 indexes: &Arc<IndexStore>,
3515 subscription_handler: &Arc<SubscriptionHandler>,
3516 metrics: &Arc<AuthorityMetrics>,
3517 name: AuthorityName,
3518 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3519 object_store: &Arc<dyn ObjectStore + Send + Sync>,
3520 certificate: &VerifiedExecutableTransaction,
3521 effects: &TransactionEffects,
3522 inner_temporary_store: &InnerTemporaryStore,
3523 epoch_store: &Arc<AuthorityPerEpochStore>,
3524 acquire_locks: bool,
3525 ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3526 let _scope = monitored_scope("Execution::post_process_one_tx");
3527
3528 let tx_digest = certificate.digest();
3529 let timestamp_ms = Self::unixtime_now_ms();
3530 let events = &inner_temporary_store.events;
3531 let written = &inner_temporary_store.written;
3532 let tx_coins = Self::fullnode_only_get_tx_coins_for_indexing(
3533 name,
3534 object_store,
3535 effects,
3536 inner_temporary_store,
3537 epoch_store,
3538 );
3539
3540 let (raw_batch, cache_updates) = Self::index_tx(
3541 sequence,
3542 backing_package_store,
3543 object_store,
3544 indexes,
3545 tx_digest,
3546 certificate,
3547 effects,
3548 events,
3549 timestamp_ms,
3550 tx_coins,
3551 written,
3552 inner_temporary_store,
3553 epoch_store,
3554 acquire_locks,
3555 )
3556 .tap_ok(|_| metrics.post_processing_total_tx_indexed.inc())
3557 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3558 .expect("Indexing tx should not fail");
3559
3560 let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3561 let events = Self::make_transaction_block_events(
3562 backing_package_store,
3563 events.clone(),
3564 *tx_digest,
3565 timestamp_ms,
3566 epoch_store,
3567 inner_temporary_store,
3568 )?;
3569 subscription_handler
3571 .process_tx(certificate.data().transaction_data(), &effects, &events)
3572 .tap_ok(|_| metrics.post_processing_total_tx_had_event_processed.inc())
3573 .tap_err(|e| {
3574 warn!(
3575 ?tx_digest,
3576 "Post processing - Couldn't process events for tx: {}", e
3577 )
3578 })?;
3579
3580 metrics
3581 .post_processing_total_events_emitted
3582 .inc_by(events.data.len() as u64);
3583
3584 Ok((raw_batch, cache_updates))
3585 }
3586
3587 fn make_transaction_block_events(
3588 backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3589 transaction_events: TransactionEvents,
3590 digest: TransactionDigest,
3591 timestamp_ms: u64,
3592 epoch_store: &Arc<AuthorityPerEpochStore>,
3593 inner_temporary_store: &InnerTemporaryStore,
3594 ) -> SuiResult<SuiTransactionBlockEvents> {
3595 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
3596 epoch_store.protocol_config(),
3597 Box::new(PackageStoreWithFallback::new(
3598 inner_temporary_store,
3599 backing_package_store,
3600 )),
3601 );
3602 SuiTransactionBlockEvents::try_from(
3603 transaction_events,
3604 digest,
3605 Some(timestamp_ms),
3606 layout_resolver.as_mut(),
3607 )
3608 }
3609
3610 pub fn unixtime_now_ms() -> u64 {
3611 let now = SystemTime::now()
3612 .duration_since(UNIX_EPOCH)
3613 .expect("Time went backwards")
3614 .as_millis();
3615 u64::try_from(now).expect("Travelling in time machine")
3616 }
3617
3618 #[instrument(level = "trace", skip_all)]
3622 pub async fn handle_transaction_info_request(
3623 &self,
3624 request: TransactionInfoRequest,
3625 ) -> SuiResult<TransactionInfoResponse> {
3626 let epoch_store = self.load_epoch_store_one_call_per_task();
3627 let (transaction, status) = self
3628 .get_transaction_status(&request.transaction_digest, &epoch_store)?
3629 .ok_or(SuiErrorKind::TransactionNotFound {
3630 digest: request.transaction_digest,
3631 })?;
3632 Ok(TransactionInfoResponse {
3633 transaction,
3634 status,
3635 })
3636 }
3637
3638 #[instrument(level = "trace", skip_all)]
3639 pub async fn handle_object_info_request(
3640 &self,
3641 request: ObjectInfoRequest,
3642 ) -> SuiResult<ObjectInfoResponse> {
3643 let epoch_store = self.load_epoch_store_one_call_per_task();
3644
3645 let requested_object_seq = match request.request_kind {
3646 ObjectInfoRequestKind::LatestObjectInfo => {
3647 let (_, seq, _) = self
3648 .get_object_or_tombstone(request.object_id)
3649 .await
3650 .ok_or_else(|| {
3651 SuiError::from(UserInputError::ObjectNotFound {
3652 object_id: request.object_id,
3653 version: None,
3654 })
3655 })?;
3656 seq
3657 }
3658 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3659 };
3660
3661 let object = self
3662 .get_object_store()
3663 .get_object_by_key(&request.object_id, requested_object_seq)
3664 .ok_or_else(|| {
3665 SuiError::from(UserInputError::ObjectNotFound {
3666 object_id: request.object_id,
3667 version: Some(requested_object_seq),
3668 })
3669 })?;
3670
3671 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3672 (request.generate_layout, object.data.try_as_move())
3673 {
3674 let epoch_store = self.load_epoch_store_one_call_per_task();
3675 Some(into_struct_layout(
3676 epoch_store
3677 .executor()
3678 .type_layout_resolver(
3679 epoch_store.protocol_config(),
3680 Box::new(self.get_backing_package_store().as_ref()),
3681 )
3682 .get_annotated_layout(&move_obj.type_().clone().into())?,
3683 )?)
3684 } else {
3685 None
3686 };
3687
3688 let lock = if !object.is_address_owned() {
3689 None
3691 } else {
3692 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3693 .await?
3694 .map(|s| s.into_inner())
3695 };
3696
3697 Ok(ObjectInfoResponse {
3698 object,
3699 layout,
3700 lock_for_debugging: lock,
3701 })
3702 }
3703
3704 #[instrument(level = "trace", skip_all)]
3705 pub fn handle_checkpoint_request(
3706 &self,
3707 request: &CheckpointRequest,
3708 ) -> SuiResult<CheckpointResponse> {
3709 let summary = match request.sequence_number {
3710 Some(seq) => self
3711 .checkpoint_store
3712 .get_checkpoint_by_sequence_number(seq)?,
3713 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3714 }
3715 .map(|v| v.into_inner());
3716 let contents = match &summary {
3717 Some(s) => self
3718 .checkpoint_store
3719 .get_checkpoint_contents(&s.content_digest)?,
3720 None => None,
3721 };
3722 Ok(CheckpointResponse {
3723 checkpoint: summary,
3724 contents,
3725 })
3726 }
3727
3728 #[instrument(level = "trace", skip_all)]
3729 pub fn handle_checkpoint_request_v2(
3730 &self,
3731 request: &CheckpointRequestV2,
3732 ) -> SuiResult<CheckpointResponseV2> {
3733 let summary = if request.certified {
3734 let summary = match request.sequence_number {
3735 Some(seq) => self
3736 .checkpoint_store
3737 .get_checkpoint_by_sequence_number(seq)?,
3738 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3739 }
3740 .map(|v| v.into_inner());
3741 summary.map(CheckpointSummaryResponse::Certified)
3742 } else {
3743 let summary = match request.sequence_number {
3744 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3745 None => self
3746 .checkpoint_store
3747 .get_latest_locally_computed_checkpoint()?,
3748 };
3749 summary.map(CheckpointSummaryResponse::Pending)
3750 };
3751 let contents = match &summary {
3752 Some(s) => self
3753 .checkpoint_store
3754 .get_checkpoint_contents(&s.content_digest())?,
3755 None => None,
3756 };
3757 Ok(CheckpointResponseV2 {
3758 checkpoint: summary,
3759 contents,
3760 })
3761 }
3762
3763 fn check_protocol_version(
3764 supported_protocol_versions: SupportedProtocolVersions,
3765 current_version: ProtocolVersion,
3766 ) {
3767 info!("current protocol version is now {:?}", current_version);
3768 info!("supported versions are: {:?}", supported_protocol_versions);
3769 if !supported_protocol_versions.is_version_supported(current_version) {
3770 let msg = format!(
3771 "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3772 current_version, supported_protocol_versions,
3773 );
3774
3775 error!("{}", msg);
3776 eprintln!("{}", msg);
3777
3778 #[cfg(not(msim))]
3779 std::process::exit(1);
3780
3781 #[cfg(msim)]
3782 sui_simulator::task::shutdown_current_node();
3783 }
3784 }
3785
3786 #[allow(clippy::disallowed_methods)] #[allow(clippy::too_many_arguments)]
3788 pub async fn new(
3789 name: AuthorityName,
3790 secret: StableSyncAuthoritySigner,
3791 supported_protocol_versions: SupportedProtocolVersions,
3792 store: Arc<AuthorityStore>,
3793 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3794 epoch_store: Arc<AuthorityPerEpochStore>,
3795 committee_store: Arc<CommitteeStore>,
3796 indexes: Option<Arc<IndexStore>>,
3797 rpc_index: Option<Arc<RpcIndexStore>>,
3798 checkpoint_store: Arc<CheckpointStore>,
3799 prometheus_registry: &Registry,
3800 genesis_objects: &[Object],
3801 db_checkpoint_config: &DBCheckpointConfig,
3802 config: NodeConfig,
3803 chain_identifier: ChainIdentifier,
3804 policy_config: Option<PolicyConfig>,
3805 firewall_config: Option<RemoteFirewallConfig>,
3806 pruner_watermarks: Arc<PrunerWatermarks>,
3807 ) -> Arc<Self> {
3808 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3809
3810 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3811 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3812 let execution_scheduler = Arc::new(ExecutionScheduler::new(
3813 execution_cache_trait_pointers.object_cache_reader.clone(),
3814 execution_cache_trait_pointers.account_funds_read.clone(),
3815 execution_cache_trait_pointers
3816 .transaction_cache_reader
3817 .clone(),
3818 tx_ready_certificates,
3819 &epoch_store,
3820 config.funds_withdraw_scheduler_type,
3821 metrics.clone(),
3822 prometheus_registry,
3823 ));
3824 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3825
3826 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3827 epoch_store.get_parent_path(),
3828 &config.authority_store_pruning_config,
3829 );
3830 let _pruner = AuthorityStorePruner::new(
3831 store.perpetual_tables.clone(),
3832 checkpoint_store.clone(),
3833 rpc_index.clone(),
3834 indexes.clone(),
3835 config.authority_store_pruning_config.clone(),
3836 epoch_store.committee().authority_exists(&name),
3837 epoch_store.epoch_start_state().epoch_duration_ms(),
3838 prometheus_registry,
3839 pruner_watermarks,
3840 );
3841 let input_loader =
3842 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3843 let epoch = epoch_store.epoch();
3844 let traffic_controller_metrics =
3845 Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3846 let traffic_controller = if let Some(policy_config) = policy_config {
3847 Some(Arc::new(
3848 TrafficController::init(
3849 policy_config,
3850 traffic_controller_metrics,
3851 firewall_config.clone(),
3852 )
3853 .await,
3854 ))
3855 } else {
3856 None
3857 };
3858
3859 let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3860 ForkRecoveryState::new(Some(fork_config))
3861 .expect("Failed to initialize fork recovery state")
3862 });
3863
3864 let coin_reservation_resolver = Arc::new(CachingCoinReservationResolver::new(
3865 execution_cache_trait_pointers.child_object_resolver.clone(),
3866 ));
3867
3868 let object_funds_checker_metrics =
3869 Arc::new(ObjectFundsCheckerMetrics::new(prometheus_registry));
3870 let state = Arc::new(AuthorityState {
3871 name,
3872 secret,
3873 execution_lock: RwLock::new(epoch),
3874 epoch_store: ArcSwap::new(epoch_store.clone()),
3875 input_loader,
3876 execution_cache_trait_pointers,
3877 coin_reservation_resolver,
3878 indexes,
3879 rpc_index,
3880 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3881 checkpoint_store,
3882 committee_store,
3883 execution_scheduler,
3884 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3885 metrics,
3886 _pruner,
3887 _authority_per_epoch_pruner,
3888 db_checkpoint_config: db_checkpoint_config.clone(),
3889 config,
3890 overload_info: AuthorityOverloadInfo::default(),
3891 chain_identifier,
3892 congestion_tracker: Arc::new(CongestionTracker::new()),
3893 consensus_gasless_counter: Arc::new(ConsensusGaslessCounter::default()),
3894 traffic_controller,
3895 fork_recovery_state,
3896 notify_epoch: tokio::sync::watch::channel(epoch).0,
3897 object_funds_checker: ArcSwapOption::empty(),
3898 object_funds_checker_metrics,
3899 pending_post_processing: Arc::new(DashMap::new()),
3900 post_processing_semaphore: Arc::new(tokio::sync::Semaphore::new(num_cpus::get())),
3901 });
3902 state.init_object_funds_checker().await;
3903
3904 let state_clone = Arc::downgrade(&state);
3905 spawn_monitored_task!(fix_indexes(state_clone));
3906 let authority_state = Arc::downgrade(&state);
3908 spawn_monitored_task!(execution_process(
3909 authority_state,
3910 rx_ready_certificates,
3911 rx_execution_shutdown,
3912 ));
3913 state
3915 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3916 .expect("Error indexing genesis objects.");
3917
3918 if epoch_store
3919 .protocol_config()
3920 .enable_multi_epoch_transaction_expiration()
3921 && epoch_store.epoch() > 0
3922 {
3923 use typed_store::Map;
3924 let previous_epoch = epoch_store.epoch() - 1;
3925 let start_key = (previous_epoch, TransactionDigest::ZERO);
3926 let end_key = (previous_epoch + 1, TransactionDigest::ZERO);
3927 let has_previous_epoch_data = store
3928 .perpetual_tables
3929 .executed_transaction_digests
3930 .safe_range_iter(start_key..end_key)
3931 .next()
3932 .is_some();
3933
3934 if !has_previous_epoch_data {
3935 panic!(
3936 "enable_multi_epoch_transaction_expiration is enabled but no transaction data found for previous epoch {}. \
3937 This indicates the node was restored using an old version of sui-tool that does not backfill the table. \
3938 Please restore from a snapshot using the latest version of sui-tool.",
3939 previous_epoch
3940 );
3941 }
3942 }
3943
3944 state
3945 }
3946
3947 async fn init_object_funds_checker(&self) {
3948 let epoch_store = self.epoch_store.load();
3949 if self.is_validator(&epoch_store)
3950 && epoch_store.protocol_config().enable_object_funds_withdraw()
3951 {
3952 if self.object_funds_checker.load().is_none() {
3953 let inner = self
3954 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
3955 .await
3956 .map(|o| {
3957 Arc::new(ObjectFundsChecker::new(
3958 o.version(),
3959 self.object_funds_checker_metrics.clone(),
3960 ))
3961 });
3962 self.object_funds_checker.store(inner);
3963 }
3964 } else {
3965 self.object_funds_checker.store(None);
3966 }
3967 }
3968
3969 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3971 &self.execution_cache_trait_pointers.object_cache_reader
3972 }
3973
3974 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3975 &self.execution_cache_trait_pointers.transaction_cache_reader
3976 }
3977
3978 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3979 &self.execution_cache_trait_pointers.cache_writer
3980 }
3981
3982 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3983 &self.execution_cache_trait_pointers.backing_store
3984 }
3985
3986 pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3987 &self.execution_cache_trait_pointers.child_object_resolver
3988 }
3989
3990 pub(crate) fn get_account_funds_read(&self) -> &Arc<dyn AccountFundsRead> {
3991 &self.execution_cache_trait_pointers.account_funds_read
3992 }
3993
3994 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3995 &self.execution_cache_trait_pointers.backing_package_store
3996 }
3997
3998 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3999 &self.execution_cache_trait_pointers.object_store
4000 }
4001
4002 pub fn pending_post_processing(
4003 &self,
4004 ) -> &Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>> {
4005 &self.pending_post_processing
4006 }
4007
4008 pub async fn await_post_processing(
4009 &self,
4010 tx_digest: &TransactionDigest,
4011 ) -> Option<PostProcessingOutput> {
4012 if let Some((_, rx)) = self.pending_post_processing.remove(tx_digest) {
4013 rx.await.ok()
4015 } else {
4016 None
4018 }
4019 }
4020
4021 pub async fn flush_post_processing(&self, tx_digest: &TransactionDigest) {
4025 if let Some(indexes) = &self.indexes
4026 && let Some((raw_batch, cache_updates)) = self.await_post_processing(tx_digest).await
4027 {
4028 let mut db_batch = indexes.new_db_batch();
4029 db_batch
4030 .concat(vec![raw_batch])
4031 .expect("failed to build index batch");
4032 indexes
4033 .commit_index_batch(db_batch, vec![cache_updates])
4034 .expect("failed to commit index batch");
4035 }
4036 }
4037
4038 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
4039 &self.execution_cache_trait_pointers.reconfig_api
4040 }
4041
4042 pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
4043 &self.execution_cache_trait_pointers.global_state_hash_store
4044 }
4045
4046 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
4047 &self.execution_cache_trait_pointers.checkpoint_cache
4048 }
4049
4050 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
4051 &self.execution_cache_trait_pointers.state_sync_store
4052 }
4053
4054 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
4055 &self.execution_cache_trait_pointers.cache_commit
4056 }
4057
4058 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
4059 self.execution_cache_trait_pointers
4060 .testing_api
4061 .database_for_testing()
4062 }
4063
4064 pub fn cache_for_testing(&self) -> &WritebackCache {
4065 self.execution_cache_trait_pointers
4066 .testing_api
4067 .cache_for_testing()
4068 }
4069
4070 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
4071 &self,
4072 config: NodeConfig,
4073 metrics: Arc<AuthorityStorePruningMetrics>,
4074 ) -> anyhow::Result<()> {
4075 use crate::authority::authority_store_pruner::PrunerWatermarks;
4076 let watermarks = Arc::new(PrunerWatermarks::default());
4077 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
4078 &self.database_for_testing().perpetual_tables,
4079 &self.checkpoint_store,
4080 self.rpc_index.as_deref(),
4081 config.authority_store_pruning_config,
4082 metrics,
4083 EPOCH_DURATION_MS_FOR_TESTING,
4084 &watermarks,
4085 )
4086 .await
4087 }
4088
4089 pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
4090 &self.execution_scheduler
4091 }
4092
4093 fn create_owner_index_if_empty(
4094 &self,
4095 genesis_objects: &[Object],
4096 epoch_store: &Arc<AuthorityPerEpochStore>,
4097 ) -> SuiResult {
4098 let Some(index_store) = &self.indexes else {
4099 return Ok(());
4100 };
4101 if !index_store.is_empty() {
4102 return Ok(());
4103 }
4104
4105 let mut new_owners = vec![];
4106 let mut new_dynamic_fields = vec![];
4107 let mut layout_resolver = epoch_store.executor().type_layout_resolver(
4108 epoch_store.protocol_config(),
4109 Box::new(self.get_backing_package_store().as_ref()),
4110 );
4111 for o in genesis_objects.iter() {
4112 match o.owner {
4113 Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
4114 new_owners.push((
4115 (addr, o.id()),
4116 ObjectInfo::new(&o.compute_object_reference(), o),
4117 ))
4118 }
4119 Owner::ObjectOwner(object_id) => {
4120 let id = o.id();
4121 let Some(info) = Self::try_create_dynamic_field_info(
4122 self.get_object_store(),
4123 o,
4124 &BTreeMap::new(),
4125 layout_resolver.as_mut(),
4126 )?
4127 else {
4128 continue;
4129 };
4130 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
4131 }
4132 _ => {}
4133 }
4134 }
4135
4136 index_store.insert_genesis_objects(ObjectIndexChanges {
4137 deleted_owners: vec![],
4138 deleted_dynamic_fields: vec![],
4139 new_owners,
4140 new_dynamic_fields,
4141 })
4142 }
4143
4144 pub fn execution_lock_for_executable_transaction(
4148 &self,
4149 transaction: &VerifiedExecutableTransaction,
4150 ) -> Option<ExecutionLockReadGuard<'_>> {
4151 let lock = self.execution_lock.try_read().ok()?;
4152 if *lock == transaction.auth_sig().epoch() {
4153 Some(lock)
4154 } else {
4155 None
4157 }
4158 }
4159
4160 fn execution_lock_for_validation(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
4163 self.execution_lock
4164 .try_read()
4165 .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
4166 }
4167
4168 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
4169 self.execution_lock.write().await
4170 }
4171
4172 #[instrument(level = "error", skip_all)]
4173 pub async fn reconfigure(
4174 &self,
4175 cur_epoch_store: &AuthorityPerEpochStore,
4176 supported_protocol_versions: SupportedProtocolVersions,
4177 new_committee: Committee,
4178 epoch_start_configuration: EpochStartConfiguration,
4179 state_hasher: Arc<GlobalStateHasher>,
4180 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4181 epoch_last_checkpoint: CheckpointSequenceNumber,
4182 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
4183 Self::check_protocol_version(
4184 supported_protocol_versions,
4185 epoch_start_configuration
4186 .epoch_start_state()
4187 .protocol_version(),
4188 );
4189
4190 self.committee_store.insert_new_committee(&new_committee)?;
4191
4192 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4194
4195 cur_epoch_store.epoch_terminated().await;
4197
4198 {
4202 let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
4203 if state.should_accept_user_certs() {
4204 cur_epoch_store.close_user_certs(state);
4211 }
4212 }
4214
4215 self.get_reconfig_api()
4216 .clear_state_end_of_epoch(&execution_lock);
4217 self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
4218 self.maybe_reaccumulate_state_hash(
4219 cur_epoch_store,
4220 epoch_start_configuration
4221 .epoch_start_state()
4222 .protocol_version(),
4223 );
4224 self.get_reconfig_api()
4225 .set_epoch_start_configuration(&epoch_start_configuration);
4226 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
4227 && self
4228 .db_checkpoint_config
4229 .perform_db_checkpoints_at_epoch_end
4230 {
4231 let checkpoint_indexes = self
4232 .db_checkpoint_config
4233 .perform_index_db_checkpoints_at_epoch_end
4234 .unwrap_or(false);
4235 let current_epoch = cur_epoch_store.epoch();
4236 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
4237 self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
4238 }
4239
4240 self.get_reconfig_api()
4241 .reconfigure_cache(&epoch_start_configuration)
4242 .await;
4243
4244 let new_epoch = new_committee.epoch;
4245 let new_epoch_store = self
4246 .reopen_epoch_db(
4247 cur_epoch_store,
4248 new_committee,
4249 epoch_start_configuration,
4250 expensive_safety_check_config,
4251 epoch_last_checkpoint,
4252 )
4253 .await?;
4254 assert_eq!(new_epoch_store.epoch(), new_epoch);
4255 self.execution_scheduler
4256 .reconfigure(&new_epoch_store, self.get_account_funds_read());
4257 self.init_object_funds_checker().await;
4258 *execution_lock = new_epoch;
4259
4260 self.notify_epoch(new_epoch);
4261 Ok(new_epoch_store)
4265 }
4266
4267 fn notify_epoch(&self, new_epoch: EpochId) {
4268 self.notify_epoch.send_modify(|epoch| *epoch = new_epoch);
4269 }
4270
4271 pub async fn wait_for_epoch(&self, target_epoch: EpochId) -> Result<EpochId, RecvError> {
4272 let mut rx = self.notify_epoch.subscribe();
4273 loop {
4274 let epoch = *rx.borrow();
4275 if epoch >= target_epoch {
4276 return Ok(epoch);
4277 }
4278 rx.changed().await?;
4279 }
4280 }
4281
4282 pub async fn settle_accumulator_for_testing(
4286 &self,
4287 effects: &[TransactionEffects],
4288 checkpoint_seq: Option<u64>,
4289 ) -> Vec<(VerifiedExecutableTransaction, ExecutionEnv)> {
4290 let accumulator_version = self
4291 .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
4292 .await
4293 .unwrap()
4294 .version();
4295 let ckpt_seq = checkpoint_seq.unwrap_or_else(|| accumulator_version.value());
4297 let builder = AccumulatorSettlementTxBuilder::new(
4298 Some(self.get_transaction_cache_reader().as_ref()),
4299 effects,
4300 ckpt_seq,
4301 0,
4302 );
4303 let balance_changes = builder.collect_funds_changes();
4304 let epoch_store = self.epoch_store_for_testing();
4305 let epoch = epoch_store.epoch();
4306 let accumulator_root_obj_initial_shared_version = epoch_store
4307 .epoch_start_config()
4308 .accumulator_root_obj_initial_shared_version()
4309 .unwrap();
4310 let settlements = builder.build_tx(
4311 epoch_store.protocol_config(),
4312 epoch,
4313 accumulator_root_obj_initial_shared_version,
4314 ckpt_seq,
4315 ckpt_seq,
4316 );
4317
4318 let settlements: Vec<_> = settlements
4319 .into_iter()
4320 .map(|tx| {
4321 VerifiedExecutableTransaction::new_system(
4322 VerifiedTransaction::new_system_transaction(tx),
4323 epoch,
4324 )
4325 })
4326 .collect();
4327
4328 let assigned_versions = epoch_store
4329 .assign_shared_object_versions_for_tests(
4330 self.get_object_cache_reader().as_ref(),
4331 &settlements,
4332 )
4333 .unwrap();
4334 let version_map = assigned_versions.into_map();
4335
4336 let mut replay_txns = Vec::new();
4337 let mut settlement_effects = Vec::with_capacity(settlements.len());
4338 for tx in settlements {
4339 let assigned = version_map.get(&tx.key()).unwrap().clone();
4340 let env = ExecutionEnv::new().with_assigned_versions(assigned);
4341 let (effects, _) = self
4342 .try_execute_immediately(&tx.clone(), env.clone(), &epoch_store)
4343 .await
4344 .unwrap();
4345 assert!(effects.status().is_ok());
4346 replay_txns.push((tx, env));
4347 settlement_effects.push(effects);
4348 }
4349
4350 let barrier = accumulators::build_accumulator_barrier_tx(
4351 epoch,
4352 accumulator_root_obj_initial_shared_version,
4353 ckpt_seq,
4354 &settlement_effects,
4355 );
4356 let barrier = VerifiedExecutableTransaction::new_system(
4357 VerifiedTransaction::new_system_transaction(barrier),
4358 epoch,
4359 );
4360
4361 let assigned_versions = epoch_store
4362 .assign_shared_object_versions_for_tests(
4363 self.get_object_cache_reader().as_ref(),
4364 std::slice::from_ref(&barrier),
4365 )
4366 .unwrap();
4367 let version_map = assigned_versions.into_map();
4368
4369 let barrier_assigned = version_map.get(&barrier.key()).unwrap().clone();
4370 let env = ExecutionEnv::new().with_assigned_versions(barrier_assigned);
4371 let (effects, _) = self
4372 .try_execute_immediately(&barrier.clone(), env.clone(), &epoch_store)
4373 .await
4374 .unwrap();
4375 assert!(effects.status().is_ok());
4376 replay_txns.push((barrier, env));
4377
4378 let next_accumulator_version = accumulator_version.next();
4379 self.execution_scheduler
4380 .settle_address_funds(FundsSettlement {
4381 funds_changes: balance_changes,
4382 next_accumulator_version,
4383 });
4384 replay_txns
4387 }
4388
4389 pub async fn replay_settlement_for_testing(
4392 &self,
4393 txns: &[(VerifiedExecutableTransaction, ExecutionEnv)],
4394 ) {
4395 let epoch_store = self.epoch_store_for_testing();
4396 for (tx, env) in txns {
4397 let (effects, _) = self
4398 .try_execute_immediately(tx, env.clone(), &epoch_store)
4399 .await
4400 .unwrap();
4401 assert!(effects.status().is_ok());
4402 }
4403 }
4404
4405 pub async fn reconfigure_for_testing(&self) {
4409 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4410 let epoch_store = self.epoch_store_for_testing().clone();
4411 let protocol_config = epoch_store.protocol_config().clone();
4412 let _guard =
4419 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
4420 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
4421 self.get_backing_package_store().clone(),
4422 self.get_object_store().clone(),
4423 &self.config.expensive_safety_check_config,
4424 self.checkpoint_store
4425 .get_epoch_last_checkpoint(epoch_store.epoch())
4426 .unwrap()
4427 .map(|c| *c.sequence_number())
4428 .unwrap_or_default(),
4429 );
4430 self.execution_scheduler
4431 .reconfigure(&new_epoch_store, self.get_account_funds_read());
4432 let new_epoch = new_epoch_store.epoch();
4433 self.epoch_store.store(new_epoch_store);
4434 epoch_store.epoch_terminated().await;
4435
4436 *execution_lock = new_epoch;
4437 }
4438
4439 #[instrument(level = "error", skip_all)]
4442 fn maybe_reaccumulate_state_hash(
4443 &self,
4444 cur_epoch_store: &AuthorityPerEpochStore,
4445 new_protocol_version: ProtocolVersion,
4446 ) {
4447 self.get_reconfig_api()
4448 .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4449 }
4450
4451 #[instrument(level = "error", skip_all)]
4452 fn check_system_consistency(
4453 &self,
4454 cur_epoch_store: &AuthorityPerEpochStore,
4455 state_hasher: Arc<GlobalStateHasher>,
4456 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4457 ) {
4458 info!(
4459 "Performing sui conservation consistency check for epoch {}",
4460 cur_epoch_store.epoch()
4461 );
4462
4463 if let Err(err) = self
4464 .get_reconfig_api()
4465 .expensive_check_sui_conservation(cur_epoch_store)
4466 {
4467 if cfg!(debug_assertions) {
4468 panic!("{}", err);
4469 } else {
4470 warn!("Sui conservation consistency check failed: {}", err);
4473 }
4474 } else {
4475 info!("Sui conservation consistency check passed");
4476 }
4477
4478 if expensive_safety_check_config.enable_state_consistency_check() {
4480 info!(
4481 "Performing state consistency check for epoch {}",
4482 cur_epoch_store.epoch()
4483 );
4484 self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4485 }
4486
4487 if expensive_safety_check_config.enable_secondary_index_checks()
4488 && let Some(indexes) = self.indexes.clone()
4489 {
4490 verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
4491 .expect("secondary indexes are inconsistent");
4492 }
4493
4494 if expensive_safety_check_config.enable_secondary_index_checks()
4498 && let Some(indexes) = self.indexes.clone()
4499 {
4500 let epoch = cur_epoch_store.epoch();
4501 let first_checkpoint = if epoch == 0 {
4505 0
4506 } else {
4507 self.checkpoint_store
4508 .get_epoch_last_checkpoint_seq_number(epoch - 1)
4509 .expect("Failed to get previous epoch's last checkpoint")
4510 .expect("Previous epoch's last checkpoint missing")
4511 + 1
4512 };
4513 let highest_executed = self
4514 .checkpoint_store
4515 .get_highest_executed_checkpoint_seq_number()
4516 .expect("Failed to get highest executed checkpoint")
4517 .expect("No executed checkpoints");
4518
4519 info!(
4520 "Verifying checkpointed transactions are in transactions_seq \
4521 (checkpoints {first_checkpoint}..={highest_executed})"
4522 );
4523 for seq in first_checkpoint..=highest_executed {
4524 let checkpoint = self
4525 .checkpoint_store
4526 .get_checkpoint_by_sequence_number(seq)
4527 .expect("Failed to get checkpoint")
4528 .expect("Checkpoint missing");
4529 let contents = self
4530 .checkpoint_store
4531 .get_checkpoint_contents(&checkpoint.content_digest)
4532 .expect("Failed to get checkpoint contents")
4533 .expect("Checkpoint contents missing");
4534 for digests in contents.iter() {
4535 let tx_digest = digests.transaction;
4536 assert!(
4537 indexes
4538 .get_transaction_seq(&tx_digest)
4539 .expect("Failed to read transactions_seq")
4540 .is_some(),
4541 "Transaction {tx_digest} from checkpoint {seq} missing from transactions_seq"
4542 );
4543 }
4544 }
4545 info!("All checkpointed transactions verified in transactions_seq");
4546 }
4547 }
4548
4549 fn expensive_check_is_consistent_state(
4550 &self,
4551 state_hasher: Arc<GlobalStateHasher>,
4552 cur_epoch_store: &AuthorityPerEpochStore,
4553 ) {
4554 let live_object_set_hash = state_hasher.digest_live_object_set(
4555 !cur_epoch_store
4556 .protocol_config()
4557 .simplified_unwrap_then_delete(),
4558 );
4559
4560 let root_state_hash: ECMHLiveObjectSetDigest = self
4561 .get_global_state_hash_store()
4562 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4563 .expect("Retrieving root state hash cannot fail")
4564 .expect("Root state hash for epoch must exist")
4565 .1
4566 .digest()
4567 .into();
4568
4569 let is_inconsistent = root_state_hash != live_object_set_hash;
4570 if is_inconsistent {
4571 debug_fatal!(
4572 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4573 root_state_hash,
4574 live_object_set_hash
4575 );
4576 } else {
4577 info!("State consistency check passed");
4578 }
4579
4580 state_hasher.set_inconsistent_state(is_inconsistent);
4581 }
4582
4583 pub fn current_epoch_for_testing(&self) -> EpochId {
4584 self.epoch_store_for_testing().epoch()
4585 }
4586
4587 #[instrument(level = "error", skip_all)]
4588 pub fn checkpoint_all_dbs(
4589 &self,
4590 checkpoint_path: &Path,
4591 cur_epoch_store: &AuthorityPerEpochStore,
4592 checkpoint_indexes: bool,
4593 ) -> SuiResult {
4594 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4595 let current_epoch = cur_epoch_store.epoch();
4596
4597 if checkpoint_path.exists() {
4598 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4599 return Ok(());
4600 }
4601
4602 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4603 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4604
4605 if checkpoint_path_tmp.exists() {
4606 fs::remove_dir_all(&checkpoint_path_tmp)
4607 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4608 }
4609
4610 fs::create_dir_all(&checkpoint_path_tmp)
4611 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4612 fs::create_dir(&store_checkpoint_path_tmp)
4613 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4614
4615 self.checkpoint_store
4618 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4619
4620 self.get_reconfig_api()
4621 .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4622
4623 self.committee_store
4624 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4625
4626 if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4627 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4628 }
4629
4630 fs::rename(checkpoint_path_tmp, checkpoint_path)
4631 .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4632 Ok(())
4633 }
4634
4635 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4641 self.epoch_store.load()
4642 }
4643
4644 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4646 self.load_epoch_store_one_call_per_task()
4647 }
4648
4649 pub fn clone_committee_for_testing(&self) -> Committee {
4650 Committee::clone(self.epoch_store_for_testing().committee())
4651 }
4652
4653 #[instrument(level = "trace", skip_all)]
4654 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4655 self.get_object_store().get_object(object_id)
4656 }
4657
4658 pub async fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4659 Ok(self
4660 .get_object(&SUI_SYSTEM_ADDRESS.into())
4661 .await
4662 .expect("framework object should always exist")
4663 .compute_object_reference())
4664 }
4665
4666 pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4668 self.get_object_cache_reader()
4669 .get_sui_system_state_object_unsafe()
4670 }
4671
4672 #[instrument(level = "trace", skip_all)]
4673 fn get_transaction_checkpoint_sequence(
4674 &self,
4675 digest: &TransactionDigest,
4676 epoch_store: &AuthorityPerEpochStore,
4677 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4678 epoch_store.get_transaction_checkpoint(digest)
4679 }
4680
4681 #[instrument(level = "trace", skip_all)]
4682 pub fn get_checkpoint_by_sequence_number(
4683 &self,
4684 sequence_number: CheckpointSequenceNumber,
4685 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4686 Ok(self
4687 .checkpoint_store
4688 .get_checkpoint_by_sequence_number(sequence_number)?)
4689 }
4690
4691 #[instrument(level = "trace", skip_all)]
4692 pub fn get_transaction_checkpoint_for_tests(
4693 &self,
4694 digest: &TransactionDigest,
4695 epoch_store: &AuthorityPerEpochStore,
4696 ) -> SuiResult<Option<VerifiedCheckpoint>> {
4697 let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4698 let Some(checkpoint) = checkpoint else {
4699 return Ok(None);
4700 };
4701 let checkpoint = self
4702 .checkpoint_store
4703 .get_checkpoint_by_sequence_number(checkpoint)?;
4704 Ok(checkpoint)
4705 }
4706
4707 #[instrument(level = "trace", skip_all)]
4708 pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4709 Ok(
4710 match self
4711 .get_object_cache_reader()
4712 .get_latest_object_or_tombstone(*object_id)
4713 {
4714 Some((_, ObjectOrTombstone::Object(object))) => {
4715 let layout = self.get_object_layout(&object)?;
4716 ObjectRead::Exists(object.compute_object_reference(), object, layout)
4717 }
4718 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4719 None => ObjectRead::NotExists(*object_id),
4720 },
4721 )
4722 }
4723
4724 pub fn get_chain_identifier(&self) -> ChainIdentifier {
4726 self.chain_identifier
4727 }
4728
4729 pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4730 self.fork_recovery_state.as_ref()
4731 }
4732
4733 #[instrument(level = "trace", skip_all)]
4734 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4735 where
4736 T: DeserializeOwned,
4737 {
4738 let o = self.get_object_read(object_id)?.into_object()?;
4739 if let Some(move_object) = o.data.try_as_move() {
4740 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4741 SuiErrorKind::ObjectDeserializationError {
4742 error: format!("{e}"),
4743 }
4744 })?)
4745 } else {
4746 Err(SuiErrorKind::ObjectDeserializationError {
4747 error: format!("Provided object : [{object_id}] is not a Move object."),
4748 }
4749 .into())
4750 }
4751 }
4752
4753 #[instrument(level = "trace", skip_all)]
4759 pub fn get_past_object_read(
4760 &self,
4761 object_id: &ObjectID,
4762 version: SequenceNumber,
4763 ) -> SuiResult<PastObjectRead> {
4764 let Some(obj_ref) = self
4766 .get_object_cache_reader()
4767 .get_latest_object_ref_or_tombstone(*object_id)
4768 else {
4769 return Ok(PastObjectRead::ObjectNotExists(*object_id));
4770 };
4771
4772 if version > obj_ref.1 {
4773 return Ok(PastObjectRead::VersionTooHigh {
4774 object_id: *object_id,
4775 asked_version: version,
4776 latest_version: obj_ref.1,
4777 });
4778 }
4779
4780 if version < obj_ref.1 {
4781 return Ok(match self.read_object_at_version(object_id, version)? {
4783 Some((object, layout)) => {
4784 let obj_ref = object.compute_object_reference();
4785 PastObjectRead::VersionFound(obj_ref, object, layout)
4786 }
4787
4788 None => PastObjectRead::VersionNotFound(*object_id, version),
4789 });
4790 }
4791
4792 if !obj_ref.2.is_alive() {
4793 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4794 }
4795
4796 match self.read_object_at_version(object_id, obj_ref.1)? {
4797 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4798 None => {
4799 debug_fatal!(
4800 "Object with in parent_entry is missing from object store, datastore is \
4801 inconsistent"
4802 );
4803 Err(UserInputError::ObjectNotFound {
4804 object_id: *object_id,
4805 version: Some(obj_ref.1),
4806 }
4807 .into())
4808 }
4809 }
4810 }
4811
4812 #[instrument(level = "trace", skip_all)]
4813 fn read_object_at_version(
4814 &self,
4815 object_id: &ObjectID,
4816 version: SequenceNumber,
4817 ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4818 let Some(object) = self
4819 .get_object_cache_reader()
4820 .get_object_by_key(object_id, version)
4821 else {
4822 return Ok(None);
4823 };
4824
4825 let layout = self.get_object_layout(&object)?;
4826 Ok(Some((object, layout)))
4827 }
4828
4829 pub fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4830 let layout = object
4831 .data
4832 .try_as_move()
4833 .map(|object| {
4834 let epoch_store = self.load_epoch_store_one_call_per_task();
4835 into_struct_layout(
4836 epoch_store
4837 .executor()
4838 .type_layout_resolver(
4840 epoch_store.protocol_config(),
4841 Box::new(self.get_backing_package_store().as_ref()),
4842 )
4843 .get_annotated_layout(&object.type_().clone().into())?,
4844 )
4845 })
4846 .transpose()?;
4847 Ok(layout)
4848 }
4849
4850 #[instrument(level = "trace", skip_all)]
4854 pub fn get_address_balance_coin_info(
4855 &self,
4856 owner: SuiAddress,
4857 balance_type: TypeTag,
4858 ) -> SuiResult<Option<(ObjectRef, u64, TransactionDigest)>> {
4859 let accumulator_id = AccumulatorValue::get_field_id(owner, &balance_type)?;
4860 let accumulator_obj = AccumulatorValue::load_object_by_id(
4861 self.get_child_object_resolver().as_ref(),
4862 None,
4863 *accumulator_id.inner(),
4864 )?;
4865
4866 let Some(accumulator_obj) = accumulator_obj else {
4867 return Ok(None);
4868 };
4869
4870 let currency_type =
4873 Balance::maybe_get_balance_type_param(&balance_type).unwrap_or(balance_type);
4874
4875 let balance = crate::accumulators::balances::get_balance(
4876 owner,
4877 self.get_child_object_resolver().as_ref(),
4878 currency_type,
4879 )?;
4880
4881 if balance == 0 {
4882 return Ok(None);
4883 };
4884
4885 let object_ref = coin_reservation::encode_object_ref(
4886 accumulator_obj.id(),
4887 accumulator_obj.version(),
4888 self.load_epoch_store_one_call_per_task().epoch(),
4889 balance,
4890 self.get_chain_identifier(),
4891 );
4892
4893 Ok(Some((
4894 object_ref,
4895 balance,
4896 accumulator_obj.previous_transaction,
4897 )))
4898 }
4899
4900 #[instrument(level = "trace", skip_all)]
4903 pub fn get_all_address_balance_coin_infos(
4904 &self,
4905 owner: SuiAddress,
4906 ) -> SuiResult<std::collections::HashMap<String, (ObjectRef, u64, TransactionDigest)>> {
4907 let indexes = self
4908 .indexes
4909 .as_ref()
4910 .ok_or(SuiErrorKind::IndexStoreNotAvailable)?;
4911
4912 let mut result = std::collections::HashMap::new();
4913 for currency_type in indexes.get_address_balance_coin_types_iter(owner) {
4914 let balance_type = sui_types::balance::Balance::type_tag(currency_type.clone());
4915 if let Some((obj_ref, balance, prev_tx)) =
4916 self.get_address_balance_coin_info(owner, balance_type)?
4917 {
4918 result.insert(currency_type.to_string(), (obj_ref, balance, prev_tx));
4921 }
4922 }
4923 Ok(result)
4924 }
4925
4926 fn get_owner_at_version(
4927 object_store: &Arc<dyn ObjectStore + Send + Sync>,
4928 object_id: &ObjectID,
4929 version: SequenceNumber,
4930 ) -> SuiResult<Owner> {
4931 object_store
4932 .get_object_by_key(object_id, version)
4933 .ok_or_else(|| {
4934 SuiError::from(UserInputError::ObjectNotFound {
4935 object_id: *object_id,
4936 version: Some(version),
4937 })
4938 })
4939 .map(|o| o.owner.clone())
4940 }
4941
4942 #[instrument(level = "trace", skip_all)]
4943 pub fn get_owner_objects(
4944 &self,
4945 owner: SuiAddress,
4946 cursor: Option<ObjectID>,
4948 limit: usize,
4949 filter: Option<SuiObjectDataFilter>,
4950 ) -> SuiResult<Vec<ObjectInfo>> {
4951 if let Some(indexes) = &self.indexes {
4952 indexes.get_owner_objects(owner, cursor, limit, filter)
4953 } else {
4954 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4955 }
4956 }
4957
4958 #[instrument(level = "trace", skip_all)]
4959 pub fn get_owned_coins_iterator_with_cursor(
4960 &self,
4961 owner: SuiAddress,
4962 cursor: (String, u64, ObjectID),
4964 limit: usize,
4965 one_coin_type_only: bool,
4966 ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4967 if let Some(indexes) = &self.indexes {
4968 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4969 } else {
4970 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4971 }
4972 }
4973
4974 #[instrument(level = "trace", skip_all)]
4975 pub fn get_owner_objects_iterator(
4976 &self,
4977 owner: SuiAddress,
4978 cursor: Option<ObjectID>,
4980 filter: Option<SuiObjectDataFilter>,
4981 ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4982 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4983 if let Some(indexes) = &self.indexes {
4984 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4985 } else {
4986 Err(SuiErrorKind::IndexStoreNotAvailable.into())
4987 }
4988 }
4989
4990 #[instrument(level = "trace", skip_all)]
4991 pub async fn get_move_objects<T>(
4992 &self,
4993 owner: SuiAddress,
4994 type_: MoveObjectType,
4995 ) -> SuiResult<Vec<T>>
4996 where
4997 T: DeserializeOwned,
4998 {
4999 let object_ids = self
5000 .get_owner_objects_iterator(owner, None, None)?
5001 .filter(|o| match &o.type_ {
5002 ObjectType::Struct(s) => &type_ == s,
5003 ObjectType::Package => false,
5004 })
5005 .map(|info| ObjectKey(info.object_id, info.version))
5006 .collect::<Vec<_>>();
5007 let mut move_objects = vec![];
5008
5009 let objects = self
5010 .get_object_store()
5011 .multi_get_objects_by_key(&object_ids);
5012
5013 for (o, id) in objects.into_iter().zip_debug_eq(object_ids) {
5014 let object = o.ok_or_else(|| {
5015 SuiError::from(UserInputError::ObjectNotFound {
5016 object_id: id.0,
5017 version: Some(id.1),
5018 })
5019 })?;
5020 let move_object = object.data.try_as_move().ok_or_else(|| {
5021 SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
5022 })?;
5023 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
5024 SuiErrorKind::ObjectDeserializationError {
5025 error: format!("{e}"),
5026 }
5027 })?);
5028 }
5029 Ok(move_objects)
5030 }
5031
5032 #[instrument(level = "trace", skip_all)]
5033 pub fn get_dynamic_fields(
5034 &self,
5035 owner: ObjectID,
5036 cursor: Option<ObjectID>,
5038 limit: usize,
5039 ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
5040 Ok(self
5041 .get_dynamic_fields_iterator(owner, cursor)?
5042 .take(limit)
5043 .collect::<Result<Vec<_>, _>>()?)
5044 }
5045
5046 fn get_dynamic_fields_iterator(
5047 &self,
5048 owner: ObjectID,
5049 cursor: Option<ObjectID>,
5051 ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
5052 {
5053 if let Some(indexes) = &self.indexes {
5054 indexes.get_dynamic_fields_iterator(owner, cursor)
5055 } else {
5056 Err(SuiErrorKind::IndexStoreNotAvailable.into())
5057 }
5058 }
5059
5060 #[instrument(level = "trace", skip_all)]
5061 pub fn get_dynamic_field_object_id(
5062 &self,
5063 owner: ObjectID,
5064 name_type: TypeTag,
5065 name_bcs_bytes: &[u8],
5066 ) -> SuiResult<Option<ObjectID>> {
5067 if let Some(indexes) = &self.indexes {
5068 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
5069 } else {
5070 Err(SuiErrorKind::IndexStoreNotAvailable.into())
5071 }
5072 }
5073
5074 #[instrument(level = "trace", skip_all)]
5075 pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
5076 Ok(self.get_indexes()?.next_sequence_number())
5077 }
5078
5079 #[instrument(level = "trace", skip_all)]
5080 pub async fn get_executed_transaction_and_effects(
5081 &self,
5082 digest: TransactionDigest,
5083 kv_store: Arc<TransactionKeyValueStore>,
5084 ) -> SuiResult<(Transaction, TransactionEffects)> {
5085 let transaction = kv_store.get_tx(digest).await?;
5086 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
5087 Ok((transaction, effects))
5088 }
5089
5090 #[instrument(level = "trace", skip_all)]
5091 pub fn multi_get_checkpoint_by_sequence_number(
5092 &self,
5093 sequence_numbers: &[CheckpointSequenceNumber],
5094 ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
5095 Ok(self
5096 .checkpoint_store
5097 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
5098 }
5099
5100 #[instrument(level = "trace", skip_all)]
5101 pub fn get_transaction_events(
5102 &self,
5103 digest: &TransactionDigest,
5104 ) -> SuiResult<TransactionEvents> {
5105 self.get_transaction_cache_reader()
5106 .get_events(digest)
5107 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
5108 }
5109
5110 pub fn get_transaction_input_objects(
5111 &self,
5112 effects: &TransactionEffects,
5113 ) -> SuiResult<Vec<Object>> {
5114 sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
5115 .map_err(Into::into)
5116 }
5117
5118 pub fn get_transaction_output_objects(
5119 &self,
5120 effects: &TransactionEffects,
5121 ) -> SuiResult<Vec<Object>> {
5122 sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
5123 .map_err(Into::into)
5124 }
5125
5126 fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
5127 match &self.indexes {
5128 Some(i) => Ok(i.clone()),
5129 None => Err(SuiErrorKind::UnsupportedFeatureError {
5130 error: "extended object indexing is not enabled on this server".into(),
5131 }
5132 .into()),
5133 }
5134 }
5135
5136 pub async fn get_transactions_for_tests(
5137 self: &Arc<Self>,
5138 filter: Option<TransactionFilter>,
5139 cursor: Option<TransactionDigest>,
5140 limit: Option<usize>,
5141 reverse: bool,
5142 ) -> SuiResult<Vec<TransactionDigest>> {
5143 let metrics = KeyValueStoreMetrics::new_for_tests();
5144 let kv_store = Arc::new(TransactionKeyValueStore::new(
5145 "rocksdb",
5146 metrics,
5147 self.clone(),
5148 ));
5149 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
5150 .await
5151 }
5152
5153 #[instrument(level = "trace", skip_all)]
5154 pub async fn get_transactions(
5155 &self,
5156 kv_store: &Arc<TransactionKeyValueStore>,
5157 filter: Option<TransactionFilter>,
5158 cursor: Option<TransactionDigest>,
5160 limit: Option<usize>,
5161 reverse: bool,
5162 ) -> SuiResult<Vec<TransactionDigest>> {
5163 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
5164 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
5165 let iter = checkpoint_contents.iter().map(|c| c.transaction);
5166 if reverse {
5167 let iter = iter
5168 .rev()
5169 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
5170 .skip(usize::from(cursor.is_some()));
5171 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
5172 } else {
5173 let iter = iter
5174 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
5175 .skip(usize::from(cursor.is_some()));
5176 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
5177 }
5178 }
5179 self.get_indexes()?
5180 .get_transactions(filter, cursor, limit, reverse)
5181 }
5182
5183 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
5184 &self.checkpoint_store
5185 }
5186
5187 pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
5188 self.get_checkpoint_store()
5189 .get_highest_executed_checkpoint_seq_number()?
5190 .ok_or(
5191 SuiErrorKind::UserInputError {
5192 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
5193 }
5194 .into(),
5195 )
5196 }
5197
5198 #[cfg(msim)]
5199 pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
5200 self.database_for_testing()
5201 .perpetual_tables
5202 .get_highest_pruned_checkpoint()
5203 .map(|c| c.unwrap_or(0))
5204 .map_err(Into::into)
5205 }
5206
5207 #[instrument(level = "trace", skip_all)]
5208 pub fn get_checkpoint_summary_by_sequence_number(
5209 &self,
5210 sequence_number: CheckpointSequenceNumber,
5211 ) -> SuiResult<CheckpointSummary> {
5212 let verified_checkpoint = self
5213 .get_checkpoint_store()
5214 .get_checkpoint_by_sequence_number(sequence_number)?;
5215 match verified_checkpoint {
5216 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
5217 None => Err(SuiErrorKind::UserInputError {
5218 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5219 }
5220 .into()),
5221 }
5222 }
5223
5224 #[instrument(level = "trace", skip_all)]
5225 pub fn get_checkpoint_summary_by_digest(
5226 &self,
5227 digest: CheckpointDigest,
5228 ) -> SuiResult<CheckpointSummary> {
5229 let verified_checkpoint = self
5230 .get_checkpoint_store()
5231 .get_checkpoint_by_digest(&digest)?;
5232 match verified_checkpoint {
5233 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
5234 None => Err(SuiErrorKind::UserInputError {
5235 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5236 }
5237 .into()),
5238 }
5239 }
5240
5241 #[instrument(level = "trace", skip_all)]
5242 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
5243 if is_system_package(package_id) {
5244 return self.find_genesis_txn_digest();
5245 }
5246 Ok(self
5247 .get_object_read(&package_id)?
5248 .into_object()?
5249 .previous_transaction)
5250 }
5251
5252 #[instrument(level = "trace", skip_all)]
5253 pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
5254 let summary = self
5255 .get_verified_checkpoint_by_sequence_number(0)?
5256 .into_message();
5257 let content = self.get_checkpoint_contents(summary.content_digest)?;
5258 let genesis_transaction = content.enumerate_transactions(&summary).next();
5259 Ok(genesis_transaction
5260 .ok_or(SuiErrorKind::UserInputError {
5261 error: UserInputError::GenesisTransactionNotFound,
5262 })?
5263 .1
5264 .transaction)
5265 }
5266
5267 #[instrument(level = "trace", skip_all)]
5268 pub fn get_verified_checkpoint_by_sequence_number(
5269 &self,
5270 sequence_number: CheckpointSequenceNumber,
5271 ) -> SuiResult<VerifiedCheckpoint> {
5272 let verified_checkpoint = self
5273 .get_checkpoint_store()
5274 .get_checkpoint_by_sequence_number(sequence_number)?;
5275 match verified_checkpoint {
5276 Some(verified_checkpoint) => Ok(verified_checkpoint),
5277 None => Err(SuiErrorKind::UserInputError {
5278 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5279 }
5280 .into()),
5281 }
5282 }
5283
5284 #[instrument(level = "trace", skip_all)]
5285 pub fn get_verified_checkpoint_summary_by_digest(
5286 &self,
5287 digest: CheckpointDigest,
5288 ) -> SuiResult<VerifiedCheckpoint> {
5289 let verified_checkpoint = self
5290 .get_checkpoint_store()
5291 .get_checkpoint_by_digest(&digest)?;
5292 match verified_checkpoint {
5293 Some(verified_checkpoint) => Ok(verified_checkpoint),
5294 None => Err(SuiErrorKind::UserInputError {
5295 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5296 }
5297 .into()),
5298 }
5299 }
5300
5301 #[instrument(level = "trace", skip_all)]
5302 pub fn get_checkpoint_contents(
5303 &self,
5304 digest: CheckpointContentsDigest,
5305 ) -> SuiResult<CheckpointContents> {
5306 self.get_checkpoint_store()
5307 .get_checkpoint_contents(&digest)?
5308 .ok_or(
5309 SuiErrorKind::UserInputError {
5310 error: UserInputError::CheckpointContentsNotFound(digest),
5311 }
5312 .into(),
5313 )
5314 }
5315
5316 #[instrument(level = "trace", skip_all)]
5317 pub fn get_checkpoint_contents_by_sequence_number(
5318 &self,
5319 sequence_number: CheckpointSequenceNumber,
5320 ) -> SuiResult<CheckpointContents> {
5321 let verified_checkpoint = self
5322 .get_checkpoint_store()
5323 .get_checkpoint_by_sequence_number(sequence_number)?;
5324 match verified_checkpoint {
5325 Some(verified_checkpoint) => {
5326 let content_digest = verified_checkpoint.into_inner().content_digest;
5327 self.get_checkpoint_contents(content_digest)
5328 }
5329 None => Err(SuiErrorKind::UserInputError {
5330 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5331 }
5332 .into()),
5333 }
5334 }
5335
5336 #[instrument(level = "trace", skip_all)]
5337 pub async fn query_events(
5338 &self,
5339 kv_store: &Arc<TransactionKeyValueStore>,
5340 query: EventFilter,
5341 cursor: Option<EventID>,
5343 limit: usize,
5344 descending: bool,
5345 ) -> SuiResult<Vec<SuiEvent>> {
5346 let index_store = self.get_indexes()?;
5347
5348 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
5350 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
5351 SuiErrorKind::TransactionNotFound {
5352 digest: cursor.tx_digest,
5353 },
5354 )?;
5355 (tx_seq, cursor.event_seq as usize)
5356 } else if descending {
5357 (u64::MAX, usize::MAX)
5358 } else {
5359 (0, 0)
5360 };
5361
5362 let limit = limit + 1;
5363 let mut event_keys = match query {
5364 EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
5365 EventFilter::Transaction(digest) => {
5366 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
5367 }
5368 EventFilter::MoveModule { package, module } => {
5369 let module_id = ModuleId::new(package.into(), module);
5370 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
5371 }
5372 EventFilter::MoveEventType(struct_name) => index_store
5373 .events_by_move_event_struct_name(
5374 &struct_name,
5375 tx_num,
5376 event_num,
5377 limit,
5378 descending,
5379 )?,
5380 EventFilter::Sender(sender) => {
5381 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
5382 }
5383 EventFilter::TimeRange {
5384 start_time,
5385 end_time,
5386 } => index_store
5387 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
5388 EventFilter::MoveEventModule { package, module } => index_store
5389 .events_by_move_event_module(
5390 &ModuleId::new(package.into(), module),
5391 tx_num,
5392 event_num,
5393 limit,
5394 descending,
5395 )?,
5396 EventFilter::Any(_) => {
5398 return Err(SuiErrorKind::UserInputError {
5399 error: UserInputError::Unsupported(
5400 "'Any' queries are not supported by the fullnode.".to_string(),
5401 ),
5402 }
5403 .into());
5404 }
5405 };
5406
5407 if cursor.is_some() {
5410 if !event_keys.is_empty() {
5411 event_keys.remove(0);
5412 }
5413 } else {
5414 event_keys.truncate(limit - 1);
5415 }
5416
5417 let transaction_digests = event_keys
5419 .iter()
5420 .map(|(_, digest, _, _)| *digest)
5421 .collect::<HashSet<_>>()
5422 .into_iter()
5423 .collect::<Vec<_>>();
5424
5425 let events = kv_store
5426 .multi_get_events_by_tx_digests(&transaction_digests)
5427 .await?;
5428
5429 let events_map: HashMap<_, _> = transaction_digests
5430 .iter()
5431 .zip_debug_eq(events.into_iter())
5432 .collect();
5433
5434 let stored_events = event_keys
5435 .into_iter()
5436 .map(|k| {
5437 (
5438 k,
5439 events_map
5440 .get(&k.1)
5441 .expect("fetched digest is missing")
5442 .clone()
5443 .and_then(|e| e.data.get(k.2).cloned()),
5444 )
5445 })
5446 .map(
5447 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
5448 event
5449 .map(|e| (e, tx_digest, event_seq, timestamp))
5450 .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
5451 },
5452 )
5453 .collect::<Result<Vec<_>, _>>()?;
5454
5455 let epoch_store = self.load_epoch_store_one_call_per_task();
5456 let backing_store = self.get_backing_package_store().as_ref();
5457 let mut layout_resolver = epoch_store
5458 .executor()
5459 .type_layout_resolver(epoch_store.protocol_config(), Box::new(backing_store));
5460 let mut events = vec![];
5461 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
5462 events.push(SuiEvent::try_from(
5463 e.clone(),
5464 tx_digest,
5465 event_seq as u64,
5466 Some(timestamp),
5467 layout_resolver.get_annotated_layout(&e.type_)?,
5468 )?)
5469 }
5470 Ok(events)
5471 }
5472
5473 pub async fn insert_genesis_object(&self, object: Object) {
5474 self.get_reconfig_api().insert_genesis_object(object);
5475 }
5476
5477 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
5478 futures::future::join_all(
5479 objects
5480 .iter()
5481 .map(|o| self.insert_genesis_object(o.clone())),
5482 )
5483 .await;
5484 }
5485
5486 #[instrument(level = "trace", skip_all)]
5488 pub fn get_transaction_status(
5489 &self,
5490 transaction_digest: &TransactionDigest,
5491 epoch_store: &Arc<AuthorityPerEpochStore>,
5492 ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
5493 if let Some(effects) =
5495 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
5496 {
5497 if let Some(transaction) = self
5498 .get_transaction_cache_reader()
5499 .get_transaction_block(transaction_digest)
5500 {
5501 let events = if effects.events_digest().is_some() {
5502 self.get_transaction_events(effects.transaction_digest())?
5503 } else {
5504 TransactionEvents::default()
5505 };
5506 return Ok(Some((
5510 (*transaction).clone().into_message(),
5511 TransactionStatus::Executed(None, effects.into_inner(), events),
5512 )));
5513 } else {
5514 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
5518 }
5519 }
5520 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
5521 self.metrics.tx_already_processed.inc();
5522 let (transaction, sig) = signed.into_inner().into_data_and_sig();
5523 Ok(Some((transaction, TransactionStatus::Signed(sig))))
5524 } else {
5525 Ok(None)
5526 }
5527 }
5528
5529 #[instrument(level = "trace", skip_all)]
5533 pub fn get_signed_effects_and_maybe_resign(
5534 &self,
5535 transaction_digest: &TransactionDigest,
5536 epoch_store: &Arc<AuthorityPerEpochStore>,
5537 ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
5538 let effects = self
5539 .get_transaction_cache_reader()
5540 .get_executed_effects(transaction_digest);
5541 match effects {
5542 Some(effects) => {
5543 if effects.executed_epoch() != epoch_store.epoch() {
5563 debug!(
5564 tx_digest=?transaction_digest,
5565 effects_epoch=?effects.executed_epoch(),
5566 epoch=?epoch_store.epoch(),
5567 "Re-signing the effects with the current epoch"
5568 );
5569 }
5570 Ok(Some(self.sign_effects(effects, epoch_store)?))
5571 }
5572 None => Ok(None),
5573 }
5574 }
5575
5576 #[instrument(level = "trace", skip_all)]
5577 pub(crate) fn sign_effects(
5578 &self,
5579 effects: TransactionEffects,
5580 epoch_store: &Arc<AuthorityPerEpochStore>,
5581 ) -> SuiResult<VerifiedSignedTransactionEffects> {
5582 let tx_digest = *effects.transaction_digest();
5583 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5584 Some(sig) => {
5585 debug_assert!(sig.epoch == epoch_store.epoch());
5586 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5587 }
5588 _ => {
5589 let sig = AuthoritySignInfo::new(
5590 epoch_store.epoch(),
5591 &effects,
5592 Intent::sui_app(IntentScope::TransactionEffects),
5593 self.name,
5594 &*self.secret,
5595 );
5596
5597 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5598
5599 epoch_store.insert_effects_digest_and_signature(
5600 &tx_digest,
5601 effects.digest(),
5602 &sig,
5603 )?;
5604
5605 effects
5606 }
5607 };
5608
5609 Ok(VerifiedSignedTransactionEffects::new_unchecked(
5610 signed_effects,
5611 ))
5612 }
5613
5614 #[instrument(level = "trace", skip_all)]
5616 fn fullnode_only_get_tx_coins_for_indexing(
5617 name: AuthorityName,
5618 object_store: &Arc<dyn ObjectStore + Send + Sync>,
5619 effects: &TransactionEffects,
5620 inner_temporary_store: &InnerTemporaryStore,
5621 epoch_store: &Arc<AuthorityPerEpochStore>,
5622 ) -> Option<TxCoins> {
5623 if epoch_store.committee().authority_exists(&name) {
5624 return None;
5625 }
5626 let written_coin_objects = inner_temporary_store
5627 .written
5628 .iter()
5629 .filter_map(|(k, v)| {
5630 if v.is_coin() {
5631 Some((*k, v.clone()))
5632 } else {
5633 None
5634 }
5635 })
5636 .collect();
5637 let mut input_coin_objects = inner_temporary_store
5638 .input_objects
5639 .iter()
5640 .filter_map(|(k, v)| {
5641 if v.is_coin() {
5642 Some((*k, v.clone()))
5643 } else {
5644 None
5645 }
5646 })
5647 .collect::<ObjectMap>();
5648
5649 for (object_id, version) in effects.modified_at_versions() {
5653 if inner_temporary_store
5654 .loaded_runtime_objects
5655 .contains_key(&object_id)
5656 && let Some(object) = object_store.get_object_by_key(&object_id, version)
5657 && object.is_coin()
5658 {
5659 input_coin_objects.insert(object_id, object);
5660 }
5661 }
5662
5663 Some((input_coin_objects, written_coin_objects))
5664 }
5665
5666 #[instrument(level = "trace", skip_all)]
5674 pub async fn get_transaction_lock(
5675 &self,
5676 object_ref: &ObjectRef,
5677 epoch_store: &AuthorityPerEpochStore,
5678 ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5679 let lock_info = self
5680 .get_object_cache_reader()
5681 .get_lock(*object_ref, epoch_store)?;
5682 let lock_info = match lock_info {
5683 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5684 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5685 provided_obj_ref: *object_ref,
5686 current_version: locked_ref.1,
5687 }
5688 .into());
5689 }
5690 ObjectLockStatus::Initialized => {
5691 return Ok(None);
5692 }
5693 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5694 };
5695
5696 epoch_store.get_signed_transaction(&lock_info.tx_digest)
5697 }
5698
5699 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5700 self.get_object_cache_reader().get_objects(objects)
5701 }
5702
5703 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5704 self.get_object_cache_reader()
5705 .get_latest_object_ref_or_tombstone(object_id)
5706 }
5707
5708 pub fn set_override_protocol_upgrade_buffer_stake(
5717 &self,
5718 expected_epoch: EpochId,
5719 buffer_stake_bps: u64,
5720 ) -> SuiResult {
5721 let epoch_store = self.load_epoch_store_one_call_per_task();
5722 let actual_epoch = epoch_store.epoch();
5723 if actual_epoch != expected_epoch {
5724 return Err(SuiErrorKind::WrongEpoch {
5725 expected_epoch,
5726 actual_epoch,
5727 }
5728 .into());
5729 }
5730
5731 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5732 }
5733
5734 pub fn clear_override_protocol_upgrade_buffer_stake(
5735 &self,
5736 expected_epoch: EpochId,
5737 ) -> SuiResult {
5738 let epoch_store = self.load_epoch_store_one_call_per_task();
5739 let actual_epoch = epoch_store.epoch();
5740 if actual_epoch != expected_epoch {
5741 return Err(SuiErrorKind::WrongEpoch {
5742 expected_epoch,
5743 actual_epoch,
5744 }
5745 .into());
5746 }
5747
5748 epoch_store.clear_override_protocol_upgrade_buffer_stake()
5749 }
5750
5751 pub async fn get_available_system_packages(
5754 &self,
5755 binary_config: &BinaryConfig,
5756 ) -> Vec<ObjectRef> {
5757 let mut results = vec![];
5758
5759 let system_packages = BuiltInFramework::iter_system_packages();
5760
5761 #[cfg(msim)]
5763 let extra_packages = framework_injection::get_extra_packages(self.name);
5764 #[cfg(msim)]
5765 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5766
5767 for system_package in system_packages {
5768 let modules = system_package.modules().to_vec();
5769 #[cfg(msim)]
5771 let modules =
5772 match framework_injection::get_override_modules(&system_package.id, self.name) {
5773 Some(overrides) if overrides.is_empty() => continue,
5774 Some(overrides) => overrides,
5775 None => modules,
5776 };
5777
5778 let Some(obj_ref) = sui_framework::compare_system_package(
5779 &self.get_object_store(),
5780 &system_package.id,
5781 &modules,
5782 system_package.dependencies.to_vec(),
5783 binary_config,
5784 )
5785 .await
5786 else {
5787 return vec![];
5788 };
5789 results.push(obj_ref);
5790 }
5791
5792 results
5793 }
5794
5795 async fn get_system_package_bytes(
5809 &self,
5810 system_packages: Vec<ObjectRef>,
5811 binary_config: &BinaryConfig,
5812 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5813 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5814 let objects = self.get_objects(&ids).await;
5815
5816 let mut res = Vec::with_capacity(system_packages.len());
5817 for (system_package_ref, object) in system_packages.into_iter().zip_debug_eq(objects.iter())
5818 {
5819 let prev_transaction = match object {
5820 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5821 info!("Framework {} does not need updating", system_package_ref.0);
5823 continue;
5824 }
5825
5826 Some(cur_object) => cur_object.previous_transaction,
5827 None => TransactionDigest::genesis_marker(),
5828 };
5829
5830 #[cfg(msim)]
5831 let SystemPackage {
5832 id: _,
5833 bytes,
5834 dependencies,
5835 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5836 .unwrap_or_else(|| {
5837 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5838 });
5839
5840 #[cfg(not(msim))]
5841 let SystemPackage {
5842 id: _,
5843 bytes,
5844 dependencies,
5845 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5846
5847 let modules: Vec<_> = bytes
5848 .iter()
5849 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5850 .collect();
5851
5852 let new_object = Object::new_system_package(
5853 &modules,
5854 system_package_ref.1,
5855 dependencies.clone(),
5856 prev_transaction,
5857 );
5858
5859 let new_ref = new_object.compute_object_reference();
5860 if new_ref != system_package_ref {
5861 if cfg!(msim) {
5862 debug_fatal!(
5864 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5865 );
5866 } else {
5867 error!(
5868 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5869 );
5870 }
5871 return None;
5872 }
5873
5874 res.push((system_package_ref.1, bytes, dependencies));
5875 }
5876
5877 Some(res)
5878 }
5879
5880 fn is_protocol_version_supported_v2(
5881 current_protocol_version: ProtocolVersion,
5882 proposed_protocol_version: ProtocolVersion,
5883 protocol_config: &ProtocolConfig,
5884 committee: &Committee,
5885 capabilities: Vec<AuthorityCapabilitiesV2>,
5886 mut buffer_stake_bps: u64,
5887 ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5888 if proposed_protocol_version > current_protocol_version + 1
5889 && !protocol_config.advance_to_highest_supported_protocol_version()
5890 {
5891 return None;
5892 }
5893
5894 if buffer_stake_bps > 10000 {
5895 warn!("clamping buffer_stake_bps to 10000");
5896 buffer_stake_bps = 10000;
5897 }
5898
5899 let mut desired_upgrades: Vec<_> = capabilities
5902 .into_iter()
5903 .filter_map(|mut cap| {
5904 if cap.available_system_packages.is_empty() {
5906 return None;
5907 }
5908
5909 cap.available_system_packages.sort();
5910
5911 info!(
5912 "validator {:?} supports {:?} with system packages: {:?}",
5913 cap.authority.concise(),
5914 cap.supported_protocol_versions,
5915 cap.available_system_packages,
5916 );
5917
5918 cap.supported_protocol_versions
5922 .get_version_digest(proposed_protocol_version)
5923 .map(|digest| (digest, cap.available_system_packages, cap.authority))
5924 })
5925 .collect();
5926
5927 desired_upgrades.sort();
5929 desired_upgrades
5930 .into_iter()
5931 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5932 .into_iter()
5933 .find_map(|((digest, packages), group)| {
5934 assert!(!packages.is_empty());
5936
5937 let mut stake_aggregator: StakeAggregator<(), true> =
5938 StakeAggregator::new(Arc::new(committee.clone()));
5939
5940 for (_, _, authority) in group {
5941 stake_aggregator.insert_generic(authority, ());
5942 }
5943
5944 let total_votes = stake_aggregator.total_votes();
5945 let quorum_threshold = committee.quorum_threshold();
5946 let f = committee.total_votes() - committee.quorum_threshold();
5947
5948 let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5950 let effective_threshold = quorum_threshold + buffer_stake;
5951
5952 info!(
5953 protocol_config_digest = ?digest,
5954 ?total_votes,
5955 ?quorum_threshold,
5956 ?buffer_stake_bps,
5957 ?effective_threshold,
5958 ?proposed_protocol_version,
5959 ?packages,
5960 "support for upgrade"
5961 );
5962
5963 let has_support = total_votes >= effective_threshold;
5964 has_support.then_some((proposed_protocol_version, packages))
5965 })
5966 }
5967
5968 fn choose_protocol_version_and_system_packages_v2(
5969 current_protocol_version: ProtocolVersion,
5970 protocol_config: &ProtocolConfig,
5971 committee: &Committee,
5972 capabilities: Vec<AuthorityCapabilitiesV2>,
5973 buffer_stake_bps: u64,
5974 ) -> (ProtocolVersion, Vec<ObjectRef>) {
5975 assert!(protocol_config.authority_capabilities_v2());
5976 let mut next_protocol_version = current_protocol_version;
5977 let mut system_packages = vec![];
5978
5979 while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5980 current_protocol_version,
5981 next_protocol_version + 1,
5982 protocol_config,
5983 committee,
5984 capabilities.clone(),
5985 buffer_stake_bps,
5986 ) {
5987 next_protocol_version = version;
5988 system_packages = packages;
5989 }
5990
5991 (next_protocol_version, system_packages)
5992 }
5993
5994 #[instrument(level = "debug", skip_all)]
5995 fn create_authenticator_state_tx(
5996 &self,
5997 epoch_store: &Arc<AuthorityPerEpochStore>,
5998 ) -> Option<EndOfEpochTransactionKind> {
5999 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
6000 info!("authenticator state transactions not enabled");
6001 return None;
6002 }
6003
6004 let authenticator_state_exists = epoch_store.authenticator_state_exists();
6005 let tx = if authenticator_state_exists {
6006 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
6007 let min_epoch =
6008 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
6009 let authenticator_obj_initial_shared_version = epoch_store
6010 .epoch_start_config()
6011 .authenticator_obj_initial_shared_version()
6012 .expect("initial version must exist");
6013
6014 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
6015 min_epoch,
6016 authenticator_obj_initial_shared_version,
6017 );
6018
6019 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
6020
6021 tx
6022 } else {
6023 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
6024 info!("Creating AuthenticatorStateCreate tx");
6025 tx
6026 };
6027 Some(tx)
6028 }
6029
6030 #[instrument(level = "debug", skip_all)]
6031 fn create_randomness_state_tx(
6032 &self,
6033 epoch_store: &Arc<AuthorityPerEpochStore>,
6034 ) -> Option<EndOfEpochTransactionKind> {
6035 if !epoch_store.protocol_config().random_beacon() {
6036 info!("randomness state transactions not enabled");
6037 return None;
6038 }
6039
6040 if epoch_store.randomness_state_exists() {
6041 return None;
6042 }
6043
6044 let tx = EndOfEpochTransactionKind::new_randomness_state_create();
6045 info!("Creating RandomnessStateCreate tx");
6046 Some(tx)
6047 }
6048
6049 #[instrument(level = "debug", skip_all)]
6050 fn create_accumulator_root_tx(
6051 &self,
6052 epoch_store: &Arc<AuthorityPerEpochStore>,
6053 ) -> Option<EndOfEpochTransactionKind> {
6054 if !epoch_store
6055 .protocol_config()
6056 .create_root_accumulator_object()
6057 {
6058 info!("accumulator root creation not enabled");
6059 return None;
6060 }
6061
6062 if epoch_store.accumulator_root_exists() {
6063 return None;
6064 }
6065
6066 let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
6067 info!("Creating AccumulatorRootCreate tx");
6068 Some(tx)
6069 }
6070
6071 #[instrument(level = "debug", skip_all)]
6072 fn create_write_accumulator_storage_cost_tx(
6073 &self,
6074 epoch_store: &Arc<AuthorityPerEpochStore>,
6075 ) -> Option<EndOfEpochTransactionKind> {
6076 if !epoch_store.accumulator_root_exists() {
6077 info!("accumulator root does not exist yet");
6078 return None;
6079 }
6080 if !epoch_store.protocol_config().enable_accumulators() {
6081 info!("accumulators not enabled");
6082 return None;
6083 }
6084
6085 let object_store = self.get_object_store();
6086 let object_count =
6087 match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
6088 Ok(Some(count)) => count,
6089 Ok(None) => return None,
6090 Err(e) => {
6091 fatal!("failed to read accumulator object count: {e}");
6092 }
6093 };
6094
6095 let storage_cost = object_count.saturating_mul(
6096 epoch_store
6097 .protocol_config()
6098 .accumulator_object_storage_cost(),
6099 );
6100
6101 let tx = EndOfEpochTransactionKind::new_write_accumulator_storage_cost(storage_cost);
6102 info!(
6103 object_count,
6104 storage_cost, "Creating WriteAccumulatorStorageCost tx"
6105 );
6106 Some(tx)
6107 }
6108
6109 #[instrument(level = "debug", skip_all)]
6110 fn create_coin_registry_tx(
6111 &self,
6112 epoch_store: &Arc<AuthorityPerEpochStore>,
6113 ) -> Option<EndOfEpochTransactionKind> {
6114 if !epoch_store.protocol_config().enable_coin_registry() {
6115 info!("coin registry not enabled");
6116 return None;
6117 }
6118
6119 if epoch_store.coin_registry_exists() {
6120 return None;
6121 }
6122
6123 let tx = EndOfEpochTransactionKind::new_coin_registry_create();
6124 info!("Creating CoinRegistryCreate tx");
6125 Some(tx)
6126 }
6127
6128 #[instrument(level = "debug", skip_all)]
6129 fn create_display_registry_tx(
6130 &self,
6131 epoch_store: &Arc<AuthorityPerEpochStore>,
6132 ) -> Option<EndOfEpochTransactionKind> {
6133 if !epoch_store.protocol_config().enable_display_registry() {
6134 info!("display registry not enabled");
6135 return None;
6136 }
6137
6138 if epoch_store.display_registry_exists() {
6139 return None;
6140 }
6141
6142 let tx = EndOfEpochTransactionKind::new_display_registry_create();
6143 info!("Creating DisplayRegistryCreate tx");
6144 Some(tx)
6145 }
6146
6147 #[instrument(level = "debug", skip_all)]
6148 fn create_bridge_tx(
6149 &self,
6150 epoch_store: &Arc<AuthorityPerEpochStore>,
6151 ) -> Option<EndOfEpochTransactionKind> {
6152 if !epoch_store.protocol_config().enable_bridge() {
6153 info!("bridge not enabled");
6154 return None;
6155 }
6156 if epoch_store.bridge_exists() {
6157 return None;
6158 }
6159 let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
6160 info!("Creating Bridge Create tx");
6161 Some(tx)
6162 }
6163
6164 #[instrument(level = "debug", skip_all)]
6165 fn init_bridge_committee_tx(
6166 &self,
6167 epoch_store: &Arc<AuthorityPerEpochStore>,
6168 ) -> Option<EndOfEpochTransactionKind> {
6169 if !epoch_store.protocol_config().enable_bridge() {
6170 info!("bridge not enabled");
6171 return None;
6172 }
6173 if !epoch_store
6174 .protocol_config()
6175 .should_try_to_finalize_bridge_committee()
6176 {
6177 info!("should not try to finalize bridge committee yet");
6178 return None;
6179 }
6180 if !epoch_store.bridge_exists() {
6182 return None;
6183 }
6184
6185 if epoch_store.bridge_committee_initiated() {
6186 return None;
6187 }
6188
6189 let bridge_initial_shared_version = epoch_store
6190 .epoch_start_config()
6191 .bridge_obj_initial_shared_version()
6192 .expect("initial version must exist");
6193 let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
6194 info!("Init Bridge committee tx");
6195 Some(tx)
6196 }
6197
6198 #[instrument(level = "debug", skip_all)]
6199 fn create_deny_list_state_tx(
6200 &self,
6201 epoch_store: &Arc<AuthorityPerEpochStore>,
6202 ) -> Option<EndOfEpochTransactionKind> {
6203 if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
6204 return None;
6205 }
6206
6207 if epoch_store.coin_deny_list_state_exists() {
6208 return None;
6209 }
6210
6211 let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
6212 info!("Creating DenyListStateCreate tx");
6213 Some(tx)
6214 }
6215
6216 #[instrument(level = "debug", skip_all)]
6217 fn create_address_alias_state_tx(
6218 &self,
6219 epoch_store: &Arc<AuthorityPerEpochStore>,
6220 ) -> Option<EndOfEpochTransactionKind> {
6221 if !epoch_store.protocol_config().address_aliases() {
6222 info!("address aliases not enabled");
6223 return None;
6224 }
6225
6226 if epoch_store.address_alias_state_exists() {
6227 return None;
6228 }
6229
6230 let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
6231 info!("Creating AddressAliasStateCreate tx");
6232 Some(tx)
6233 }
6234
6235 #[instrument(level = "debug", skip_all)]
6236 fn create_execution_time_observations_tx(
6237 &self,
6238 epoch_store: &Arc<AuthorityPerEpochStore>,
6239 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6240 last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
6241 ) -> Option<EndOfEpochTransactionKind> {
6242 let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
6243 .protocol_config()
6244 .per_object_congestion_control_mode()
6245 else {
6246 return None;
6247 };
6248
6249 let start_checkpoint = std::cmp::max(
6252 last_checkpoint_before_end_of_epoch
6253 .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
6254 epoch_store
6256 .epoch()
6257 .checked_sub(1)
6258 .map(|prev_epoch| {
6259 self.checkpoint_store
6260 .get_epoch_last_checkpoint_seq_number(prev_epoch)
6261 .expect("typed store must not fail")
6262 .expect(
6263 "sequence number of last checkpoint of preceding epoch must be saved",
6264 )
6265 + 1
6266 })
6267 .unwrap_or(1),
6268 );
6269 info!(
6270 "reading checkpoint range {:?}..={:?}",
6271 start_checkpoint, last_checkpoint_before_end_of_epoch
6272 );
6273 let sequence_numbers =
6274 (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
6275 let contents_digests: Vec<_> = self
6276 .checkpoint_store
6277 .multi_get_locally_computed_checkpoints(&sequence_numbers)
6278 .expect("typed store must not fail")
6279 .into_iter()
6280 .zip_debug_eq(sequence_numbers)
6281 .map(|(maybe_checkpoint, sequence_number)| {
6282 if let Some(checkpoint) = maybe_checkpoint {
6283 checkpoint.content_digest
6284 } else {
6285 self.checkpoint_store
6288 .get_checkpoint_by_sequence_number(sequence_number)
6289 .expect("typed store must not fail")
6290 .unwrap_or_else(|| {
6291 fatal!("preceding checkpoints must exist by end of epoch")
6292 })
6293 .data()
6294 .content_digest
6295 }
6296 })
6297 .collect();
6298 let tx_digests: Vec<_> = self
6299 .checkpoint_store
6300 .multi_get_checkpoint_content(&contents_digests)
6301 .expect("typed store must not fail")
6302 .into_iter()
6303 .flat_map(|maybe_contents| {
6304 maybe_contents
6305 .expect("preceding checkpoint contents must exist by end of epoch")
6306 .into_inner()
6307 .into_iter()
6308 .map(|digests| digests.transaction)
6309 })
6310 .collect();
6311 let included_execution_time_observations: HashSet<_> = self
6312 .get_transaction_cache_reader()
6313 .multi_get_transaction_blocks(&tx_digests)
6314 .into_iter()
6315 .flat_map(|maybe_tx| {
6316 if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
6317 .expect("preceding transaction must exist by end of epoch")
6318 .transaction_data()
6319 .kind()
6320 {
6321 #[allow(clippy::unnecessary_to_owned)]
6322 itertools::Either::Left(
6323 ptb.commands
6324 .to_owned()
6325 .into_iter()
6326 .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
6327 )
6328 } else {
6329 itertools::Either::Right(std::iter::empty())
6330 }
6331 })
6332 .chain(end_of_epoch_observation_keys.into_iter())
6333 .collect();
6334
6335 let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
6336 epoch_store
6337 .get_end_of_epoch_execution_time_observations()
6338 .filter_and_sort_v1(
6339 |(key, _)| included_execution_time_observations.contains(key),
6340 params.stored_observations_limit.try_into().unwrap(),
6341 ),
6342 );
6343 info!("Creating StoreExecutionTimeObservations tx");
6344 Some(tx)
6345 }
6346
6347 #[instrument(level = "error", skip_all)]
6358 pub async fn create_and_execute_advance_epoch_tx(
6359 &self,
6360 epoch_store: &Arc<AuthorityPerEpochStore>,
6361 gas_cost_summary: &GasCostSummary,
6362 checkpoint: CheckpointSequenceNumber,
6363 epoch_start_timestamp_ms: CheckpointTimestamp,
6364 end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6365 last_checkpoint: CheckpointSequenceNumber,
6368 ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
6369 let mut txns = Vec::new();
6370
6371 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
6372 txns.push(tx);
6373 }
6374 if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
6375 txns.push(tx);
6376 }
6377 if let Some(tx) = self.create_bridge_tx(epoch_store) {
6378 txns.push(tx);
6379 }
6380 if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
6381 txns.push(tx);
6382 }
6383 if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
6384 txns.push(tx);
6385 }
6386 if let Some(tx) = self.create_execution_time_observations_tx(
6387 epoch_store,
6388 end_of_epoch_observation_keys,
6389 last_checkpoint,
6390 ) {
6391 txns.push(tx);
6392 }
6393 if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
6394 txns.push(tx);
6395 }
6396
6397 if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
6398 txns.push(tx);
6399 }
6400 if let Some(tx) = self.create_display_registry_tx(epoch_store) {
6401 txns.push(tx);
6402 }
6403 if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
6404 txns.push(tx);
6405 }
6406 if let Some(tx) = self.create_write_accumulator_storage_cost_tx(epoch_store) {
6407 txns.push(tx);
6408 }
6409
6410 let next_epoch = epoch_store.epoch() + 1;
6411
6412 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
6413
6414 let (next_epoch_protocol_version, next_epoch_system_packages) =
6415 Self::choose_protocol_version_and_system_packages_v2(
6416 epoch_store.protocol_version(),
6417 epoch_store.protocol_config(),
6418 epoch_store.committee(),
6419 epoch_store
6420 .get_capabilities_v2()
6421 .expect("read capabilities from db cannot fail"),
6422 buffer_stake_bps,
6423 );
6424
6425 let config = epoch_store.protocol_config();
6428 let binary_config = config.binary_config(None);
6429 let Some(next_epoch_system_package_bytes) = self
6430 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
6431 .await
6432 else {
6433 if next_epoch_protocol_version <= ProtocolVersion::MAX {
6434 debug_fatal!(
6438 "upgraded system packages {:?} are not locally available, cannot create \
6439 ChangeEpochTx. validator binary must be upgraded to the correct version!",
6440 next_epoch_system_packages
6441 );
6442 } else {
6443 error!(
6444 "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
6445 next_epoch_protocol_version
6446 );
6447 }
6448 return Err(CheckpointBuilderError::SystemPackagesMissing);
6457 };
6458
6459 let tx = if epoch_store
6460 .protocol_config()
6461 .end_of_epoch_transaction_supported()
6462 {
6463 txns.push(EndOfEpochTransactionKind::new_change_epoch(
6464 next_epoch,
6465 next_epoch_protocol_version,
6466 gas_cost_summary.storage_cost,
6467 gas_cost_summary.computation_cost,
6468 gas_cost_summary.storage_rebate,
6469 gas_cost_summary.non_refundable_storage_fee,
6470 epoch_start_timestamp_ms,
6471 next_epoch_system_package_bytes,
6472 ));
6473
6474 VerifiedTransaction::new_end_of_epoch_transaction(txns)
6475 } else {
6476 VerifiedTransaction::new_change_epoch(
6477 next_epoch,
6478 next_epoch_protocol_version,
6479 gas_cost_summary.storage_cost,
6480 gas_cost_summary.computation_cost,
6481 gas_cost_summary.storage_rebate,
6482 gas_cost_summary.non_refundable_storage_fee,
6483 epoch_start_timestamp_ms,
6484 next_epoch_system_package_bytes,
6485 )
6486 };
6487
6488 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6489 tx.clone(),
6490 epoch_store.epoch(),
6491 checkpoint,
6492 );
6493
6494 let tx_digest = executable_tx.digest();
6495
6496 info!(
6497 ?next_epoch,
6498 ?next_epoch_protocol_version,
6499 ?next_epoch_system_packages,
6500 computation_cost=?gas_cost_summary.computation_cost,
6501 storage_cost=?gas_cost_summary.storage_cost,
6502 storage_rebate=?gas_cost_summary.storage_rebate,
6503 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6504 ?tx_digest,
6505 "Creating advance epoch transaction"
6506 );
6507
6508 fail_point_async!("change_epoch_tx_delay");
6509 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6510
6511 if self
6514 .get_transaction_cache_reader()
6515 .is_tx_already_executed(tx_digest)
6516 {
6517 warn!("change epoch tx has already been executed via state sync");
6518 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6519 }
6520
6521 let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6522 else {
6523 return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6524 };
6525
6526 let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6529 self.get_object_cache_reader().as_ref(),
6530 std::iter::once(&Schedulable::Transaction(&executable_tx)),
6531 )?;
6532
6533 assert_eq!(assigned_versions.0.len(), 1);
6534 let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6535
6536 let input_objects = self.read_objects_for_execution(
6537 &tx_lock,
6538 &executable_tx,
6539 &assigned_versions,
6540 epoch_store,
6541 )?;
6542
6543 let (transaction_outputs, _timings, _execution_error_opt) = self
6544 .execute_certificate(
6545 &execution_guard,
6546 &executable_tx,
6547 input_objects,
6548 None,
6549 ExecutionEnv::default(),
6550 epoch_store,
6551 )
6552 .unwrap();
6553 let system_obj = get_sui_system_state(&transaction_outputs.written)
6554 .expect("change epoch tx must write to system object");
6555
6556 let effects = transaction_outputs.effects;
6557 self.get_state_sync_store()
6561 .insert_transaction_and_effects(&tx, &effects);
6562
6563 info!(
6564 "Effects summary of the change epoch transaction: {:?}",
6565 effects.summary_for_debug()
6566 );
6567 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6568 assert!(effects.status().is_ok());
6570 Ok((system_obj, effects))
6571 }
6572
6573 #[instrument(level = "error", skip_all)]
6574 async fn reopen_epoch_db(
6575 &self,
6576 cur_epoch_store: &AuthorityPerEpochStore,
6577 new_committee: Committee,
6578 epoch_start_configuration: EpochStartConfiguration,
6579 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6580 epoch_last_checkpoint: CheckpointSequenceNumber,
6581 ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6582 let new_epoch = new_committee.epoch;
6583 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6584 assert_eq!(
6585 epoch_start_configuration.epoch_start_state().epoch(),
6586 new_committee.epoch
6587 );
6588 fail_point!("before-open-new-epoch-store");
6589 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6590 self.name,
6591 new_committee,
6592 epoch_start_configuration,
6593 self.get_backing_package_store().clone(),
6594 self.get_object_store().clone(),
6595 expensive_safety_check_config,
6596 epoch_last_checkpoint,
6597 self.config.fullnode_sync_mode,
6598 )?;
6599 self.epoch_store.store(new_epoch_store.clone());
6600 Ok(new_epoch_store)
6601 }
6602
6603 #[cfg(test)]
6604 pub(crate) fn iter_live_object_set_for_testing(
6605 &self,
6606 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6607 let include_wrapped_object = !self
6608 .epoch_store_for_testing()
6609 .protocol_config()
6610 .simplified_unwrap_then_delete();
6611 self.get_global_state_hash_store()
6612 .iter_cached_live_object_set_for_testing(include_wrapped_object)
6613 }
6614
6615 #[cfg(test)]
6616 pub(crate) fn shutdown_execution_for_test(&self) {
6617 self.tx_execution_shutdown
6618 .lock()
6619 .take()
6620 .unwrap()
6621 .send(())
6622 .unwrap();
6623 }
6624
6625 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6627 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6628 self.get_object_cache_reader()
6629 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6630 self.get_reconfig_api()
6631 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6632 Ok(())
6633 }
6634}
6635
6636pub struct RandomnessRoundReceiver {
6637 authority_state: Arc<AuthorityState>,
6638 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6639}
6640
6641impl RandomnessRoundReceiver {
6642 pub fn spawn(
6643 authority_state: Arc<AuthorityState>,
6644 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6645 ) -> JoinHandle<()> {
6646 let rrr = RandomnessRoundReceiver {
6647 authority_state,
6648 randomness_rx,
6649 };
6650 spawn_monitored_task!(rrr.run())
6651 }
6652
6653 async fn run(mut self) {
6654 info!("RandomnessRoundReceiver event loop started");
6655
6656 loop {
6657 tokio::select! {
6658 maybe_recv = self.randomness_rx.recv() => {
6659 if let Some((epoch, round, bytes)) = maybe_recv {
6660 self.handle_new_randomness(epoch, round, bytes).await;
6661 } else {
6662 break;
6663 }
6664 },
6665 }
6666 }
6667
6668 info!("RandomnessRoundReceiver event loop ended");
6669 }
6670
6671 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
6672 async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
6673 fail_point_async!("randomness-delay");
6674
6675 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
6676 if epoch_store.epoch() != epoch {
6677 warn!(
6678 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
6679 epoch_store.epoch()
6680 );
6681 return;
6682 }
6683 let key = TransactionKey::RandomnessRound(epoch, round);
6684 let transaction = VerifiedTransaction::new_randomness_state_update(
6685 epoch,
6686 round,
6687 bytes,
6688 epoch_store
6689 .epoch_start_config()
6690 .randomness_obj_initial_shared_version()
6691 .expect("randomness state obj must exist"),
6692 );
6693 debug!(
6694 "created randomness state update transaction with digest: {:?}",
6695 transaction.digest()
6696 );
6697 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
6698 let digest = *transaction.digest();
6699
6700 self.authority_state
6705 .get_cache_commit()
6706 .persist_transaction(&transaction);
6707
6708 if epoch_store.insert_tx_key(key, digest).is_err() {
6710 warn!("epoch ended while handling new randomness");
6711 }
6712
6713 let authority_state = self.authority_state.clone();
6714 spawn_monitored_task!(async move {
6715 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
6722 let result = tokio::time::timeout(
6723 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
6724 authority_state
6725 .get_transaction_cache_reader()
6726 .notify_read_executed_effects(
6727 "RandomnessRoundReceiver::notify_read_executed_effects_first",
6728 &[digest],
6729 ),
6730 )
6731 .await;
6732 let mut effects = match result {
6733 Ok(result) => result,
6734 Err(_) => {
6735 debug_fatal_no_invariant!(
6737 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6738 );
6739 authority_state
6741 .get_transaction_cache_reader()
6742 .notify_read_executed_effects(
6743 "RandomnessRoundReceiver::notify_read_executed_effects_second",
6744 &[digest],
6745 )
6746 .await
6747 }
6748 };
6749
6750 let effects = effects.pop().expect("should return effects");
6751 if *effects.status() != ExecutionStatus::Success {
6752 fatal!(
6753 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6754 );
6755 }
6756 debug!(
6757 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6758 );
6759 });
6760 }
6761}
6762
6763#[async_trait]
6764impl TransactionKeyValueStoreTrait for AuthorityState {
6765 #[instrument(skip(self))]
6766 async fn multi_get(
6767 &self,
6768 transactions: &[TransactionDigest],
6769 effects: &[TransactionDigest],
6770 ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6771 let txns = if !transactions.is_empty() {
6772 self.get_transaction_cache_reader()
6773 .multi_get_transaction_blocks(transactions)
6774 .into_iter()
6775 .map(|t| t.map(|t| (*t).clone().into_inner()))
6776 .collect()
6777 } else {
6778 vec![]
6779 };
6780
6781 let fx = if !effects.is_empty() {
6782 self.get_transaction_cache_reader()
6783 .multi_get_executed_effects(effects)
6784 } else {
6785 vec![]
6786 };
6787
6788 Ok((txns, fx))
6789 }
6790
6791 #[instrument(skip(self))]
6792 async fn multi_get_checkpoints(
6793 &self,
6794 checkpoint_summaries: &[CheckpointSequenceNumber],
6795 checkpoint_contents: &[CheckpointSequenceNumber],
6796 checkpoint_summaries_by_digest: &[CheckpointDigest],
6797 ) -> SuiResult<(
6798 Vec<Option<CertifiedCheckpointSummary>>,
6799 Vec<Option<CheckpointContents>>,
6800 Vec<Option<CertifiedCheckpointSummary>>,
6801 )> {
6802 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6804 let store = self.get_checkpoint_store();
6805 for seq in checkpoint_summaries {
6806 let checkpoint = store
6807 .get_checkpoint_by_sequence_number(*seq)?
6808 .map(|c| c.into_inner());
6809
6810 summaries.push(checkpoint);
6811 }
6812
6813 let mut contents = Vec::with_capacity(checkpoint_contents.len());
6814 for seq in checkpoint_contents {
6815 let checkpoint = store
6816 .get_checkpoint_by_sequence_number(*seq)?
6817 .and_then(|summary| {
6818 store
6819 .get_checkpoint_contents(&summary.content_digest)
6820 .expect("db read cannot fail")
6821 });
6822 contents.push(checkpoint);
6823 }
6824
6825 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6826 for digest in checkpoint_summaries_by_digest {
6827 let checkpoint = store
6828 .get_checkpoint_by_digest(digest)?
6829 .map(|c| c.into_inner());
6830 summaries_by_digest.push(checkpoint);
6831 }
6832 Ok((summaries, contents, summaries_by_digest))
6833 }
6834
6835 #[instrument(skip(self))]
6836 async fn deprecated_get_transaction_checkpoint(
6837 &self,
6838 digest: TransactionDigest,
6839 ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6840 Ok(self
6841 .get_checkpoint_cache()
6842 .deprecated_get_transaction_checkpoint(&digest)
6843 .map(|(_epoch, checkpoint)| checkpoint))
6844 }
6845
6846 #[instrument(skip(self))]
6847 async fn get_object(
6848 &self,
6849 object_id: ObjectID,
6850 version: VersionNumber,
6851 ) -> SuiResult<Option<Object>> {
6852 Ok(self
6853 .get_object_cache_reader()
6854 .get_object_by_key(&object_id, version))
6855 }
6856
6857 #[instrument(skip_all)]
6858 async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6859 Ok(self
6860 .get_object_cache_reader()
6861 .multi_get_objects_by_key(object_keys))
6862 }
6863
6864 #[instrument(skip(self))]
6865 async fn multi_get_transaction_checkpoint(
6866 &self,
6867 digests: &[TransactionDigest],
6868 ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6869 let res = self
6870 .get_checkpoint_cache()
6871 .deprecated_multi_get_transaction_checkpoint(digests);
6872
6873 Ok(res
6874 .into_iter()
6875 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6876 .collect())
6877 }
6878
6879 #[instrument(skip(self))]
6880 async fn multi_get_events_by_tx_digests(
6881 &self,
6882 digests: &[TransactionDigest],
6883 ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6884 if digests.is_empty() {
6885 return Ok(vec![]);
6886 }
6887
6888 Ok(self
6889 .get_transaction_cache_reader()
6890 .multi_get_events(digests))
6891 }
6892}
6893
6894#[cfg(msim)]
6895pub mod framework_injection {
6896 use move_binary_format::CompiledModule;
6897 use std::collections::BTreeMap;
6898 use std::{cell::RefCell, collections::BTreeSet};
6899 use sui_framework::{BuiltInFramework, SystemPackage};
6900 use sui_types::base_types::{AuthorityName, ObjectID};
6901 use sui_types::is_system_package;
6902
6903 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6904
6905 thread_local! {
6907 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6908 }
6909
6910 type Framework = Vec<CompiledModule>;
6911
6912 pub type PackageUpgradeCallback =
6913 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6914
6915 enum PackageOverrideConfig {
6916 Global(Framework),
6917 PerValidator(PackageUpgradeCallback),
6918 }
6919
6920 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6921 modules
6922 .iter()
6923 .map(|m| {
6924 let mut buf = Vec::new();
6925 m.serialize_with_version(m.version, &mut buf).unwrap();
6926 buf
6927 })
6928 .collect()
6929 }
6930
6931 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6932 OVERRIDE.with(|bs| {
6933 bs.borrow_mut()
6934 .insert(package_id, PackageOverrideConfig::Global(modules))
6935 });
6936 }
6937
6938 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6939 OVERRIDE.with(|bs| {
6940 bs.borrow_mut()
6941 .insert(package_id, PackageOverrideConfig::PerValidator(func))
6942 });
6943 }
6944
6945 pub fn set_system_packages(packages: Vec<SystemPackage>) {
6946 OVERRIDE.with(|bs| {
6947 let mut new_packages_not_to_include: BTreeSet<_> =
6948 BuiltInFramework::all_package_ids().into_iter().collect();
6949 for pkg in &packages {
6950 new_packages_not_to_include.remove(&pkg.id);
6951 }
6952 for pkg in packages {
6953 bs.borrow_mut()
6954 .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6955 }
6956 for empty_pkg in new_packages_not_to_include {
6957 bs.borrow_mut()
6958 .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6959 }
6960 });
6961 }
6962
6963 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6964 OVERRIDE.with(|cfg| {
6965 cfg.borrow().get(package_id).and_then(|entry| match entry {
6966 PackageOverrideConfig::Global(framework) => {
6967 Some(compiled_modules_to_bytes(framework))
6968 }
6969 PackageOverrideConfig::PerValidator(func) => {
6970 func(name).map(|fw| compiled_modules_to_bytes(&fw))
6971 }
6972 })
6973 })
6974 }
6975
6976 pub fn get_override_modules(
6977 package_id: &ObjectID,
6978 name: AuthorityName,
6979 ) -> Option<Vec<CompiledModule>> {
6980 OVERRIDE.with(|cfg| {
6981 cfg.borrow().get(package_id).and_then(|entry| match entry {
6982 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6983 PackageOverrideConfig::PerValidator(func) => func(name),
6984 })
6985 })
6986 }
6987
6988 pub fn get_override_system_package(
6989 package_id: &ObjectID,
6990 name: AuthorityName,
6991 ) -> Option<SystemPackage> {
6992 let bytes = get_override_bytes(package_id, name)?;
6993 let dependencies = if is_system_package(*package_id) {
6994 BuiltInFramework::get_package_by_id(package_id)
6995 .dependencies
6996 .to_vec()
6997 } else {
6998 BuiltInFramework::all_package_ids()
7000 };
7001 Some(SystemPackage {
7002 id: *package_id,
7003 bytes,
7004 dependencies,
7005 })
7006 }
7007
7008 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
7009 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
7010 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
7011 cfg.borrow()
7012 .keys()
7013 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
7014 .collect()
7015 });
7016
7017 extra
7018 .into_iter()
7019 .map(|package| SystemPackage {
7020 id: package,
7021 bytes: get_override_bytes(&package, name).unwrap(),
7022 dependencies: BuiltInFramework::all_package_ids(),
7023 })
7024 .collect()
7025 }
7026}
7027
7028#[derive(Debug, Serialize, Deserialize, Clone)]
7029pub struct ObjDumpFormat {
7030 pub id: ObjectID,
7031 pub version: VersionNumber,
7032 pub digest: ObjectDigest,
7033 pub object: Object,
7034}
7035
7036impl ObjDumpFormat {
7037 fn new(object: Object) -> Self {
7038 let oref = object.compute_object_reference();
7039 Self {
7040 id: oref.0,
7041 version: oref.1,
7042 digest: oref.2,
7043 object,
7044 }
7045 }
7046}
7047
7048#[derive(Debug, Serialize, Deserialize, Clone)]
7049pub struct NodeStateDump {
7050 pub tx_digest: TransactionDigest,
7051 pub sender_signed_data: SenderSignedData,
7052 pub executed_epoch: u64,
7053 pub reference_gas_price: u64,
7054 pub protocol_version: u64,
7055 pub epoch_start_timestamp_ms: u64,
7056 pub computed_effects: TransactionEffects,
7057 pub expected_effects_digest: TransactionEffectsDigest,
7058 pub relevant_system_packages: Vec<ObjDumpFormat>,
7059 pub shared_objects: Vec<ObjDumpFormat>,
7060 pub loaded_child_objects: Vec<ObjDumpFormat>,
7061 pub modified_at_versions: Vec<ObjDumpFormat>,
7062 pub runtime_reads: Vec<ObjDumpFormat>,
7063 pub input_objects: Vec<ObjDumpFormat>,
7064}
7065
7066impl NodeStateDump {
7067 pub fn new(
7068 tx_digest: &TransactionDigest,
7069 effects: &TransactionEffects,
7070 expected_effects_digest: TransactionEffectsDigest,
7071 object_store: &dyn ObjectStore,
7072 epoch_store: &Arc<AuthorityPerEpochStore>,
7073 inner_temporary_store: &InnerTemporaryStore,
7074 certificate: &VerifiedExecutableTransaction,
7075 ) -> SuiResult<Self> {
7076 let executed_epoch = epoch_store.epoch();
7078 let reference_gas_price = epoch_store.reference_gas_price();
7079 let epoch_start_config = epoch_store.epoch_start_config();
7080 let protocol_version = epoch_store.protocol_version().as_u64();
7081 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
7082
7083 let mut relevant_system_packages = Vec::new();
7085 for sys_package_id in BuiltInFramework::all_package_ids() {
7086 if let Some(w) = object_store.get_object(&sys_package_id) {
7087 relevant_system_packages.push(ObjDumpFormat::new(w))
7088 }
7089 }
7090
7091 let mut shared_objects = Vec::new();
7093 for kind in effects.input_consensus_objects() {
7094 match kind {
7095 InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
7096 if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
7097 shared_objects.push(ObjDumpFormat::new(w))
7098 }
7099 }
7100 InputConsensusObject::ReadConsensusStreamEnded(..)
7101 | InputConsensusObject::MutateConsensusStreamEnded(..)
7102 | InputConsensusObject::Cancelled(..) => (), }
7104 }
7105
7106 let mut loaded_child_objects = Vec::new();
7109 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
7110 if let Some(w) = object_store.get_object_by_key(id, meta.version) {
7111 loaded_child_objects.push(ObjDumpFormat::new(w))
7112 }
7113 }
7114
7115 let mut modified_at_versions = Vec::new();
7117 for (id, ver) in effects.modified_at_versions() {
7118 if let Some(w) = object_store.get_object_by_key(&id, ver) {
7119 modified_at_versions.push(ObjDumpFormat::new(w))
7120 }
7121 }
7122
7123 let mut runtime_reads = Vec::new();
7126 for obj in inner_temporary_store
7127 .runtime_packages_loaded_from_db
7128 .values()
7129 {
7130 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
7131 }
7132
7133 Ok(Self {
7136 tx_digest: *tx_digest,
7137 executed_epoch,
7138 reference_gas_price,
7139 epoch_start_timestamp_ms,
7140 protocol_version,
7141 relevant_system_packages,
7142 shared_objects,
7143 loaded_child_objects,
7144 modified_at_versions,
7145 runtime_reads,
7146 sender_signed_data: certificate.clone().into_message(),
7147 input_objects: inner_temporary_store
7148 .input_objects
7149 .values()
7150 .map(|o| ObjDumpFormat::new(o.clone()))
7151 .collect(),
7152 computed_effects: effects.clone(),
7153 expected_effects_digest,
7154 })
7155 }
7156
7157 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
7158 let mut objects = Vec::new();
7159 objects.extend(self.relevant_system_packages.clone());
7160 objects.extend(self.shared_objects.clone());
7161 objects.extend(self.loaded_child_objects.clone());
7162 objects.extend(self.modified_at_versions.clone());
7163 objects.extend(self.runtime_reads.clone());
7164 objects.extend(self.input_objects.clone());
7165 objects
7166 }
7167
7168 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
7169 let file_name = format!(
7170 "{}_{}_NODE_DUMP.json",
7171 self.tx_digest,
7172 AuthorityState::unixtime_now_ms()
7173 );
7174 let mut path = path.to_path_buf();
7175 path.push(&file_name);
7176 let mut file = File::create(path.clone())?;
7177 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
7178 Ok(path)
7179 }
7180
7181 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
7182 let file = File::open(path)?;
7183 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
7184 }
7185}