sui_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use crate::accumulators::coin_reservations::CoinReservationResolver;
6use crate::accumulators::funds_read::AccountFundsRead;
7use crate::accumulators::object_funds_checker::ObjectFundsChecker;
8use crate::accumulators::object_funds_checker::metrics::ObjectFundsCheckerMetrics;
9use crate::accumulators::{self, AccumulatorSettlementTxBuilder};
10use crate::checkpoints::CheckpointBuilderError;
11use crate::checkpoints::CheckpointBuilderResult;
12use crate::congestion_tracker::CongestionTracker;
13use crate::consensus_adapter::ConsensusOverloadChecker;
14use crate::execution_cache::ExecutionCacheTraitPointers;
15use crate::execution_cache::TransactionCacheRead;
16use crate::execution_cache::writeback_cache::WritebackCache;
17use crate::execution_scheduler::ExecutionScheduler;
18use crate::execution_scheduler::funds_withdraw_scheduler::FundsSettlement;
19use crate::jsonrpc_index::CoinIndexKey2;
20use crate::rpc_index::RpcIndexStore;
21use crate::traffic_controller::TrafficController;
22use crate::traffic_controller::metrics::TrafficControllerMetrics;
23use crate::transaction_outputs::TransactionOutputs;
24use crate::verify_indexes::{fix_indexes, verify_indexes};
25use arc_swap::{ArcSwap, ArcSwapOption, Guard};
26use async_trait::async_trait;
27use authority_per_epoch_store::CertLockGuard;
28use dashmap::DashMap;
29use fastcrypto::encoding::Base58;
30use fastcrypto::encoding::Encoding;
31use fastcrypto::hash::MultisetHash;
32use itertools::Itertools;
33use move_binary_format::CompiledModule;
34use move_binary_format::binary_config::BinaryConfig;
35use move_core_types::annotated_value::MoveStructLayout;
36use move_core_types::language_storage::ModuleId;
37use mysten_common::{assert_reachable, fatal};
38use parking_lot::Mutex;
39use prometheus::{
40    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
41    register_histogram_vec_with_registry, register_histogram_with_registry,
42    register_int_counter_vec_with_registry, register_int_counter_with_registry,
43    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
44};
45use serde::de::DeserializeOwned;
46use serde::{Deserialize, Serialize};
47use shared_object_version_manager::AssignedVersions;
48use shared_object_version_manager::Schedulable;
49use std::collections::BTreeMap;
50use std::collections::BTreeSet;
51use std::fs::File;
52use std::io::Write;
53use std::path::{Path, PathBuf};
54use std::sync::atomic::Ordering;
55use std::time::Duration;
56use std::time::Instant;
57use std::time::SystemTime;
58use std::time::UNIX_EPOCH;
59use std::{
60    collections::{HashMap, HashSet},
61    fs,
62    pin::Pin,
63    str::FromStr,
64    sync::Arc,
65    vec,
66};
67use sui_config::NodeConfig;
68use sui_config::node::{AuthorityOverloadConfig, StateDebugDumpConfig};
69use sui_protocol_config::PerObjectCongestionControlMode;
70use sui_types::accumulator_root::AccumulatorObjId;
71use sui_types::crypto::RandomnessRound;
72use sui_types::dynamic_field::visitor as DFV;
73use sui_types::execution::ExecutionOutput;
74use sui_types::execution::ExecutionTimeObservationKey;
75use sui_types::execution::ExecutionTiming;
76use sui_types::execution_params::ExecutionOrEarlyError;
77use sui_types::execution_params::FundsWithdrawStatus;
78use sui_types::execution_params::get_early_execution_error;
79use sui_types::execution_status::ExecutionStatus;
80use sui_types::inner_temporary_store::PackageStoreWithFallback;
81use sui_types::layout_resolver::LayoutResolver;
82use sui_types::layout_resolver::into_struct_layout;
83use sui_types::messages_consensus::AuthorityCapabilitiesV2;
84use sui_types::object::bounded_visitor::BoundedVisitor;
85use sui_types::storage::ChildObjectResolver;
86use sui_types::storage::InputKey;
87use sui_types::storage::TrackingBackingStore;
88use sui_types::traffic_control::{
89    PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams,
90};
91use sui_types::transaction_executor::SimulateTransactionResult;
92use sui_types::transaction_executor::TransactionChecks;
93use sui_types::{SUI_ACCUMULATOR_ROOT_OBJECT_ID, accumulator_metadata};
94use tap::TapFallible;
95use tokio::sync::RwLock;
96use tokio::sync::mpsc::unbounded_channel;
97use tokio::sync::watch::error::RecvError;
98use tokio::sync::{mpsc, oneshot};
99use tokio::task::JoinHandle;
100use tokio::time::timeout;
101use tracing::{debug, error, info, instrument, warn};
102
103use self::authority_store::ExecutionLockWriteGuard;
104use self::authority_store_pruner::{AuthorityStorePruningMetrics, PrunerWatermarks};
105pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
106use mysten_metrics::{monitored_scope, spawn_monitored_task};
107
108use crate::jsonrpc_index::IndexStore;
109use crate::jsonrpc_index::{
110    CoinInfo, IndexStoreCacheUpdates, IndexStoreCacheUpdatesWithLocks, ObjectIndexChanges,
111};
112use mysten_common::debug_fatal;
113use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
114use sui_config::genesis::Genesis;
115use sui_config::node::{DBCheckpointConfig, ExpensiveSafetyCheckConfig};
116use sui_framework::{BuiltInFramework, SystemPackage};
117use sui_json_rpc_types::{
118    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, SuiEvent, SuiMoveValue,
119    SuiObjectDataFilter, SuiTransactionBlockData, SuiTransactionBlockEffects,
120    SuiTransactionBlockEvents, TransactionFilter,
121};
122use sui_macros::{fail_point, fail_point_arg, fail_point_async, fail_point_if};
123use sui_storage::key_value_store::{TransactionKeyValueStore, TransactionKeyValueStoreTrait};
124use sui_storage::key_value_store_metrics::KeyValueStoreMetrics;
125use sui_types::authenticator_state::get_authenticator_state;
126use sui_types::committee::{EpochId, ProtocolVersion};
127use sui_types::crypto::{AuthoritySignInfo, Signer, default_hash};
128use sui_types::deny_list_v1::check_coin_deny_list_v1;
129use sui_types::digests::ChainIdentifier;
130use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldName};
131use sui_types::effects::{
132    InputConsensusObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
133    TransactionEvents, VerifiedSignedTransactionEffects,
134};
135use sui_types::error::{ExecutionError, ExecutionErrorKind, SuiErrorKind, UserInputError};
136use sui_types::event::{Event, EventID};
137use sui_types::executable_transaction::VerifiedExecutableTransaction;
138use sui_types::gas::{GasCostSummary, SuiGasStatus};
139use sui_types::inner_temporary_store::{
140    InnerTemporaryStore, ObjectMap, TemporaryModuleResolver, TxCoins, WrittenObjects,
141};
142use sui_types::message_envelope::Message;
143use sui_types::messages_checkpoint::{
144    CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointContentsDigest,
145    CheckpointDigest, CheckpointRequest, CheckpointRequestV2, CheckpointResponse,
146    CheckpointResponseV2, CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
147    CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
148};
149use sui_types::messages_grpc::{
150    LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
151    TransactionInfoRequest, TransactionInfoResponse, TransactionStatus,
152};
153use sui_types::metrics::{BytecodeVerifierMetrics, LimitsMetrics};
154use sui_types::object::{MoveObject, OBJECT_START_VERSION, Owner, PastObjectRead};
155use sui_types::signature::GenericSignature;
156use sui_types::storage::{
157    BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
158};
159use sui_types::sui_system_state::SuiSystemStateTrait;
160use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
161use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
162use sui_types::supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions};
163use sui_types::{
164    SUI_SYSTEM_ADDRESS,
165    base_types::*,
166    committee::Committee,
167    crypto::AuthoritySignature,
168    error::{SuiError, SuiResult},
169    object::{Object, ObjectRead},
170    transaction::*,
171};
172use sui_types::{TypeTag, is_system_package};
173use typed_store::TypedStoreError;
174use typed_store::rocks::StagedBatch;
175
176use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
177use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
178use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
179use crate::authority::authority_store_pruner::{
180    AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING,
181};
182use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
183use crate::authority::epoch_start_configuration::EpochStartConfiguration;
184use crate::checkpoints::CheckpointStore;
185use crate::epoch::committee_store::CommitteeStore;
186use crate::execution_cache::{
187    CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
188    ObjectCacheRead, StateSyncAPI,
189};
190use crate::execution_driver::execution_process;
191use crate::global_state_hasher::{GlobalStateHashStore, GlobalStateHasher, WrappedObject};
192use crate::metrics::LatencyObserver;
193use crate::metrics::RateTracker;
194use crate::module_cache_metrics::ResolverMetrics;
195use crate::overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx};
196use crate::stake_aggregator::StakeAggregator;
197use crate::subscription_handler::SubscriptionHandler;
198use crate::transaction_input_loader::TransactionInputLoader;
199
200#[cfg(msim)]
201pub use crate::checkpoints::checkpoint_executor::utils::{
202    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
203};
204
205#[cfg(msim)]
206use sui_types::committee::CommitteeTrait;
207use sui_types::deny_list_v2::check_coin_deny_list_v2_during_signing;
208
209#[cfg(test)]
210#[path = "unit_tests/authority_tests.rs"]
211pub mod authority_tests;
212
213#[cfg(test)]
214#[path = "unit_tests/transaction_tests.rs"]
215pub mod transaction_tests;
216
217#[cfg(test)]
218#[path = "unit_tests/batch_transaction_tests.rs"]
219mod batch_transaction_tests;
220
221#[cfg(test)]
222#[path = "unit_tests/move_integration_tests.rs"]
223pub mod move_integration_tests;
224
225#[cfg(test)]
226#[path = "unit_tests/gas_tests.rs"]
227mod gas_tests;
228
229#[cfg(test)]
230#[path = "unit_tests/batch_verification_tests.rs"]
231mod batch_verification_tests;
232
233#[cfg(test)]
234#[path = "unit_tests/coin_deny_list_tests.rs"]
235mod coin_deny_list_tests;
236
237#[cfg(test)]
238#[path = "unit_tests/auth_unit_test_utils.rs"]
239pub mod auth_unit_test_utils;
240
241pub mod authority_test_utils;
242
243pub mod authority_per_epoch_store;
244pub mod authority_per_epoch_store_pruner;
245
246pub mod authority_store_pruner;
247pub mod authority_store_tables;
248pub mod authority_store_types;
249pub mod congestion_log;
250pub mod consensus_tx_status_cache;
251pub mod epoch_start_configuration;
252pub mod execution_time_estimator;
253pub mod shared_object_congestion_tracker;
254pub mod shared_object_version_manager;
255pub mod submitted_transaction_cache;
256pub mod test_authority_builder;
257pub mod transaction_deferral;
258pub mod transaction_reject_reason_cache;
259mod weighted_moving_average;
260
261pub(crate) mod authority_store;
262pub mod backpressure;
263
264/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
265pub struct AuthorityMetrics {
266    tx_orders: IntCounter,
267    total_certs: IntCounter,
268    total_effects: IntCounter,
269    // TODO: this tracks consensus object tx, not just shared. Consider renaming.
270    pub shared_obj_tx: IntCounter,
271    sponsored_tx: IntCounter,
272    tx_already_processed: IntCounter,
273    num_input_objs: Histogram,
274    // TODO: this tracks consensus object count, not just shared. Consider renaming.
275    num_shared_objects: Histogram,
276    batch_size: Histogram,
277
278    authority_state_handle_vote_transaction_latency: Histogram,
279
280    internal_execution_latency: Histogram,
281    execution_load_input_objects_latency: Histogram,
282    prepare_certificate_latency: Histogram,
283    commit_certificate_latency: Histogram,
284    db_checkpoint_latency: Histogram,
285
286    // TODO: Rename these metrics.
287    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
288    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
289    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
290    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
291
292    pub(crate) execution_driver_executed_transactions: IntCounter,
293    pub(crate) execution_driver_paused_transactions: IntCounter,
294    pub(crate) execution_driver_dispatch_queue: IntGauge,
295    pub(crate) execution_queueing_delay_s: Histogram,
296    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
297    pub(crate) execution_gas_latency_ratio: Histogram,
298
299    pub(crate) skipped_consensus_txns: IntCounter,
300    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
301    pub(crate) consensus_handler_duplicate_tx_count: Histogram,
302
303    pub(crate) authority_overload_status: IntGauge,
304    pub(crate) authority_load_shedding_percentage: IntGauge,
305
306    pub(crate) transaction_overload_sources: IntCounterVec,
307
308    /// Post processing metrics
309    post_processing_total_events_emitted: IntCounter,
310    post_processing_total_tx_indexed: IntCounter,
311    post_processing_total_tx_had_event_processed: IntCounter,
312    post_processing_total_failures: IntCounter,
313
314    /// Consensus commit and transaction handler metrics
315    pub consensus_handler_processed: IntCounterVec,
316    pub consensus_handler_transaction_sizes: HistogramVec,
317    pub consensus_handler_num_low_scoring_authorities: IntGauge,
318    pub consensus_handler_scores: IntGaugeVec,
319    pub consensus_handler_deferred_transactions: IntCounter,
320    pub consensus_handler_congested_transactions: IntCounter,
321    pub consensus_handler_unpaid_amplification_deferrals: IntCounter,
322    pub consensus_handler_cancelled_transactions: IntCounter,
323    pub consensus_handler_max_object_costs: IntGaugeVec,
324    pub consensus_committed_subdags: IntCounterVec,
325    pub consensus_committed_messages: IntGaugeVec,
326    pub consensus_committed_user_transactions: IntGaugeVec,
327    pub consensus_finalized_user_transactions: IntGaugeVec,
328    pub consensus_rejected_user_transactions: IntGaugeVec,
329    pub consensus_calculated_throughput: IntGauge,
330    pub consensus_calculated_throughput_profile: IntGauge,
331    pub consensus_block_handler_block_processed: IntCounter,
332    pub consensus_block_handler_txn_processed: IntCounterVec,
333    pub consensus_block_handler_fastpath_executions: IntCounter,
334    pub consensus_timestamp_bias: Histogram,
335
336    pub limits_metrics: Arc<LimitsMetrics>,
337
338    /// bytecode verifier metrics for tracking timeouts
339    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
340
341    /// Count of zklogin signatures
342    pub zklogin_sig_count: IntCounter,
343    /// Count of multisig signatures
344    pub multisig_sig_count: IntCounter,
345
346    // Tracks recent average txn queueing delay between when it is ready for execution
347    // until it starts executing.
348    pub execution_queueing_latency: LatencyObserver,
349
350    // Tracks the rate of transactions become ready for execution in transaction manager.
351    // The need for the Mutex is that the tracker is updated in transaction manager and read
352    // in the overload_monitor. There should be low mutex contention because
353    // transaction manager is single threaded and the read rate in overload_monitor is
354    // low. In the case where transaction manager becomes multi-threaded, we can
355    // create one rate tracker per thread.
356    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
357
358    // Tracks the rate of transactions starts execution in execution driver.
359    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
360    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
361}
362
363// Override default Prom buckets for positive numbers in 0-10M range
364const POSITIVE_INT_BUCKETS: &[f64] = &[
365    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
366    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
367    7000000., 10000000.,
368];
369
370const LATENCY_SEC_BUCKETS: &[f64] = &[
371    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
372    10., 20., 30., 60., 90.,
373];
374
375// Buckets for low latency samples. Starts from 10us.
376const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
377    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
378    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
379];
380
381// Buckets for consensus timestamp bias in seconds.
382// We expect 200-300ms, so we cluster the buckets more tightly in that range.
383const TIMESTAMP_BIAS_SEC_BUCKETS: &[f64] = &[
384    -2.0, 0.0, 0.2, 0.22, 0.24, 0.26, 0.28, 0.30, 0.32, 0.34, 0.36, 0.38, 0.40, 0.45, 0.50, 0.80,
385    2.0, 10.0, 30.0,
386];
387
388const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
389    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
390    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
391];
392
393pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000_000_000;
394
395// Transaction author should have observed the input objects as finalized output,
396// so usually the wait does not need to be long.
397// When submitted by TransactionDriver, it will retry quickly if there is no return from this validator too.
398pub const WAIT_FOR_FASTPATH_INPUT_TIMEOUT: Duration = Duration::from_secs(2);
399
400impl AuthorityMetrics {
401    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
402        Self {
403            tx_orders: register_int_counter_with_registry!(
404                "total_transaction_orders",
405                "Total number of transaction orders",
406                registry,
407            )
408            .unwrap(),
409            total_certs: register_int_counter_with_registry!(
410                "total_transaction_certificates",
411                "Total number of transaction certificates handled",
412                registry,
413            )
414            .unwrap(),
415            // total_effects == total transactions finished
416            total_effects: register_int_counter_with_registry!(
417                "total_transaction_effects",
418                "Total number of transaction effects produced",
419                registry,
420            )
421            .unwrap(),
422
423            shared_obj_tx: register_int_counter_with_registry!(
424                "num_shared_obj_tx",
425                "Number of transactions involving shared objects",
426                registry,
427            )
428            .unwrap(),
429
430            sponsored_tx: register_int_counter_with_registry!(
431                "num_sponsored_tx",
432                "Number of sponsored transactions",
433                registry,
434            )
435            .unwrap(),
436
437            tx_already_processed: register_int_counter_with_registry!(
438                "num_tx_already_processed",
439                "Number of transaction orders already processed previously",
440                registry,
441            )
442            .unwrap(),
443            num_input_objs: register_histogram_with_registry!(
444                "num_input_objects",
445                "Distribution of number of input TX objects per TX",
446                POSITIVE_INT_BUCKETS.to_vec(),
447                registry,
448            )
449            .unwrap(),
450            num_shared_objects: register_histogram_with_registry!(
451                "num_shared_objects",
452                "Number of shared input objects per TX",
453                POSITIVE_INT_BUCKETS.to_vec(),
454                registry,
455            )
456            .unwrap(),
457            batch_size: register_histogram_with_registry!(
458                "batch_size",
459                "Distribution of size of transaction batch",
460                POSITIVE_INT_BUCKETS.to_vec(),
461                registry,
462            )
463            .unwrap(),
464            authority_state_handle_vote_transaction_latency: register_histogram_with_registry!(
465                "authority_state_handle_vote_transaction_latency",
466                "Latency of voting on transactions without signing",
467                LATENCY_SEC_BUCKETS.to_vec(),
468                registry,
469            )
470            .unwrap(),
471            internal_execution_latency: register_histogram_with_registry!(
472                "authority_state_internal_execution_latency",
473                "Latency of actual certificate executions",
474                LATENCY_SEC_BUCKETS.to_vec(),
475                registry,
476            )
477            .unwrap(),
478            execution_load_input_objects_latency: register_histogram_with_registry!(
479                "authority_state_execution_load_input_objects_latency",
480                "Latency of loading input objects for execution",
481                LOW_LATENCY_SEC_BUCKETS.to_vec(),
482                registry,
483            )
484            .unwrap(),
485            prepare_certificate_latency: register_histogram_with_registry!(
486                "authority_state_prepare_certificate_latency",
487                "Latency of executing certificates, before committing the results",
488                LATENCY_SEC_BUCKETS.to_vec(),
489                registry,
490            )
491            .unwrap(),
492            commit_certificate_latency: register_histogram_with_registry!(
493                "authority_state_commit_certificate_latency",
494                "Latency of committing certificate execution results",
495                LATENCY_SEC_BUCKETS.to_vec(),
496                registry,
497            )
498            .unwrap(),
499            db_checkpoint_latency: register_histogram_with_registry!(
500                "db_checkpoint_latency",
501                "Latency of checkpointing dbs",
502                LATENCY_SEC_BUCKETS.to_vec(),
503                registry,
504            ).unwrap(),
505            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
506                "transaction_manager_num_enqueued_certificates",
507                "Current number of certificates enqueued to ExecutionScheduler",
508                &["result"],
509                registry,
510            )
511            .unwrap(),
512            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
513                "transaction_manager_num_pending_certificates",
514                "Number of certificates pending in ExecutionScheduler, with at least 1 missing input object",
515                registry,
516            )
517            .unwrap(),
518            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
519                "transaction_manager_num_executing_certificates",
520                "Number of executing certificates, including queued and actually running certificates",
521                registry,
522            )
523            .unwrap(),
524            authority_overload_status: register_int_gauge_with_registry!(
525                "authority_overload_status",
526                "Whether authority is current experiencing overload and enters load shedding mode.",
527                registry)
528            .unwrap(),
529            authority_load_shedding_percentage: register_int_gauge_with_registry!(
530                "authority_load_shedding_percentage",
531                "The percentage of transactions is shed when the authority is in load shedding mode.",
532                registry)
533            .unwrap(),
534            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
535                "transaction_manager_transaction_queue_age_s",
536                "Time spent in waiting for transaction in the queue",
537                LATENCY_SEC_BUCKETS.to_vec(),
538                registry,
539            )
540            .unwrap(),
541            transaction_overload_sources: register_int_counter_vec_with_registry!(
542                "transaction_overload_sources",
543                "Number of times each source indicates transaction overload.",
544                &["source"],
545                registry)
546            .unwrap(),
547            execution_driver_executed_transactions: register_int_counter_with_registry!(
548                "execution_driver_executed_transactions",
549                "Cumulative number of transaction executed by execution driver",
550                registry,
551            )
552            .unwrap(),
553            execution_driver_paused_transactions: register_int_counter_with_registry!(
554                "execution_driver_paused_transactions",
555                "Cumulative number of transactions paused by execution driver",
556                registry,
557            )
558            .unwrap(),
559            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
560                "execution_driver_dispatch_queue",
561                "Number of transaction pending in execution driver dispatch queue",
562                registry,
563            )
564            .unwrap(),
565            execution_queueing_delay_s: register_histogram_with_registry!(
566                "execution_queueing_delay_s",
567                "Queueing delay between a transaction is ready for execution until it starts executing.",
568                LATENCY_SEC_BUCKETS.to_vec(),
569                registry
570            )
571            .unwrap(),
572            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
573                "prepare_cert_gas_latency_ratio",
574                "The ratio of computation gas divided by VM execution latency.",
575                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
576                registry
577            )
578            .unwrap(),
579            execution_gas_latency_ratio: register_histogram_with_registry!(
580                "execution_gas_latency_ratio",
581                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
582                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
583                registry
584            )
585            .unwrap(),
586            skipped_consensus_txns: register_int_counter_with_registry!(
587                "skipped_consensus_txns",
588                "Total number of consensus transactions skipped",
589                registry,
590            )
591            .unwrap(),
592            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
593                "skipped_consensus_txns_cache_hit",
594                "Total number of consensus transactions skipped because of local cache hit",
595                registry,
596            )
597            .unwrap(),
598            consensus_handler_duplicate_tx_count: register_histogram_with_registry!(
599                "consensus_handler_duplicate_tx_count",
600                "Number of times each transaction appears in its first consensus commit",
601                POSITIVE_INT_BUCKETS.to_vec(),
602                registry,
603            )
604            .unwrap(),
605            post_processing_total_events_emitted: register_int_counter_with_registry!(
606                "post_processing_total_events_emitted",
607                "Total number of events emitted in post processing",
608                registry,
609            )
610            .unwrap(),
611            post_processing_total_tx_indexed: register_int_counter_with_registry!(
612                "post_processing_total_tx_indexed",
613                "Total number of txes indexed in post processing",
614                registry,
615            )
616            .unwrap(),
617            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
618                "post_processing_total_tx_had_event_processed",
619                "Total number of txes finished event processing in post processing",
620                registry,
621            )
622            .unwrap(),
623            post_processing_total_failures: register_int_counter_with_registry!(
624                "post_processing_total_failures",
625                "Total number of failure in post processing",
626                registry,
627            )
628            .unwrap(),
629            consensus_handler_processed: register_int_counter_vec_with_registry!(
630                "consensus_handler_processed",
631                "Number of transactions processed by consensus handler",
632                &["class"],
633                registry
634            ).unwrap(),
635            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
636                "consensus_handler_transaction_sizes",
637                "Sizes of each type of transactions processed by consensus handler",
638                &["class"],
639                POSITIVE_INT_BUCKETS.to_vec(),
640                registry
641            ).unwrap(),
642            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
643                "consensus_handler_num_low_scoring_authorities",
644                "Number of low scoring authorities based on reputation scores from consensus",
645                registry
646            ).unwrap(),
647            consensus_handler_scores: register_int_gauge_vec_with_registry!(
648                "consensus_handler_scores",
649                "scores from consensus for each authority",
650                &["authority"],
651                registry,
652            ).unwrap(),
653            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
654                "consensus_handler_deferred_transactions",
655                "Number of transactions deferred by consensus handler",
656                registry,
657            ).unwrap(),
658            consensus_handler_congested_transactions: register_int_counter_with_registry!(
659                "consensus_handler_congested_transactions",
660                "Number of transactions deferred by consensus handler due to congestion",
661                registry,
662            ).unwrap(),
663            consensus_handler_unpaid_amplification_deferrals: register_int_counter_with_registry!(
664                "consensus_handler_unpaid_amplification_deferrals",
665                "Number of transactions deferred due to unpaid consensus amplification",
666                registry,
667            ).unwrap(),
668            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
669                "consensus_handler_cancelled_transactions",
670                "Number of transactions cancelled by consensus handler",
671                registry,
672            ).unwrap(),
673            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
674                "consensus_handler_max_congestion_control_object_costs",
675                "Max object costs for congestion control in the current consensus commit",
676                &["commit_type"],
677                registry,
678            ).unwrap(),
679            consensus_committed_subdags: register_int_counter_vec_with_registry!(
680                "consensus_committed_subdags",
681                "Number of committed subdags, sliced by author",
682                &["authority"],
683                registry,
684            ).unwrap(),
685            consensus_committed_messages: register_int_gauge_vec_with_registry!(
686                "consensus_committed_messages",
687                "Total number of committed consensus messages, sliced by author",
688                &["authority"],
689                registry,
690            ).unwrap(),
691            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
692                "consensus_committed_user_transactions",
693                "Number of certified & user transactions committed, sliced by submitter and persisted across restarts within each epoch",
694                &["authority"],
695                registry,
696            ).unwrap(),
697            consensus_finalized_user_transactions: register_int_gauge_vec_with_registry!(
698                "consensus_finalized_user_transactions",
699                "Number of user transactions finalized, sliced by submitter",
700                &["authority"],
701                registry,
702            ).unwrap(),
703            consensus_rejected_user_transactions: register_int_gauge_vec_with_registry!(
704                "consensus_rejected_user_transactions",
705                "Number of user transactions rejected, sliced by submitter",
706                &["authority"],
707                registry,
708            ).unwrap(),
709            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
710            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
711            zklogin_sig_count: register_int_counter_with_registry!(
712                "zklogin_sig_count",
713                "Count of zkLogin signatures",
714                registry,
715            )
716            .unwrap(),
717            multisig_sig_count: register_int_counter_with_registry!(
718                "multisig_sig_count",
719                "Count of zkLogin signatures",
720                registry,
721            )
722            .unwrap(),
723            consensus_calculated_throughput: register_int_gauge_with_registry!(
724                "consensus_calculated_throughput",
725                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
726                registry,
727            ).unwrap(),
728            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
729                "consensus_calculated_throughput_profile",
730                "The current active calculated throughput profile",
731                registry
732            ).unwrap(),
733            consensus_block_handler_block_processed: register_int_counter_with_registry!(
734                "consensus_block_handler_block_processed",
735                "Number of blocks processed by consensus block handler.",
736                registry
737            ).unwrap(),
738            consensus_block_handler_txn_processed: register_int_counter_vec_with_registry!(
739                "consensus_block_handler_txn_processed",
740                "Number of transactions processed by consensus block handler, by whether they are certified or rejected.",
741                &["outcome"],
742                registry
743            ).unwrap(),
744            consensus_block_handler_fastpath_executions: register_int_counter_with_registry!(
745                "consensus_block_handler_fastpath_executions",
746                "Number of fastpath transactions sent for execution by consensus transaction handler",
747                registry,
748            ).unwrap(),
749            consensus_timestamp_bias: register_histogram_with_registry!(
750                "consensus_timestamp_bias",
751                "Bias/delay from consensus timestamp to system time",
752                TIMESTAMP_BIAS_SEC_BUCKETS.to_vec(),
753                registry
754            ).unwrap(),
755            execution_queueing_latency: LatencyObserver::new(),
756            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
758        }
759    }
760}
761
762/// a Trait object for `Signer` that is:
763/// - Pin, i.e. confined to one place in memory (we don't want to copy private keys).
764/// - Sync, i.e. can be safely shared between threads.
765///
766/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
767///
768pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
769
770/// Execution env contains the "environment" for the transaction to be executed in, that is,
771/// all the information necessary for execution that is not specified by the transaction itself.
772#[derive(Debug, Clone)]
773pub struct ExecutionEnv {
774    /// The assigned version of each shared object for the transaction.
775    pub assigned_versions: AssignedVersions,
776    /// The expected digest of the effects of the transaction, if executing from checkpoint or
777    /// other sources where the effects are known in advance.
778    pub expected_effects_digest: Option<TransactionEffectsDigest>,
779    /// Status of the address funds withdraw scheduling of the transaction,
780    /// including both address and object funds withdraws.
781    pub funds_withdraw_status: FundsWithdrawStatus,
782    /// Transactions that must finish before this transaction can be executed.
783    /// Used to schedule barrier transactions after non-exclusive writes.
784    pub barrier_dependencies: Vec<TransactionDigest>,
785}
786
787impl Default for ExecutionEnv {
788    fn default() -> Self {
789        Self {
790            assigned_versions: Default::default(),
791            expected_effects_digest: None,
792            funds_withdraw_status: FundsWithdrawStatus::MaybeSufficient,
793            barrier_dependencies: Default::default(),
794        }
795    }
796}
797
798impl ExecutionEnv {
799    pub fn new() -> Self {
800        Default::default()
801    }
802
803    pub fn with_expected_effects_digest(
804        mut self,
805        expected_effects_digest: TransactionEffectsDigest,
806    ) -> Self {
807        self.expected_effects_digest = Some(expected_effects_digest);
808        self
809    }
810
811    pub fn with_assigned_versions(mut self, assigned_versions: AssignedVersions) -> Self {
812        self.assigned_versions = assigned_versions;
813        self
814    }
815
816    pub fn with_insufficient_funds(mut self) -> Self {
817        self.funds_withdraw_status = FundsWithdrawStatus::Insufficient;
818        self
819    }
820
821    pub fn with_barrier_dependencies(
822        mut self,
823        barrier_dependencies: BTreeSet<TransactionDigest>,
824    ) -> Self {
825        self.barrier_dependencies = barrier_dependencies.into_iter().collect();
826        self
827    }
828}
829
830#[derive(Debug)]
831pub struct ForkRecoveryState {
832    /// Transaction digest to effects digest overrides
833    transaction_overrides:
834        parking_lot::RwLock<HashMap<TransactionDigest, TransactionEffectsDigest>>,
835}
836
837impl Default for ForkRecoveryState {
838    fn default() -> Self {
839        Self {
840            transaction_overrides: parking_lot::RwLock::new(HashMap::new()),
841        }
842    }
843}
844
845impl ForkRecoveryState {
846    pub fn new(config: Option<&sui_config::node::ForkRecoveryConfig>) -> Result<Self, SuiError> {
847        let Some(config) = config else {
848            return Ok(Self::default());
849        };
850
851        let mut transaction_overrides = HashMap::new();
852        for (tx_digest_str, effects_digest_str) in &config.transaction_overrides {
853            let tx_digest = TransactionDigest::from_str(tx_digest_str).map_err(|_| {
854                SuiErrorKind::Unknown(format!("Invalid transaction digest: {}", tx_digest_str))
855            })?;
856            let effects_digest =
857                TransactionEffectsDigest::from_str(effects_digest_str).map_err(|_| {
858                    SuiErrorKind::Unknown(format!("Invalid effects digest: {}", effects_digest_str))
859                })?;
860            transaction_overrides.insert(tx_digest, effects_digest);
861        }
862
863        Ok(Self {
864            transaction_overrides: parking_lot::RwLock::new(transaction_overrides),
865        })
866    }
867
868    pub fn get_transaction_override(
869        &self,
870        tx_digest: &TransactionDigest,
871    ) -> Option<TransactionEffectsDigest> {
872        self.transaction_overrides.read().get(tx_digest).copied()
873    }
874}
875
876pub type PostProcessingOutput = (StagedBatch, IndexStoreCacheUpdates);
877
878pub struct AuthorityState {
879    // Fixed size, static, identity of the authority
880    /// The name of this authority.
881    pub name: AuthorityName,
882    /// The signature key of the authority.
883    pub secret: StableSyncAuthoritySigner,
884
885    /// The database
886    input_loader: TransactionInputLoader,
887    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
888    coin_reservation_resolver: Arc<CoinReservationResolver>,
889
890    epoch_store: ArcSwap<AuthorityPerEpochStore>,
891
892    /// This lock denotes current 'execution epoch'.
893    /// Execution acquires read lock, checks certificate epoch and holds it until all writes are complete.
894    /// Reconfiguration acquires write lock, changes the epoch and revert all transactions
895    /// from previous epoch that are executed but did not make into checkpoint.
896    execution_lock: RwLock<EpochId>,
897
898    pub indexes: Option<Arc<IndexStore>>,
899    pub rpc_index: Option<Arc<RpcIndexStore>>,
900
901    pub subscription_handler: Arc<SubscriptionHandler>,
902    pub checkpoint_store: Arc<CheckpointStore>,
903
904    committee_store: Arc<CommitteeStore>,
905
906    /// Schedules transaction execution.
907    execution_scheduler: Arc<ExecutionScheduler>,
908
909    /// Shuts down the execution task. Used only in testing.
910    #[allow(unused)]
911    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
912
913    pub metrics: Arc<AuthorityMetrics>,
914    _pruner: AuthorityStorePruner,
915    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
916
917    /// Take db checkpoints of different dbs
918    db_checkpoint_config: DBCheckpointConfig,
919
920    pub config: NodeConfig,
921
922    /// Current overload status in this authority. Updated periodically.
923    pub overload_info: AuthorityOverloadInfo,
924
925    /// The chain identifier is derived from the digest of the genesis checkpoint.
926    chain_identifier: ChainIdentifier,
927
928    pub(crate) congestion_tracker: Arc<CongestionTracker>,
929
930    /// Traffic controller for Sui core servers (json-rpc, validator service)
931    pub traffic_controller: Option<Arc<TrafficController>>,
932
933    /// Fork recovery state for handling equivocation after forks
934    fork_recovery_state: Option<ForkRecoveryState>,
935
936    /// Notification channel for reconfiguration
937    notify_epoch: tokio::sync::watch::Sender<EpochId>,
938
939    pub(crate) object_funds_checker: ArcSwapOption<ObjectFundsChecker>,
940    object_funds_checker_metrics: Arc<ObjectFundsCheckerMetrics>,
941
942    /// Tracks transactions whose post-processing (indexing/events) is still in flight.
943    /// CheckpointExecutor removes entries and collects the index batches before committing
944    /// them atomically at checkpoint boundaries.
945    pending_post_processing:
946        Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>>,
947
948    /// Limits the number of concurrent post-processing tasks to avoid overwhelming
949    /// the blocking thread pool. Defaults to the number of available CPUs.
950    post_processing_semaphore: Arc<tokio::sync::Semaphore>,
951}
952
953/// The authority state encapsulates all state, drives execution, and ensures safety.
954///
955/// Note the authority operations can be accessed through a read ref (&) and do not
956/// require &mut. Internally a database is synchronized through a mutex lock.
957///
958/// Repeating valid commands should produce no changes and return no error.
959impl AuthorityState {
960    pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
961        epoch_store.committee().authority_exists(&self.name)
962    }
963
964    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
965        !self.is_validator(epoch_store)
966    }
967
968    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
969        &self.committee_store
970    }
971
972    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
973        self.committee_store.clone()
974    }
975
976    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
977        &self.config.authority_overload_config
978    }
979
980    pub fn get_epoch_state_commitments(
981        &self,
982        epoch: EpochId,
983    ) -> SuiResult<Option<Vec<CheckpointCommitment>>> {
984        self.checkpoint_store.get_epoch_state_commitments(epoch)
985    }
986
987    /// Runs deny list checks and processes funds withdrawals. Called before loading input
988    /// objects, since these checks don't depend on object state.
989    fn pre_object_load_checks(
990        &self,
991        tx_data: &TransactionData,
992        tx_signatures: &[GenericSignature],
993        input_object_kinds: &[InputObjectKind],
994        receiving_objects_refs: &[ObjectRef],
995    ) -> SuiResult<BTreeMap<AccumulatorObjId, u64>> {
996        // Note: the deny checks may do redundant package loads but:
997        // - they only load packages when there is an active package deny map
998        // - the loads are cached anyway
999        sui_transaction_checks::deny::check_transaction_for_signing(
1000            tx_data,
1001            tx_signatures,
1002            input_object_kinds,
1003            receiving_objects_refs,
1004            &self.config.transaction_deny_config,
1005            self.get_backing_package_store().as_ref(),
1006        )?;
1007
1008        let declared_withdrawals = tx_data.process_funds_withdrawals_for_signing(
1009            self.chain_identifier,
1010            self.coin_reservation_resolver.as_ref(),
1011        )?;
1012
1013        self.execution_cache_trait_pointers
1014            .account_funds_read
1015            .check_amounts_available(&declared_withdrawals)?;
1016
1017        Ok(declared_withdrawals)
1018    }
1019
1020    fn handle_transaction_deny_checks(
1021        &self,
1022        transaction: &VerifiedTransaction,
1023        epoch_store: &Arc<AuthorityPerEpochStore>,
1024    ) -> SuiResult<CheckedInputObjects> {
1025        let tx_digest = transaction.digest();
1026        let tx_data = transaction.data().transaction_data();
1027
1028        let input_object_kinds = tx_data.input_objects()?;
1029        let receiving_objects_refs = tx_data.receiving_objects();
1030
1031        self.pre_object_load_checks(
1032            tx_data,
1033            transaction.tx_signatures(),
1034            &input_object_kinds,
1035            &receiving_objects_refs,
1036        )?;
1037
1038        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1039            Some(tx_digest),
1040            &input_object_kinds,
1041            &receiving_objects_refs,
1042            epoch_store.epoch(),
1043        )?;
1044
1045        let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
1046            epoch_store.protocol_config(),
1047            epoch_store.reference_gas_price(),
1048            tx_data,
1049            input_objects,
1050            &receiving_objects,
1051            &self.metrics.bytecode_verifier_metrics,
1052            &self.config.verifier_signing_config,
1053        )?;
1054
1055        self.handle_coin_deny_list_checks(
1056            tx_data,
1057            &checked_input_objects,
1058            &receiving_objects,
1059            epoch_store,
1060        )?;
1061
1062        Ok(checked_input_objects)
1063    }
1064
1065    fn handle_coin_deny_list_checks(
1066        &self,
1067        tx_data: &TransactionData,
1068        checked_input_objects: &CheckedInputObjects,
1069        receiving_objects: &ReceivingObjects,
1070        epoch_store: &Arc<AuthorityPerEpochStore>,
1071    ) -> SuiResult<()> {
1072        let funds_withdraw_types = tx_data
1073            .get_funds_withdrawals()
1074            .into_iter()
1075            .filter_map(|withdraw| {
1076                withdraw
1077                    .type_arg
1078                    .get_balance_type_param()
1079                    .map(|ty| ty.to_canonical_string(false))
1080            })
1081            .collect::<BTreeSet<_>>();
1082
1083        if epoch_store.coin_deny_list_v1_enabled() {
1084            check_coin_deny_list_v1(
1085                tx_data.sender(),
1086                checked_input_objects,
1087                receiving_objects,
1088                funds_withdraw_types.clone(),
1089                &self.get_object_store(),
1090            )?;
1091        }
1092
1093        if epoch_store.protocol_config().enable_coin_deny_list_v2() {
1094            check_coin_deny_list_v2_during_signing(
1095                tx_data.sender(),
1096                checked_input_objects,
1097                receiving_objects,
1098                funds_withdraw_types.clone(),
1099                &self.get_object_store(),
1100            )?;
1101        }
1102
1103        Ok(())
1104    }
1105
1106    /// Vote for a transaction, either when validator receives a submit_transaction request,
1107    /// or sees a transaction from consensus. Performs the same types of checks as
1108    /// transaction signing, but does not explicitly sign the transaction.
1109    /// Note that if the transaction has been executed, we still go through the
1110    /// same checks. If the transaction has only been executed through mysticeti fastpath,
1111    /// but not yet finalized, the signing will still work since the objects are still available.
1112    /// But if the transaction has been finalized, the signing will fail.
1113    /// TODO(mysticeti-fastpath): Assess whether we want to optimize the case when the transaction
1114    /// has already been finalized executed.
1115    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1116    pub fn handle_vote_transaction(
1117        &self,
1118        epoch_store: &Arc<AuthorityPerEpochStore>,
1119        transaction: VerifiedTransaction,
1120    ) -> SuiResult<()> {
1121        debug!("handle_vote_transaction");
1122
1123        let _metrics_guard = self
1124            .metrics
1125            .authority_state_handle_vote_transaction_latency
1126            .start_timer();
1127        self.metrics.tx_orders.inc();
1128
1129        // The should_accept_user_certs check here is best effort, because
1130        // between a validator signs a tx and a cert is formed, the validator
1131        // could close the window.
1132        if !epoch_store
1133            .get_reconfig_state_read_lock_guard()
1134            .should_accept_user_certs()
1135        {
1136            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1137        }
1138
1139        // Accept finalized transactions, instead of voting to reject them.
1140        // Checking executed transactions is limited to the current epoch.
1141        // Otherwise there can be a race where the transaction is accepted because it has executed,
1142        // but the executed effects are pruned post consensus, leading to failures.
1143        let tx_digest = *transaction.digest();
1144        if epoch_store.is_recently_finalized(&tx_digest)
1145            || epoch_store.transactions_executed_in_cur_epoch(&[tx_digest])?[0]
1146        {
1147            assert_reachable!("transaction recently executed");
1148            return Ok(());
1149        }
1150
1151        if self
1152            .get_transaction_cache_reader()
1153            .transaction_executed_in_last_epoch(transaction.digest(), epoch_store.epoch())
1154        {
1155            assert_reachable!("transaction executed in last epoch");
1156            return Err(SuiErrorKind::TransactionAlreadyExecuted {
1157                digest: (*transaction.digest()),
1158            }
1159            .into());
1160        }
1161
1162        // Ensure that validator cannot reconfigure while we are validating the transaction.
1163        let _execution_lock = self.execution_lock_for_validation()?;
1164
1165        let checked_input_objects =
1166            self.handle_transaction_deny_checks(&transaction, epoch_store)?;
1167
1168        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1169
1170        // Validate owned object versions without acquiring locks. Locking happens post-consensus
1171        // in the consensus handler. Validation still runs to prevent spam transactions with
1172        // invalid object versions, and is necessary to handle recently created objects.
1173        self.get_cache_writer()
1174            .validate_owned_object_versions(&owned_objects)
1175    }
1176
1177    /// Used for early client validation check for transactions before submission to server.
1178    /// Performs the same validation checks as handle_vote_transaction without acquiring locks.
1179    /// This allows for fast failure feedback to clients for non-retriable errors.
1180    ///
1181    /// The key addition is checking that owned object versions match live object versions.
1182    /// This is necessary because handle_transaction_deny_checks fetches objects at their
1183    /// requested version (which may exist in storage as historical versions), whereas
1184    /// validators check against the live/current version during locking (verify_live_object).
1185    pub fn check_transaction_validity(
1186        &self,
1187        epoch_store: &Arc<AuthorityPerEpochStore>,
1188        transaction: &VerifiedTransaction,
1189        enforce_live_input_objects: bool,
1190    ) -> SuiResult<()> {
1191        if !epoch_store
1192            .get_reconfig_state_read_lock_guard()
1193            .should_accept_user_certs()
1194        {
1195            return Err(SuiErrorKind::ValidatorHaltedAtEpochEnd.into());
1196        }
1197
1198        transaction.validity_check(&epoch_store.tx_validity_check_context())?;
1199
1200        let checked_input_objects =
1201            self.handle_transaction_deny_checks(transaction, epoch_store)?;
1202
1203        // Check that owned object versions match live objects
1204        let owned_objects = checked_input_objects.inner().filter_owned_objects();
1205        let cache_reader = self.get_object_cache_reader();
1206
1207        for obj_ref in &owned_objects {
1208            if let Some(live_object) = cache_reader.get_object(&obj_ref.0) {
1209                // Only reject if transaction references an old version. Allow newer
1210                // versions that may exist on validators but not yet synced to fullnode.
1211                if obj_ref.1 < live_object.version() {
1212                    return Err(SuiErrorKind::UserInputError {
1213                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1214                            provided_obj_ref: *obj_ref,
1215                            current_version: live_object.version(),
1216                        },
1217                    }
1218                    .into());
1219                }
1220
1221                if enforce_live_input_objects && live_object.version() < obj_ref.1 {
1222                    // Returns a non-retriable error when the input object version is required to be live.
1223                    return Err(SuiErrorKind::UserInputError {
1224                        error: UserInputError::ObjectVersionUnavailableForConsumption {
1225                            provided_obj_ref: *obj_ref,
1226                            current_version: live_object.version(),
1227                        },
1228                    }
1229                    .into());
1230                }
1231
1232                // If version matches, verify digest also matches
1233                if obj_ref.1 == live_object.version() && obj_ref.2 != live_object.digest() {
1234                    return Err(SuiErrorKind::UserInputError {
1235                        error: UserInputError::InvalidObjectDigest {
1236                            object_id: obj_ref.0,
1237                            expected_digest: live_object.digest(),
1238                        },
1239                    }
1240                    .into());
1241                }
1242            }
1243        }
1244
1245        Ok(())
1246    }
1247
1248    pub fn check_system_overload_at_signing(&self) -> bool {
1249        self.config
1250            .authority_overload_config
1251            .check_system_overload_at_signing
1252    }
1253
1254    pub fn check_system_overload_at_execution(&self) -> bool {
1255        self.config
1256            .authority_overload_config
1257            .check_system_overload_at_execution
1258    }
1259
1260    pub(crate) fn check_system_overload(
1261        &self,
1262        consensus_overload_checker: &(impl ConsensusOverloadChecker + ?Sized),
1263        tx_data: &SenderSignedData,
1264        do_authority_overload_check: bool,
1265    ) -> SuiResult {
1266        if do_authority_overload_check {
1267            self.check_authority_overload(tx_data).tap_err(|_| {
1268                self.update_overload_metrics("execution_queue");
1269            })?;
1270        }
1271        self.execution_scheduler
1272            .check_execution_overload(self.overload_config(), tx_data)
1273            .tap_err(|_| {
1274                self.update_overload_metrics("execution_pending");
1275            })?;
1276        consensus_overload_checker
1277            .check_consensus_overload()
1278            .tap_err(|_| {
1279                self.update_overload_metrics("consensus");
1280            })?;
1281
1282        let pending_tx_count = self
1283            .get_cache_commit()
1284            .approximate_pending_transaction_count();
1285        if pending_tx_count > self.config.execution_cache.backpressure_threshold_for_rpc() {
1286            return Err(SuiErrorKind::ValidatorOverloadedRetryAfter {
1287                retry_after_secs: 10,
1288            }
1289            .into());
1290        }
1291
1292        Ok(())
1293    }
1294
1295    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
1296        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1297            return Ok(());
1298        }
1299
1300        let load_shedding_percentage = self
1301            .overload_info
1302            .load_shedding_percentage
1303            .load(Ordering::Relaxed);
1304        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1305    }
1306
1307    fn update_overload_metrics(&self, source: &str) {
1308        self.metrics
1309            .transaction_overload_sources
1310            .with_label_values(&[source])
1311            .inc();
1312    }
1313
1314    /// Waits for fastpath (owned, package) dependency objects to become available.
1315    /// Returns true if a chosen set of fastpath dependency objects are available,
1316    /// returns false otherwise after an internal timeout.
1317    pub(crate) async fn wait_for_fastpath_dependency_objects(
1318        &self,
1319        transaction: &VerifiedTransaction,
1320        epoch: EpochId,
1321    ) -> SuiResult<bool> {
1322        let txn_data = transaction.data().transaction_data();
1323        let (move_objects, packages, receiving_objects) = txn_data.fastpath_dependency_objects()?;
1324
1325        // Gather and filter input objects to wait for.
1326        let fastpath_dependency_objects: Vec<_> = move_objects
1327            .into_iter()
1328            .filter_map(|obj_ref| self.should_wait_for_dependency_object(obj_ref))
1329            .chain(
1330                packages
1331                    .into_iter()
1332                    .map(|package_id| InputKey::Package { id: package_id }),
1333            )
1334            .collect();
1335        let receiving_keys: HashSet<_> = receiving_objects
1336            .into_iter()
1337            .filter_map(|receiving_obj_ref| {
1338                self.should_wait_for_dependency_object(receiving_obj_ref)
1339            })
1340            .collect();
1341        if fastpath_dependency_objects.is_empty() && receiving_keys.is_empty() {
1342            return Ok(true);
1343        }
1344
1345        // Use shorter wait timeout in simtests to exercise server-side error paths and
1346        // client-side retry logic.
1347        let max_wait = if cfg!(msim) {
1348            Duration::from_millis(200)
1349        } else {
1350            WAIT_FOR_FASTPATH_INPUT_TIMEOUT
1351        };
1352
1353        match timeout(
1354            max_wait,
1355            self.get_object_cache_reader().notify_read_input_objects(
1356                &fastpath_dependency_objects,
1357                &receiving_keys,
1358                epoch,
1359            ),
1360        )
1361        .await
1362        {
1363            Ok(()) => Ok(true),
1364            // Maybe return an error for unavailable input objects,
1365            // and allow the caller to skip the rest of input checks?
1366            Err(_) => Ok(false),
1367        }
1368    }
1369
1370    /// Returns Some(inputKey) if the object reference should be waited on until it is
1371    /// finalized, before proceeding to input checks.
1372    ///
1373    /// Incorrect decisions here should only affect user experience, not safety:
1374    /// - Waiting unnecessarily adds latency to transaction signing and submission.
1375    /// - Not waiting when needed may cause the transaction to be rejected because input object is unavailable.
1376    fn should_wait_for_dependency_object(&self, obj_ref: ObjectRef) -> Option<InputKey> {
1377        let (obj_id, cur_version, _digest) = obj_ref;
1378        let Some(latest_obj_ref) = self
1379            .get_object_cache_reader()
1380            .get_latest_object_ref_or_tombstone(obj_id)
1381        else {
1382            // Object might not have been created.
1383            return Some(InputKey::VersionedObject {
1384                id: FullObjectID::new(obj_id, None),
1385                version: cur_version,
1386            });
1387        };
1388        let latest_digest = latest_obj_ref.2;
1389        if latest_digest == ObjectDigest::OBJECT_DIGEST_DELETED {
1390            // Do not wait for deleted object and rely on input check instead.
1391            return None;
1392        }
1393        let latest_version = latest_obj_ref.1;
1394        if cur_version <= latest_version {
1395            // Do not wait for version that already exists or has been consumed.
1396            // Let the input check to handle them and return the proper error.
1397            return None;
1398        }
1399        // Wait for the object version to become available.
1400        Some(InputKey::VersionedObject {
1401            id: FullObjectID::new(obj_id, None),
1402            version: cur_version,
1403        })
1404    }
1405
1406    /// Wait for a transaction to be executed.
1407    /// For consensus transactions, it needs to be sequenced by the consensus.
1408    /// For owned object transactions, this function will enqueue the transaction for execution.
1409    ///
1410    /// Only use this in tests.
1411    #[instrument(level = "trace", skip_all)]
1412    pub async fn wait_for_transaction_execution_for_testing(
1413        &self,
1414        transaction: &VerifiedExecutableTransaction,
1415        epoch_store: &Arc<AuthorityPerEpochStore>,
1416    ) -> TransactionEffects {
1417        if !transaction.is_consensus_tx()
1418            && !epoch_store.protocol_config().disable_preconsensus_locking()
1419        {
1420            // Shared object transactions need to be sequenced by the consensus before enqueueing
1421            // for execution, done in AuthorityPerEpochStore::handle_consensus_transaction().
1422            //
1423            // For owned object transactions, they can be enqueued for execution immediately
1424            // ONLY when disable_preconsensus_locking is false (QD/original fastpath mode).
1425            // When disable_preconsensus_locking is true (MFP mode), all transactions including
1426            // owned object transactions must go through consensus before enqueuing for execution.
1427            self.execution_scheduler.enqueue(
1428                vec![(
1429                    Schedulable::Transaction(transaction.clone()),
1430                    ExecutionEnv::new(),
1431                )],
1432                epoch_store,
1433            );
1434        }
1435
1436        self.notify_read_effects_for_testing(
1437            "AuthorityState::wait_for_transaction_execution_for_testing",
1438            *transaction.digest(),
1439        )
1440        .await
1441    }
1442
1443    /// Internal logic to execute a certificate.
1444    ///
1445    /// Guarantees that
1446    /// - If input objects are available, return no permanent failure.
1447    /// - Execution and output commit are atomic. i.e. outputs are only written to storage,
1448    /// on successful execution; crashed execution has no observable effect and can be retried.
1449    ///
1450    /// It is caller's responsibility to ensure input objects are available and locks are set.
1451    /// If this cannot be satisfied by the caller, execute_certificate() should be called instead.
1452    ///
1453    /// Should only be called within sui-core.
1454    #[instrument(level = "trace", skip_all)]
1455    pub async fn try_execute_immediately(
1456        &self,
1457        certificate: &VerifiedExecutableTransaction,
1458        mut execution_env: ExecutionEnv,
1459        epoch_store: &Arc<AuthorityPerEpochStore>,
1460    ) -> ExecutionOutput<(TransactionEffects, Option<ExecutionError>)> {
1461        let _scope = monitored_scope("Execution::try_execute_immediately");
1462        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1463
1464        let tx_digest = certificate.digest();
1465
1466        if let Some(fork_recovery) = &self.fork_recovery_state
1467            && let Some(override_digest) = fork_recovery.get_transaction_override(tx_digest)
1468        {
1469            warn!(
1470                ?tx_digest,
1471                original_digest = ?execution_env.expected_effects_digest,
1472                override_digest = ?override_digest,
1473                "Applying fork recovery override for transaction effects digest"
1474            );
1475            execution_env.expected_effects_digest = Some(override_digest);
1476        }
1477
1478        // prevent concurrent executions of the same tx.
1479        let tx_guard = epoch_store.acquire_tx_guard(certificate);
1480
1481        let tx_cache_reader = self.get_transaction_cache_reader();
1482        if let Some(effects) = tx_cache_reader.get_executed_effects(tx_digest) {
1483            if let Some(expected_effects_digest) = execution_env.expected_effects_digest {
1484                assert_eq!(
1485                    effects.digest(),
1486                    expected_effects_digest,
1487                    "Unexpected effects digest for transaction {:?}",
1488                    tx_digest
1489                );
1490            }
1491            tx_guard.release();
1492            return ExecutionOutput::Success((effects, None));
1493        }
1494
1495        let execution_start_time = Instant::now();
1496
1497        // Any caller that verifies the signatures on the certificate will have already checked the
1498        // epoch. But paths that don't verify sigs (e.g. execution from checkpoint, reading from db)
1499        // present the possibility of an epoch mismatch. If this cert is not finalzied in previous
1500        // epoch, then it's invalid.
1501        let Some(execution_guard) = self.execution_lock_for_executable_transaction(certificate)
1502        else {
1503            tx_guard.release();
1504            return ExecutionOutput::EpochEnded;
1505        };
1506        // Since we obtain a reference to the epoch store before taking the execution lock, it's
1507        // possible that reconfiguration has happened and they no longer match.
1508        // TODO: We may not need the following check anymore since the scheduler
1509        // should have checked that the certificate is from the same epoch as epoch_store.
1510        if *execution_guard != epoch_store.epoch() {
1511            tx_guard.release();
1512            info!("The epoch of the execution_guard doesn't match the epoch store");
1513            return ExecutionOutput::EpochEnded;
1514        }
1515
1516        let accumulator_version = execution_env.assigned_versions.accumulator_version;
1517
1518        let (transaction_outputs, timings, execution_error_opt) = match self.process_certificate(
1519            &tx_guard,
1520            &execution_guard,
1521            certificate,
1522            execution_env,
1523            epoch_store,
1524        ) {
1525            ExecutionOutput::Success(result) => result,
1526            output => return output.unwrap_err(),
1527        };
1528        let transaction_outputs = Arc::new(transaction_outputs);
1529
1530        fail_point!("crash");
1531
1532        let effects = transaction_outputs.effects.clone();
1533        debug!(
1534            ?tx_digest,
1535            fx_digest=?effects.digest(),
1536            "process_certificate succeeded in {:.3}ms",
1537            (execution_start_time.elapsed().as_micros() as f64) / 1000.0
1538        );
1539
1540        let commit_result = self.commit_certificate(certificate, transaction_outputs, epoch_store);
1541        if let Err(err) = commit_result {
1542            error!(?tx_digest, "Error committing transaction: {err}");
1543            tx_guard.release();
1544            return ExecutionOutput::Fatal(err);
1545        }
1546
1547        if let TransactionKind::AuthenticatorStateUpdate(auth_state) =
1548            certificate.data().transaction_data().kind()
1549        {
1550            if let Some(err) = &execution_error_opt {
1551                debug_fatal!("Authenticator state update failed: {:?}", err);
1552            }
1553            epoch_store.update_authenticator_state(auth_state);
1554
1555            // double check that the signature verifier always matches the authenticator state
1556            if cfg!(debug_assertions) {
1557                let authenticator_state = get_authenticator_state(self.get_object_store())
1558                    .expect("Read cannot fail")
1559                    .expect("Authenticator state must exist");
1560
1561                let mut sys_jwks: Vec<_> = authenticator_state
1562                    .active_jwks
1563                    .into_iter()
1564                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1565                    .collect();
1566                let mut active_jwks: Vec<_> = epoch_store
1567                    .signature_verifier
1568                    .get_jwks()
1569                    .into_iter()
1570                    .collect();
1571                sys_jwks.sort();
1572                active_jwks.sort();
1573
1574                assert_eq!(sys_jwks, active_jwks);
1575            }
1576        }
1577
1578        // TODO: We should also settle address funds during execution,
1579        // instead of from checkpoint builder. It will improve performance
1580        // since we will be settling as soon as we can.
1581        if certificate
1582            .transaction_data()
1583            .kind()
1584            .is_accumulator_barrier_settle_tx()
1585        {
1586            let object_funds_checker = self.object_funds_checker.load();
1587            if let Some(object_funds_checker) = object_funds_checker.as_ref() {
1588                // unwrap safe because we assign accumulator version for every transaction
1589                // when accumulator is enabled.
1590                let next_accumulator_version = accumulator_version.unwrap().next();
1591                object_funds_checker.settle_accumulator_version(next_accumulator_version);
1592            }
1593        }
1594
1595        tx_guard.commit_tx();
1596
1597        let elapsed = execution_start_time.elapsed();
1598        epoch_store.record_local_execution_time(
1599            certificate.data().transaction_data(),
1600            &effects,
1601            timings,
1602            elapsed,
1603        );
1604
1605        let elapsed_us = elapsed.as_micros() as f64;
1606        if elapsed_us > 0.0 {
1607            self.metrics
1608                .execution_gas_latency_ratio
1609                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed_us);
1610        };
1611        ExecutionOutput::Success((effects, execution_error_opt))
1612    }
1613
1614    pub fn read_objects_for_execution(
1615        &self,
1616        tx_lock: &CertLockGuard,
1617        certificate: &VerifiedExecutableTransaction,
1618        assigned_shared_object_versions: &AssignedVersions,
1619        epoch_store: &Arc<AuthorityPerEpochStore>,
1620    ) -> SuiResult<InputObjects> {
1621        let _scope = monitored_scope("Execution::load_input_objects");
1622        let _metrics_guard = self
1623            .metrics
1624            .execution_load_input_objects_latency
1625            .start_timer();
1626        let input_objects = &certificate.data().transaction_data().input_objects()?;
1627        self.input_loader.read_objects_for_execution(
1628            &certificate.key(),
1629            tx_lock,
1630            input_objects,
1631            assigned_shared_object_versions,
1632            epoch_store.epoch(),
1633        )
1634    }
1635
1636    /// Test only wrapper for `try_execute_immediately()` above, useful for executing change epoch
1637    /// transactions. Assumes execution will always succeed.
1638    pub async fn try_execute_for_test(
1639        &self,
1640        certificate: &VerifiedCertificate,
1641        execution_env: ExecutionEnv,
1642    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1643        let epoch_store = self.epoch_store_for_testing();
1644        let (effects, execution_error_opt) = self
1645            .try_execute_immediately(
1646                &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1647                execution_env,
1648                &epoch_store,
1649            )
1650            .await
1651            .unwrap();
1652        let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1653        (signed_effects, execution_error_opt)
1654    }
1655
1656    pub async fn try_execute_executable_for_test(
1657        &self,
1658        executable: &VerifiedExecutableTransaction,
1659        execution_env: ExecutionEnv,
1660    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1661        let epoch_store = self.epoch_store_for_testing();
1662        let (effects, execution_error_opt) = self
1663            .try_execute_immediately(executable, execution_env, &epoch_store)
1664            .await
1665            .unwrap();
1666        self.flush_post_processing(executable.digest()).await;
1667        let signed_effects = self.sign_effects(effects, &epoch_store).unwrap();
1668        (signed_effects, execution_error_opt)
1669    }
1670
1671    /// Wait until the effects of the given transaction are available and return them.
1672    /// Panics if the effects are not found.
1673    ///
1674    /// Only use this in tests where effects are expected to exist.
1675    pub async fn notify_read_effects_for_testing(
1676        &self,
1677        task_name: &'static str,
1678        digest: TransactionDigest,
1679    ) -> TransactionEffects {
1680        self.get_transaction_cache_reader()
1681            .notify_read_executed_effects(task_name, &[digest])
1682            .await
1683            .pop()
1684            .expect("must return correct number of effects")
1685    }
1686
1687    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1688        self.get_object_cache_reader()
1689            .check_owned_objects_are_live(owned_object_refs)
1690    }
1691
1692    /// This function captures the required state to debug a forked transaction.
1693    /// The dump is written to a file in dir `path`, with name prefixed by the transaction digest.
1694    /// NOTE: Since this info escapes the validator context,
1695    /// make sure not to leak any private info here
1696    pub(crate) fn debug_dump_transaction_state(
1697        &self,
1698        tx_digest: &TransactionDigest,
1699        effects: &TransactionEffects,
1700        expected_effects_digest: TransactionEffectsDigest,
1701        inner_temporary_store: &InnerTemporaryStore,
1702        certificate: &VerifiedExecutableTransaction,
1703        debug_dump_config: &StateDebugDumpConfig,
1704    ) -> SuiResult<PathBuf> {
1705        let dump_dir = debug_dump_config
1706            .dump_file_directory
1707            .as_ref()
1708            .cloned()
1709            .unwrap_or(std::env::temp_dir());
1710        let epoch_store = self.load_epoch_store_one_call_per_task();
1711
1712        NodeStateDump::new(
1713            tx_digest,
1714            effects,
1715            expected_effects_digest,
1716            self.get_object_store().as_ref(),
1717            &epoch_store,
1718            inner_temporary_store,
1719            certificate,
1720        )?
1721        .write_to_file(&dump_dir)
1722        .map_err(|e| SuiErrorKind::FileIOError(e.to_string()).into())
1723    }
1724
1725    #[instrument(level = "trace", skip_all)]
1726    pub(crate) fn process_certificate(
1727        &self,
1728        tx_guard: &CertTxGuard,
1729        execution_guard: &ExecutionLockReadGuard<'_>,
1730        certificate: &VerifiedExecutableTransaction,
1731        execution_env: ExecutionEnv,
1732        epoch_store: &Arc<AuthorityPerEpochStore>,
1733    ) -> ExecutionOutput<(
1734        TransactionOutputs,
1735        Vec<ExecutionTiming>,
1736        Option<ExecutionError>,
1737    )> {
1738        let _scope = monitored_scope("Execution::process_certificate");
1739        let tx_digest = *certificate.digest();
1740
1741        let input_objects = match self.read_objects_for_execution(
1742            tx_guard.as_lock_guard(),
1743            certificate,
1744            &execution_env.assigned_versions,
1745            epoch_store,
1746        ) {
1747            Ok(objects) => objects,
1748            Err(e) => return ExecutionOutput::Fatal(e),
1749        };
1750
1751        let expected_effects_digest = match execution_env.expected_effects_digest {
1752            Some(expected_effects_digest) => Some(expected_effects_digest),
1753            None => {
1754                // We could be re-executing a previously executed but uncommitted transaction, perhaps after
1755                // restarting with a new binary. In this situation, if we have published an effects signature,
1756                // we must be sure not to equivocate.
1757                // TODO: read from cache instead of DB
1758                match epoch_store.get_signed_effects_digest(&tx_digest) {
1759                    Ok(digest) => digest,
1760                    Err(e) => return ExecutionOutput::Fatal(e),
1761                }
1762            }
1763        };
1764
1765        fail_point_if!("correlated-crash-process-certificate", || {
1766            if sui_simulator::random::deterministic_probability_once(&tx_digest, 0.01) {
1767                sui_simulator::task::kill_current_node(None);
1768            }
1769        });
1770
1771        // Errors originating from prepare_certificate may be transient (failure to read locks) or
1772        // non-transient (transaction input is invalid, move vm errors). However, all errors from
1773        // this function occur before we have written anything to the db, so we commit the tx
1774        // guard and rely on the client to retry the tx (if it was transient).
1775        self.execute_certificate(
1776            execution_guard,
1777            certificate,
1778            input_objects,
1779            expected_effects_digest,
1780            execution_env,
1781            epoch_store,
1782        )
1783    }
1784
1785    pub async fn reconfigure_traffic_control(
1786        &self,
1787        params: TrafficControlReconfigParams,
1788    ) -> Result<TrafficControlReconfigParams, SuiError> {
1789        if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1790            traffic_controller.admin_reconfigure(params).await
1791        } else {
1792            Err(SuiErrorKind::InvalidAdminRequest(
1793                "Traffic controller is not configured on this node".to_string(),
1794            )
1795            .into())
1796        }
1797    }
1798
1799    #[instrument(level = "trace", skip_all)]
1800    fn commit_certificate(
1801        &self,
1802        certificate: &VerifiedExecutableTransaction,
1803        transaction_outputs: Arc<TransactionOutputs>,
1804        epoch_store: &Arc<AuthorityPerEpochStore>,
1805    ) -> SuiResult {
1806        let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
1807            monitored_scope("Execution::commit_certificate");
1808        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1809
1810        let tx_digest = certificate.digest();
1811
1812        // The insertion to epoch_store is not atomic with the insertion to the perpetual store. This is OK because
1813        // we insert to the epoch store first. And during lookups we always look up in the perpetual store first.
1814        epoch_store.insert_executed_in_epoch(tx_digest);
1815        let key = certificate.key();
1816        if !matches!(key, TransactionKey::Digest(_)) {
1817            epoch_store.insert_tx_key(key, *tx_digest)?;
1818        }
1819
1820        // Allow testing what happens if we crash here.
1821        fail_point!("crash");
1822
1823        self.get_cache_writer()
1824            .write_transaction_outputs(epoch_store.epoch(), transaction_outputs);
1825
1826        // Fire an in-memory-only notification so the scheduler can detect that this
1827        // barrier transaction has been executed (e.g. by the checkpoint executor) and
1828        // stop waiting for the checkpoint builder.  We intentionally do NOT persist
1829        // this to the DB to avoid stale entries surviving a crash while effects may not.
1830        if let Some(settlement_key) = certificate
1831            .transaction_data()
1832            .kind()
1833            .accumulator_barrier_settlement_key()
1834        {
1835            epoch_store.notify_barrier_executed(settlement_key, *tx_digest);
1836        }
1837
1838        if certificate.transaction_data().is_end_of_epoch_tx() {
1839            // At the end of epoch, since system packages may have been upgraded, force
1840            // reload them in the cache.
1841            self.get_object_cache_reader()
1842                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1843        }
1844
1845        Ok(())
1846    }
1847
1848    fn update_metrics(
1849        &self,
1850        certificate: &VerifiedExecutableTransaction,
1851        inner_temporary_store: &InnerTemporaryStore,
1852        effects: &TransactionEffects,
1853    ) {
1854        // count signature by scheme, for zklogin and multisig
1855        if certificate.has_zklogin_sig() {
1856            self.metrics.zklogin_sig_count.inc();
1857        } else if certificate.has_upgraded_multisig() {
1858            self.metrics.multisig_sig_count.inc();
1859        }
1860
1861        self.metrics.total_effects.inc();
1862        self.metrics.total_certs.inc();
1863
1864        let consensus_object_count = effects.input_consensus_objects().len();
1865        if consensus_object_count > 0 {
1866            self.metrics.shared_obj_tx.inc();
1867        }
1868
1869        if certificate.is_sponsored_tx() {
1870            self.metrics.sponsored_tx.inc();
1871        }
1872
1873        let input_object_count = inner_temporary_store.input_objects.len();
1874        self.metrics
1875            .num_input_objs
1876            .observe(input_object_count as f64);
1877        self.metrics
1878            .num_shared_objects
1879            .observe(consensus_object_count as f64);
1880        self.metrics.batch_size.observe(
1881            certificate
1882                .data()
1883                .intent_message()
1884                .value
1885                .kind()
1886                .num_commands() as f64,
1887        );
1888    }
1889
1890    /// execute_certificate validates the transaction input, and executes the certificate,
1891    /// returning transaction outputs.
1892    ///
1893    /// It reads state from the db (both owned and shared locks), but it has no side effects.
1894    ///
1895    /// Executes a certificate and returns an ExecutionOutput.
1896    /// The function can fail with Fatal errors (e.g., the transaction input is invalid,
1897    /// locks are not held correctly, etc.) or transient errors (e.g., db read errors).
1898    #[instrument(level = "trace", skip_all)]
1899    fn execute_certificate(
1900        &self,
1901        _execution_guard: &ExecutionLockReadGuard<'_>,
1902        certificate: &VerifiedExecutableTransaction,
1903        input_objects: InputObjects,
1904        expected_effects_digest: Option<TransactionEffectsDigest>,
1905        execution_env: ExecutionEnv,
1906        epoch_store: &Arc<AuthorityPerEpochStore>,
1907    ) -> ExecutionOutput<(
1908        TransactionOutputs,
1909        Vec<ExecutionTiming>,
1910        Option<ExecutionError>,
1911    )> {
1912        let _scope = monitored_scope("Execution::prepare_certificate");
1913        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1914        let prepare_certificate_start_time = tokio::time::Instant::now();
1915
1916        // TODO: We need to move this to a more appropriate place to avoid redundant checks.
1917        let tx_data = certificate.data().transaction_data();
1918
1919        if let Err(e) = tx_data.validity_check(&epoch_store.tx_validity_check_context()) {
1920            return ExecutionOutput::Fatal(e);
1921        }
1922
1923        // The cost of partially re-auditing a transaction before execution is tolerated.
1924        // This step is required for correctness because, for example, ConsensusAddressOwner
1925        // object owner may have changed between signing and execution.
1926        let (gas_status, input_objects) = match sui_transaction_checks::check_certificate_input(
1927            certificate,
1928            input_objects,
1929            epoch_store.protocol_config(),
1930            epoch_store.reference_gas_price(),
1931        ) {
1932            Ok(result) => result,
1933            Err(e) => return ExecutionOutput::Fatal(e),
1934        };
1935
1936        let owned_object_refs = input_objects.inner().filter_owned_objects();
1937        if let Err(e) = self.check_owned_locks(&owned_object_refs) {
1938            return ExecutionOutput::Fatal(e);
1939        }
1940        let tx_digest = *certificate.digest();
1941        let protocol_config = epoch_store.protocol_config();
1942        let transaction_data = &certificate.data().intent_message().value;
1943        let (kind, signer, gas_data) = transaction_data.execution_parts();
1944        let early_execution_error = get_early_execution_error(
1945            &tx_digest,
1946            &input_objects,
1947            self.config.certificate_deny_config.certificate_deny_set(),
1948            &execution_env.funds_withdraw_status,
1949        );
1950        let execution_params = match early_execution_error {
1951            Some(error) => ExecutionOrEarlyError::Err(error),
1952            None => ExecutionOrEarlyError::Ok(()),
1953        };
1954
1955        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
1956
1957        #[allow(unused_mut)]
1958        let (inner_temp_store, _, mut effects, timings, execution_error_opt) =
1959            epoch_store.executor().execute_transaction_to_effects(
1960                &tracking_store,
1961                protocol_config,
1962                self.metrics.limits_metrics.clone(),
1963                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1964                // cyclic dependency w/ sui-adapter
1965                self.config
1966                    .expensive_safety_check_config
1967                    .enable_deep_per_tx_sui_conservation_check(),
1968                execution_params,
1969                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1970                epoch_store
1971                    .epoch_start_config()
1972                    .epoch_data()
1973                    .epoch_start_timestamp(),
1974                input_objects,
1975                gas_data,
1976                gas_status,
1977                kind,
1978                signer,
1979                tx_digest,
1980                &mut None,
1981            );
1982
1983        let object_funds_checker = self.object_funds_checker.load();
1984        if let Some(object_funds_checker) = object_funds_checker.as_ref()
1985            && !object_funds_checker.should_commit_object_funds_withdraws(
1986                certificate,
1987                effects.status(),
1988                &inner_temp_store.accumulator_running_max_withdraws,
1989                &execution_env,
1990                self.get_account_funds_read(),
1991                &self.execution_scheduler,
1992                epoch_store,
1993            )
1994        {
1995            assert_reachable!("retry object withdraw later");
1996            return ExecutionOutput::RetryLater;
1997        }
1998
1999        if let Some(expected_effects_digest) = expected_effects_digest
2000            && effects.digest() != expected_effects_digest
2001        {
2002            // We dont want to mask the original error, so we log it and continue.
2003            match self.debug_dump_transaction_state(
2004                &tx_digest,
2005                &effects,
2006                expected_effects_digest,
2007                &inner_temp_store,
2008                certificate,
2009                &self.config.state_debug_dump_config,
2010            ) {
2011                Ok(out_path) => {
2012                    info!(
2013                        "Dumped node state for transaction {} to {}",
2014                        tx_digest,
2015                        out_path.as_path().display().to_string()
2016                    );
2017                }
2018                Err(e) => {
2019                    error!("Error dumping state for transaction {}: {e}", tx_digest);
2020                }
2021            }
2022            let expected_effects = self
2023                .get_transaction_cache_reader()
2024                .get_effects(&expected_effects_digest);
2025            error!(
2026                ?tx_digest,
2027                ?expected_effects_digest,
2028                actual_effects = ?effects,
2029                expected_effects = ?expected_effects,
2030                "fork detected!"
2031            );
2032            if let Err(e) = self.checkpoint_store.record_transaction_fork_detected(
2033                tx_digest,
2034                expected_effects_digest,
2035                effects.digest(),
2036            ) {
2037                error!("Failed to record transaction fork: {e}");
2038            }
2039
2040            fail_point_if!("kill_transaction_fork_node", || {
2041                #[cfg(msim)]
2042                {
2043                    tracing::error!(
2044                        fatal = true,
2045                        "Fork recovery test: killing node due to transaction effects fork for digest: {}",
2046                        tx_digest
2047                    );
2048                    sui_simulator::task::shutdown_current_node();
2049                }
2050            });
2051
2052            fatal!(
2053                "Transaction {} is expected to have effects digest {}, but got {}!",
2054                tx_digest,
2055                expected_effects_digest,
2056                effects.digest()
2057            );
2058        }
2059
2060        fail_point_arg!("simulate_fork_during_execution", |(
2061            forked_validators,
2062            full_halt,
2063            effects_overrides,
2064            fork_probability,
2065        ): (
2066            std::sync::Arc<
2067                std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2068            >,
2069            bool,
2070            std::sync::Arc<std::sync::Mutex<std::collections::BTreeMap<String, String>>>,
2071            f32,
2072        )| {
2073            #[cfg(msim)]
2074            self.simulate_fork_during_execution(
2075                certificate,
2076                epoch_store,
2077                &mut effects,
2078                forked_validators,
2079                full_halt,
2080                effects_overrides,
2081                fork_probability,
2082            );
2083        });
2084
2085        let unchanged_loaded_runtime_objects =
2086            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2087                certificate.transaction_data(),
2088                &effects,
2089                &tracking_store.into_read_objects(),
2090            );
2091
2092        // index certificate
2093        let _ = self
2094            .post_process_one_tx(certificate, &effects, &inner_temp_store, epoch_store)
2095            .tap_err(|e| {
2096                self.metrics.post_processing_total_failures.inc();
2097                error!(?tx_digest, "tx post processing failed: {e}");
2098            });
2099
2100        self.update_metrics(certificate, &inner_temp_store, &effects);
2101
2102        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
2103            certificate.clone().into_unsigned(),
2104            effects,
2105            inner_temp_store,
2106            unchanged_loaded_runtime_objects,
2107        );
2108
2109        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
2110        if elapsed > 0.0 {
2111            self.metrics.prepare_cert_gas_latency_ratio.observe(
2112                transaction_outputs
2113                    .effects
2114                    .gas_cost_summary()
2115                    .computation_cost as f64
2116                    / elapsed,
2117            );
2118        }
2119
2120        ExecutionOutput::Success((transaction_outputs, timings, execution_error_opt.err()))
2121    }
2122
2123    pub fn prepare_certificate_for_benchmark(
2124        &self,
2125        certificate: &VerifiedExecutableTransaction,
2126        input_objects: InputObjects,
2127        epoch_store: &Arc<AuthorityPerEpochStore>,
2128    ) -> SuiResult<(TransactionOutputs, Option<ExecutionError>)> {
2129        let lock = RwLock::new(epoch_store.epoch());
2130        let execution_guard = lock.try_read().unwrap();
2131
2132        let (transaction_outputs, _timings, execution_error_opt) = self
2133            .execute_certificate(
2134                &execution_guard,
2135                certificate,
2136                input_objects,
2137                None,
2138                ExecutionEnv::default(),
2139                epoch_store,
2140            )
2141            .unwrap();
2142        Ok((transaction_outputs, execution_error_opt))
2143    }
2144
2145    #[instrument(skip_all)]
2146    #[allow(clippy::type_complexity)]
2147    pub async fn dry_exec_transaction(
2148        &self,
2149        transaction: TransactionData,
2150        transaction_digest: TransactionDigest,
2151    ) -> SuiResult<(
2152        DryRunTransactionBlockResponse,
2153        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2154        TransactionEffects,
2155        Option<ObjectID>,
2156    )> {
2157        let epoch_store = self.load_epoch_store_one_call_per_task();
2158        if !self.is_fullnode(&epoch_store) {
2159            return Err(SuiErrorKind::UnsupportedFeatureError {
2160                error: "dry-exec is only supported on fullnodes".to_string(),
2161            }
2162            .into());
2163        }
2164
2165        if transaction.kind().is_system_tx() {
2166            return Err(SuiErrorKind::UnsupportedFeatureError {
2167                error: "dry-exec does not support system transactions".to_string(),
2168            }
2169            .into());
2170        }
2171
2172        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2173    }
2174
2175    #[allow(clippy::type_complexity)]
2176    pub fn dry_exec_transaction_for_benchmark(
2177        &self,
2178        transaction: TransactionData,
2179        transaction_digest: TransactionDigest,
2180    ) -> SuiResult<(
2181        DryRunTransactionBlockResponse,
2182        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2183        TransactionEffects,
2184        Option<ObjectID>,
2185    )> {
2186        let epoch_store = self.load_epoch_store_one_call_per_task();
2187        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2188    }
2189
2190    #[allow(clippy::type_complexity)]
2191    fn dry_exec_transaction_impl(
2192        &self,
2193        epoch_store: &AuthorityPerEpochStore,
2194        transaction: TransactionData,
2195        transaction_digest: TransactionDigest,
2196    ) -> SuiResult<(
2197        DryRunTransactionBlockResponse,
2198        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
2199        TransactionEffects,
2200        Option<ObjectID>,
2201    )> {
2202        // Cheap validity checks for a transaction, including input size limits.
2203        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2204
2205        let input_object_kinds = transaction.input_objects()?;
2206        let receiving_object_refs = transaction.receiving_objects();
2207
2208        sui_transaction_checks::deny::check_transaction_for_signing(
2209            &transaction,
2210            &[],
2211            &input_object_kinds,
2212            &receiving_object_refs,
2213            &self.config.transaction_deny_config,
2214            self.get_backing_package_store().as_ref(),
2215        )?;
2216
2217        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2218            // We don't want to cache this transaction since it's a dry run.
2219            None,
2220            &input_object_kinds,
2221            &receiving_object_refs,
2222            epoch_store.epoch(),
2223        )?;
2224
2225        // make a gas object if one was not provided
2226        let mut gas_data = transaction.gas_data().clone();
2227        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2228            let sender = transaction.sender();
2229            // use a 1B sui coin
2230            const MIST_TO_SUI: u64 = 1_000_000_000;
2231            const DRY_RUN_SUI: u64 = 1_000_000_000;
2232            let max_coin_value = MIST_TO_SUI * DRY_RUN_SUI;
2233            let gas_object_id = ObjectID::random();
2234            let gas_object = Object::new_move(
2235                MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
2236                Owner::AddressOwner(sender),
2237                TransactionDigest::genesis_marker(),
2238            );
2239            let gas_object_ref = gas_object.compute_object_reference();
2240            gas_data.payment = vec![gas_object_ref];
2241            (
2242                sui_transaction_checks::check_transaction_input_with_given_gas(
2243                    epoch_store.protocol_config(),
2244                    epoch_store.reference_gas_price(),
2245                    &transaction,
2246                    input_objects,
2247                    receiving_objects,
2248                    gas_object,
2249                    &self.metrics.bytecode_verifier_metrics,
2250                    &self.config.verifier_signing_config,
2251                )?,
2252                Some(gas_object_id),
2253            )
2254        } else {
2255            (
2256                sui_transaction_checks::check_transaction_input(
2257                    epoch_store.protocol_config(),
2258                    epoch_store.reference_gas_price(),
2259                    &transaction,
2260                    input_objects,
2261                    &receiving_objects,
2262                    &self.metrics.bytecode_verifier_metrics,
2263                    &self.config.verifier_signing_config,
2264                )?,
2265                None,
2266            )
2267        };
2268
2269        let protocol_config = epoch_store.protocol_config();
2270        let (kind, signer, _) = transaction.execution_parts();
2271
2272        let silent = true;
2273        let executor = sui_execution::executor(protocol_config, silent)
2274            .expect("Creating an executor should not fail here");
2275
2276        let expensive_checks = false;
2277        let early_execution_error = get_early_execution_error(
2278            &transaction_digest,
2279            &checked_input_objects,
2280            self.config.certificate_deny_config.certificate_deny_set(),
2281            // TODO(address-balances): This does not currently support balance withdraws properly.
2282            // For address balance withdraws, this cannot detect insufficient balance. We need to
2283            // first check if the balance is sufficient, similar to how we schedule withdraws.
2284            // For object balance withdraws, we need to handle the case where object balance is
2285            // insufficient in the post-execution.
2286            &FundsWithdrawStatus::MaybeSufficient,
2287        );
2288        let execution_params = match early_execution_error {
2289            Some(error) => ExecutionOrEarlyError::Err(error),
2290            None => ExecutionOrEarlyError::Ok(()),
2291        };
2292        let (inner_temp_store, _, effects, _timings, execution_error) = executor
2293            .execute_transaction_to_effects(
2294                self.get_backing_store().as_ref(),
2295                protocol_config,
2296                self.metrics.limits_metrics.clone(),
2297                expensive_checks,
2298                execution_params,
2299                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2300                epoch_store
2301                    .epoch_start_config()
2302                    .epoch_data()
2303                    .epoch_start_timestamp(),
2304                checked_input_objects,
2305                gas_data,
2306                gas_status,
2307                kind,
2308                signer,
2309                transaction_digest,
2310                &mut None,
2311            );
2312        let tx_digest = *effects.transaction_digest();
2313
2314        let module_cache =
2315            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2316
2317        let mut layout_resolver =
2318            epoch_store
2319                .executor()
2320                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2321                    &inner_temp_store,
2322                    self.get_backing_package_store(),
2323                )));
2324        // Returning empty vector here because we recalculate changes in the rpc layer.
2325        let object_changes = Vec::new();
2326
2327        // Returning empty vector here because we recalculate changes in the rpc layer.
2328        let balance_changes = Vec::new();
2329
2330        let written_with_kind = effects
2331            .created()
2332            .into_iter()
2333            .map(|(oref, _)| (oref, WriteKind::Create))
2334            .chain(
2335                effects
2336                    .unwrapped()
2337                    .into_iter()
2338                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2339            )
2340            .chain(
2341                effects
2342                    .mutated()
2343                    .into_iter()
2344                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2345            )
2346            .map(|(oref, kind)| {
2347                let obj = inner_temp_store.written.get(&oref.0).unwrap();
2348                // TODO: Avoid clones.
2349                (oref.0, (oref, obj.clone(), kind))
2350            })
2351            .collect();
2352
2353        let execution_error_source = execution_error
2354            .as_ref()
2355            .err()
2356            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2357
2358        Ok((
2359            DryRunTransactionBlockResponse {
2360                suggested_gas_price: self
2361                    .congestion_tracker
2362                    .get_suggested_gas_prices(&transaction),
2363                input: SuiTransactionBlockData::try_from_with_module_cache(
2364                    transaction,
2365                    &module_cache,
2366                )
2367                .map_err(|e| SuiErrorKind::TransactionSerializationError {
2368                    error: format!(
2369                        "Failed to convert transaction to SuiTransactionBlockData: {}",
2370                        e
2371                    ),
2372                })?, // TODO: replace the underlying try_from to SuiError. This one goes deep
2373                effects: effects.clone().try_into()?,
2374                events: SuiTransactionBlockEvents::try_from(
2375                    inner_temp_store.events.clone(),
2376                    tx_digest,
2377                    None,
2378                    layout_resolver.as_mut(),
2379                )?,
2380                object_changes,
2381                balance_changes,
2382                execution_error_source,
2383            },
2384            written_with_kind,
2385            effects,
2386            mock_gas,
2387        ))
2388    }
2389
2390    pub fn simulate_transaction(
2391        &self,
2392        mut transaction: TransactionData,
2393        checks: TransactionChecks,
2394        allow_mock_gas_coin: bool,
2395    ) -> SuiResult<SimulateTransactionResult> {
2396        if transaction.kind().is_system_tx() {
2397            return Err(SuiErrorKind::UnsupportedFeatureError {
2398                error: "simulate does not support system transactions".to_string(),
2399            }
2400            .into());
2401        }
2402
2403        let epoch_store = self.load_epoch_store_one_call_per_task();
2404        if !self.is_fullnode(&epoch_store) {
2405            return Err(SuiErrorKind::UnsupportedFeatureError {
2406                error: "simulate is only supported on fullnodes".to_string(),
2407            }
2408            .into());
2409        }
2410
2411        let dev_inspect = checks.disabled();
2412        if dev_inspect && self.config.dev_inspect_disabled {
2413            return Err(SuiErrorKind::UnsupportedFeatureError {
2414                error: "simulate with checks disabled is not allowed on this node".to_string(),
2415            }
2416            .into());
2417        }
2418
2419        // Cheap validity checks for a transaction, including input size limits.
2420        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2421
2422        let input_object_kinds = transaction.input_objects()?;
2423        let receiving_object_refs = transaction.receiving_objects();
2424
2425        // Create and inject mock gas coin before pre_object_load_checks so that
2426        // funds withdrawal processing sees non-empty payment and doesn't incorrectly
2427        // create an address balance withdrawal for gas.
2428        let mock_gas_object = if allow_mock_gas_coin && transaction.gas().is_empty() {
2429            let obj = Object::new_move(
2430                MoveObject::new_gas_coin(
2431                    OBJECT_START_VERSION,
2432                    ObjectID::MAX,
2433                    DEV_INSPECT_GAS_COIN_VALUE,
2434                ),
2435                Owner::AddressOwner(transaction.gas_data().owner),
2436                TransactionDigest::genesis_marker(),
2437            );
2438            transaction.gas_data_mut().payment = vec![obj.compute_object_reference()];
2439            Some(obj)
2440        } else {
2441            None
2442        };
2443
2444        let declared_withdrawals = self.pre_object_load_checks(
2445            &transaction,
2446            &[],
2447            &input_object_kinds,
2448            &receiving_object_refs,
2449        )?;
2450        let address_funds: BTreeSet<_> = declared_withdrawals.keys().cloned().collect();
2451
2452        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2453            // We don't want to cache this transaction since it's a simulation.
2454            None,
2455            &input_object_kinds,
2456            &receiving_object_refs,
2457            epoch_store.epoch(),
2458        )?;
2459
2460        // Add mock gas to input objects after loading (it doesn't exist in the store).
2461        let mock_gas_id = mock_gas_object.map(|obj| {
2462            let id = obj.id();
2463            input_objects.push(ObjectReadResult::new_from_gas_object(&obj));
2464            id
2465        });
2466
2467        let protocol_config = epoch_store.protocol_config();
2468
2469        let (gas_status, checked_input_objects) = if dev_inspect {
2470            sui_transaction_checks::check_dev_inspect_input(
2471                protocol_config,
2472                &transaction,
2473                input_objects,
2474                receiving_objects,
2475                epoch_store.reference_gas_price(),
2476            )?
2477        } else {
2478            sui_transaction_checks::check_transaction_input(
2479                epoch_store.protocol_config(),
2480                epoch_store.reference_gas_price(),
2481                &transaction,
2482                input_objects,
2483                &receiving_objects,
2484                &self.metrics.bytecode_verifier_metrics,
2485                &self.config.verifier_signing_config,
2486            )?
2487        };
2488
2489        // TODO see if we can spin up a VM once and reuse it
2490        let executor = sui_execution::executor(
2491            protocol_config,
2492            true, // silent
2493        )
2494        .expect("Creating an executor should not fail here");
2495
2496        let (kind, signer, gas_data) = transaction.execution_parts();
2497        let early_execution_error = get_early_execution_error(
2498            &transaction.digest(),
2499            &checked_input_objects,
2500            self.config.certificate_deny_config.certificate_deny_set(),
2501            &FundsWithdrawStatus::MaybeSufficient,
2502        );
2503        let execution_params = match early_execution_error {
2504            Some(error) => ExecutionOrEarlyError::Err(error),
2505            None => ExecutionOrEarlyError::Ok(()),
2506        };
2507
2508        let tracking_store = TrackingBackingStore::new(self.get_backing_store().as_ref());
2509
2510        // Clone inputs for potential retry if object funds check fails post-execution.
2511        let cloned_input_objects = checked_input_objects.clone();
2512        let cloned_gas = gas_data.clone();
2513        let cloned_kind = kind.clone();
2514        let tx_digest = transaction.digest();
2515        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
2516        let epoch_timestamp_ms = epoch_store
2517            .epoch_start_config()
2518            .epoch_data()
2519            .epoch_start_timestamp();
2520        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2521            &tracking_store,
2522            protocol_config,
2523            self.metrics.limits_metrics.clone(),
2524            false, // expensive_checks
2525            execution_params,
2526            &epoch_id,
2527            epoch_timestamp_ms,
2528            checked_input_objects,
2529            gas_data,
2530            gas_status,
2531            kind,
2532            signer,
2533            tx_digest,
2534            dev_inspect,
2535        );
2536
2537        // Post-execution: check object funds (non-address withdrawals discovered during execution).
2538        let (inner_temp_store, effects, execution_result) = if execution_result.is_ok() {
2539            let has_insufficient_object_funds = inner_temp_store
2540                .accumulator_running_max_withdraws
2541                .iter()
2542                .filter(|(id, _)| !address_funds.contains(id))
2543                .any(|(id, max_withdraw)| {
2544                    let (balance, _) = self.get_account_funds_read().get_latest_account_amount(id);
2545                    balance < *max_withdraw
2546                });
2547
2548            if has_insufficient_object_funds {
2549                let retry_gas_status = SuiGasStatus::new(
2550                    cloned_gas.budget,
2551                    cloned_gas.price,
2552                    epoch_store.reference_gas_price(),
2553                    protocol_config,
2554                )?;
2555                let (store, _, effects, result) = executor.dev_inspect_transaction(
2556                    &tracking_store,
2557                    protocol_config,
2558                    self.metrics.limits_metrics.clone(),
2559                    false,
2560                    ExecutionOrEarlyError::Err(ExecutionErrorKind::InsufficientFundsForWithdraw),
2561                    &epoch_id,
2562                    epoch_timestamp_ms,
2563                    cloned_input_objects,
2564                    cloned_gas,
2565                    retry_gas_status,
2566                    cloned_kind,
2567                    signer,
2568                    tx_digest,
2569                    dev_inspect,
2570                );
2571                (store, effects, result)
2572            } else {
2573                (inner_temp_store, effects, execution_result)
2574            }
2575        } else {
2576            (inner_temp_store, effects, execution_result)
2577        };
2578
2579        let loaded_runtime_objects = tracking_store.into_read_objects();
2580        let unchanged_loaded_runtime_objects =
2581            crate::transaction_outputs::unchanged_loaded_runtime_objects(
2582                &transaction,
2583                &effects,
2584                &loaded_runtime_objects,
2585            );
2586
2587        let object_set = {
2588            let objects = {
2589                let mut objects = loaded_runtime_objects;
2590
2591                for o in inner_temp_store
2592                    .input_objects
2593                    .into_values()
2594                    .chain(inner_temp_store.written.into_values())
2595                {
2596                    objects.insert(o);
2597                }
2598
2599                objects
2600            };
2601
2602            let object_keys = sui_types::storage::get_transaction_object_set(
2603                &transaction,
2604                &effects,
2605                &unchanged_loaded_runtime_objects,
2606            );
2607
2608            let mut set = sui_types::full_checkpoint_content::ObjectSet::default();
2609            for k in object_keys {
2610                if let Some(o) = objects.get(&k) {
2611                    set.insert(o.clone());
2612                }
2613            }
2614
2615            set
2616        };
2617
2618        Ok(SimulateTransactionResult {
2619            objects: object_set,
2620            events: effects.events_digest().map(|_| inner_temp_store.events),
2621            effects,
2622            execution_result,
2623            mock_gas_id,
2624            unchanged_loaded_runtime_objects,
2625            suggested_gas_price: self
2626                .congestion_tracker
2627                .get_suggested_gas_prices(&transaction),
2628        })
2629    }
2630
2631    /// The object ID for gas can be any object ID, even for an uncreated object
2632    #[allow(clippy::collapsible_else_if)]
2633    #[instrument(skip_all)]
2634    pub async fn dev_inspect_transaction_block(
2635        &self,
2636        sender: SuiAddress,
2637        transaction_kind: TransactionKind,
2638        gas_price: Option<u64>,
2639        gas_budget: Option<u64>,
2640        gas_sponsor: Option<SuiAddress>,
2641        gas_objects: Option<Vec<ObjectRef>>,
2642        show_raw_txn_data_and_effects: Option<bool>,
2643        skip_checks: Option<bool>,
2644    ) -> SuiResult<DevInspectResults> {
2645        let epoch_store = self.load_epoch_store_one_call_per_task();
2646
2647        if !self.is_fullnode(&epoch_store) {
2648            return Err(SuiErrorKind::UnsupportedFeatureError {
2649                error: "dev-inspect is only supported on fullnodes".to_string(),
2650            }
2651            .into());
2652        }
2653
2654        if transaction_kind.is_system_tx() {
2655            return Err(SuiErrorKind::UnsupportedFeatureError {
2656                error: "system transactions are not supported".to_string(),
2657            }
2658            .into());
2659        }
2660
2661        let skip_checks = skip_checks.unwrap_or(true);
2662        if skip_checks && self.config.dev_inspect_disabled {
2663            return Err(SuiErrorKind::UnsupportedFeatureError {
2664                error: "dev-inspect with skip_checks is not allowed on this node".to_string(),
2665            }
2666            .into());
2667        }
2668
2669        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2670        let reference_gas_price = epoch_store.reference_gas_price();
2671        let protocol_config = epoch_store.protocol_config();
2672        let max_tx_gas = protocol_config.max_tx_gas();
2673
2674        let price = gas_price.unwrap_or(reference_gas_price);
2675        let budget = gas_budget.unwrap_or(max_tx_gas);
2676        let owner = gas_sponsor.unwrap_or(sender);
2677        // Payment might be empty here, but it's fine we'll have to deal with it later after reading all the input objects.
2678        let payment = gas_objects.unwrap_or_default();
2679        let mut transaction = TransactionData::V1(TransactionDataV1 {
2680            kind: transaction_kind.clone(),
2681            sender,
2682            gas_data: GasData {
2683                payment,
2684                owner,
2685                price,
2686                budget,
2687            },
2688            expiration: TransactionExpiration::None,
2689        });
2690
2691        let raw_txn_data = if show_raw_txn_data_and_effects {
2692            bcs::to_bytes(&transaction).map_err(|_| {
2693                SuiErrorKind::TransactionSerializationError {
2694                    error: "Failed to serialize transaction during dev inspect".to_string(),
2695                }
2696            })?
2697        } else {
2698            vec![]
2699        };
2700
2701        transaction.validity_check_no_gas_check(protocol_config)?;
2702
2703        let input_object_kinds = transaction.input_objects()?;
2704        let receiving_object_refs = transaction.receiving_objects();
2705
2706        sui_transaction_checks::deny::check_transaction_for_signing(
2707            &transaction,
2708            &[],
2709            &input_object_kinds,
2710            &receiving_object_refs,
2711            &self.config.transaction_deny_config,
2712            self.get_backing_package_store().as_ref(),
2713        )?;
2714
2715        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2716            // We don't want to cache this transaction since it's a dev inspect.
2717            None,
2718            &input_object_kinds,
2719            &receiving_object_refs,
2720            epoch_store.epoch(),
2721        )?;
2722
2723        let (gas_status, checked_input_objects) = if skip_checks {
2724            // If we are skipping checks, then we call the check_dev_inspect_input function which will perform
2725            // only lightweight checks on the transaction input. And if the gas field is empty, that means we will
2726            // use the dummy gas object so we need to add it to the input objects vector.
2727            if transaction.gas().is_empty() {
2728                // Create and use a dummy gas object if there is no gas object provided.
2729                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2730                    DEV_INSPECT_GAS_COIN_VALUE,
2731                    transaction.gas_owner(),
2732                );
2733                let gas_object_ref = dummy_gas_object.compute_object_reference();
2734                transaction.gas_data_mut().payment = vec![gas_object_ref];
2735                input_objects.push(ObjectReadResult::new(
2736                    InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2737                    dummy_gas_object.into(),
2738                ));
2739            }
2740            sui_transaction_checks::check_dev_inspect_input(
2741                protocol_config,
2742                &transaction,
2743                input_objects,
2744                receiving_objects,
2745                reference_gas_price,
2746            )?
2747        } else {
2748            // If we are not skipping checks, then we call the check_transaction_input function and its dummy gas
2749            // variant which will perform full fledged checks just like a real transaction execution.
2750            if transaction.gas().is_empty() {
2751                // Create and use a dummy gas object if there is no gas object provided.
2752                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2753                    DEV_INSPECT_GAS_COIN_VALUE,
2754                    transaction.gas_owner(),
2755                );
2756                let gas_object_ref = dummy_gas_object.compute_object_reference();
2757                transaction.gas_data_mut().payment = vec![gas_object_ref];
2758                sui_transaction_checks::check_transaction_input_with_given_gas(
2759                    epoch_store.protocol_config(),
2760                    epoch_store.reference_gas_price(),
2761                    &transaction,
2762                    input_objects,
2763                    receiving_objects,
2764                    dummy_gas_object,
2765                    &self.metrics.bytecode_verifier_metrics,
2766                    &self.config.verifier_signing_config,
2767                )?
2768            } else {
2769                sui_transaction_checks::check_transaction_input(
2770                    epoch_store.protocol_config(),
2771                    epoch_store.reference_gas_price(),
2772                    &transaction,
2773                    input_objects,
2774                    &receiving_objects,
2775                    &self.metrics.bytecode_verifier_metrics,
2776                    &self.config.verifier_signing_config,
2777                )?
2778            }
2779        };
2780
2781        let executor = sui_execution::executor(protocol_config, /* silent */ true)
2782            .expect("Creating an executor should not fail here");
2783        let gas_data = transaction.gas_data().clone();
2784        let intent_msg = IntentMessage::new(
2785            Intent {
2786                version: IntentVersion::V0,
2787                scope: IntentScope::TransactionData,
2788                app_id: AppId::Sui,
2789            },
2790            transaction,
2791        );
2792        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2793        let early_execution_error = get_early_execution_error(
2794            &transaction_digest,
2795            &checked_input_objects,
2796            self.config.certificate_deny_config.certificate_deny_set(),
2797            // TODO(address-balances): Mimic withdraw scheduling and pass the result.
2798            &FundsWithdrawStatus::MaybeSufficient,
2799        );
2800        let execution_params = match early_execution_error {
2801            Some(error) => ExecutionOrEarlyError::Err(error),
2802            None => ExecutionOrEarlyError::Ok(()),
2803        };
2804
2805        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2806            self.get_backing_store().as_ref(),
2807            protocol_config,
2808            self.metrics.limits_metrics.clone(),
2809            /* expensive checks */ false,
2810            execution_params,
2811            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2812            epoch_store
2813                .epoch_start_config()
2814                .epoch_data()
2815                .epoch_start_timestamp(),
2816            checked_input_objects,
2817            gas_data,
2818            gas_status,
2819            transaction_kind,
2820            sender,
2821            transaction_digest,
2822            skip_checks,
2823        );
2824
2825        let raw_effects = if show_raw_txn_data_and_effects {
2826            bcs::to_bytes(&effects).map_err(|_| SuiErrorKind::TransactionSerializationError {
2827                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2828            })?
2829        } else {
2830            vec![]
2831        };
2832
2833        let mut layout_resolver =
2834            epoch_store
2835                .executor()
2836                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2837                    &inner_temp_store,
2838                    self.get_backing_package_store(),
2839                )));
2840
2841        DevInspectResults::new(
2842            effects,
2843            inner_temp_store.events.clone(),
2844            execution_result,
2845            raw_txn_data,
2846            raw_effects,
2847            layout_resolver.as_mut(),
2848        )
2849    }
2850
2851    // Only used for testing because of how epoch store is loaded.
2852    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2853        let epoch_store = self.epoch_store_for_testing();
2854        Ok(epoch_store.reference_gas_price())
2855    }
2856
2857    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2858        self.get_transaction_cache_reader()
2859            .is_tx_already_executed(digest)
2860    }
2861
2862    #[instrument(level = "debug", skip_all, err(level = "debug"))]
2863    fn index_tx(
2864        sequence: u64,
2865        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
2866        object_store: &Arc<dyn ObjectStore + Send + Sync>,
2867        indexes: &IndexStore,
2868        digest: &TransactionDigest,
2869        // TODO: index_tx really just need the transaction data here.
2870        cert: &VerifiedExecutableTransaction,
2871        effects: &TransactionEffects,
2872        events: &TransactionEvents,
2873        timestamp_ms: u64,
2874        tx_coins: Option<TxCoins>,
2875        written: &WrittenObjects,
2876        inner_temporary_store: &InnerTemporaryStore,
2877        epoch_store: &Arc<AuthorityPerEpochStore>,
2878        acquire_locks: bool,
2879    ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
2880        let changes = Self::process_object_index(backing_package_store, object_store, effects, written, inner_temporary_store, epoch_store)
2881            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2882
2883        indexes.index_tx(
2884            sequence,
2885            cert.data().intent_message().value.sender(),
2886            cert.data()
2887                .intent_message()
2888                .value
2889                .input_objects()?
2890                .iter()
2891                .map(|o| o.object_id()),
2892            effects
2893                .all_changed_objects()
2894                .into_iter()
2895                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2896            cert.data()
2897                .intent_message()
2898                .value
2899                .move_calls()
2900                .into_iter()
2901                .map(|(_cmd_idx, package, module, function)| {
2902                    (*package, module.to_owned(), function.to_owned())
2903                }),
2904            events,
2905            changes,
2906            digest,
2907            timestamp_ms,
2908            tx_coins,
2909            effects.accumulator_events(),
2910            acquire_locks,
2911        )
2912    }
2913
2914    #[cfg(msim)]
2915    fn simulate_fork_during_execution(
2916        &self,
2917        certificate: &VerifiedExecutableTransaction,
2918        epoch_store: &Arc<AuthorityPerEpochStore>,
2919        effects: &mut TransactionEffects,
2920        forked_validators: std::sync::Arc<
2921            std::sync::Mutex<std::collections::HashSet<sui_types::base_types::AuthorityName>>,
2922        >,
2923        full_halt: bool,
2924        effects_overrides: std::sync::Arc<
2925            std::sync::Mutex<std::collections::BTreeMap<String, String>>,
2926        >,
2927        fork_probability: f32,
2928    ) {
2929        use std::cell::RefCell;
2930        thread_local! {
2931            static TOTAL_FAILING_STAKE: RefCell<u64> = RefCell::new(0);
2932        }
2933        if !certificate.data().intent_message().value.is_system_tx() {
2934            let committee = epoch_store.committee();
2935            let cur_stake = (**committee).weight(&self.name);
2936            if cur_stake > 0 {
2937                TOTAL_FAILING_STAKE.with_borrow_mut(|total_stake| {
2938                    let already_forked = forked_validators
2939                        .lock()
2940                        .ok()
2941                        .map(|set| set.contains(&self.name))
2942                        .unwrap_or(false);
2943
2944                    if !already_forked {
2945                        let should_fork = if full_halt {
2946                            // For full halt, fork enough nodes to reach validity threshold
2947                            *total_stake <= committee.validity_threshold()
2948                        } else {
2949                            // For partial fork, stay strictly below validity threshold
2950                            *total_stake + cur_stake < committee.validity_threshold()
2951                        };
2952
2953                        if should_fork {
2954                            *total_stake += cur_stake;
2955
2956                            if let Ok(mut external_set) = forked_validators.lock() {
2957                                external_set.insert(self.name);
2958                                info!("forked_validators: {:?}", external_set);
2959                            }
2960                        }
2961                    }
2962
2963                    if let Ok(external_set) = forked_validators.lock() {
2964                        if external_set.contains(&self.name) {
2965                            // If effects_overrides is empty, deterministically select a tx_digest to fork with 1/100 probability.
2966                            // Fork this transaction and record the digest and the original effects to original_effects.
2967                            // If original_effects is nonempty and contains a key matching this transaction digest (i.e.
2968                            // the transaction was forked on a different validator), fork this txn as well.
2969
2970                            let tx_digest = certificate.digest().to_string();
2971                            if let Ok(mut overrides) = effects_overrides.lock() {
2972                                if overrides.contains_key(&tx_digest)
2973                                    || overrides.is_empty()
2974                                        && sui_simulator::random::deterministic_probability(
2975                                            &tx_digest,
2976                                            fork_probability,
2977                                        )
2978                                {
2979                                    let original_effects_digest = effects.digest().to_string();
2980                                    overrides
2981                                        .insert(tx_digest.clone(), original_effects_digest.clone());
2982                                    info!(
2983                                        ?tx_digest,
2984                                        ?original_effects_digest,
2985                                        "Captured forked effects digest for transaction"
2986                                    );
2987                                    effects.gas_cost_summary_mut_for_testing().computation_cost +=
2988                                        1;
2989                                }
2990                            }
2991                        }
2992                    }
2993                });
2994            }
2995        }
2996    }
2997
2998    fn process_object_index(
2999        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3000        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3001        effects: &TransactionEffects,
3002        written: &WrittenObjects,
3003        inner_temporary_store: &InnerTemporaryStore,
3004        epoch_store: &Arc<AuthorityPerEpochStore>,
3005    ) -> SuiResult<ObjectIndexChanges> {
3006        let mut layout_resolver =
3007            epoch_store
3008                .executor()
3009                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3010                    inner_temporary_store,
3011                    backing_package_store,
3012                )));
3013
3014        let modified_at_version = effects
3015            .modified_at_versions()
3016            .into_iter()
3017            .collect::<HashMap<_, _>>();
3018
3019        let tx_digest = effects.transaction_digest();
3020        let mut deleted_owners = vec![];
3021        let mut deleted_dynamic_fields = vec![];
3022        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
3023            let old_version = modified_at_version.get(&id).unwrap();
3024            // When we process the index, the latest object hasn't been written yet so
3025            // the old object must be present.
3026            match Self::get_owner_at_version(object_store, &id, *old_version).unwrap_or_else(
3027                |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
3028            ) {
3029                Owner::AddressOwner(addr)
3030                | Owner::ConsensusAddressOwner { owner: addr, .. } => deleted_owners.push((addr, id)),
3031                Owner::ObjectOwner(object_id) => {
3032                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
3033                }
3034                _ => {}
3035            }
3036        }
3037
3038        let mut new_owners = vec![];
3039        let mut new_dynamic_fields = vec![];
3040
3041        for (oref, owner, kind) in effects.all_changed_objects() {
3042            let id = &oref.0;
3043            // For mutated objects, retrieve old owner and delete old index if there is a owner change.
3044            if let WriteKind::Mutate = kind {
3045                let Some(old_version) = modified_at_version.get(id) else {
3046                    panic!(
3047                        "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
3048                        tx_digest
3049                    );
3050                };
3051                // When we process the index, the latest object hasn't been written yet so
3052                // the old object must be present.
3053                let Some(old_object) = object_store.get_object_by_key(id, *old_version) else {
3054                    panic!(
3055                        "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
3056                        tx_digest, id, old_version
3057                    );
3058                };
3059                if old_object.owner != owner {
3060                    match old_object.owner {
3061                        Owner::AddressOwner(addr)
3062                        | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3063                            deleted_owners.push((addr, *id));
3064                        }
3065                        Owner::ObjectOwner(object_id) => {
3066                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
3067                        }
3068                        _ => {}
3069                    }
3070                }
3071            }
3072
3073            match owner {
3074                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3075                    // TODO: We can remove the object fetching after we added ObjectType to TransactionEffects
3076                    let new_object = written.get(id).unwrap_or_else(
3077                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3078                    );
3079                    assert_eq!(
3080                        new_object.version(),
3081                        oref.1,
3082                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3083                        tx_digest,
3084                        id,
3085                        new_object.version(),
3086                        oref.1
3087                    );
3088
3089                    let type_ = new_object
3090                        .type_()
3091                        .map(|type_| ObjectType::Struct(type_.clone()))
3092                        .unwrap_or(ObjectType::Package);
3093
3094                    new_owners.push((
3095                        (addr, *id),
3096                        ObjectInfo {
3097                            object_id: *id,
3098                            version: oref.1,
3099                            digest: oref.2,
3100                            type_,
3101                            owner,
3102                            previous_transaction: *effects.transaction_digest(),
3103                        },
3104                    ));
3105                }
3106                Owner::ObjectOwner(owner) => {
3107                    let new_object = written.get(id).unwrap_or_else(
3108                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
3109                    );
3110                    assert_eq!(
3111                        new_object.version(),
3112                        oref.1,
3113                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
3114                        tx_digest,
3115                        id,
3116                        new_object.version(),
3117                        oref.1
3118                    );
3119
3120                    let Some(df_info) = Self::try_create_dynamic_field_info(
3121                        object_store,
3122                        new_object,
3123                        written,
3124                        layout_resolver.as_mut(),
3125                    )
3126                    .unwrap_or_else(|e| {
3127                        error!(
3128                            "try_create_dynamic_field_info should not fail, {}, new_object={:?}",
3129                            e, new_object
3130                        );
3131                        None
3132                    }) else {
3133                        // Skip indexing for non dynamic field objects.
3134                        continue;
3135                    };
3136                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
3137                }
3138                _ => {}
3139            }
3140        }
3141
3142        Ok(ObjectIndexChanges {
3143            deleted_owners,
3144            deleted_dynamic_fields,
3145            new_owners,
3146            new_dynamic_fields,
3147        })
3148    }
3149
3150    fn try_create_dynamic_field_info(
3151        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3152        o: &Object,
3153        written: &WrittenObjects,
3154        resolver: &mut dyn LayoutResolver,
3155    ) -> SuiResult<Option<DynamicFieldInfo>> {
3156        // Skip if not a move object
3157        let Some(move_object) = o.data.try_as_move().cloned() else {
3158            return Ok(None);
3159        };
3160
3161        // We only index dynamic field objects
3162        if !move_object.type_().is_dynamic_field() {
3163            return Ok(None);
3164        }
3165
3166        let layout = resolver
3167            .get_annotated_layout(&move_object.type_().clone().into())?
3168            .into_layout();
3169
3170        let field =
3171            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
3172                SuiErrorKind::ObjectDeserializationError {
3173                    error: e.to_string(),
3174                }
3175            })?;
3176
3177        let type_ = field.kind;
3178        let name_type: TypeTag = field.name_layout.into();
3179        let bcs_name = field.name_bytes.to_owned();
3180
3181        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
3182            .map_err(|e| {
3183                warn!("{e}");
3184                SuiErrorKind::ObjectDeserializationError {
3185                    error: e.to_string(),
3186                }
3187            })?;
3188
3189        let name = DynamicFieldName {
3190            type_: name_type,
3191            value: SuiMoveValue::from(name_value).to_json_value(),
3192        };
3193
3194        let value_metadata = field.value_metadata().map_err(|e| {
3195            warn!("{e}");
3196            SuiErrorKind::ObjectDeserializationError {
3197                error: e.to_string(),
3198            }
3199        })?;
3200
3201        Ok(Some(match value_metadata {
3202            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
3203                name,
3204                bcs_name,
3205                type_,
3206                object_type: object_type.to_canonical_string(/* with_prefix */ true),
3207                object_id: o.id(),
3208                version: o.version(),
3209                digest: o.digest(),
3210            },
3211
3212            DFV::ValueMetadata::DynamicObjectField(object_id) => {
3213                // Find the actual object from storage using the object id obtained from the wrapper.
3214
3215                // Try to find the object in the written objects first.
3216                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
3217                    let version = object.version();
3218                    let digest = object.digest();
3219                    let object_type = object.data.type_().unwrap().clone();
3220                    (version, digest, object_type)
3221                } else {
3222                    // If not found, try to find it in the database.
3223                    let object = object_store
3224                        .get_object_by_key(&object_id, o.version())
3225                        .ok_or_else(|| UserInputError::ObjectNotFound {
3226                            object_id,
3227                            version: Some(o.version()),
3228                        })?;
3229                    let version = object.version();
3230                    let digest = object.digest();
3231                    let object_type = object.data.type_().unwrap().clone();
3232                    (version, digest, object_type)
3233                };
3234
3235                DynamicFieldInfo {
3236                    name,
3237                    bcs_name,
3238                    type_,
3239                    object_type: object_type.to_string(),
3240                    object_id,
3241                    version,
3242                    digest,
3243                }
3244            }
3245        }))
3246    }
3247
3248    #[instrument(level = "trace", skip_all, err(level = "debug"))]
3249    fn post_process_one_tx(
3250        &self,
3251        certificate: &VerifiedExecutableTransaction,
3252        effects: &TransactionEffects,
3253        inner_temporary_store: &InnerTemporaryStore,
3254        epoch_store: &Arc<AuthorityPerEpochStore>,
3255    ) -> SuiResult {
3256        let Some(indexes) = &self.indexes else {
3257            return Ok(());
3258        };
3259
3260        let tx_digest = *certificate.digest();
3261
3262        // Allocate sequence number on the calling thread to preserve execution order.
3263        let sequence = indexes.allocate_sequence_number();
3264
3265        if self.config.sync_post_process_one_tx {
3266            // Synchronous mode: run post-processing inline on the calling thread
3267            // and commit the index batch immediately with locks held.
3268            // Used as a rollback mechanism and for testing correctness against async mode.
3269            // TODO: delete this branch once async mode has shipped
3270            let result = Self::post_process_one_tx_impl(
3271                sequence,
3272                indexes,
3273                &self.subscription_handler,
3274                &self.metrics,
3275                self.name,
3276                self.get_backing_package_store(),
3277                self.get_object_store(),
3278                certificate,
3279                effects,
3280                inner_temporary_store,
3281                epoch_store,
3282                true, // acquire_locks
3283            );
3284
3285            match result {
3286                Ok((raw_batch, cache_updates_with_locks)) => {
3287                    let mut db_batch = indexes.new_db_batch();
3288                    db_batch
3289                        .concat(vec![raw_batch])
3290                        .expect("failed to absorb raw index batch");
3291                    // Destructure to keep _locks alive through commit_index_batch.
3292                    let IndexStoreCacheUpdatesWithLocks { _locks, inner } =
3293                        cache_updates_with_locks;
3294                    indexes
3295                        .commit_index_batch(db_batch, vec![inner])
3296                        .expect("failed to commit index batch");
3297                }
3298                Err(e) => {
3299                    self.metrics.post_processing_total_failures.inc();
3300                    error!(?tx_digest, "tx post processing failed: {e}");
3301                    return Err(e);
3302                }
3303            }
3304
3305            return Ok(());
3306        }
3307
3308        let (done_tx, done_rx) = tokio::sync::oneshot::channel::<PostProcessingOutput>();
3309        self.pending_post_processing.insert(tx_digest, done_rx);
3310
3311        let indexes = indexes.clone();
3312        let subscription_handler = self.subscription_handler.clone();
3313        let metrics = self.metrics.clone();
3314        let name = self.name;
3315        let backing_package_store = self.get_backing_package_store().clone();
3316        let object_store = self.get_object_store().clone();
3317        let semaphore = self.post_processing_semaphore.clone();
3318
3319        let certificate = certificate.clone();
3320        let effects = effects.clone();
3321        let inner_temporary_store = inner_temporary_store.clone();
3322        let epoch_store = epoch_store.clone();
3323
3324        // spawn post processing on a blocking thread
3325        tokio::spawn(async move {
3326            let permit = {
3327                let _scope = monitored_scope("Execution::post_process_one_tx::semaphore_acquire");
3328                semaphore
3329                    .acquire_owned()
3330                    .await
3331                    .expect("post-processing semaphore should not be closed")
3332            };
3333
3334            let _ = tokio::task::spawn_blocking(move || {
3335                let _permit = permit;
3336
3337                let result = Self::post_process_one_tx_impl(
3338                    sequence,
3339                    &indexes,
3340                    &subscription_handler,
3341                    &metrics,
3342                    name,
3343                    &backing_package_store,
3344                    &object_store,
3345                    &certificate,
3346                    &effects,
3347                    &inner_temporary_store,
3348                    &epoch_store,
3349                    false, // acquire_locks
3350                );
3351
3352                match result {
3353                    Ok((raw_batch, cache_updates_with_locks)) => {
3354                        fail_point!("crash-after-post-process-one-tx");
3355                        let output = (raw_batch, cache_updates_with_locks.into_inner());
3356                        let _ = done_tx.send(output);
3357                    }
3358                    Err(e) => {
3359                        metrics.post_processing_total_failures.inc();
3360                        error!(?tx_digest, "tx post processing failed: {e}");
3361                    }
3362                }
3363            })
3364            .await;
3365        });
3366
3367        Ok(())
3368    }
3369
3370    fn post_process_one_tx_impl(
3371        sequence: u64,
3372        indexes: &Arc<IndexStore>,
3373        subscription_handler: &Arc<SubscriptionHandler>,
3374        metrics: &Arc<AuthorityMetrics>,
3375        name: AuthorityName,
3376        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3377        object_store: &Arc<dyn ObjectStore + Send + Sync>,
3378        certificate: &VerifiedExecutableTransaction,
3379        effects: &TransactionEffects,
3380        inner_temporary_store: &InnerTemporaryStore,
3381        epoch_store: &Arc<AuthorityPerEpochStore>,
3382        acquire_locks: bool,
3383    ) -> SuiResult<(StagedBatch, IndexStoreCacheUpdatesWithLocks)> {
3384        let _scope = monitored_scope("Execution::post_process_one_tx");
3385
3386        let tx_digest = certificate.digest();
3387        let timestamp_ms = Self::unixtime_now_ms();
3388        let events = &inner_temporary_store.events;
3389        let written = &inner_temporary_store.written;
3390        let tx_coins = Self::fullnode_only_get_tx_coins_for_indexing(
3391            name,
3392            object_store,
3393            effects,
3394            inner_temporary_store,
3395            epoch_store,
3396        );
3397
3398        let (raw_batch, cache_updates) = Self::index_tx(
3399            sequence,
3400            backing_package_store,
3401            object_store,
3402            indexes,
3403            tx_digest,
3404            certificate,
3405            effects,
3406            events,
3407            timestamp_ms,
3408            tx_coins,
3409            written,
3410            inner_temporary_store,
3411            epoch_store,
3412            acquire_locks,
3413        )
3414        .tap_ok(|_| metrics.post_processing_total_tx_indexed.inc())
3415        .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3416        .expect("Indexing tx should not fail");
3417
3418        let effects: SuiTransactionBlockEffects = effects.clone().try_into()?;
3419        let events = Self::make_transaction_block_events(
3420            backing_package_store,
3421            events.clone(),
3422            *tx_digest,
3423            timestamp_ms,
3424            epoch_store,
3425            inner_temporary_store,
3426        )?;
3427        // Emit events
3428        subscription_handler
3429            .process_tx(certificate.data().transaction_data(), &effects, &events)
3430            .tap_ok(|_| metrics.post_processing_total_tx_had_event_processed.inc())
3431            .tap_err(|e| {
3432                warn!(
3433                    ?tx_digest,
3434                    "Post processing - Couldn't process events for tx: {}", e
3435                )
3436            })?;
3437
3438        metrics
3439            .post_processing_total_events_emitted
3440            .inc_by(events.data.len() as u64);
3441
3442        Ok((raw_batch, cache_updates))
3443    }
3444
3445    fn make_transaction_block_events(
3446        backing_package_store: &Arc<dyn BackingPackageStore + Send + Sync>,
3447        transaction_events: TransactionEvents,
3448        digest: TransactionDigest,
3449        timestamp_ms: u64,
3450        epoch_store: &Arc<AuthorityPerEpochStore>,
3451        inner_temporary_store: &InnerTemporaryStore,
3452    ) -> SuiResult<SuiTransactionBlockEvents> {
3453        let mut layout_resolver =
3454            epoch_store
3455                .executor()
3456                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3457                    inner_temporary_store,
3458                    backing_package_store,
3459                )));
3460        SuiTransactionBlockEvents::try_from(
3461            transaction_events,
3462            digest,
3463            Some(timestamp_ms),
3464            layout_resolver.as_mut(),
3465        )
3466    }
3467
3468    pub fn unixtime_now_ms() -> u64 {
3469        let now = SystemTime::now()
3470            .duration_since(UNIX_EPOCH)
3471            .expect("Time went backwards")
3472            .as_millis();
3473        u64::try_from(now).expect("Travelling in time machine")
3474    }
3475
3476    // TODO(fastpath): update this handler for Mysticeti fastpath.
3477    // There will no longer be validator quorum signed transactions or effects.
3478    // The proof of finality needs to come from checkpoints.
3479    #[instrument(level = "trace", skip_all)]
3480    pub async fn handle_transaction_info_request(
3481        &self,
3482        request: TransactionInfoRequest,
3483    ) -> SuiResult<TransactionInfoResponse> {
3484        let epoch_store = self.load_epoch_store_one_call_per_task();
3485        let (transaction, status) = self
3486            .get_transaction_status(&request.transaction_digest, &epoch_store)?
3487            .ok_or(SuiErrorKind::TransactionNotFound {
3488                digest: request.transaction_digest,
3489            })?;
3490        Ok(TransactionInfoResponse {
3491            transaction,
3492            status,
3493        })
3494    }
3495
3496    #[instrument(level = "trace", skip_all)]
3497    pub async fn handle_object_info_request(
3498        &self,
3499        request: ObjectInfoRequest,
3500    ) -> SuiResult<ObjectInfoResponse> {
3501        let epoch_store = self.load_epoch_store_one_call_per_task();
3502
3503        let requested_object_seq = match request.request_kind {
3504            ObjectInfoRequestKind::LatestObjectInfo => {
3505                let (_, seq, _) = self
3506                    .get_object_or_tombstone(request.object_id)
3507                    .await
3508                    .ok_or_else(|| {
3509                        SuiError::from(UserInputError::ObjectNotFound {
3510                            object_id: request.object_id,
3511                            version: None,
3512                        })
3513                    })?;
3514                seq
3515            }
3516            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3517        };
3518
3519        let object = self
3520            .get_object_store()
3521            .get_object_by_key(&request.object_id, requested_object_seq)
3522            .ok_or_else(|| {
3523                SuiError::from(UserInputError::ObjectNotFound {
3524                    object_id: request.object_id,
3525                    version: Some(requested_object_seq),
3526                })
3527            })?;
3528
3529        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3530            (request.generate_layout, object.data.try_as_move())
3531        {
3532            Some(into_struct_layout(
3533                self.load_epoch_store_one_call_per_task()
3534                    .executor()
3535                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3536                    .get_annotated_layout(&move_obj.type_().clone().into())?,
3537            )?)
3538        } else {
3539            None
3540        };
3541
3542        let lock = if !object.is_address_owned() {
3543            // Only address owned objects have locks.
3544            None
3545        } else {
3546            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3547                .await?
3548                .map(|s| s.into_inner())
3549        };
3550
3551        Ok(ObjectInfoResponse {
3552            object,
3553            layout,
3554            lock_for_debugging: lock,
3555        })
3556    }
3557
3558    #[instrument(level = "trace", skip_all)]
3559    pub fn handle_checkpoint_request(
3560        &self,
3561        request: &CheckpointRequest,
3562    ) -> SuiResult<CheckpointResponse> {
3563        let summary = match request.sequence_number {
3564            Some(seq) => self
3565                .checkpoint_store
3566                .get_checkpoint_by_sequence_number(seq)?,
3567            None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3568        }
3569        .map(|v| v.into_inner());
3570        let contents = match &summary {
3571            Some(s) => self
3572                .checkpoint_store
3573                .get_checkpoint_contents(&s.content_digest)?,
3574            None => None,
3575        };
3576        Ok(CheckpointResponse {
3577            checkpoint: summary,
3578            contents,
3579        })
3580    }
3581
3582    #[instrument(level = "trace", skip_all)]
3583    pub fn handle_checkpoint_request_v2(
3584        &self,
3585        request: &CheckpointRequestV2,
3586    ) -> SuiResult<CheckpointResponseV2> {
3587        let summary = if request.certified {
3588            let summary = match request.sequence_number {
3589                Some(seq) => self
3590                    .checkpoint_store
3591                    .get_checkpoint_by_sequence_number(seq)?,
3592                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3593            }
3594            .map(|v| v.into_inner());
3595            summary.map(CheckpointSummaryResponse::Certified)
3596        } else {
3597            let summary = match request.sequence_number {
3598                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3599                None => self
3600                    .checkpoint_store
3601                    .get_latest_locally_computed_checkpoint()?,
3602            };
3603            summary.map(CheckpointSummaryResponse::Pending)
3604        };
3605        let contents = match &summary {
3606            Some(s) => self
3607                .checkpoint_store
3608                .get_checkpoint_contents(&s.content_digest())?,
3609            None => None,
3610        };
3611        Ok(CheckpointResponseV2 {
3612            checkpoint: summary,
3613            contents,
3614        })
3615    }
3616
3617    fn check_protocol_version(
3618        supported_protocol_versions: SupportedProtocolVersions,
3619        current_version: ProtocolVersion,
3620    ) {
3621        info!("current protocol version is now {:?}", current_version);
3622        info!("supported versions are: {:?}", supported_protocol_versions);
3623        if !supported_protocol_versions.is_version_supported(current_version) {
3624            let msg = format!(
3625                "Unsupported protocol version. The network is at {:?}, but this SuiNode only supports: {:?}. Shutting down.",
3626                current_version, supported_protocol_versions,
3627            );
3628
3629            error!("{}", msg);
3630            eprintln!("{}", msg);
3631
3632            #[cfg(not(msim))]
3633            std::process::exit(1);
3634
3635            #[cfg(msim)]
3636            sui_simulator::task::shutdown_current_node();
3637        }
3638    }
3639
3640    #[allow(clippy::disallowed_methods)] // allow unbounded_channel()
3641    #[allow(clippy::too_many_arguments)]
3642    pub async fn new(
3643        name: AuthorityName,
3644        secret: StableSyncAuthoritySigner,
3645        supported_protocol_versions: SupportedProtocolVersions,
3646        store: Arc<AuthorityStore>,
3647        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3648        epoch_store: Arc<AuthorityPerEpochStore>,
3649        committee_store: Arc<CommitteeStore>,
3650        indexes: Option<Arc<IndexStore>>,
3651        rpc_index: Option<Arc<RpcIndexStore>>,
3652        checkpoint_store: Arc<CheckpointStore>,
3653        prometheus_registry: &Registry,
3654        genesis_objects: &[Object],
3655        db_checkpoint_config: &DBCheckpointConfig,
3656        config: NodeConfig,
3657        chain_identifier: ChainIdentifier,
3658        policy_config: Option<PolicyConfig>,
3659        firewall_config: Option<RemoteFirewallConfig>,
3660        pruner_watermarks: Arc<PrunerWatermarks>,
3661    ) -> Arc<Self> {
3662        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3663
3664        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3665        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3666        let execution_scheduler = Arc::new(ExecutionScheduler::new(
3667            execution_cache_trait_pointers.object_cache_reader.clone(),
3668            execution_cache_trait_pointers.account_funds_read.clone(),
3669            execution_cache_trait_pointers
3670                .transaction_cache_reader
3671                .clone(),
3672            tx_ready_certificates,
3673            &epoch_store,
3674            config.funds_withdraw_scheduler_type,
3675            metrics.clone(),
3676            prometheus_registry,
3677        ));
3678        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3679
3680        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3681            epoch_store.get_parent_path(),
3682            &config.authority_store_pruning_config,
3683        );
3684        let _pruner = AuthorityStorePruner::new(
3685            store.perpetual_tables.clone(),
3686            checkpoint_store.clone(),
3687            rpc_index.clone(),
3688            indexes.clone(),
3689            config.authority_store_pruning_config.clone(),
3690            epoch_store.committee().authority_exists(&name),
3691            epoch_store.epoch_start_state().epoch_duration_ms(),
3692            prometheus_registry,
3693            pruner_watermarks,
3694        );
3695        let input_loader =
3696            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3697        let epoch = epoch_store.epoch();
3698        let traffic_controller_metrics =
3699            Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3700        let traffic_controller = if let Some(policy_config) = policy_config {
3701            Some(Arc::new(
3702                TrafficController::init(
3703                    policy_config,
3704                    traffic_controller_metrics,
3705                    firewall_config.clone(),
3706                )
3707                .await,
3708            ))
3709        } else {
3710            None
3711        };
3712
3713        let fork_recovery_state = config.fork_recovery.as_ref().map(|fork_config| {
3714            ForkRecoveryState::new(Some(fork_config))
3715                .expect("Failed to initialize fork recovery state")
3716        });
3717
3718        let coin_reservation_resolver = Arc::new(CoinReservationResolver::new(
3719            execution_cache_trait_pointers.child_object_resolver.clone(),
3720        ));
3721
3722        let object_funds_checker_metrics =
3723            Arc::new(ObjectFundsCheckerMetrics::new(prometheus_registry));
3724        let state = Arc::new(AuthorityState {
3725            name,
3726            secret,
3727            execution_lock: RwLock::new(epoch),
3728            epoch_store: ArcSwap::new(epoch_store.clone()),
3729            input_loader,
3730            execution_cache_trait_pointers,
3731            coin_reservation_resolver,
3732            indexes,
3733            rpc_index,
3734            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3735            checkpoint_store,
3736            committee_store,
3737            execution_scheduler,
3738            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3739            metrics,
3740            _pruner,
3741            _authority_per_epoch_pruner,
3742            db_checkpoint_config: db_checkpoint_config.clone(),
3743            config,
3744            overload_info: AuthorityOverloadInfo::default(),
3745            chain_identifier,
3746            congestion_tracker: Arc::new(CongestionTracker::new()),
3747            traffic_controller,
3748            fork_recovery_state,
3749            notify_epoch: tokio::sync::watch::channel(epoch).0,
3750            object_funds_checker: ArcSwapOption::empty(),
3751            object_funds_checker_metrics,
3752            pending_post_processing: Arc::new(DashMap::new()),
3753            post_processing_semaphore: Arc::new(tokio::sync::Semaphore::new(num_cpus::get())),
3754        });
3755        state.init_object_funds_checker().await;
3756
3757        let state_clone = Arc::downgrade(&state);
3758        spawn_monitored_task!(fix_indexes(state_clone));
3759        // Start a task to execute ready certificates.
3760        let authority_state = Arc::downgrade(&state);
3761        spawn_monitored_task!(execution_process(
3762            authority_state,
3763            rx_ready_certificates,
3764            rx_execution_shutdown,
3765        ));
3766        // TODO: This doesn't belong to the constructor of AuthorityState.
3767        state
3768            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3769            .expect("Error indexing genesis objects.");
3770
3771        if epoch_store
3772            .protocol_config()
3773            .enable_multi_epoch_transaction_expiration()
3774            && epoch_store.epoch() > 0
3775        {
3776            use typed_store::Map;
3777            let previous_epoch = epoch_store.epoch() - 1;
3778            let start_key = (previous_epoch, TransactionDigest::ZERO);
3779            let end_key = (previous_epoch + 1, TransactionDigest::ZERO);
3780            let has_previous_epoch_data = store
3781                .perpetual_tables
3782                .executed_transaction_digests
3783                .safe_range_iter(start_key..end_key)
3784                .next()
3785                .is_some();
3786
3787            if !has_previous_epoch_data {
3788                panic!(
3789                    "enable_multi_epoch_transaction_expiration is enabled but no transaction data found for previous epoch {}. \
3790                    This indicates the node was restored using an old version of sui-tool that does not backfill the table. \
3791                    Please restore from a snapshot using the latest version of sui-tool.",
3792                    previous_epoch
3793                );
3794            }
3795        }
3796
3797        state
3798    }
3799
3800    async fn init_object_funds_checker(&self) {
3801        let epoch_store = self.epoch_store.load();
3802        if self.is_validator(&epoch_store)
3803            && epoch_store.protocol_config().enable_object_funds_withdraw()
3804        {
3805            if self.object_funds_checker.load().is_none() {
3806                let inner = self
3807                    .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
3808                    .await
3809                    .map(|o| {
3810                        Arc::new(ObjectFundsChecker::new(
3811                            o.version(),
3812                            self.object_funds_checker_metrics.clone(),
3813                        ))
3814                    });
3815                self.object_funds_checker.store(inner);
3816            }
3817        } else {
3818            self.object_funds_checker.store(None);
3819        }
3820    }
3821
3822    // TODO: Consolidate our traits to reduce the number of methods here.
3823    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3824        &self.execution_cache_trait_pointers.object_cache_reader
3825    }
3826
3827    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3828        &self.execution_cache_trait_pointers.transaction_cache_reader
3829    }
3830
3831    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3832        &self.execution_cache_trait_pointers.cache_writer
3833    }
3834
3835    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3836        &self.execution_cache_trait_pointers.backing_store
3837    }
3838
3839    pub fn get_child_object_resolver(&self) -> &Arc<dyn ChildObjectResolver + Send + Sync> {
3840        &self.execution_cache_trait_pointers.child_object_resolver
3841    }
3842
3843    pub(crate) fn get_account_funds_read(&self) -> &Arc<dyn AccountFundsRead> {
3844        &self.execution_cache_trait_pointers.account_funds_read
3845    }
3846
3847    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3848        &self.execution_cache_trait_pointers.backing_package_store
3849    }
3850
3851    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3852        &self.execution_cache_trait_pointers.object_store
3853    }
3854
3855    pub fn pending_post_processing(
3856        &self,
3857    ) -> &Arc<DashMap<TransactionDigest, oneshot::Receiver<PostProcessingOutput>>> {
3858        &self.pending_post_processing
3859    }
3860
3861    pub async fn await_post_processing(
3862        &self,
3863        tx_digest: &TransactionDigest,
3864    ) -> Option<PostProcessingOutput> {
3865        if let Some((_, rx)) = self.pending_post_processing.remove(tx_digest) {
3866            // Tx was executed and post-processing is in flight.
3867            rx.await.ok()
3868        } else {
3869            // Tx was already persisted or post-processing already completed.
3870            None
3871        }
3872    }
3873
3874    /// Await post-processing for a transaction and commit the index batch immediately.
3875    /// Used in test helpers where there is no CheckpointExecutor to collect and commit
3876    /// index batches at checkpoint boundaries.
3877    pub async fn flush_post_processing(&self, tx_digest: &TransactionDigest) {
3878        if let Some(indexes) = &self.indexes
3879            && let Some((raw_batch, cache_updates)) = self.await_post_processing(tx_digest).await
3880        {
3881            let mut db_batch = indexes.new_db_batch();
3882            db_batch
3883                .concat(vec![raw_batch])
3884                .expect("failed to build index batch");
3885            indexes
3886                .commit_index_batch(db_batch, vec![cache_updates])
3887                .expect("failed to commit index batch");
3888        }
3889    }
3890
3891    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3892        &self.execution_cache_trait_pointers.reconfig_api
3893    }
3894
3895    pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3896        &self.execution_cache_trait_pointers.global_state_hash_store
3897    }
3898
3899    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3900        &self.execution_cache_trait_pointers.checkpoint_cache
3901    }
3902
3903    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3904        &self.execution_cache_trait_pointers.state_sync_store
3905    }
3906
3907    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3908        &self.execution_cache_trait_pointers.cache_commit
3909    }
3910
3911    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3912        self.execution_cache_trait_pointers
3913            .testing_api
3914            .database_for_testing()
3915    }
3916
3917    pub fn cache_for_testing(&self) -> &WritebackCache {
3918        self.execution_cache_trait_pointers
3919            .testing_api
3920            .cache_for_testing()
3921    }
3922
3923    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3924        &self,
3925        config: NodeConfig,
3926        metrics: Arc<AuthorityStorePruningMetrics>,
3927    ) -> anyhow::Result<()> {
3928        use crate::authority::authority_store_pruner::PrunerWatermarks;
3929        let watermarks = Arc::new(PrunerWatermarks::default());
3930        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3931            &self.database_for_testing().perpetual_tables,
3932            &self.checkpoint_store,
3933            self.rpc_index.as_deref(),
3934            config.authority_store_pruning_config,
3935            metrics,
3936            EPOCH_DURATION_MS_FOR_TESTING,
3937            &watermarks,
3938        )
3939        .await
3940    }
3941
3942    pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
3943        &self.execution_scheduler
3944    }
3945
3946    fn create_owner_index_if_empty(
3947        &self,
3948        genesis_objects: &[Object],
3949        epoch_store: &Arc<AuthorityPerEpochStore>,
3950    ) -> SuiResult {
3951        let Some(index_store) = &self.indexes else {
3952            return Ok(());
3953        };
3954        if !index_store.is_empty() {
3955            return Ok(());
3956        }
3957
3958        let mut new_owners = vec![];
3959        let mut new_dynamic_fields = vec![];
3960        let mut layout_resolver = epoch_store
3961            .executor()
3962            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3963        for o in genesis_objects.iter() {
3964            match o.owner {
3965                Owner::AddressOwner(addr) | Owner::ConsensusAddressOwner { owner: addr, .. } => {
3966                    new_owners.push((
3967                        (addr, o.id()),
3968                        ObjectInfo::new(&o.compute_object_reference(), o),
3969                    ))
3970                }
3971                Owner::ObjectOwner(object_id) => {
3972                    let id = o.id();
3973                    let Some(info) = Self::try_create_dynamic_field_info(
3974                        self.get_object_store(),
3975                        o,
3976                        &BTreeMap::new(),
3977                        layout_resolver.as_mut(),
3978                    )?
3979                    else {
3980                        continue;
3981                    };
3982                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3983                }
3984                _ => {}
3985            }
3986        }
3987
3988        index_store.insert_genesis_objects(ObjectIndexChanges {
3989            deleted_owners: vec![],
3990            deleted_dynamic_fields: vec![],
3991            new_owners,
3992            new_dynamic_fields,
3993        })
3994    }
3995
3996    /// Attempts to acquire execution lock for an executable transaction.
3997    /// Returns Some(lock) if the transaction is matching current executed epoch.
3998    /// Returns None if validator is halted at epoch end or epoch mismatch.
3999    pub fn execution_lock_for_executable_transaction(
4000        &self,
4001        transaction: &VerifiedExecutableTransaction,
4002    ) -> Option<ExecutionLockReadGuard<'_>> {
4003        let lock = self.execution_lock.try_read().ok()?;
4004        if *lock == transaction.auth_sig().epoch() {
4005            Some(lock)
4006        } else {
4007            // TODO: Can this still happen?
4008            None
4009        }
4010    }
4011
4012    /// Acquires the execution lock for the duration of transaction validation.
4013    /// This prevents reconfiguration from starting until we are finished validating the transaction.
4014    fn execution_lock_for_validation(&self) -> SuiResult<ExecutionLockReadGuard<'_>> {
4015        self.execution_lock
4016            .try_read()
4017            .map_err(|_| SuiErrorKind::ValidatorHaltedAtEpochEnd.into())
4018    }
4019
4020    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
4021        self.execution_lock.write().await
4022    }
4023
4024    #[instrument(level = "error", skip_all)]
4025    pub async fn reconfigure(
4026        &self,
4027        cur_epoch_store: &AuthorityPerEpochStore,
4028        supported_protocol_versions: SupportedProtocolVersions,
4029        new_committee: Committee,
4030        epoch_start_configuration: EpochStartConfiguration,
4031        state_hasher: Arc<GlobalStateHasher>,
4032        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4033        epoch_last_checkpoint: CheckpointSequenceNumber,
4034    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
4035        Self::check_protocol_version(
4036            supported_protocol_versions,
4037            epoch_start_configuration
4038                .epoch_start_state()
4039                .protocol_version(),
4040        );
4041
4042        self.committee_store.insert_new_committee(&new_committee)?;
4043
4044        // Wait until no transactions are being executed.
4045        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4046
4047        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
4048        cur_epoch_store.epoch_terminated().await;
4049
4050        // Safe to being reconfiguration now. No transactions are being executed,
4051        // and no epoch-specific tasks are running.
4052
4053        {
4054            let state = cur_epoch_store.get_reconfig_state_write_lock_guard();
4055            if state.should_accept_user_certs() {
4056                // Need to change this so that consensus adapter do not accept certificates from user.
4057                // This can happen if our local validator did not initiate epoch change locally,
4058                // but 2f+1 nodes already concluded the epoch.
4059                //
4060                // This lock is essentially a barrier for
4061                // `epoch_store.pending_consensus_certificates` table we are reading on the line after this block
4062                cur_epoch_store.close_user_certs(state);
4063            }
4064            // lock is dropped here
4065        }
4066
4067        self.get_reconfig_api()
4068            .clear_state_end_of_epoch(&execution_lock);
4069        self.check_system_consistency(cur_epoch_store, state_hasher, expensive_safety_check_config);
4070        self.maybe_reaccumulate_state_hash(
4071            cur_epoch_store,
4072            epoch_start_configuration
4073                .epoch_start_state()
4074                .protocol_version(),
4075        );
4076        self.get_reconfig_api()
4077            .set_epoch_start_configuration(&epoch_start_configuration);
4078        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path
4079            && self
4080                .db_checkpoint_config
4081                .perform_db_checkpoints_at_epoch_end
4082        {
4083            let checkpoint_indexes = self
4084                .db_checkpoint_config
4085                .perform_index_db_checkpoints_at_epoch_end
4086                .unwrap_or(false);
4087            let current_epoch = cur_epoch_store.epoch();
4088            let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{}", current_epoch));
4089            self.checkpoint_all_dbs(&epoch_checkpoint_path, cur_epoch_store, checkpoint_indexes)?;
4090        }
4091
4092        self.get_reconfig_api()
4093            .reconfigure_cache(&epoch_start_configuration)
4094            .await;
4095
4096        let new_epoch = new_committee.epoch;
4097        let new_epoch_store = self
4098            .reopen_epoch_db(
4099                cur_epoch_store,
4100                new_committee,
4101                epoch_start_configuration,
4102                expensive_safety_check_config,
4103                epoch_last_checkpoint,
4104            )
4105            .await?;
4106        assert_eq!(new_epoch_store.epoch(), new_epoch);
4107        self.execution_scheduler
4108            .reconfigure(&new_epoch_store, self.get_account_funds_read());
4109        self.init_object_funds_checker().await;
4110        *execution_lock = new_epoch;
4111
4112        self.notify_epoch(new_epoch);
4113        // drop execution_lock after epoch store was updated
4114        // see also assert in AuthorityState::process_certificate
4115        // on the epoch store and execution lock epoch match
4116        Ok(new_epoch_store)
4117    }
4118
4119    fn notify_epoch(&self, new_epoch: EpochId) {
4120        self.notify_epoch.send_modify(|epoch| *epoch = new_epoch);
4121    }
4122
4123    pub async fn wait_for_epoch(&self, target_epoch: EpochId) -> Result<EpochId, RecvError> {
4124        let mut rx = self.notify_epoch.subscribe();
4125        loop {
4126            let epoch = *rx.borrow();
4127            if epoch >= target_epoch {
4128                return Ok(epoch);
4129            }
4130            rx.changed().await?;
4131        }
4132    }
4133
4134    pub async fn settle_accumulator_for_testing(&self, effects: &[TransactionEffects]) {
4135        let accumulator_version = self
4136            .get_object(&SUI_ACCUMULATOR_ROOT_OBJECT_ID)
4137            .await
4138            .unwrap()
4139            .version();
4140        // Use the current accumulator version as the checkpoint sequence number.
4141        // This is a convenient hack to keep the checkpoint version unique for each settlement and incrementing.
4142        let ckpt_seq = accumulator_version.value();
4143        let builder = AccumulatorSettlementTxBuilder::new(
4144            Some(self.get_transaction_cache_reader().as_ref()),
4145            effects,
4146            ckpt_seq,
4147            0,
4148        );
4149        let balance_changes = builder.collect_funds_changes();
4150        let epoch_store = self.epoch_store_for_testing();
4151        let epoch = epoch_store.epoch();
4152        let accumulator_root_obj_initial_shared_version = epoch_store
4153            .epoch_start_config()
4154            .accumulator_root_obj_initial_shared_version()
4155            .unwrap();
4156        let settlements = builder.build_tx(
4157            epoch_store.protocol_config(),
4158            epoch,
4159            accumulator_root_obj_initial_shared_version,
4160            ckpt_seq,
4161            ckpt_seq,
4162        );
4163
4164        let settlements: Vec<_> = settlements
4165            .into_iter()
4166            .map(|tx| {
4167                VerifiedExecutableTransaction::new_system(
4168                    VerifiedTransaction::new_system_transaction(tx),
4169                    epoch,
4170                )
4171            })
4172            .collect();
4173
4174        let assigned_versions = epoch_store
4175            .assign_shared_object_versions_for_tests(
4176                self.get_object_cache_reader().as_ref(),
4177                &settlements,
4178            )
4179            .unwrap();
4180        let version_map = assigned_versions.into_map();
4181
4182        let mut settlement_effects = Vec::with_capacity(settlements.len());
4183        for tx in settlements {
4184            let env = ExecutionEnv::new()
4185                .with_assigned_versions(version_map.get(&tx.key()).unwrap().clone());
4186            let (effects, _) = self
4187                .try_execute_immediately(&tx, env, &epoch_store)
4188                .await
4189                .unwrap();
4190            assert!(effects.status().is_ok());
4191            settlement_effects.push(effects);
4192        }
4193
4194        let barrier = accumulators::build_accumulator_barrier_tx(
4195            epoch,
4196            accumulator_root_obj_initial_shared_version,
4197            ckpt_seq,
4198            &settlement_effects,
4199        );
4200        let barrier = VerifiedExecutableTransaction::new_system(
4201            VerifiedTransaction::new_system_transaction(barrier),
4202            epoch,
4203        );
4204
4205        let assigned_versions = epoch_store
4206            .assign_shared_object_versions_for_tests(
4207                self.get_object_cache_reader().as_ref(),
4208                std::slice::from_ref(&barrier),
4209            )
4210            .unwrap();
4211        let version_map = assigned_versions.into_map();
4212
4213        let env = ExecutionEnv::new()
4214            .with_assigned_versions(version_map.get(&barrier.key()).unwrap().clone());
4215        let (effects, _) = self
4216            .try_execute_immediately(&barrier, env, &epoch_store)
4217            .await
4218            .unwrap();
4219        assert!(effects.status().is_ok());
4220
4221        let next_accumulator_version = accumulator_version.next();
4222        self.execution_scheduler
4223            .settle_address_funds(FundsSettlement {
4224                funds_changes: balance_changes,
4225                next_accumulator_version,
4226            });
4227        // object funds are settled while executing the barrier transaction
4228    }
4229
4230    /// Advance the epoch store to the next epoch for testing only.
4231    /// This only manually sets all the places where we have the epoch number.
4232    /// It doesn't properly reconfigure the node, hence should be only used for testing.
4233    pub async fn reconfigure_for_testing(&self) {
4234        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
4235        let epoch_store = self.epoch_store_for_testing().clone();
4236        let protocol_config = epoch_store.protocol_config().clone();
4237        // The current protocol config used in the epoch store may have been overridden and diverged from
4238        // the protocol config definitions. That override may have now been dropped when the initial guard was dropped.
4239        // We reapply the override before creating the new epoch store, to make sure that
4240        // the new epoch store has the same protocol config as the current one.
4241        // Since this is for testing only, we mostly like to keep the protocol config the same
4242        // across epochs.
4243        let _guard =
4244            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
4245        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
4246            self.get_backing_package_store().clone(),
4247            self.get_object_store().clone(),
4248            &self.config.expensive_safety_check_config,
4249            self.checkpoint_store
4250                .get_epoch_last_checkpoint(epoch_store.epoch())
4251                .unwrap()
4252                .map(|c| *c.sequence_number())
4253                .unwrap_or_default(),
4254        );
4255        self.execution_scheduler
4256            .reconfigure(&new_epoch_store, self.get_account_funds_read());
4257        let new_epoch = new_epoch_store.epoch();
4258        self.epoch_store.store(new_epoch_store);
4259        epoch_store.epoch_terminated().await;
4260
4261        *execution_lock = new_epoch;
4262    }
4263
4264    /// This is a temporary method to be used when we enable simplified_unwrap_then_delete.
4265    /// It re-accumulates state hash for the new epoch if simplified_unwrap_then_delete is enabled.
4266    #[instrument(level = "error", skip_all)]
4267    fn maybe_reaccumulate_state_hash(
4268        &self,
4269        cur_epoch_store: &AuthorityPerEpochStore,
4270        new_protocol_version: ProtocolVersion,
4271    ) {
4272        self.get_reconfig_api()
4273            .maybe_reaccumulate_state_hash(cur_epoch_store, new_protocol_version);
4274    }
4275
4276    #[instrument(level = "error", skip_all)]
4277    fn check_system_consistency(
4278        &self,
4279        cur_epoch_store: &AuthorityPerEpochStore,
4280        state_hasher: Arc<GlobalStateHasher>,
4281        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4282    ) {
4283        info!(
4284            "Performing sui conservation consistency check for epoch {}",
4285            cur_epoch_store.epoch()
4286        );
4287
4288        if let Err(err) = self
4289            .get_reconfig_api()
4290            .expensive_check_sui_conservation(cur_epoch_store)
4291        {
4292            if cfg!(debug_assertions) {
4293                panic!("{}", err);
4294            } else {
4295                // We cannot panic in production yet because it is known that there are some
4296                // inconsistencies in testnet. We will enable this once we make it balanced again in testnet.
4297                warn!("Sui conservation consistency check failed: {}", err);
4298            }
4299        } else {
4300            info!("Sui conservation consistency check passed");
4301        }
4302
4303        // check for root state hash consistency with live object set
4304        if expensive_safety_check_config.enable_state_consistency_check() {
4305            info!(
4306                "Performing state consistency check for epoch {}",
4307                cur_epoch_store.epoch()
4308            );
4309            self.expensive_check_is_consistent_state(state_hasher, cur_epoch_store);
4310        }
4311
4312        if expensive_safety_check_config.enable_secondary_index_checks()
4313            && let Some(indexes) = self.indexes.clone()
4314        {
4315            verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
4316                .expect("secondary indexes are inconsistent");
4317        }
4318
4319        // Verify all checkpointed transactions are present in transactions_seq.
4320        // This catches any post-processing gaps that could occur if async
4321        // post-processing failed to complete before persistence.
4322        // This runs whenever indexes are enabled (not gated on enable_secondary_index_checks)
4323        // because it is cheap and critical for async post-processing correctness.
4324        if let Some(indexes) = self.indexes.clone() {
4325            let epoch = cur_epoch_store.epoch();
4326            // Only verify the current epoch's checkpoints. Previous epoch contents
4327            // may have been pruned, and we only need to verify that this epoch's
4328            // async post-processing completed correctly.
4329            let first_checkpoint = if epoch == 0 {
4330                0
4331            } else {
4332                self.checkpoint_store
4333                    .get_epoch_last_checkpoint_seq_number(epoch - 1)
4334                    .expect("Failed to get previous epoch's last checkpoint")
4335                    .expect("Previous epoch's last checkpoint missing")
4336                    + 1
4337            };
4338            let highest_executed = self
4339                .checkpoint_store
4340                .get_highest_executed_checkpoint_seq_number()
4341                .expect("Failed to get highest executed checkpoint")
4342                .expect("No executed checkpoints");
4343
4344            info!(
4345                "Verifying checkpointed transactions are in transactions_seq \
4346                 (checkpoints {first_checkpoint}..={highest_executed})"
4347            );
4348            for seq in first_checkpoint..=highest_executed {
4349                let checkpoint = self
4350                    .checkpoint_store
4351                    .get_checkpoint_by_sequence_number(seq)
4352                    .expect("Failed to get checkpoint")
4353                    .expect("Checkpoint missing");
4354                let contents = self
4355                    .checkpoint_store
4356                    .get_checkpoint_contents(&checkpoint.content_digest)
4357                    .expect("Failed to get checkpoint contents")
4358                    .expect("Checkpoint contents missing");
4359                for digests in contents.iter() {
4360                    let tx_digest = digests.transaction;
4361                    assert!(
4362                        indexes
4363                            .get_transaction_seq(&tx_digest)
4364                            .expect("Failed to read transactions_seq")
4365                            .is_some(),
4366                        "Transaction {tx_digest} from checkpoint {seq} missing from transactions_seq"
4367                    );
4368                }
4369            }
4370            info!("All checkpointed transactions verified in transactions_seq");
4371        }
4372    }
4373
4374    fn expensive_check_is_consistent_state(
4375        &self,
4376        state_hasher: Arc<GlobalStateHasher>,
4377        cur_epoch_store: &AuthorityPerEpochStore,
4378    ) {
4379        let live_object_set_hash = state_hasher.digest_live_object_set(
4380            !cur_epoch_store
4381                .protocol_config()
4382                .simplified_unwrap_then_delete(),
4383        );
4384
4385        let root_state_hash: ECMHLiveObjectSetDigest = self
4386            .get_global_state_hash_store()
4387            .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
4388            .expect("Retrieving root state hash cannot fail")
4389            .expect("Root state hash for epoch must exist")
4390            .1
4391            .digest()
4392            .into();
4393
4394        let is_inconsistent = root_state_hash != live_object_set_hash;
4395        if is_inconsistent {
4396            debug_fatal!(
4397                "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
4398                root_state_hash,
4399                live_object_set_hash
4400            );
4401        } else {
4402            info!("State consistency check passed");
4403        }
4404
4405        state_hasher.set_inconsistent_state(is_inconsistent);
4406    }
4407
4408    pub fn current_epoch_for_testing(&self) -> EpochId {
4409        self.epoch_store_for_testing().epoch()
4410    }
4411
4412    #[instrument(level = "error", skip_all)]
4413    pub fn checkpoint_all_dbs(
4414        &self,
4415        checkpoint_path: &Path,
4416        cur_epoch_store: &AuthorityPerEpochStore,
4417        checkpoint_indexes: bool,
4418    ) -> SuiResult {
4419        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
4420        let current_epoch = cur_epoch_store.epoch();
4421
4422        if checkpoint_path.exists() {
4423            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
4424            return Ok(());
4425        }
4426
4427        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
4428        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
4429
4430        if checkpoint_path_tmp.exists() {
4431            fs::remove_dir_all(&checkpoint_path_tmp)
4432                .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4433        }
4434
4435        fs::create_dir_all(&checkpoint_path_tmp)
4436            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4437        fs::create_dir(&store_checkpoint_path_tmp)
4438            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4439
4440        // NOTE: Do not change the order of invoking these checkpoint calls
4441        // We want to snapshot checkpoint db first to not race with state sync
4442        self.checkpoint_store
4443            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
4444
4445        self.get_reconfig_api()
4446            .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
4447
4448        self.committee_store
4449            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
4450
4451        if checkpoint_indexes && let Some(indexes) = self.indexes.as_ref() {
4452            indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
4453        }
4454
4455        fs::rename(checkpoint_path_tmp, checkpoint_path)
4456            .map_err(|e| SuiErrorKind::FileIOError(e.to_string()))?;
4457        Ok(())
4458    }
4459
4460    /// Load the current epoch store. This can change during reconfiguration. To ensure that
4461    /// we never end up accessing different epoch stores in a single task, we need to make sure
4462    /// that this is called once per task. Each call needs to be carefully audited to ensure it is
4463    /// the case. This also means we should minimize the number of call-sites. Only call it when
4464    /// there is no way to obtain it from somewhere else.
4465    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4466        self.epoch_store.load()
4467    }
4468
4469    // Load the epoch store, should be used in tests only.
4470    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
4471        self.load_epoch_store_one_call_per_task()
4472    }
4473
4474    pub fn clone_committee_for_testing(&self) -> Committee {
4475        Committee::clone(self.epoch_store_for_testing().committee())
4476    }
4477
4478    #[instrument(level = "trace", skip_all)]
4479    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
4480        self.get_object_store().get_object(object_id)
4481    }
4482
4483    pub async fn get_sui_system_package_object_ref(&self) -> SuiResult<ObjectRef> {
4484        Ok(self
4485            .get_object(&SUI_SYSTEM_ADDRESS.into())
4486            .await
4487            .expect("framework object should always exist")
4488            .compute_object_reference())
4489    }
4490
4491    // This function is only used for testing.
4492    pub fn get_sui_system_state_object_for_testing(&self) -> SuiResult<SuiSystemState> {
4493        self.get_object_cache_reader()
4494            .get_sui_system_state_object_unsafe()
4495    }
4496
4497    #[instrument(level = "trace", skip_all)]
4498    fn get_transaction_checkpoint_sequence(
4499        &self,
4500        digest: &TransactionDigest,
4501        epoch_store: &AuthorityPerEpochStore,
4502    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
4503        epoch_store.get_transaction_checkpoint(digest)
4504    }
4505
4506    #[instrument(level = "trace", skip_all)]
4507    pub fn get_checkpoint_by_sequence_number(
4508        &self,
4509        sequence_number: CheckpointSequenceNumber,
4510    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4511        Ok(self
4512            .checkpoint_store
4513            .get_checkpoint_by_sequence_number(sequence_number)?)
4514    }
4515
4516    #[instrument(level = "trace", skip_all)]
4517    pub fn get_transaction_checkpoint_for_tests(
4518        &self,
4519        digest: &TransactionDigest,
4520        epoch_store: &AuthorityPerEpochStore,
4521    ) -> SuiResult<Option<VerifiedCheckpoint>> {
4522        let checkpoint = self.get_transaction_checkpoint_sequence(digest, epoch_store)?;
4523        let Some(checkpoint) = checkpoint else {
4524            return Ok(None);
4525        };
4526        let checkpoint = self
4527            .checkpoint_store
4528            .get_checkpoint_by_sequence_number(checkpoint)?;
4529        Ok(checkpoint)
4530    }
4531
4532    #[instrument(level = "trace", skip_all)]
4533    pub fn get_object_read(&self, object_id: &ObjectID) -> SuiResult<ObjectRead> {
4534        Ok(
4535            match self
4536                .get_object_cache_reader()
4537                .get_latest_object_or_tombstone(*object_id)
4538            {
4539                Some((_, ObjectOrTombstone::Object(object))) => {
4540                    let layout = self.get_object_layout(&object)?;
4541                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
4542                }
4543                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
4544                None => ObjectRead::NotExists(*object_id),
4545            },
4546        )
4547    }
4548
4549    /// Chain Identifier is the digest of the genesis checkpoint.
4550    pub fn get_chain_identifier(&self) -> ChainIdentifier {
4551        self.chain_identifier
4552    }
4553
4554    pub fn get_fork_recovery_state(&self) -> Option<&ForkRecoveryState> {
4555        self.fork_recovery_state.as_ref()
4556    }
4557
4558    #[instrument(level = "trace", skip_all)]
4559    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> SuiResult<T>
4560    where
4561        T: DeserializeOwned,
4562    {
4563        let o = self.get_object_read(object_id)?.into_object()?;
4564        if let Some(move_object) = o.data.try_as_move() {
4565            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4566                SuiErrorKind::ObjectDeserializationError {
4567                    error: format!("{e}"),
4568                }
4569            })?)
4570        } else {
4571            Err(SuiErrorKind::ObjectDeserializationError {
4572                error: format!("Provided object : [{object_id}] is not a Move object."),
4573            }
4574            .into())
4575        }
4576    }
4577
4578    /// This function aims to serve rpc reads on past objects and
4579    /// we don't expect it to be called for other purposes.
4580    /// Depending on the object pruning policies that will be enforced in the
4581    /// future there is no software-level guarantee/SLA to retrieve an object
4582    /// with an old version even if it exists/existed.
4583    #[instrument(level = "trace", skip_all)]
4584    pub fn get_past_object_read(
4585        &self,
4586        object_id: &ObjectID,
4587        version: SequenceNumber,
4588    ) -> SuiResult<PastObjectRead> {
4589        // Firstly we see if the object ever existed by getting its latest data
4590        let Some(obj_ref) = self
4591            .get_object_cache_reader()
4592            .get_latest_object_ref_or_tombstone(*object_id)
4593        else {
4594            return Ok(PastObjectRead::ObjectNotExists(*object_id));
4595        };
4596
4597        if version > obj_ref.1 {
4598            return Ok(PastObjectRead::VersionTooHigh {
4599                object_id: *object_id,
4600                asked_version: version,
4601                latest_version: obj_ref.1,
4602            });
4603        }
4604
4605        if version < obj_ref.1 {
4606            // Read past objects
4607            return Ok(match self.read_object_at_version(object_id, version)? {
4608                Some((object, layout)) => {
4609                    let obj_ref = object.compute_object_reference();
4610                    PastObjectRead::VersionFound(obj_ref, object, layout)
4611                }
4612
4613                None => PastObjectRead::VersionNotFound(*object_id, version),
4614            });
4615        }
4616
4617        if !obj_ref.2.is_alive() {
4618            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4619        }
4620
4621        match self.read_object_at_version(object_id, obj_ref.1)? {
4622            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4623            None => {
4624                debug_fatal!(
4625                    "Object with in parent_entry is missing from object store, datastore is \
4626                     inconsistent"
4627                );
4628                Err(UserInputError::ObjectNotFound {
4629                    object_id: *object_id,
4630                    version: Some(obj_ref.1),
4631                }
4632                .into())
4633            }
4634        }
4635    }
4636
4637    #[instrument(level = "trace", skip_all)]
4638    fn read_object_at_version(
4639        &self,
4640        object_id: &ObjectID,
4641        version: SequenceNumber,
4642    ) -> SuiResult<Option<(Object, Option<MoveStructLayout>)>> {
4643        let Some(object) = self
4644            .get_object_cache_reader()
4645            .get_object_by_key(object_id, version)
4646        else {
4647            return Ok(None);
4648        };
4649
4650        let layout = self.get_object_layout(&object)?;
4651        Ok(Some((object, layout)))
4652    }
4653
4654    fn get_object_layout(&self, object: &Object) -> SuiResult<Option<MoveStructLayout>> {
4655        let layout = object
4656            .data
4657            .try_as_move()
4658            .map(|object| {
4659                into_struct_layout(
4660                    self.load_epoch_store_one_call_per_task()
4661                        .executor()
4662                        // TODO(cache) - must read through cache
4663                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
4664                        .get_annotated_layout(&object.type_().clone().into())?,
4665                )
4666            })
4667            .transpose()?;
4668        Ok(layout)
4669    }
4670
4671    fn get_owner_at_version(
4672        object_store: &Arc<dyn ObjectStore + Send + Sync>,
4673        object_id: &ObjectID,
4674        version: SequenceNumber,
4675    ) -> SuiResult<Owner> {
4676        object_store
4677            .get_object_by_key(object_id, version)
4678            .ok_or_else(|| {
4679                SuiError::from(UserInputError::ObjectNotFound {
4680                    object_id: *object_id,
4681                    version: Some(version),
4682                })
4683            })
4684            .map(|o| o.owner.clone())
4685    }
4686
4687    #[instrument(level = "trace", skip_all)]
4688    pub fn get_owner_objects(
4689        &self,
4690        owner: SuiAddress,
4691        // If `Some`, the query will start from the next item after the specified cursor
4692        cursor: Option<ObjectID>,
4693        limit: usize,
4694        filter: Option<SuiObjectDataFilter>,
4695    ) -> SuiResult<Vec<ObjectInfo>> {
4696        if let Some(indexes) = &self.indexes {
4697            indexes.get_owner_objects(owner, cursor, limit, filter)
4698        } else {
4699            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4700        }
4701    }
4702
4703    #[instrument(level = "trace", skip_all)]
4704    pub fn get_owned_coins_iterator_with_cursor(
4705        &self,
4706        owner: SuiAddress,
4707        // If `Some`, the query will start from the next item after the specified cursor
4708        cursor: (String, u64, ObjectID),
4709        limit: usize,
4710        one_coin_type_only: bool,
4711    ) -> SuiResult<impl Iterator<Item = (CoinIndexKey2, CoinInfo)> + '_> {
4712        if let Some(indexes) = &self.indexes {
4713            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4714        } else {
4715            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4716        }
4717    }
4718
4719    #[instrument(level = "trace", skip_all)]
4720    pub fn get_owner_objects_iterator(
4721        &self,
4722        owner: SuiAddress,
4723        // If `Some`, the query will start from the next item after the specified cursor
4724        cursor: Option<ObjectID>,
4725        filter: Option<SuiObjectDataFilter>,
4726    ) -> SuiResult<impl Iterator<Item = ObjectInfo> + '_> {
4727        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
4728        if let Some(indexes) = &self.indexes {
4729            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4730        } else {
4731            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4732        }
4733    }
4734
4735    #[instrument(level = "trace", skip_all)]
4736    pub async fn get_move_objects<T>(
4737        &self,
4738        owner: SuiAddress,
4739        type_: MoveObjectType,
4740    ) -> SuiResult<Vec<T>>
4741    where
4742        T: DeserializeOwned,
4743    {
4744        let object_ids = self
4745            .get_owner_objects_iterator(owner, None, None)?
4746            .filter(|o| match &o.type_ {
4747                ObjectType::Struct(s) => &type_ == s,
4748                ObjectType::Package => false,
4749            })
4750            .map(|info| ObjectKey(info.object_id, info.version))
4751            .collect::<Vec<_>>();
4752        let mut move_objects = vec![];
4753
4754        let objects = self
4755            .get_object_store()
4756            .multi_get_objects_by_key(&object_ids);
4757
4758        for (o, id) in objects.into_iter().zip(object_ids) {
4759            let object = o.ok_or_else(|| {
4760                SuiError::from(UserInputError::ObjectNotFound {
4761                    object_id: id.0,
4762                    version: Some(id.1),
4763                })
4764            })?;
4765            let move_object = object.data.try_as_move().ok_or_else(|| {
4766                SuiError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4767            })?;
4768            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4769                SuiErrorKind::ObjectDeserializationError {
4770                    error: format!("{e}"),
4771                }
4772            })?);
4773        }
4774        Ok(move_objects)
4775    }
4776
4777    #[instrument(level = "trace", skip_all)]
4778    pub fn get_dynamic_fields(
4779        &self,
4780        owner: ObjectID,
4781        // If `Some`, the query will start from the next item after the specified cursor
4782        cursor: Option<ObjectID>,
4783        limit: usize,
4784    ) -> SuiResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4785        Ok(self
4786            .get_dynamic_fields_iterator(owner, cursor)?
4787            .take(limit)
4788            .collect::<Result<Vec<_>, _>>()?)
4789    }
4790
4791    fn get_dynamic_fields_iterator(
4792        &self,
4793        owner: ObjectID,
4794        // If `Some`, the query will start from the next item after the specified cursor
4795        cursor: Option<ObjectID>,
4796    ) -> SuiResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4797    {
4798        if let Some(indexes) = &self.indexes {
4799            indexes.get_dynamic_fields_iterator(owner, cursor)
4800        } else {
4801            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4802        }
4803    }
4804
4805    #[instrument(level = "trace", skip_all)]
4806    pub fn get_dynamic_field_object_id(
4807        &self,
4808        owner: ObjectID,
4809        name_type: TypeTag,
4810        name_bcs_bytes: &[u8],
4811    ) -> SuiResult<Option<ObjectID>> {
4812        if let Some(indexes) = &self.indexes {
4813            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4814        } else {
4815            Err(SuiErrorKind::IndexStoreNotAvailable.into())
4816        }
4817    }
4818
4819    #[instrument(level = "trace", skip_all)]
4820    pub fn get_total_transaction_blocks(&self) -> SuiResult<u64> {
4821        Ok(self.get_indexes()?.next_sequence_number())
4822    }
4823
4824    #[instrument(level = "trace", skip_all)]
4825    pub async fn get_executed_transaction_and_effects(
4826        &self,
4827        digest: TransactionDigest,
4828        kv_store: Arc<TransactionKeyValueStore>,
4829    ) -> SuiResult<(Transaction, TransactionEffects)> {
4830        let transaction = kv_store.get_tx(digest).await?;
4831        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4832        Ok((transaction, effects))
4833    }
4834
4835    #[instrument(level = "trace", skip_all)]
4836    pub fn multi_get_checkpoint_by_sequence_number(
4837        &self,
4838        sequence_numbers: &[CheckpointSequenceNumber],
4839    ) -> SuiResult<Vec<Option<VerifiedCheckpoint>>> {
4840        Ok(self
4841            .checkpoint_store
4842            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4843    }
4844
4845    #[instrument(level = "trace", skip_all)]
4846    pub fn get_transaction_events(
4847        &self,
4848        digest: &TransactionDigest,
4849    ) -> SuiResult<TransactionEvents> {
4850        self.get_transaction_cache_reader()
4851            .get_events(digest)
4852            .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: *digest }.into())
4853    }
4854
4855    pub fn get_transaction_input_objects(
4856        &self,
4857        effects: &TransactionEffects,
4858    ) -> SuiResult<Vec<Object>> {
4859        sui_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4860            .map_err(Into::into)
4861    }
4862
4863    pub fn get_transaction_output_objects(
4864        &self,
4865        effects: &TransactionEffects,
4866    ) -> SuiResult<Vec<Object>> {
4867        sui_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4868            .map_err(Into::into)
4869    }
4870
4871    fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
4872        match &self.indexes {
4873            Some(i) => Ok(i.clone()),
4874            None => Err(SuiErrorKind::UnsupportedFeatureError {
4875                error: "extended object indexing is not enabled on this server".into(),
4876            }
4877            .into()),
4878        }
4879    }
4880
4881    pub async fn get_transactions_for_tests(
4882        self: &Arc<Self>,
4883        filter: Option<TransactionFilter>,
4884        cursor: Option<TransactionDigest>,
4885        limit: Option<usize>,
4886        reverse: bool,
4887    ) -> SuiResult<Vec<TransactionDigest>> {
4888        let metrics = KeyValueStoreMetrics::new_for_tests();
4889        let kv_store = Arc::new(TransactionKeyValueStore::new(
4890            "rocksdb",
4891            metrics,
4892            self.clone(),
4893        ));
4894        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4895            .await
4896    }
4897
4898    #[instrument(level = "trace", skip_all)]
4899    pub async fn get_transactions(
4900        &self,
4901        kv_store: &Arc<TransactionKeyValueStore>,
4902        filter: Option<TransactionFilter>,
4903        // If `Some`, the query will start from the next item after the specified cursor
4904        cursor: Option<TransactionDigest>,
4905        limit: Option<usize>,
4906        reverse: bool,
4907    ) -> SuiResult<Vec<TransactionDigest>> {
4908        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4909            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4910            let iter = checkpoint_contents.iter().map(|c| c.transaction);
4911            if reverse {
4912                let iter = iter
4913                    .rev()
4914                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4915                    .skip(usize::from(cursor.is_some()));
4916                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4917            } else {
4918                let iter = iter
4919                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4920                    .skip(usize::from(cursor.is_some()));
4921                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4922            }
4923        }
4924        self.get_indexes()?
4925            .get_transactions(filter, cursor, limit, reverse)
4926    }
4927
4928    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4929        &self.checkpoint_store
4930    }
4931
4932    pub fn get_latest_checkpoint_sequence_number(&self) -> SuiResult<CheckpointSequenceNumber> {
4933        self.get_checkpoint_store()
4934            .get_highest_executed_checkpoint_seq_number()?
4935            .ok_or(
4936                SuiErrorKind::UserInputError {
4937                    error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4938                }
4939                .into(),
4940            )
4941    }
4942
4943    #[cfg(msim)]
4944    pub fn get_highest_pruned_checkpoint_for_testing(&self) -> SuiResult<CheckpointSequenceNumber> {
4945        self.database_for_testing()
4946            .perpetual_tables
4947            .get_highest_pruned_checkpoint()
4948            .map(|c| c.unwrap_or(0))
4949            .map_err(Into::into)
4950    }
4951
4952    #[instrument(level = "trace", skip_all)]
4953    pub fn get_checkpoint_summary_by_sequence_number(
4954        &self,
4955        sequence_number: CheckpointSequenceNumber,
4956    ) -> SuiResult<CheckpointSummary> {
4957        let verified_checkpoint = self
4958            .get_checkpoint_store()
4959            .get_checkpoint_by_sequence_number(sequence_number)?;
4960        match verified_checkpoint {
4961            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4962            None => Err(SuiErrorKind::UserInputError {
4963                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4964            }
4965            .into()),
4966        }
4967    }
4968
4969    #[instrument(level = "trace", skip_all)]
4970    pub fn get_checkpoint_summary_by_digest(
4971        &self,
4972        digest: CheckpointDigest,
4973    ) -> SuiResult<CheckpointSummary> {
4974        let verified_checkpoint = self
4975            .get_checkpoint_store()
4976            .get_checkpoint_by_digest(&digest)?;
4977        match verified_checkpoint {
4978            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4979            None => Err(SuiErrorKind::UserInputError {
4980                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4981            }
4982            .into()),
4983        }
4984    }
4985
4986    #[instrument(level = "trace", skip_all)]
4987    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> SuiResult<TransactionDigest> {
4988        if is_system_package(package_id) {
4989            return self.find_genesis_txn_digest();
4990        }
4991        Ok(self
4992            .get_object_read(&package_id)?
4993            .into_object()?
4994            .previous_transaction)
4995    }
4996
4997    #[instrument(level = "trace", skip_all)]
4998    pub fn find_genesis_txn_digest(&self) -> SuiResult<TransactionDigest> {
4999        let summary = self
5000            .get_verified_checkpoint_by_sequence_number(0)?
5001            .into_message();
5002        let content = self.get_checkpoint_contents(summary.content_digest)?;
5003        let genesis_transaction = content.enumerate_transactions(&summary).next();
5004        Ok(genesis_transaction
5005            .ok_or(SuiErrorKind::UserInputError {
5006                error: UserInputError::GenesisTransactionNotFound,
5007            })?
5008            .1
5009            .transaction)
5010    }
5011
5012    #[instrument(level = "trace", skip_all)]
5013    pub fn get_verified_checkpoint_by_sequence_number(
5014        &self,
5015        sequence_number: CheckpointSequenceNumber,
5016    ) -> SuiResult<VerifiedCheckpoint> {
5017        let verified_checkpoint = self
5018            .get_checkpoint_store()
5019            .get_checkpoint_by_sequence_number(sequence_number)?;
5020        match verified_checkpoint {
5021            Some(verified_checkpoint) => Ok(verified_checkpoint),
5022            None => Err(SuiErrorKind::UserInputError {
5023                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5024            }
5025            .into()),
5026        }
5027    }
5028
5029    #[instrument(level = "trace", skip_all)]
5030    pub fn get_verified_checkpoint_summary_by_digest(
5031        &self,
5032        digest: CheckpointDigest,
5033    ) -> SuiResult<VerifiedCheckpoint> {
5034        let verified_checkpoint = self
5035            .get_checkpoint_store()
5036            .get_checkpoint_by_digest(&digest)?;
5037        match verified_checkpoint {
5038            Some(verified_checkpoint) => Ok(verified_checkpoint),
5039            None => Err(SuiErrorKind::UserInputError {
5040                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
5041            }
5042            .into()),
5043        }
5044    }
5045
5046    #[instrument(level = "trace", skip_all)]
5047    pub fn get_checkpoint_contents(
5048        &self,
5049        digest: CheckpointContentsDigest,
5050    ) -> SuiResult<CheckpointContents> {
5051        self.get_checkpoint_store()
5052            .get_checkpoint_contents(&digest)?
5053            .ok_or(
5054                SuiErrorKind::UserInputError {
5055                    error: UserInputError::CheckpointContentsNotFound(digest),
5056                }
5057                .into(),
5058            )
5059    }
5060
5061    #[instrument(level = "trace", skip_all)]
5062    pub fn get_checkpoint_contents_by_sequence_number(
5063        &self,
5064        sequence_number: CheckpointSequenceNumber,
5065    ) -> SuiResult<CheckpointContents> {
5066        let verified_checkpoint = self
5067            .get_checkpoint_store()
5068            .get_checkpoint_by_sequence_number(sequence_number)?;
5069        match verified_checkpoint {
5070            Some(verified_checkpoint) => {
5071                let content_digest = verified_checkpoint.into_inner().content_digest;
5072                self.get_checkpoint_contents(content_digest)
5073            }
5074            None => Err(SuiErrorKind::UserInputError {
5075                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
5076            }
5077            .into()),
5078        }
5079    }
5080
5081    #[instrument(level = "trace", skip_all)]
5082    pub async fn query_events(
5083        &self,
5084        kv_store: &Arc<TransactionKeyValueStore>,
5085        query: EventFilter,
5086        // If `Some`, the query will start from the next item after the specified cursor
5087        cursor: Option<EventID>,
5088        limit: usize,
5089        descending: bool,
5090    ) -> SuiResult<Vec<SuiEvent>> {
5091        let index_store = self.get_indexes()?;
5092
5093        //Get the tx_num from tx_digest
5094        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
5095            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
5096                SuiErrorKind::TransactionNotFound {
5097                    digest: cursor.tx_digest,
5098                },
5099            )?;
5100            (tx_seq, cursor.event_seq as usize)
5101        } else if descending {
5102            (u64::MAX, usize::MAX)
5103        } else {
5104            (0, 0)
5105        };
5106
5107        let limit = limit + 1;
5108        let mut event_keys = match query {
5109            EventFilter::All([]) => index_store.all_events(tx_num, event_num, limit, descending)?,
5110            EventFilter::Transaction(digest) => {
5111                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
5112            }
5113            EventFilter::MoveModule { package, module } => {
5114                let module_id = ModuleId::new(package.into(), module);
5115                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
5116            }
5117            EventFilter::MoveEventType(struct_name) => index_store
5118                .events_by_move_event_struct_name(
5119                    &struct_name,
5120                    tx_num,
5121                    event_num,
5122                    limit,
5123                    descending,
5124                )?,
5125            EventFilter::Sender(sender) => {
5126                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
5127            }
5128            EventFilter::TimeRange {
5129                start_time,
5130                end_time,
5131            } => index_store
5132                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
5133            EventFilter::MoveEventModule { package, module } => index_store
5134                .events_by_move_event_module(
5135                    &ModuleId::new(package.into(), module),
5136                    tx_num,
5137                    event_num,
5138                    limit,
5139                    descending,
5140                )?,
5141            // not using "_ =>" because we want to make sure we remember to add new variants here
5142            EventFilter::Any(_) => {
5143                return Err(SuiErrorKind::UserInputError {
5144                    error: UserInputError::Unsupported(
5145                        "'Any' queries are not supported by the fullnode.".to_string(),
5146                    ),
5147                }
5148                .into());
5149            }
5150        };
5151
5152        // skip one event if exclusive cursor is provided,
5153        // otherwise truncate to the original limit.
5154        if cursor.is_some() {
5155            if !event_keys.is_empty() {
5156                event_keys.remove(0);
5157            }
5158        } else {
5159            event_keys.truncate(limit - 1);
5160        }
5161
5162        // get the unique set of digests from the event_keys
5163        let transaction_digests = event_keys
5164            .iter()
5165            .map(|(_, digest, _, _)| *digest)
5166            .collect::<HashSet<_>>()
5167            .into_iter()
5168            .collect::<Vec<_>>();
5169
5170        let events = kv_store
5171            .multi_get_events_by_tx_digests(&transaction_digests)
5172            .await?;
5173
5174        let events_map: HashMap<_, _> =
5175            transaction_digests.iter().zip(events.into_iter()).collect();
5176
5177        let stored_events = event_keys
5178            .into_iter()
5179            .map(|k| {
5180                (
5181                    k,
5182                    events_map
5183                        .get(&k.1)
5184                        .expect("fetched digest is missing")
5185                        .clone()
5186                        .and_then(|e| e.data.get(k.2).cloned()),
5187                )
5188            })
5189            .map(
5190                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
5191                    event
5192                        .map(|e| (e, tx_digest, event_seq, timestamp))
5193                        .ok_or(SuiErrorKind::TransactionEventsNotFound { digest: tx_digest })
5194                },
5195            )
5196            .collect::<Result<Vec<_>, _>>()?;
5197
5198        let epoch_store = self.load_epoch_store_one_call_per_task();
5199        let backing_store = self.get_backing_package_store().as_ref();
5200        let mut layout_resolver = epoch_store
5201            .executor()
5202            .type_layout_resolver(Box::new(backing_store));
5203        let mut events = vec![];
5204        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
5205            events.push(SuiEvent::try_from(
5206                e.clone(),
5207                tx_digest,
5208                event_seq as u64,
5209                Some(timestamp),
5210                layout_resolver.get_annotated_layout(&e.type_)?,
5211            )?)
5212        }
5213        Ok(events)
5214    }
5215
5216    pub async fn insert_genesis_object(&self, object: Object) {
5217        self.get_reconfig_api().insert_genesis_object(object);
5218    }
5219
5220    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
5221        futures::future::join_all(
5222            objects
5223                .iter()
5224                .map(|o| self.insert_genesis_object(o.clone())),
5225        )
5226        .await;
5227    }
5228
5229    /// Make a status response for a transaction
5230    #[instrument(level = "trace", skip_all)]
5231    pub fn get_transaction_status(
5232        &self,
5233        transaction_digest: &TransactionDigest,
5234        epoch_store: &Arc<AuthorityPerEpochStore>,
5235    ) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
5236        // TODO: In the case of read path, we should not have to re-sign the effects.
5237        if let Some(effects) =
5238            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
5239        {
5240            if let Some(transaction) = self
5241                .get_transaction_cache_reader()
5242                .get_transaction_block(transaction_digest)
5243            {
5244                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
5245                let events = if effects.events_digest().is_some() {
5246                    self.get_transaction_events(effects.transaction_digest())?
5247                } else {
5248                    TransactionEvents::default()
5249                };
5250                return Ok(Some((
5251                    (*transaction).clone().into_message(),
5252                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
5253                )));
5254            } else {
5255                // The read of effects and read of transaction are not atomic. It's possible that we reverted
5256                // the transaction (during epoch change) in between the above two reads, and we end up
5257                // having effects but not transaction. In this case, we just fall through.
5258                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
5259            }
5260        }
5261        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
5262            self.metrics.tx_already_processed.inc();
5263            let (transaction, sig) = signed.into_inner().into_data_and_sig();
5264            Ok(Some((transaction, TransactionStatus::Signed(sig))))
5265        } else {
5266            Ok(None)
5267        }
5268    }
5269
5270    /// Get the signed effects of the given transaction. If the effects was signed in a previous
5271    /// epoch, re-sign it so that the caller is able to form a cert of the effects in the current
5272    /// epoch.
5273    #[instrument(level = "trace", skip_all)]
5274    pub fn get_signed_effects_and_maybe_resign(
5275        &self,
5276        transaction_digest: &TransactionDigest,
5277        epoch_store: &Arc<AuthorityPerEpochStore>,
5278    ) -> SuiResult<Option<VerifiedSignedTransactionEffects>> {
5279        let effects = self
5280            .get_transaction_cache_reader()
5281            .get_executed_effects(transaction_digest);
5282        match effects {
5283            Some(effects) => {
5284                // If the transaction was executed in previous epochs, the validator will
5285                // re-sign the effects with new current epoch so that a client is always able to
5286                // obtain an effects certificate at the current epoch.
5287                //
5288                // Why is this necessary? Consider the following case:
5289                // - assume there are 4 validators
5290                // - Quorum driver gets 2 signed effects before reconfig halt
5291                // - The tx makes it into final checkpoint.
5292                // - 2 validators go away and are replaced in the new epoch.
5293                // - The new epoch begins.
5294                // - The quorum driver cannot complete the partial effects cert from the previous epoch,
5295                //   because it may not be able to reach either of the 2 former validators.
5296                // - But, if the 2 validators that stayed are willing to re-sign the effects in the new
5297                //   epoch, the QD can make a new effects cert and return it to the client.
5298                //
5299                // This is a considered a short-term workaround. Eventually, Quorum Driver should be able
5300                // to return either an effects certificate, -or- a proof of inclusion in a checkpoint. In
5301                // the case above, the Quorum Driver would return a proof of inclusion in the final
5302                // checkpoint, and this code would no longer be necessary.
5303                if effects.executed_epoch() != epoch_store.epoch() {
5304                    debug!(
5305                        tx_digest=?transaction_digest,
5306                        effects_epoch=?effects.executed_epoch(),
5307                        epoch=?epoch_store.epoch(),
5308                        "Re-signing the effects with the current epoch"
5309                    );
5310                }
5311                Ok(Some(self.sign_effects(effects, epoch_store)?))
5312            }
5313            None => Ok(None),
5314        }
5315    }
5316
5317    #[instrument(level = "trace", skip_all)]
5318    pub(crate) fn sign_effects(
5319        &self,
5320        effects: TransactionEffects,
5321        epoch_store: &Arc<AuthorityPerEpochStore>,
5322    ) -> SuiResult<VerifiedSignedTransactionEffects> {
5323        let tx_digest = *effects.transaction_digest();
5324        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
5325            Some(sig) => {
5326                debug_assert!(sig.epoch == epoch_store.epoch());
5327                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
5328            }
5329            _ => {
5330                let sig = AuthoritySignInfo::new(
5331                    epoch_store.epoch(),
5332                    &effects,
5333                    Intent::sui_app(IntentScope::TransactionEffects),
5334                    self.name,
5335                    &*self.secret,
5336                );
5337
5338                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
5339
5340                epoch_store.insert_effects_digest_and_signature(
5341                    &tx_digest,
5342                    effects.digest(),
5343                    &sig,
5344                )?;
5345
5346                effects
5347            }
5348        };
5349
5350        Ok(VerifiedSignedTransactionEffects::new_unchecked(
5351            signed_effects,
5352        ))
5353    }
5354
5355    // Returns coin objects for indexing for fullnode if indexing is enabled.
5356    #[instrument(level = "trace", skip_all)]
5357    fn fullnode_only_get_tx_coins_for_indexing(
5358        name: AuthorityName,
5359        object_store: &Arc<dyn ObjectStore + Send + Sync>,
5360        effects: &TransactionEffects,
5361        inner_temporary_store: &InnerTemporaryStore,
5362        epoch_store: &Arc<AuthorityPerEpochStore>,
5363    ) -> Option<TxCoins> {
5364        if epoch_store.committee().authority_exists(&name) {
5365            return None;
5366        }
5367        let written_coin_objects = inner_temporary_store
5368            .written
5369            .iter()
5370            .filter_map(|(k, v)| {
5371                if v.is_coin() {
5372                    Some((*k, v.clone()))
5373                } else {
5374                    None
5375                }
5376            })
5377            .collect();
5378        let mut input_coin_objects = inner_temporary_store
5379            .input_objects
5380            .iter()
5381            .filter_map(|(k, v)| {
5382                if v.is_coin() {
5383                    Some((*k, v.clone()))
5384                } else {
5385                    None
5386                }
5387            })
5388            .collect::<ObjectMap>();
5389
5390        // Check for receiving objects that were actually used and modified during execution. Their
5391        // updated version will already showup in "written_coins" but their input isn't included in
5392        // the set of input objects in a inner_temporary_store.
5393        for (object_id, version) in effects.modified_at_versions() {
5394            if inner_temporary_store
5395                .loaded_runtime_objects
5396                .contains_key(&object_id)
5397                && let Some(object) = object_store.get_object_by_key(&object_id, version)
5398                && object.is_coin()
5399            {
5400                input_coin_objects.insert(object_id, object);
5401            }
5402        }
5403
5404        Some((input_coin_objects, written_coin_objects))
5405    }
5406
5407    /// Get the TransactionEnvelope that currently locks the given object, if any.
5408    /// Since object locks are only valid for one epoch, we also need the epoch_id in the query.
5409    /// Returns UserInputError::ObjectNotFound if no lock records for the given object can be found.
5410    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the object record is at a different version.
5411    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a certain transaction.
5412    /// Returns None if a lock record is initialized for the given ObjectRef but not yet locked by any transaction,
5413    ///     or cannot find the transaction in transaction table, because of data race etc.
5414    #[instrument(level = "trace", skip_all)]
5415    pub async fn get_transaction_lock(
5416        &self,
5417        object_ref: &ObjectRef,
5418        epoch_store: &AuthorityPerEpochStore,
5419    ) -> SuiResult<Option<VerifiedSignedTransaction>> {
5420        let lock_info = self
5421            .get_object_cache_reader()
5422            .get_lock(*object_ref, epoch_store)?;
5423        let lock_info = match lock_info {
5424            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
5425                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
5426                    provided_obj_ref: *object_ref,
5427                    current_version: locked_ref.1,
5428                }
5429                .into());
5430            }
5431            ObjectLockStatus::Initialized => {
5432                return Ok(None);
5433            }
5434            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
5435        };
5436
5437        epoch_store.get_signed_transaction(&lock_info.tx_digest)
5438    }
5439
5440    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
5441        self.get_object_cache_reader().get_objects(objects)
5442    }
5443
5444    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
5445        self.get_object_cache_reader()
5446            .get_latest_object_ref_or_tombstone(object_id)
5447    }
5448
5449    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
5450    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the upgrade.
5451    ///
5452    /// This method can be used to dynamic adjust the amount of buffer. If set to 0, the upgrade
5453    /// will go through with only 2f+1 votes.
5454    ///
5455    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all should have the same
5456    /// value), or you risk halting the chain.
5457    pub fn set_override_protocol_upgrade_buffer_stake(
5458        &self,
5459        expected_epoch: EpochId,
5460        buffer_stake_bps: u64,
5461    ) -> SuiResult {
5462        let epoch_store = self.load_epoch_store_one_call_per_task();
5463        let actual_epoch = epoch_store.epoch();
5464        if actual_epoch != expected_epoch {
5465            return Err(SuiErrorKind::WrongEpoch {
5466                expected_epoch,
5467                actual_epoch,
5468            }
5469            .into());
5470        }
5471
5472        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
5473    }
5474
5475    pub fn clear_override_protocol_upgrade_buffer_stake(
5476        &self,
5477        expected_epoch: EpochId,
5478    ) -> SuiResult {
5479        let epoch_store = self.load_epoch_store_one_call_per_task();
5480        let actual_epoch = epoch_store.epoch();
5481        if actual_epoch != expected_epoch {
5482            return Err(SuiErrorKind::WrongEpoch {
5483                expected_epoch,
5484                actual_epoch,
5485            }
5486            .into());
5487        }
5488
5489        epoch_store.clear_override_protocol_upgrade_buffer_stake()
5490    }
5491
5492    /// Get the set of system packages that are compiled in to this build, if those packages are
5493    /// compatible with the current versions of those packages on-chain.
5494    pub async fn get_available_system_packages(
5495        &self,
5496        binary_config: &BinaryConfig,
5497    ) -> Vec<ObjectRef> {
5498        let mut results = vec![];
5499
5500        let system_packages = BuiltInFramework::iter_system_packages();
5501
5502        // Add extra framework packages during simtest
5503        #[cfg(msim)]
5504        let extra_packages = framework_injection::get_extra_packages(self.name);
5505        #[cfg(msim)]
5506        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
5507
5508        for system_package in system_packages {
5509            let modules = system_package.modules().to_vec();
5510            // In simtests, we could override the current built-in framework packages.
5511            #[cfg(msim)]
5512            let modules =
5513                match framework_injection::get_override_modules(&system_package.id, self.name) {
5514                    Some(overrides) if overrides.is_empty() => continue,
5515                    Some(overrides) => overrides,
5516                    None => modules,
5517                };
5518
5519            let Some(obj_ref) = sui_framework::compare_system_package(
5520                &self.get_object_store(),
5521                &system_package.id,
5522                &modules,
5523                system_package.dependencies.to_vec(),
5524                binary_config,
5525            )
5526            .await
5527            else {
5528                return vec![];
5529            };
5530            results.push(obj_ref);
5531        }
5532
5533        results
5534    }
5535
5536    /// Return the new versions, module bytes, and dependencies for the packages that have been
5537    /// committed to for a framework upgrade, in `system_packages`.  Loads the module contents from
5538    /// the binary, and performs the following checks:
5539    ///
5540    /// - Whether its contents matches what is on-chain already, in which case no upgrade is
5541    ///   required, and its contents are omitted from the output.
5542    /// - Whether the contents in the binary can form a package whose digest matches the input,
5543    ///   meaning the framework will be upgraded, and this authority can satisfy that upgrade, in
5544    ///   which case the contents are included in the output.
5545    ///
5546    /// If the current version of the framework can't be loaded, the binary does not contain the
5547    /// bytes for that framework ID, or the resulting package fails the digest check, `None` is
5548    /// returned indicating that this authority cannot run the upgrade that the network voted on.
5549    async fn get_system_package_bytes(
5550        &self,
5551        system_packages: Vec<ObjectRef>,
5552        binary_config: &BinaryConfig,
5553    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
5554        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
5555        let objects = self.get_objects(&ids).await;
5556
5557        let mut res = Vec::with_capacity(system_packages.len());
5558        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
5559            let prev_transaction = match object {
5560                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
5561                    // Skip this one because it doesn't need to be upgraded.
5562                    info!("Framework {} does not need updating", system_package_ref.0);
5563                    continue;
5564                }
5565
5566                Some(cur_object) => cur_object.previous_transaction,
5567                None => TransactionDigest::genesis_marker(),
5568            };
5569
5570            #[cfg(msim)]
5571            let SystemPackage {
5572                id: _,
5573                bytes,
5574                dependencies,
5575            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
5576                .unwrap_or_else(|| {
5577                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
5578                });
5579
5580            #[cfg(not(msim))]
5581            let SystemPackage {
5582                id: _,
5583                bytes,
5584                dependencies,
5585            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
5586
5587            let modules: Vec<_> = bytes
5588                .iter()
5589                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5590                .collect();
5591
5592            let new_object = Object::new_system_package(
5593                &modules,
5594                system_package_ref.1,
5595                dependencies.clone(),
5596                prev_transaction,
5597            );
5598
5599            let new_ref = new_object.compute_object_reference();
5600            if new_ref != system_package_ref {
5601                if cfg!(msim) {
5602                    // debug_fatal required here for test_framework_upgrade_conflicting_versions to pass
5603                    debug_fatal!(
5604                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5605                    );
5606                } else {
5607                    error!(
5608                        "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
5609                    );
5610                }
5611                return None;
5612            }
5613
5614            res.push((system_package_ref.1, bytes, dependencies));
5615        }
5616
5617        Some(res)
5618    }
5619
5620    fn is_protocol_version_supported_v2(
5621        current_protocol_version: ProtocolVersion,
5622        proposed_protocol_version: ProtocolVersion,
5623        protocol_config: &ProtocolConfig,
5624        committee: &Committee,
5625        capabilities: Vec<AuthorityCapabilitiesV2>,
5626        mut buffer_stake_bps: u64,
5627    ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
5628        if proposed_protocol_version > current_protocol_version + 1
5629            && !protocol_config.advance_to_highest_supported_protocol_version()
5630        {
5631            return None;
5632        }
5633
5634        if buffer_stake_bps > 10000 {
5635            warn!("clamping buffer_stake_bps to 10000");
5636            buffer_stake_bps = 10000;
5637        }
5638
5639        // For each validator, gather the protocol version and system packages that it would like
5640        // to upgrade to in the next epoch.
5641        let mut desired_upgrades: Vec<_> = capabilities
5642            .into_iter()
5643            .filter_map(|mut cap| {
5644                // A validator that lists no packages is voting against any change at all.
5645                if cap.available_system_packages.is_empty() {
5646                    return None;
5647                }
5648
5649                cap.available_system_packages.sort();
5650
5651                info!(
5652                    "validator {:?} supports {:?} with system packages: {:?}",
5653                    cap.authority.concise(),
5654                    cap.supported_protocol_versions,
5655                    cap.available_system_packages,
5656                );
5657
5658                // A validator that only supports the current protocol version is also voting
5659                // against any change, because framework upgrades always require a protocol version
5660                // bump.
5661                cap.supported_protocol_versions
5662                    .get_version_digest(proposed_protocol_version)
5663                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
5664            })
5665            .collect();
5666
5667        // There can only be one set of votes that have a majority, find one if it exists.
5668        desired_upgrades.sort();
5669        desired_upgrades
5670            .into_iter()
5671            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5672            .into_iter()
5673            .find_map(|((digest, packages), group)| {
5674                // should have been filtered out earlier.
5675                assert!(!packages.is_empty());
5676
5677                let mut stake_aggregator: StakeAggregator<(), true> =
5678                    StakeAggregator::new(Arc::new(committee.clone()));
5679
5680                for (_, _, authority) in group {
5681                    stake_aggregator.insert_generic(authority, ());
5682                }
5683
5684                let total_votes = stake_aggregator.total_votes();
5685                let quorum_threshold = committee.quorum_threshold();
5686                let f = committee.total_votes() - committee.quorum_threshold();
5687
5688                // multiple by buffer_stake_bps / 10000, rounded up.
5689                let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
5690                let effective_threshold = quorum_threshold + buffer_stake;
5691
5692                info!(
5693                    protocol_config_digest = ?digest,
5694                    ?total_votes,
5695                    ?quorum_threshold,
5696                    ?buffer_stake_bps,
5697                    ?effective_threshold,
5698                    ?proposed_protocol_version,
5699                    ?packages,
5700                    "support for upgrade"
5701                );
5702
5703                let has_support = total_votes >= effective_threshold;
5704                has_support.then_some((proposed_protocol_version, packages))
5705            })
5706    }
5707
5708    fn choose_protocol_version_and_system_packages_v2(
5709        current_protocol_version: ProtocolVersion,
5710        protocol_config: &ProtocolConfig,
5711        committee: &Committee,
5712        capabilities: Vec<AuthorityCapabilitiesV2>,
5713        buffer_stake_bps: u64,
5714    ) -> (ProtocolVersion, Vec<ObjectRef>) {
5715        assert!(protocol_config.authority_capabilities_v2());
5716        let mut next_protocol_version = current_protocol_version;
5717        let mut system_packages = vec![];
5718
5719        while let Some((version, packages)) = Self::is_protocol_version_supported_v2(
5720            current_protocol_version,
5721            next_protocol_version + 1,
5722            protocol_config,
5723            committee,
5724            capabilities.clone(),
5725            buffer_stake_bps,
5726        ) {
5727            next_protocol_version = version;
5728            system_packages = packages;
5729        }
5730
5731        (next_protocol_version, system_packages)
5732    }
5733
5734    #[instrument(level = "debug", skip_all)]
5735    fn create_authenticator_state_tx(
5736        &self,
5737        epoch_store: &Arc<AuthorityPerEpochStore>,
5738    ) -> Option<EndOfEpochTransactionKind> {
5739        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5740            info!("authenticator state transactions not enabled");
5741            return None;
5742        }
5743
5744        let authenticator_state_exists = epoch_store.authenticator_state_exists();
5745        let tx = if authenticator_state_exists {
5746            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5747            let min_epoch =
5748                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5749            let authenticator_obj_initial_shared_version = epoch_store
5750                .epoch_start_config()
5751                .authenticator_obj_initial_shared_version()
5752                .expect("initial version must exist");
5753
5754            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5755                min_epoch,
5756                authenticator_obj_initial_shared_version,
5757            );
5758
5759            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5760
5761            tx
5762        } else {
5763            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5764            info!("Creating AuthenticatorStateCreate tx");
5765            tx
5766        };
5767        Some(tx)
5768    }
5769
5770    #[instrument(level = "debug", skip_all)]
5771    fn create_randomness_state_tx(
5772        &self,
5773        epoch_store: &Arc<AuthorityPerEpochStore>,
5774    ) -> Option<EndOfEpochTransactionKind> {
5775        if !epoch_store.protocol_config().random_beacon() {
5776            info!("randomness state transactions not enabled");
5777            return None;
5778        }
5779
5780        if epoch_store.randomness_state_exists() {
5781            return None;
5782        }
5783
5784        let tx = EndOfEpochTransactionKind::new_randomness_state_create();
5785        info!("Creating RandomnessStateCreate tx");
5786        Some(tx)
5787    }
5788
5789    #[instrument(level = "debug", skip_all)]
5790    fn create_accumulator_root_tx(
5791        &self,
5792        epoch_store: &Arc<AuthorityPerEpochStore>,
5793    ) -> Option<EndOfEpochTransactionKind> {
5794        if !epoch_store
5795            .protocol_config()
5796            .create_root_accumulator_object()
5797        {
5798            info!("accumulator root creation not enabled");
5799            return None;
5800        }
5801
5802        if epoch_store.accumulator_root_exists() {
5803            return None;
5804        }
5805
5806        let tx = EndOfEpochTransactionKind::new_accumulator_root_create();
5807        info!("Creating AccumulatorRootCreate tx");
5808        Some(tx)
5809    }
5810
5811    #[instrument(level = "debug", skip_all)]
5812    fn create_write_accumulator_storage_cost_tx(
5813        &self,
5814        epoch_store: &Arc<AuthorityPerEpochStore>,
5815    ) -> Option<EndOfEpochTransactionKind> {
5816        if !epoch_store.accumulator_root_exists() {
5817            info!("accumulator root does not exist yet");
5818            return None;
5819        }
5820        if !epoch_store.protocol_config().enable_accumulators() {
5821            info!("accumulators not enabled");
5822            return None;
5823        }
5824
5825        let object_store = self.get_object_store();
5826        let object_count =
5827            match accumulator_metadata::get_accumulator_object_count(object_store.as_ref()) {
5828                Ok(Some(count)) => count,
5829                Ok(None) => return None,
5830                Err(e) => {
5831                    fatal!("failed to read accumulator object count: {e}");
5832                }
5833            };
5834
5835        let storage_cost = object_count.saturating_mul(
5836            epoch_store
5837                .protocol_config()
5838                .accumulator_object_storage_cost(),
5839        );
5840
5841        let tx = EndOfEpochTransactionKind::new_write_accumulator_storage_cost(storage_cost);
5842        info!(
5843            object_count,
5844            storage_cost, "Creating WriteAccumulatorStorageCost tx"
5845        );
5846        Some(tx)
5847    }
5848
5849    #[instrument(level = "debug", skip_all)]
5850    fn create_coin_registry_tx(
5851        &self,
5852        epoch_store: &Arc<AuthorityPerEpochStore>,
5853    ) -> Option<EndOfEpochTransactionKind> {
5854        if !epoch_store.protocol_config().enable_coin_registry() {
5855            info!("coin registry not enabled");
5856            return None;
5857        }
5858
5859        if epoch_store.coin_registry_exists() {
5860            return None;
5861        }
5862
5863        let tx = EndOfEpochTransactionKind::new_coin_registry_create();
5864        info!("Creating CoinRegistryCreate tx");
5865        Some(tx)
5866    }
5867
5868    #[instrument(level = "debug", skip_all)]
5869    fn create_display_registry_tx(
5870        &self,
5871        epoch_store: &Arc<AuthorityPerEpochStore>,
5872    ) -> Option<EndOfEpochTransactionKind> {
5873        if !epoch_store.protocol_config().enable_display_registry() {
5874            info!("display registry not enabled");
5875            return None;
5876        }
5877
5878        if epoch_store.display_registry_exists() {
5879            return None;
5880        }
5881
5882        let tx = EndOfEpochTransactionKind::new_display_registry_create();
5883        info!("Creating DisplayRegistryCreate tx");
5884        Some(tx)
5885    }
5886
5887    #[instrument(level = "debug", skip_all)]
5888    fn create_bridge_tx(
5889        &self,
5890        epoch_store: &Arc<AuthorityPerEpochStore>,
5891    ) -> Option<EndOfEpochTransactionKind> {
5892        if !epoch_store.protocol_config().enable_bridge() {
5893            info!("bridge not enabled");
5894            return None;
5895        }
5896        if epoch_store.bridge_exists() {
5897            return None;
5898        }
5899        let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
5900        info!("Creating Bridge Create tx");
5901        Some(tx)
5902    }
5903
5904    #[instrument(level = "debug", skip_all)]
5905    fn init_bridge_committee_tx(
5906        &self,
5907        epoch_store: &Arc<AuthorityPerEpochStore>,
5908    ) -> Option<EndOfEpochTransactionKind> {
5909        if !epoch_store.protocol_config().enable_bridge() {
5910            info!("bridge not enabled");
5911            return None;
5912        }
5913        if !epoch_store
5914            .protocol_config()
5915            .should_try_to_finalize_bridge_committee()
5916        {
5917            info!("should not try to finalize bridge committee yet");
5918            return None;
5919        }
5920        // Only create this transaction if bridge exists
5921        if !epoch_store.bridge_exists() {
5922            return None;
5923        }
5924
5925        if epoch_store.bridge_committee_initiated() {
5926            return None;
5927        }
5928
5929        let bridge_initial_shared_version = epoch_store
5930            .epoch_start_config()
5931            .bridge_obj_initial_shared_version()
5932            .expect("initial version must exist");
5933        let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
5934        info!("Init Bridge committee tx");
5935        Some(tx)
5936    }
5937
5938    #[instrument(level = "debug", skip_all)]
5939    fn create_deny_list_state_tx(
5940        &self,
5941        epoch_store: &Arc<AuthorityPerEpochStore>,
5942    ) -> Option<EndOfEpochTransactionKind> {
5943        if !epoch_store.protocol_config().enable_coin_deny_list_v1() {
5944            return None;
5945        }
5946
5947        if epoch_store.coin_deny_list_state_exists() {
5948            return None;
5949        }
5950
5951        let tx = EndOfEpochTransactionKind::new_deny_list_state_create();
5952        info!("Creating DenyListStateCreate tx");
5953        Some(tx)
5954    }
5955
5956    #[instrument(level = "debug", skip_all)]
5957    fn create_address_alias_state_tx(
5958        &self,
5959        epoch_store: &Arc<AuthorityPerEpochStore>,
5960    ) -> Option<EndOfEpochTransactionKind> {
5961        if !epoch_store.protocol_config().address_aliases() {
5962            info!("address aliases not enabled");
5963            return None;
5964        }
5965
5966        if epoch_store.address_alias_state_exists() {
5967            return None;
5968        }
5969
5970        let tx = EndOfEpochTransactionKind::new_address_alias_state_create();
5971        info!("Creating AddressAliasStateCreate tx");
5972        Some(tx)
5973    }
5974
5975    #[instrument(level = "debug", skip_all)]
5976    fn create_execution_time_observations_tx(
5977        &self,
5978        epoch_store: &Arc<AuthorityPerEpochStore>,
5979        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
5980        last_checkpoint_before_end_of_epoch: CheckpointSequenceNumber,
5981    ) -> Option<EndOfEpochTransactionKind> {
5982        let PerObjectCongestionControlMode::ExecutionTimeEstimate(params) = epoch_store
5983            .protocol_config()
5984            .per_object_congestion_control_mode()
5985        else {
5986            return None;
5987        };
5988
5989        // Load tx in the last N checkpoints before end-of-epoch, and save only the
5990        // execution time observations for commands in these checkpoints.
5991        let start_checkpoint = std::cmp::max(
5992            last_checkpoint_before_end_of_epoch
5993                .saturating_sub(params.stored_observations_num_included_checkpoints - 1),
5994            // If we have <N checkpoints in the epoch, use all of them.
5995            epoch_store
5996                .epoch()
5997                .checked_sub(1)
5998                .map(|prev_epoch| {
5999                    self.checkpoint_store
6000                        .get_epoch_last_checkpoint_seq_number(prev_epoch)
6001                        .expect("typed store must not fail")
6002                        .expect(
6003                            "sequence number of last checkpoint of preceding epoch must be saved",
6004                        )
6005                        + 1
6006                })
6007                .unwrap_or(1),
6008        );
6009        info!(
6010            "reading checkpoint range {:?}..={:?}",
6011            start_checkpoint, last_checkpoint_before_end_of_epoch
6012        );
6013        let sequence_numbers =
6014            (start_checkpoint..=last_checkpoint_before_end_of_epoch).collect::<Vec<_>>();
6015        let contents_digests: Vec<_> = self
6016            .checkpoint_store
6017            .multi_get_locally_computed_checkpoints(&sequence_numbers)
6018            .expect("typed store must not fail")
6019            .into_iter()
6020            .zip(sequence_numbers)
6021            .map(|(maybe_checkpoint, sequence_number)| {
6022                if let Some(checkpoint) = maybe_checkpoint {
6023                    checkpoint.content_digest
6024                } else {
6025                    // If locally computed checkpoint summary was already pruned, load the
6026                    // certified checkpoint.
6027                    self.checkpoint_store
6028                        .get_checkpoint_by_sequence_number(sequence_number)
6029                        .expect("typed store must not fail")
6030                        .unwrap_or_else(|| {
6031                            fatal!("preceding checkpoints must exist by end of epoch")
6032                        })
6033                        .data()
6034                        .content_digest
6035                }
6036            })
6037            .collect();
6038        let tx_digests: Vec<_> = self
6039            .checkpoint_store
6040            .multi_get_checkpoint_content(&contents_digests)
6041            .expect("typed store must not fail")
6042            .into_iter()
6043            .flat_map(|maybe_contents| {
6044                maybe_contents
6045                    .expect("preceding checkpoint contents must exist by end of epoch")
6046                    .into_inner()
6047                    .into_iter()
6048                    .map(|digests| digests.transaction)
6049            })
6050            .collect();
6051        let included_execution_time_observations: HashSet<_> = self
6052            .get_transaction_cache_reader()
6053            .multi_get_transaction_blocks(&tx_digests)
6054            .into_iter()
6055            .flat_map(|maybe_tx| {
6056                if let TransactionKind::ProgrammableTransaction(ptb) = maybe_tx
6057                    .expect("preceding transaction must exist by end of epoch")
6058                    .transaction_data()
6059                    .kind()
6060                {
6061                    #[allow(clippy::unnecessary_to_owned)]
6062                    itertools::Either::Left(
6063                        ptb.commands
6064                            .to_owned()
6065                            .into_iter()
6066                            .map(|cmd| ExecutionTimeObservationKey::from_command(&cmd)),
6067                    )
6068                } else {
6069                    itertools::Either::Right(std::iter::empty())
6070                }
6071            })
6072            .chain(end_of_epoch_observation_keys.into_iter())
6073            .collect();
6074
6075        let tx = EndOfEpochTransactionKind::new_store_execution_time_observations(
6076            epoch_store
6077                .get_end_of_epoch_execution_time_observations()
6078                .filter_and_sort_v1(
6079                    |(key, _)| included_execution_time_observations.contains(key),
6080                    params.stored_observations_limit.try_into().unwrap(),
6081                ),
6082        );
6083        info!("Creating StoreExecutionTimeObservations tx");
6084        Some(tx)
6085    }
6086
6087    /// Creates and execute the advance epoch transaction to effects without committing it to the database.
6088    /// The effects of the change epoch tx are only written to the database after a certified checkpoint has been
6089    /// formed and executed by CheckpointExecutor.
6090    ///
6091    /// When a framework upgraded has been decided on, but the validator does not have the new
6092    /// versions of the packages locally, the validator cannot form the ChangeEpochTx. In this case
6093    /// it returns Err, indicating that the checkpoint builder should give up trying to make the
6094    /// final checkpoint. As long as the network is able to create a certified checkpoint (which
6095    /// should be ensured by the capabilities vote), it will arrive via state sync and be executed
6096    /// by CheckpointExecutor.
6097    #[instrument(level = "error", skip_all)]
6098    pub async fn create_and_execute_advance_epoch_tx(
6099        &self,
6100        epoch_store: &Arc<AuthorityPerEpochStore>,
6101        gas_cost_summary: &GasCostSummary,
6102        checkpoint: CheckpointSequenceNumber,
6103        epoch_start_timestamp_ms: CheckpointTimestamp,
6104        end_of_epoch_observation_keys: Vec<ExecutionTimeObservationKey>,
6105        // This may be less than `checkpoint - 1` if the end-of-epoch PendingCheckpoint produced
6106        // >1 checkpoint.
6107        last_checkpoint: CheckpointSequenceNumber,
6108    ) -> CheckpointBuilderResult<(SuiSystemState, TransactionEffects)> {
6109        let mut txns = Vec::new();
6110
6111        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
6112            txns.push(tx);
6113        }
6114        if let Some(tx) = self.create_randomness_state_tx(epoch_store) {
6115            txns.push(tx);
6116        }
6117        if let Some(tx) = self.create_bridge_tx(epoch_store) {
6118            txns.push(tx);
6119        }
6120        if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
6121            txns.push(tx);
6122        }
6123        if let Some(tx) = self.create_deny_list_state_tx(epoch_store) {
6124            txns.push(tx);
6125        }
6126        if let Some(tx) = self.create_execution_time_observations_tx(
6127            epoch_store,
6128            end_of_epoch_observation_keys,
6129            last_checkpoint,
6130        ) {
6131            txns.push(tx);
6132        }
6133        if let Some(tx) = self.create_accumulator_root_tx(epoch_store) {
6134            txns.push(tx);
6135        }
6136
6137        if let Some(tx) = self.create_coin_registry_tx(epoch_store) {
6138            txns.push(tx);
6139        }
6140        if let Some(tx) = self.create_display_registry_tx(epoch_store) {
6141            txns.push(tx);
6142        }
6143        if let Some(tx) = self.create_address_alias_state_tx(epoch_store) {
6144            txns.push(tx);
6145        }
6146        if let Some(tx) = self.create_write_accumulator_storage_cost_tx(epoch_store) {
6147            txns.push(tx);
6148        }
6149
6150        let next_epoch = epoch_store.epoch() + 1;
6151
6152        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
6153
6154        let (next_epoch_protocol_version, next_epoch_system_packages) =
6155            Self::choose_protocol_version_and_system_packages_v2(
6156                epoch_store.protocol_version(),
6157                epoch_store.protocol_config(),
6158                epoch_store.committee(),
6159                epoch_store
6160                    .get_capabilities_v2()
6161                    .expect("read capabilities from db cannot fail"),
6162                buffer_stake_bps,
6163            );
6164
6165        // since system packages are created during the current epoch, they should abide by the
6166        // rules of the current epoch, including the current epoch's max Move binary format version
6167        let config = epoch_store.protocol_config();
6168        let binary_config = config.binary_config(None);
6169        let Some(next_epoch_system_package_bytes) = self
6170            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
6171            .await
6172        else {
6173            if next_epoch_protocol_version <= ProtocolVersion::MAX {
6174                // This case should only be hit if the validator supports the new protocol version,
6175                // but carries a different framework. The validator should still be able to
6176                // reconfigure, as the correct framework will be installed by the change epoch txn.
6177                debug_fatal!(
6178                    "upgraded system packages {:?} are not locally available, cannot create \
6179                    ChangeEpochTx. validator binary must be upgraded to the correct version!",
6180                    next_epoch_system_packages
6181                );
6182            } else {
6183                error!(
6184                    "validator does not support next_epoch_protocol_version {:?} - will shut down after reconfig unless upgraded",
6185                    next_epoch_protocol_version
6186                );
6187            }
6188            // the checkpoint builder will keep retrying forever when it hits this error.
6189            // Eventually, one of two things will happen:
6190            // - The operator will upgrade this binary to one that has the new packages locally,
6191            //   and this function will succeed.
6192            // - The final checkpoint will be certified by other validators, we will receive it via
6193            //   state sync, and execute it. This will upgrade the framework packages, reconfigure,
6194            //   and most likely shut down in the new epoch (this validator likely doesn't support
6195            //   the new protocol version, or else it should have had the packages.)
6196            return Err(CheckpointBuilderError::SystemPackagesMissing);
6197        };
6198
6199        let tx = if epoch_store
6200            .protocol_config()
6201            .end_of_epoch_transaction_supported()
6202        {
6203            txns.push(EndOfEpochTransactionKind::new_change_epoch(
6204                next_epoch,
6205                next_epoch_protocol_version,
6206                gas_cost_summary.storage_cost,
6207                gas_cost_summary.computation_cost,
6208                gas_cost_summary.storage_rebate,
6209                gas_cost_summary.non_refundable_storage_fee,
6210                epoch_start_timestamp_ms,
6211                next_epoch_system_package_bytes,
6212            ));
6213
6214            VerifiedTransaction::new_end_of_epoch_transaction(txns)
6215        } else {
6216            VerifiedTransaction::new_change_epoch(
6217                next_epoch,
6218                next_epoch_protocol_version,
6219                gas_cost_summary.storage_cost,
6220                gas_cost_summary.computation_cost,
6221                gas_cost_summary.storage_rebate,
6222                gas_cost_summary.non_refundable_storage_fee,
6223                epoch_start_timestamp_ms,
6224                next_epoch_system_package_bytes,
6225            )
6226        };
6227
6228        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
6229            tx.clone(),
6230            epoch_store.epoch(),
6231            checkpoint,
6232        );
6233
6234        let tx_digest = executable_tx.digest();
6235
6236        info!(
6237            ?next_epoch,
6238            ?next_epoch_protocol_version,
6239            ?next_epoch_system_packages,
6240            computation_cost=?gas_cost_summary.computation_cost,
6241            storage_cost=?gas_cost_summary.storage_cost,
6242            storage_rebate=?gas_cost_summary.storage_rebate,
6243            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
6244            ?tx_digest,
6245            "Creating advance epoch transaction"
6246        );
6247
6248        fail_point_async!("change_epoch_tx_delay");
6249        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
6250
6251        // The tx could have been executed by state sync already - if so simply return an error.
6252        // The checkpoint builder will shortly be terminated by reconfiguration anyway.
6253        if self
6254            .get_transaction_cache_reader()
6255            .is_tx_already_executed(tx_digest)
6256        {
6257            warn!("change epoch tx has already been executed via state sync");
6258            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6259        }
6260
6261        let Some(execution_guard) = self.execution_lock_for_executable_transaction(&executable_tx)
6262        else {
6263            return Err(CheckpointBuilderError::ChangeEpochTxAlreadyExecuted);
6264        };
6265
6266        // We must manually assign the shared object versions to the transaction before executing it.
6267        // This is because we do not sequence end-of-epoch transactions through consensus.
6268        let assigned_versions = epoch_store.assign_shared_object_versions_idempotent(
6269            self.get_object_cache_reader().as_ref(),
6270            std::iter::once(&Schedulable::Transaction(&executable_tx)),
6271        )?;
6272
6273        assert_eq!(assigned_versions.0.len(), 1);
6274        let assigned_versions = assigned_versions.0.into_iter().next().unwrap().1;
6275
6276        let input_objects = self.read_objects_for_execution(
6277            &tx_lock,
6278            &executable_tx,
6279            &assigned_versions,
6280            epoch_store,
6281        )?;
6282
6283        let (transaction_outputs, _timings, _execution_error_opt) = self
6284            .execute_certificate(
6285                &execution_guard,
6286                &executable_tx,
6287                input_objects,
6288                None,
6289                ExecutionEnv::default(),
6290                epoch_store,
6291            )
6292            .unwrap();
6293        let system_obj = get_sui_system_state(&transaction_outputs.written)
6294            .expect("change epoch tx must write to system object");
6295
6296        let effects = transaction_outputs.effects;
6297        // We must write tx and effects to the state sync tables so that state sync is able to
6298        // deliver to the transaction to CheckpointExecutor after it is included in a certified
6299        // checkpoint.
6300        self.get_state_sync_store()
6301            .insert_transaction_and_effects(&tx, &effects);
6302
6303        info!(
6304            "Effects summary of the change epoch transaction: {:?}",
6305            effects.summary_for_debug()
6306        );
6307        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
6308        // The change epoch transaction cannot fail to execute.
6309        assert!(effects.status().is_ok());
6310        Ok((system_obj, effects))
6311    }
6312
6313    #[instrument(level = "error", skip_all)]
6314    async fn reopen_epoch_db(
6315        &self,
6316        cur_epoch_store: &AuthorityPerEpochStore,
6317        new_committee: Committee,
6318        epoch_start_configuration: EpochStartConfiguration,
6319        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
6320        epoch_last_checkpoint: CheckpointSequenceNumber,
6321    ) -> SuiResult<Arc<AuthorityPerEpochStore>> {
6322        let new_epoch = new_committee.epoch;
6323        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
6324        assert_eq!(
6325            epoch_start_configuration.epoch_start_state().epoch(),
6326            new_committee.epoch
6327        );
6328        fail_point!("before-open-new-epoch-store");
6329        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
6330            self.name,
6331            new_committee,
6332            epoch_start_configuration,
6333            self.get_backing_package_store().clone(),
6334            self.get_object_store().clone(),
6335            expensive_safety_check_config,
6336            epoch_last_checkpoint,
6337        )?;
6338        self.epoch_store.store(new_epoch_store.clone());
6339        Ok(new_epoch_store)
6340    }
6341
6342    #[cfg(test)]
6343    pub(crate) fn iter_live_object_set_for_testing(
6344        &self,
6345    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
6346        let include_wrapped_object = !self
6347            .epoch_store_for_testing()
6348            .protocol_config()
6349            .simplified_unwrap_then_delete();
6350        self.get_global_state_hash_store()
6351            .iter_cached_live_object_set_for_testing(include_wrapped_object)
6352    }
6353
6354    #[cfg(test)]
6355    pub(crate) fn shutdown_execution_for_test(&self) {
6356        self.tx_execution_shutdown
6357            .lock()
6358            .take()
6359            .unwrap()
6360            .send(())
6361            .unwrap();
6362    }
6363
6364    /// NOTE: this function is only to be used for fuzzing and testing. Never use in prod
6365    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> SuiResult {
6366        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
6367        self.get_object_cache_reader()
6368            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
6369        self.get_reconfig_api()
6370            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
6371        Ok(())
6372    }
6373}
6374
6375pub struct RandomnessRoundReceiver {
6376    authority_state: Arc<AuthorityState>,
6377    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6378}
6379
6380impl RandomnessRoundReceiver {
6381    pub fn spawn(
6382        authority_state: Arc<AuthorityState>,
6383        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
6384    ) -> JoinHandle<()> {
6385        let rrr = RandomnessRoundReceiver {
6386            authority_state,
6387            randomness_rx,
6388        };
6389        spawn_monitored_task!(rrr.run())
6390    }
6391
6392    async fn run(mut self) {
6393        info!("RandomnessRoundReceiver event loop started");
6394
6395        loop {
6396            tokio::select! {
6397                maybe_recv = self.randomness_rx.recv() => {
6398                    if let Some((epoch, round, bytes)) = maybe_recv {
6399                        self.handle_new_randomness(epoch, round, bytes).await;
6400                    } else {
6401                        break;
6402                    }
6403                },
6404            }
6405        }
6406
6407        info!("RandomnessRoundReceiver event loop ended");
6408    }
6409
6410    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
6411    async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
6412        fail_point_async!("randomness-delay");
6413
6414        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
6415        if epoch_store.epoch() != epoch {
6416            warn!(
6417                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
6418                epoch_store.epoch()
6419            );
6420            return;
6421        }
6422        let key = TransactionKey::RandomnessRound(epoch, round);
6423        let transaction = VerifiedTransaction::new_randomness_state_update(
6424            epoch,
6425            round,
6426            bytes,
6427            epoch_store
6428                .epoch_start_config()
6429                .randomness_obj_initial_shared_version()
6430                .expect("randomness state obj must exist"),
6431        );
6432        debug!(
6433            "created randomness state update transaction with digest: {:?}",
6434            transaction.digest()
6435        );
6436        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
6437        let digest = *transaction.digest();
6438
6439        // Randomness state updates contain the full bls signature for the random round,
6440        // which cannot necessarily be reconstructed again later. Therefore we must immediately
6441        // persist this transaction. If we crash before its outputs are committed, this
6442        // ensures we will be able to re-execute it.
6443        self.authority_state
6444            .get_cache_commit()
6445            .persist_transaction(&transaction);
6446
6447        // Notify the scheduler that the transaction key now has a known digest
6448        if epoch_store.insert_tx_key(key, digest).is_err() {
6449            warn!("epoch ended while handling new randomness");
6450        }
6451
6452        let authority_state = self.authority_state.clone();
6453        spawn_monitored_task!(async move {
6454            // Wait for transaction execution in a separate task, to avoid deadlock in case of
6455            // out-of-order randomness generation. (Each RandomnessStateUpdate depends on the
6456            // output of the RandomnessStateUpdate from the previous round.)
6457            //
6458            // We set a very long timeout so that in case this gets stuck for some reason, the
6459            // validator will eventually crash rather than continuing in a zombie mode.
6460            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
6461            let result = tokio::time::timeout(
6462                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
6463                authority_state
6464                    .get_transaction_cache_reader()
6465                    .notify_read_executed_effects(
6466                        "RandomnessRoundReceiver::notify_read_executed_effects_first",
6467                        &[digest],
6468                    ),
6469            )
6470            .await;
6471            let mut effects = match result {
6472                Ok(result) => result,
6473                Err(_) => {
6474                    // Crash on randomness update execution timeout in debug builds.
6475                    debug_fatal!(
6476                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
6477                    );
6478                    // Continue waiting as long as necessary in non-debug builds.
6479                    authority_state
6480                        .get_transaction_cache_reader()
6481                        .notify_read_executed_effects(
6482                            "RandomnessRoundReceiver::notify_read_executed_effects_second",
6483                            &[digest],
6484                        )
6485                        .await
6486                }
6487            };
6488
6489            let effects = effects.pop().expect("should return effects");
6490            if *effects.status() != ExecutionStatus::Success {
6491                fatal!(
6492                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6493                );
6494            }
6495            debug!(
6496                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6497            );
6498        });
6499    }
6500}
6501
6502#[async_trait]
6503impl TransactionKeyValueStoreTrait for AuthorityState {
6504    #[instrument(skip(self))]
6505    async fn multi_get(
6506        &self,
6507        transactions: &[TransactionDigest],
6508        effects: &[TransactionDigest],
6509    ) -> SuiResult<(Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>)> {
6510        let txns = if !transactions.is_empty() {
6511            self.get_transaction_cache_reader()
6512                .multi_get_transaction_blocks(transactions)
6513                .into_iter()
6514                .map(|t| t.map(|t| (*t).clone().into_inner()))
6515                .collect()
6516        } else {
6517            vec![]
6518        };
6519
6520        let fx = if !effects.is_empty() {
6521            self.get_transaction_cache_reader()
6522                .multi_get_executed_effects(effects)
6523        } else {
6524            vec![]
6525        };
6526
6527        Ok((txns, fx))
6528    }
6529
6530    #[instrument(skip(self))]
6531    async fn multi_get_checkpoints(
6532        &self,
6533        checkpoint_summaries: &[CheckpointSequenceNumber],
6534        checkpoint_contents: &[CheckpointSequenceNumber],
6535        checkpoint_summaries_by_digest: &[CheckpointDigest],
6536    ) -> SuiResult<(
6537        Vec<Option<CertifiedCheckpointSummary>>,
6538        Vec<Option<CheckpointContents>>,
6539        Vec<Option<CertifiedCheckpointSummary>>,
6540    )> {
6541        // TODO: use multi-get methods if it ever becomes important (unlikely)
6542        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6543        let store = self.get_checkpoint_store();
6544        for seq in checkpoint_summaries {
6545            let checkpoint = store
6546                .get_checkpoint_by_sequence_number(*seq)?
6547                .map(|c| c.into_inner());
6548
6549            summaries.push(checkpoint);
6550        }
6551
6552        let mut contents = Vec::with_capacity(checkpoint_contents.len());
6553        for seq in checkpoint_contents {
6554            let checkpoint = store
6555                .get_checkpoint_by_sequence_number(*seq)?
6556                .and_then(|summary| {
6557                    store
6558                        .get_checkpoint_contents(&summary.content_digest)
6559                        .expect("db read cannot fail")
6560                });
6561            contents.push(checkpoint);
6562        }
6563
6564        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6565        for digest in checkpoint_summaries_by_digest {
6566            let checkpoint = store
6567                .get_checkpoint_by_digest(digest)?
6568                .map(|c| c.into_inner());
6569            summaries_by_digest.push(checkpoint);
6570        }
6571        Ok((summaries, contents, summaries_by_digest))
6572    }
6573
6574    #[instrument(skip(self))]
6575    async fn deprecated_get_transaction_checkpoint(
6576        &self,
6577        digest: TransactionDigest,
6578    ) -> SuiResult<Option<CheckpointSequenceNumber>> {
6579        Ok(self
6580            .get_checkpoint_cache()
6581            .deprecated_get_transaction_checkpoint(&digest)
6582            .map(|(_epoch, checkpoint)| checkpoint))
6583    }
6584
6585    #[instrument(skip(self))]
6586    async fn get_object(
6587        &self,
6588        object_id: ObjectID,
6589        version: VersionNumber,
6590    ) -> SuiResult<Option<Object>> {
6591        Ok(self
6592            .get_object_cache_reader()
6593            .get_object_by_key(&object_id, version))
6594    }
6595
6596    #[instrument(skip_all)]
6597    async fn multi_get_objects(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
6598        Ok(self
6599            .get_object_cache_reader()
6600            .multi_get_objects_by_key(object_keys))
6601    }
6602
6603    #[instrument(skip(self))]
6604    async fn multi_get_transaction_checkpoint(
6605        &self,
6606        digests: &[TransactionDigest],
6607    ) -> SuiResult<Vec<Option<CheckpointSequenceNumber>>> {
6608        let res = self
6609            .get_checkpoint_cache()
6610            .deprecated_multi_get_transaction_checkpoint(digests);
6611
6612        Ok(res
6613            .into_iter()
6614            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6615            .collect())
6616    }
6617
6618    #[instrument(skip(self))]
6619    async fn multi_get_events_by_tx_digests(
6620        &self,
6621        digests: &[TransactionDigest],
6622    ) -> SuiResult<Vec<Option<TransactionEvents>>> {
6623        if digests.is_empty() {
6624            return Ok(vec![]);
6625        }
6626
6627        Ok(self
6628            .get_transaction_cache_reader()
6629            .multi_get_events(digests))
6630    }
6631}
6632
6633#[cfg(msim)]
6634pub mod framework_injection {
6635    use move_binary_format::CompiledModule;
6636    use std::collections::BTreeMap;
6637    use std::{cell::RefCell, collections::BTreeSet};
6638    use sui_framework::{BuiltInFramework, SystemPackage};
6639    use sui_types::base_types::{AuthorityName, ObjectID};
6640    use sui_types::is_system_package;
6641
6642    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
6643
6644    // Thread local cache because all simtests run in a single unique thread.
6645    thread_local! {
6646        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6647    }
6648
6649    type Framework = Vec<CompiledModule>;
6650
6651    pub type PackageUpgradeCallback =
6652        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6653
6654    enum PackageOverrideConfig {
6655        Global(Framework),
6656        PerValidator(PackageUpgradeCallback),
6657    }
6658
6659    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6660        modules
6661            .iter()
6662            .map(|m| {
6663                let mut buf = Vec::new();
6664                m.serialize_with_version(m.version, &mut buf).unwrap();
6665                buf
6666            })
6667            .collect()
6668    }
6669
6670    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
6671        OVERRIDE.with(|bs| {
6672            bs.borrow_mut()
6673                .insert(package_id, PackageOverrideConfig::Global(modules))
6674        });
6675    }
6676
6677    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
6678        OVERRIDE.with(|bs| {
6679            bs.borrow_mut()
6680                .insert(package_id, PackageOverrideConfig::PerValidator(func))
6681        });
6682    }
6683
6684    pub fn set_system_packages(packages: Vec<SystemPackage>) {
6685        OVERRIDE.with(|bs| {
6686            let mut new_packages_not_to_include: BTreeSet<_> =
6687                BuiltInFramework::all_package_ids().into_iter().collect();
6688            for pkg in &packages {
6689                new_packages_not_to_include.remove(&pkg.id);
6690            }
6691            for pkg in packages {
6692                bs.borrow_mut()
6693                    .insert(pkg.id, PackageOverrideConfig::Global(pkg.modules()));
6694            }
6695            for empty_pkg in new_packages_not_to_include {
6696                bs.borrow_mut()
6697                    .insert(empty_pkg, PackageOverrideConfig::Global(vec![]));
6698            }
6699        });
6700    }
6701
6702    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6703        OVERRIDE.with(|cfg| {
6704            cfg.borrow().get(package_id).and_then(|entry| match entry {
6705                PackageOverrideConfig::Global(framework) => {
6706                    Some(compiled_modules_to_bytes(framework))
6707                }
6708                PackageOverrideConfig::PerValidator(func) => {
6709                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
6710                }
6711            })
6712        })
6713    }
6714
6715    pub fn get_override_modules(
6716        package_id: &ObjectID,
6717        name: AuthorityName,
6718    ) -> Option<Vec<CompiledModule>> {
6719        OVERRIDE.with(|cfg| {
6720            cfg.borrow().get(package_id).and_then(|entry| match entry {
6721                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6722                PackageOverrideConfig::PerValidator(func) => func(name),
6723            })
6724        })
6725    }
6726
6727    pub fn get_override_system_package(
6728        package_id: &ObjectID,
6729        name: AuthorityName,
6730    ) -> Option<SystemPackage> {
6731        let bytes = get_override_bytes(package_id, name)?;
6732        let dependencies = if is_system_package(*package_id) {
6733            BuiltInFramework::get_package_by_id(package_id)
6734                .dependencies
6735                .to_vec()
6736        } else {
6737            // Assume that entirely new injected packages depend on all existing system packages.
6738            BuiltInFramework::all_package_ids()
6739        };
6740        Some(SystemPackage {
6741            id: *package_id,
6742            bytes,
6743            dependencies,
6744        })
6745    }
6746
6747    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6748        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids().into_iter());
6749        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6750            cfg.borrow()
6751                .keys()
6752                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6753                .collect()
6754        });
6755
6756        extra
6757            .into_iter()
6758            .map(|package| SystemPackage {
6759                id: package,
6760                bytes: get_override_bytes(&package, name).unwrap(),
6761                dependencies: BuiltInFramework::all_package_ids(),
6762            })
6763            .collect()
6764    }
6765}
6766
6767#[derive(Debug, Serialize, Deserialize, Clone)]
6768pub struct ObjDumpFormat {
6769    pub id: ObjectID,
6770    pub version: VersionNumber,
6771    pub digest: ObjectDigest,
6772    pub object: Object,
6773}
6774
6775impl ObjDumpFormat {
6776    fn new(object: Object) -> Self {
6777        let oref = object.compute_object_reference();
6778        Self {
6779            id: oref.0,
6780            version: oref.1,
6781            digest: oref.2,
6782            object,
6783        }
6784    }
6785}
6786
6787#[derive(Debug, Serialize, Deserialize, Clone)]
6788pub struct NodeStateDump {
6789    pub tx_digest: TransactionDigest,
6790    pub sender_signed_data: SenderSignedData,
6791    pub executed_epoch: u64,
6792    pub reference_gas_price: u64,
6793    pub protocol_version: u64,
6794    pub epoch_start_timestamp_ms: u64,
6795    pub computed_effects: TransactionEffects,
6796    pub expected_effects_digest: TransactionEffectsDigest,
6797    pub relevant_system_packages: Vec<ObjDumpFormat>,
6798    pub shared_objects: Vec<ObjDumpFormat>,
6799    pub loaded_child_objects: Vec<ObjDumpFormat>,
6800    pub modified_at_versions: Vec<ObjDumpFormat>,
6801    pub runtime_reads: Vec<ObjDumpFormat>,
6802    pub input_objects: Vec<ObjDumpFormat>,
6803}
6804
6805impl NodeStateDump {
6806    pub fn new(
6807        tx_digest: &TransactionDigest,
6808        effects: &TransactionEffects,
6809        expected_effects_digest: TransactionEffectsDigest,
6810        object_store: &dyn ObjectStore,
6811        epoch_store: &Arc<AuthorityPerEpochStore>,
6812        inner_temporary_store: &InnerTemporaryStore,
6813        certificate: &VerifiedExecutableTransaction,
6814    ) -> SuiResult<Self> {
6815        // Epoch info
6816        let executed_epoch = epoch_store.epoch();
6817        let reference_gas_price = epoch_store.reference_gas_price();
6818        let epoch_start_config = epoch_store.epoch_start_config();
6819        let protocol_version = epoch_store.protocol_version().as_u64();
6820        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6821
6822        // Record all system packages at this version
6823        let mut relevant_system_packages = Vec::new();
6824        for sys_package_id in BuiltInFramework::all_package_ids() {
6825            if let Some(w) = object_store.get_object(&sys_package_id) {
6826                relevant_system_packages.push(ObjDumpFormat::new(w))
6827            }
6828        }
6829
6830        // Record all the shared objects
6831        let mut shared_objects = Vec::new();
6832        for kind in effects.input_consensus_objects() {
6833            match kind {
6834                InputConsensusObject::Mutate(obj_ref) | InputConsensusObject::ReadOnly(obj_ref) => {
6835                    if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1) {
6836                        shared_objects.push(ObjDumpFormat::new(w))
6837                    }
6838                }
6839                InputConsensusObject::ReadConsensusStreamEnded(..)
6840                | InputConsensusObject::MutateConsensusStreamEnded(..)
6841                | InputConsensusObject::Cancelled(..) => (), // TODO: consider record congested objects.
6842            }
6843        }
6844
6845        // Record all loaded child objects
6846        // Child objects which are read but not mutated are not tracked anywhere else
6847        let mut loaded_child_objects = Vec::new();
6848        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6849            if let Some(w) = object_store.get_object_by_key(id, meta.version) {
6850                loaded_child_objects.push(ObjDumpFormat::new(w))
6851            }
6852        }
6853
6854        // Record all modified objects
6855        let mut modified_at_versions = Vec::new();
6856        for (id, ver) in effects.modified_at_versions() {
6857            if let Some(w) = object_store.get_object_by_key(&id, ver) {
6858                modified_at_versions.push(ObjDumpFormat::new(w))
6859            }
6860        }
6861
6862        // Packages read at runtime, which were not previously loaded into the temoorary store
6863        // Some packages may be fetched at runtime and wont show up in input objects
6864        let mut runtime_reads = Vec::new();
6865        for obj in inner_temporary_store
6866            .runtime_packages_loaded_from_db
6867            .values()
6868        {
6869            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6870        }
6871
6872        // All other input objects should already be in `inner_temporary_store.objects`
6873
6874        Ok(Self {
6875            tx_digest: *tx_digest,
6876            executed_epoch,
6877            reference_gas_price,
6878            epoch_start_timestamp_ms,
6879            protocol_version,
6880            relevant_system_packages,
6881            shared_objects,
6882            loaded_child_objects,
6883            modified_at_versions,
6884            runtime_reads,
6885            sender_signed_data: certificate.clone().into_message(),
6886            input_objects: inner_temporary_store
6887                .input_objects
6888                .values()
6889                .map(|o| ObjDumpFormat::new(o.clone()))
6890                .collect(),
6891            computed_effects: effects.clone(),
6892            expected_effects_digest,
6893        })
6894    }
6895
6896    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6897        let mut objects = Vec::new();
6898        objects.extend(self.relevant_system_packages.clone());
6899        objects.extend(self.shared_objects.clone());
6900        objects.extend(self.loaded_child_objects.clone());
6901        objects.extend(self.modified_at_versions.clone());
6902        objects.extend(self.runtime_reads.clone());
6903        objects.extend(self.input_objects.clone());
6904        objects
6905    }
6906
6907    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6908        let file_name = format!(
6909            "{}_{}_NODE_DUMP.json",
6910            self.tx_digest,
6911            AuthorityState::unixtime_now_ms()
6912        );
6913        let mut path = path.to_path_buf();
6914        path.push(&file_name);
6915        let mut file = File::create(path.clone())?;
6916        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6917        Ok(path)
6918    }
6919
6920    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6921        let file = File::open(path)?;
6922        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6923    }
6924}